using System.Collections;
using System.Collections.Immutable;
using System.Data;
using System.Linq.Expressions;
using Connected.Data.DataProtection;
using Connected.Entities;
using Connected.Entities.Storage;
using Connected.Middleware;
using Connected.ServiceModel.Transactions;
using Connected.Validation;
namespace Connected.Data.Storage;
///
/// Provides read and write operations on the supported storage providers.
///
/// >The type of the entitiy on which operations are performed.
internal class EntityStorage : IAsyncEnumerable, IStorage
where TEntity : IEntity
{
private IQueryProvider _provider;
///
/// Creates a new instance.
///
/// Middleware for protecting transactions and data access.
/// Middleware for recurring validation.
/// Middleware for providing storage connections.
/// Saga Transactions orchestration.
/// The storage connection behavior.
public EntityStorage(IEntityProtectionService dataProtection, IMiddlewareService middleware, IConnectionProvider connections,
ITransactionContext transactions)
{
EntityProtection = dataProtection;
Middleware = middleware;
Connections = connections;
Transactions = transactions;
Expression = Expression.Constant(this);
}
///
/// The middleware used when performing entity operations.
///
private IStorageMiddleware StorageMiddleware
{
get
{
if (Provider is not IStorageMiddleware result)
throw new InvalidCastException(nameof(IStorageMiddleware));
return result;
}
}
///
/// The expression used for retrieving entities.
///
public Expression Expression { get; }
///
/// The entity type for which operations are performed.
///
public Type ElementType => typeof(TEntity);
///
/// The provider used when querying entities. It is based on the .
///
public IQueryProvider Provider => _provider;
///
/// Middleware used for protecting data access and manipulation.
///
private IEntityProtectionService EntityProtection { get; }
///
/// Middleware used for validation in concurrency transactions.
///
private IMiddlewareService Middleware { get; }
///
/// Middleware for retrieving storage connections.
///
private IConnectionProvider Connections { get; }
///
/// Middleware providing saga transactions orchestration.
///
private ITransactionContext Transactions { get; }
///
/// Gets enumerator for entities retrieved via .
///
/// Enumerator containing entities.
public IEnumerator GetEnumerator()
{
var result = Provider.Execute(Expression);
/*
* Make sure we always return non nullable value.
*/
if (result is null)
return new List().GetEnumerator();
return ((IEnumerable)result).GetEnumerator();
}
///
/// Gets enumerator for entities retrieved via .
///
/// Enumerator containing entities.
IEnumerator IEnumerable.GetEnumerator()
{
var result = Provider.Execute(Expression);
/*
* Make sure we always return non nullable value.
*/
if (result is null)
return new List().GetEnumerator();
return ((IEnumerable)result).GetEnumerator();
}
///
/// Gets enumerator for asynchronoues entity retrieval via .
///
/// Token that enables operation to be cancelled.
/// Asynchronous enumerator containing entities.
public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
var result = Provider.Execute(Expression);
if (result is IEnumerable en)
{
var enumerator = en.GetEnumerator();
while (enumerator.MoveNext())
{
await Task.CompletedTask;
yield return (TEntity)enumerator.Current;
}
}
}
public override string ToString()
{
if (Expression.NodeType == ExpressionType.Constant && ((ConstantExpression)Expression).Value == this)
return "Query(" + typeof(TEntity) + ")";
else
return Expression.ToString();
}
///
/// Performs the update on the specified entity.
///
/// The entity to be updated.
/// Entity with an id if insert has been executed, the same entity otherwise.
/// Thrown if the storage statement could no be created.
public async Task Update(TEntity? entity)
{
if (entity is null)
return entity;
await EntityProtection.Invoke(new EntityProtection.EntityProtectionArgs(entity, entity.State));
var operation = StorageMiddleware.CreateOperation(entity);
await Execute(new StorageContextArgs(operation));
return entity;
}
///
/// Performs the update on the specified entity with optional concurrency callback support.
///
/// The type of the arguments used to update the entity.
/// The entity to update.
/// The arguments that supplied updated values.
/// The retry delegate for preparing new update.
/// The entity with the newly inserted id if insert was performed, the same entity otherwise.
public async Task Update(TEntity? entity, TArgs args, Func>? concurrencyRetrying)
where TArgs : IDto
{
return await Update(entity, args, concurrencyRetrying, null);
}
///
/// Updates the entity to the underlying storage with concurrency check.
///
/// The type of the arguments used to update the entity.
/// The entity to update.
/// The arguments that supplied updated values.
/// The retry delegate for preparing new update.
/// An optional merge callback if default merge is not sufficient.
/// The entity with the newly inserted id if insert was performed, the same entity otherwise.
public async Task Update(TEntity? entity, TArgs args, Func>? concurrencyRetrying, Func>? merging)
where TArgs : IDto
{
if (entity is null)
return entity;
DBConcurrencyException? lastException = null;
/*
* Merge the updating entity with the supplied arguments. If callback is provided it is used instead of the default merge.
*/
var currentEntity = merging is null ? entity.Merge(args, entity.State) : await merging(entity);
/*
* There will be 3 retries. If none is succedded an exception will be thrown.
*/
for (var i = 0; i < 3; i++)
{
try
{
/*
* Perform the update. Note that provider should check for concurrency only if
* the entity is updating. Concurrency is not used for inserting and deleting operations.
*/
await Update(currentEntity);
/*
* Provider will merge the updating entity with a new id if the operation is Insert. For updating and
* deleting operations the same entity is returned.
*/
return currentEntity;
}
catch (DBConcurrencyException ex)
{
/*
* Concurrency exception occurred. If the callback is not passed we return immediatelly.
*/
if (concurrencyRetrying is null)
throw;
lastException = ex;
/*
* Wait a small amount of time if the system is currently under heavy load to increase the probabillity
* of successful update.
*/
await Task.Delay(i * i * 50);
/*
* We must perform validation again since the state of the entities has possibly changed. Note that
* only middleware validation is performed not the argument (attribute based).
*/
if (await Middleware.Query>() is ImmutableList> items)
{
foreach (var item in items)
await item.Validate(args);
}
/*
* If validation succedded invoke callback which usually refreshes the cache which causes the entity to be
* reloaded from the data source.
*/
currentEntity = await concurrencyRetrying();
/*
* Entity must be supplied and the merge is performed again.
*/
if (currentEntity is not null)
currentEntity = merging is null ? currentEntity.Merge(args, entity.State) : await merging(currentEntity);
else
throw new NullReferenceException(nameof(entity));
}
}
/*
* This is not good. We couldn't update the entity after 3 retries. The system is most probably either under heavy load and
* the entity is updating very frequently.
*/
if (lastException is not null)
throw lastException;
return default;
}
///
/// Executes storage operation agains one or mode storages.
///
/// The arguments containing data about operation to be performed.
/// The number of records affected in the physical storage.
/// If concurrency is supported on the operation and
/// no records have been affected and the actual operation is UPDATE this exception is thrown.
public async Task Execute(StorageContextArgs args)
{
await using var writer = await OpenWriter(args);
/*
* Execute operation against the storage. This method should return the number of records affected.
*/
var recordsAffected = await writer.Execute();
/*
* It is not necessary that concurrency is actualy considered. Concurrency should be disabled
* if the operation is not UPDATE or the entity does not supports it (does not have an Etag or similar property).
*/
if (recordsAffected == 0 && args.Operation.Concurrency == DataConcurrencyMode.Enabled)
throw new DBConcurrencyException($"{SR.ErrDataConcurrency} ({typeof(Entity).Name})");
/*
* Bind storage parameters with operation parameters.
*/
ReturnValueBinder.Bind(writer, args.Operation);
return recordsAffected;
}
///
/// Opens one or more readers for the specified entity.
///
///
/// If the entity does not support sharding, only one reader is returned. If arguments require more
/// than one shard to be read this method will return one for every shard.
///
/// The arguments containing data about operation to be performed.
/// One or more .
private async Task>> OpenEntityReaders(StorageContextArgs args)
{
/*
* Connection middleware will return one connection for every shard. If sharding is not supported only
* one connection will be returned.
*/
var connections = await Connections.Open(args);
var result = new List>();
foreach (var connection in connections)
result.Add(OpenReader(args.Operation, connection));
return result.ToImmutableList();
}
///
/// Opens one or more readers for the specified entity.
///
/// The arguments containing data about operation to be performed.
/// One or more .
public async Task> OpenReaders(StorageContextArgs args)
{
/*
* Connection middleware will return one connection for every shard. If sharding is not supported only
* one connection will be returned.
*/
var connections = await Connections.Open(args);
var result = new List();
foreach (var connection in connections)
{
/*
* Temporarly create a full database reader. We won't actually need it but it is
* the only way to get to the actual IDataReader.
*/
await using var r = StorageMiddleware.OpenReader(args.Operation, connection);
/*
* Now open reader and add it to the result.
*/
result.Add(await r.OpenReader());
}
return result.ToImmutableList();
}
///
/// Opens the on the underlying connection.
///
/// The operation to be performed on the data reader.
/// The connection to be used when opening the reader.
/// The .
private IStorageReader OpenReader(IStorageOperation operation, IStorageConnection connection)
{
return StorageMiddleware.OpenReader(operation, connection);
}
///
/// Opens the on the urderlying connection.
///
/// The arguments containing operation to be performed.
/// The .
private async Task OpenWriter(StorageContextArgs args)
{
var connections = await Connections.Open(args);
/*
* Only one connection should be returned when performing transactions
* on the single entity.
*/
if (connections.Count != 1)
throw new InvalidOperationException("Only one connection expected.");
return OpenWriter(args.Operation, connections[0]);
}
///
/// Opens the on the urderlying connection.
///
/// The operation to be performed.
/// The connection to be used on the writer.
/// The .
private IStorageWriter OpenWriter(IStorageOperation operation, IStorageConnection connection)
{
/*
* Signal transaction orchestration that we are going to use transactions.
*/
Transactions.IsDirty = true;
return StorageMiddleware.OpenWriter(operation, connection);
}
///
/// Performs a query for the specified operation.
///
/// Arguments containing data about operation to be performed.
/// A List of entities that were returned from the storage.
public async Task?> Query(StorageContextArgs args)
{
var readers = await OpenEntityReaders(args);
/*
* In a sharding model it is possible that more than one reader will be returned since
* data could reside in more than one shard, for example:
* we have a projects, each having its work items in it own shard. It's fine to query
* work items for the project since they are definitely in the same shard. But whyt about
* querying work items for the specific user. If the user has access to the more then one
* project it is very likely that work items are in more than one shard.
*/
if (readers.Count == 1)
{
var result = await readers[0].Query();
await readers[0].DisposeAsync();
return result;
}
else
{
/*
* It's a sharding scenario
*/
var results = new List();
var tasks = new List();
foreach (var reader in readers)
{
tasks.Add(Task.Run(async () =>
{
if (await reader.Query() is ImmutableList r && !r.IsEmpty)
{
lock (results)
results.AddRange(r);
}
}));
}
await Task.WhenAll(tasks);
/*
* Need to manually dispose all readers.
*/
foreach (var reader in readers)
await reader.DisposeAsync();
return results.ToImmutableList();
}
}
///
/// Performs a single entity select for the specified operation.
///
/// Arguments containing data about operation to be performed.
/// An entity if found, null otherwise.
public async Task Select(StorageContextArgs args)
{
var readers = await OpenEntityReaders(args);
/*
* In a sharding model, it is possible that a middleware won't know
* exactly in which shard the record resides. This is not ideal but very much
* possible scenario. This is why we will perform a call on all available
* readers and then, if more then one record returned, selects only the first one.
* -------------------------------------------------------------------------
* Q: should we throw an exception if more than one record is found?
* -------------------------------------------------------------------------
*/
TEntity? result = default;
if (readers.Count == 1)
result = await readers[0].Select();
else
{
var results = new List();
var tasks = new List();
foreach (var reader in readers)
{
tasks.Add(Task.Run(async () =>
{
if (await reader.Select() is TEntity r)
{
lock (results)
results.Add(r);
}
}));
}
await Task.WhenAll(tasks);
if (results.Any())
result = results[0];
}
/*
* Need to manually dispose all readers.
*/
foreach (var reader in readers)
await reader.DisposeAsync();
return result;
}
///
/// Resolved provider used based on the entity type.
///
///
///
private async Task ResolveProvider()
{
/*
* We need to resolve provider based on an entity type. At least one
* provider must respond to the entity type. On the other hand, only
* one provider should handle entity type. This means sharding is not
* supported on nodes with different connection types.
*/
var middlewares = await Middleware.Query();
if (!middlewares.Any())
throw new NullReferenceException(nameof(IStorageMiddleware));
foreach (var middleware in middlewares)
{
/*
* The first middleware supporting the entity wins.
*/
if (middleware.SupportsEntity(ElementType))
{
_provider = middleware;
break;
}
}
if (_provider is null)
throw new NullReferenceException($"{nameof(IStorageMiddleware)} -> {ElementType.Name}");
}
public async Task Initialize()
{
await ResolveProvider();
}
}