using System.Collections; using System.Collections.Immutable; using System.Data; using System.Linq.Expressions; using Connected.Data.DataProtection; using Connected.Entities; using Connected.Entities.Storage; using Connected.Middleware; using Connected.ServiceModel.Transactions; using Connected.Validation; namespace Connected.Data.Storage; /// /// Provides read and write operations on the supported storage providers. /// /// >The type of the entitiy on which operations are performed. internal class EntityStorage : IAsyncEnumerable, IStorage where TEntity : IEntity { private IQueryProvider _provider; /// /// Creates a new instance. /// /// Middleware for protecting transactions and data access. /// Middleware for recurring validation. /// Middleware for providing storage connections. /// Saga Transactions orchestration. /// The storage connection behavior. public EntityStorage(IEntityProtectionService dataProtection, IMiddlewareService middleware, IConnectionProvider connections, ITransactionContext transactions) { EntityProtection = dataProtection; Middleware = middleware; Connections = connections; Transactions = transactions; Expression = Expression.Constant(this); } /// /// The middleware used when performing entity operations. /// private IStorageMiddleware StorageMiddleware { get { if (Provider is not IStorageMiddleware result) throw new InvalidCastException(nameof(IStorageMiddleware)); return result; } } /// /// The expression used for retrieving entities. /// public Expression Expression { get; } /// /// The entity type for which operations are performed. /// public Type ElementType => typeof(TEntity); /// /// The provider used when querying entities. It is based on the . /// public IQueryProvider Provider => _provider; /// /// Middleware used for protecting data access and manipulation. /// private IEntityProtectionService EntityProtection { get; } /// /// Middleware used for validation in concurrency transactions. /// private IMiddlewareService Middleware { get; } /// /// Middleware for retrieving storage connections. /// private IConnectionProvider Connections { get; } /// /// Middleware providing saga transactions orchestration. /// private ITransactionContext Transactions { get; } /// /// Gets enumerator for entities retrieved via . /// /// Enumerator containing entities. public IEnumerator GetEnumerator() { var result = Provider.Execute(Expression); /* * Make sure we always return non nullable value. */ if (result is null) return new List().GetEnumerator(); return ((IEnumerable)result).GetEnumerator(); } /// /// Gets enumerator for entities retrieved via . /// /// Enumerator containing entities. IEnumerator IEnumerable.GetEnumerator() { var result = Provider.Execute(Expression); /* * Make sure we always return non nullable value. */ if (result is null) return new List().GetEnumerator(); return ((IEnumerable)result).GetEnumerator(); } /// /// Gets enumerator for asynchronoues entity retrieval via . /// /// Token that enables operation to be cancelled. /// Asynchronous enumerator containing entities. public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { var result = Provider.Execute(Expression); if (result is IEnumerable en) { var enumerator = en.GetEnumerator(); while (enumerator.MoveNext()) { await Task.CompletedTask; yield return (TEntity)enumerator.Current; } } } public override string ToString() { if (Expression.NodeType == ExpressionType.Constant && ((ConstantExpression)Expression).Value == this) return "Query(" + typeof(TEntity) + ")"; else return Expression.ToString(); } /// /// Performs the update on the specified entity. /// /// The entity to be updated. /// Entity with an id if insert has been executed, the same entity otherwise. /// Thrown if the storage statement could no be created. public async Task Update(TEntity? entity) { if (entity is null) return entity; await EntityProtection.Invoke(new EntityProtection.EntityProtectionArgs(entity, entity.State)); var operation = StorageMiddleware.CreateOperation(entity); await Execute(new StorageContextArgs(operation)); return entity; } /// /// Performs the update on the specified entity with optional concurrency callback support. /// /// The type of the arguments used to update the entity. /// The entity to update. /// The arguments that supplied updated values. /// The retry delegate for preparing new update. /// The entity with the newly inserted id if insert was performed, the same entity otherwise. public async Task Update(TEntity? entity, TArgs args, Func>? concurrencyRetrying) where TArgs : IDto { return await Update(entity, args, concurrencyRetrying, null); } /// /// Updates the entity to the underlying storage with concurrency check. /// /// The type of the arguments used to update the entity. /// The entity to update. /// The arguments that supplied updated values. /// The retry delegate for preparing new update. /// An optional merge callback if default merge is not sufficient. /// The entity with the newly inserted id if insert was performed, the same entity otherwise. public async Task Update(TEntity? entity, TArgs args, Func>? concurrencyRetrying, Func>? merging) where TArgs : IDto { if (entity is null) return entity; DBConcurrencyException? lastException = null; /* * Merge the updating entity with the supplied arguments. If callback is provided it is used instead of the default merge. */ var currentEntity = merging is null ? entity.Merge(args, entity.State) : await merging(entity); /* * There will be 3 retries. If none is succedded an exception will be thrown. */ for (var i = 0; i < 3; i++) { try { /* * Perform the update. Note that provider should check for concurrency only if * the entity is updating. Concurrency is not used for inserting and deleting operations. */ await Update(currentEntity); /* * Provider will merge the updating entity with a new id if the operation is Insert. For updating and * deleting operations the same entity is returned. */ return currentEntity; } catch (DBConcurrencyException ex) { /* * Concurrency exception occurred. If the callback is not passed we return immediatelly. */ if (concurrencyRetrying is null) throw; lastException = ex; /* * Wait a small amount of time if the system is currently under heavy load to increase the probabillity * of successful update. */ await Task.Delay(i * i * 50); /* * We must perform validation again since the state of the entities has possibly changed. Note that * only middleware validation is performed not the argument (attribute based). */ if (await Middleware.Query>() is ImmutableList> items) { foreach (var item in items) await item.Validate(args); } /* * If validation succedded invoke callback which usually refreshes the cache which causes the entity to be * reloaded from the data source. */ currentEntity = await concurrencyRetrying(); /* * Entity must be supplied and the merge is performed again. */ if (currentEntity is not null) currentEntity = merging is null ? currentEntity.Merge(args, entity.State) : await merging(currentEntity); else throw new NullReferenceException(nameof(entity)); } } /* * This is not good. We couldn't update the entity after 3 retries. The system is most probably either under heavy load and * the entity is updating very frequently. */ if (lastException is not null) throw lastException; return default; } /// /// Executes storage operation agains one or mode storages. /// /// The arguments containing data about operation to be performed. /// The number of records affected in the physical storage. /// If concurrency is supported on the operation and /// no records have been affected and the actual operation is UPDATE this exception is thrown. public async Task Execute(StorageContextArgs args) { await using var writer = await OpenWriter(args); /* * Execute operation against the storage. This method should return the number of records affected. */ var recordsAffected = await writer.Execute(); /* * It is not necessary that concurrency is actualy considered. Concurrency should be disabled * if the operation is not UPDATE or the entity does not supports it (does not have an Etag or similar property). */ if (recordsAffected == 0 && args.Operation.Concurrency == DataConcurrencyMode.Enabled) throw new DBConcurrencyException($"{SR.ErrDataConcurrency} ({typeof(Entity).Name})"); /* * Bind storage parameters with operation parameters. */ ReturnValueBinder.Bind(writer, args.Operation); return recordsAffected; } /// /// Opens one or more readers for the specified entity. /// /// /// If the entity does not support sharding, only one reader is returned. If arguments require more /// than one shard to be read this method will return one for every shard. /// /// The arguments containing data about operation to be performed. /// One or more . private async Task>> OpenEntityReaders(StorageContextArgs args) { /* * Connection middleware will return one connection for every shard. If sharding is not supported only * one connection will be returned. */ var connections = await Connections.Open(args); var result = new List>(); foreach (var connection in connections) result.Add(OpenReader(args.Operation, connection)); return result.ToImmutableList(); } /// /// Opens one or more readers for the specified entity. /// /// The arguments containing data about operation to be performed. /// One or more . public async Task> OpenReaders(StorageContextArgs args) { /* * Connection middleware will return one connection for every shard. If sharding is not supported only * one connection will be returned. */ var connections = await Connections.Open(args); var result = new List(); foreach (var connection in connections) { /* * Temporarly create a full database reader. We won't actually need it but it is * the only way to get to the actual IDataReader. */ await using var r = StorageMiddleware.OpenReader(args.Operation, connection); /* * Now open reader and add it to the result. */ result.Add(await r.OpenReader()); } return result.ToImmutableList(); } /// /// Opens the on the underlying connection. /// /// The operation to be performed on the data reader. /// The connection to be used when opening the reader. /// The . private IStorageReader OpenReader(IStorageOperation operation, IStorageConnection connection) { return StorageMiddleware.OpenReader(operation, connection); } /// /// Opens the on the urderlying connection. /// /// The arguments containing operation to be performed. /// The . private async Task OpenWriter(StorageContextArgs args) { var connections = await Connections.Open(args); /* * Only one connection should be returned when performing transactions * on the single entity. */ if (connections.Count != 1) throw new InvalidOperationException("Only one connection expected."); return OpenWriter(args.Operation, connections[0]); } /// /// Opens the on the urderlying connection. /// /// The operation to be performed. /// The connection to be used on the writer. /// The . private IStorageWriter OpenWriter(IStorageOperation operation, IStorageConnection connection) { /* * Signal transaction orchestration that we are going to use transactions. */ Transactions.IsDirty = true; return StorageMiddleware.OpenWriter(operation, connection); } /// /// Performs a query for the specified operation. /// /// Arguments containing data about operation to be performed. /// A List of entities that were returned from the storage. public async Task?> Query(StorageContextArgs args) { var readers = await OpenEntityReaders(args); /* * In a sharding model it is possible that more than one reader will be returned since * data could reside in more than one shard, for example: * we have a projects, each having its work items in it own shard. It's fine to query * work items for the project since they are definitely in the same shard. But whyt about * querying work items for the specific user. If the user has access to the more then one * project it is very likely that work items are in more than one shard. */ if (readers.Count == 1) { var result = await readers[0].Query(); await readers[0].DisposeAsync(); return result; } else { /* * It's a sharding scenario */ var results = new List(); var tasks = new List(); foreach (var reader in readers) { tasks.Add(Task.Run(async () => { if (await reader.Query() is ImmutableList r && !r.IsEmpty) { lock (results) results.AddRange(r); } })); } await Task.WhenAll(tasks); /* * Need to manually dispose all readers. */ foreach (var reader in readers) await reader.DisposeAsync(); return results.ToImmutableList(); } } /// /// Performs a single entity select for the specified operation. /// /// Arguments containing data about operation to be performed. /// An entity if found, null otherwise. public async Task Select(StorageContextArgs args) { var readers = await OpenEntityReaders(args); /* * In a sharding model, it is possible that a middleware won't know * exactly in which shard the record resides. This is not ideal but very much * possible scenario. This is why we will perform a call on all available * readers and then, if more then one record returned, selects only the first one. * ------------------------------------------------------------------------- * Q: should we throw an exception if more than one record is found? * ------------------------------------------------------------------------- */ TEntity? result = default; if (readers.Count == 1) result = await readers[0].Select(); else { var results = new List(); var tasks = new List(); foreach (var reader in readers) { tasks.Add(Task.Run(async () => { if (await reader.Select() is TEntity r) { lock (results) results.Add(r); } })); } await Task.WhenAll(tasks); if (results.Any()) result = results[0]; } /* * Need to manually dispose all readers. */ foreach (var reader in readers) await reader.DisposeAsync(); return result; } /// /// Resolved provider used based on the entity type. /// /// /// private async Task ResolveProvider() { /* * We need to resolve provider based on an entity type. At least one * provider must respond to the entity type. On the other hand, only * one provider should handle entity type. This means sharding is not * supported on nodes with different connection types. */ var middlewares = await Middleware.Query(); if (!middlewares.Any()) throw new NullReferenceException(nameof(IStorageMiddleware)); foreach (var middleware in middlewares) { /* * The first middleware supporting the entity wins. */ if (middleware.SupportsEntity(ElementType)) { _provider = middleware; break; } } if (_provider is null) throw new NullReferenceException($"{nameof(IStorageMiddleware)} -> {ElementType.Name}"); } public async Task Initialize() { await ResolveProvider(); } }