You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
87 lines
2.3 KiB
87 lines
2.3 KiB
using Connected.Annotations;
|
|
using Connected.Configuration;
|
|
using Connected.Data.Sharding;
|
|
using Connected.Data.Sql;
|
|
using Connected.Entities;
|
|
using Connected.Entities.Annotations;
|
|
using Connected.Entities.Storage;
|
|
using Connected.Middleware;
|
|
|
|
namespace Connected.Data.Schema.Sql;
|
|
|
|
internal enum ConstraintNameType
|
|
{
|
|
Index = 1,
|
|
PrimaryKey = 2,
|
|
Default = 3
|
|
}
|
|
|
|
[Priority(0)]
|
|
internal sealed class SqlSchemaMiddleware : MiddlewareComponent, ISchemaMiddleware
|
|
{
|
|
public SqlSchemaMiddleware(IMiddlewareService middleware, IStorageProvider storage, IConfigurationService configuration)
|
|
{
|
|
Middleware = middleware;
|
|
Storage = storage;
|
|
Configuration = configuration;
|
|
}
|
|
|
|
private IMiddlewareService Middleware { get; }
|
|
private IStorageProvider Storage { get; }
|
|
public IConfigurationService Configuration { get; }
|
|
|
|
public Type ConnectionType => typeof(SqlDataConnection);
|
|
|
|
public string DefaultConnectionString => Configuration.Storage.Databases.DefaultConnectionString;
|
|
|
|
public async Task<bool> IsEntitySupported(Type entityType)
|
|
{
|
|
await Task.CompletedTask;
|
|
/*
|
|
* By default, all entities are supported by this middleware.
|
|
*/
|
|
return entityType.IsAssignableTo(typeof(IEntity));
|
|
}
|
|
|
|
public async Task Synchronize(Type entity, ISchema schema)
|
|
{
|
|
await Synchronize(schema, DefaultConnectionString);
|
|
/*
|
|
* First query all sharding middleware because we must perform synchronization
|
|
* on all nodes.
|
|
*/
|
|
if (await ResolveShardingMiddleware(entity) is IShardingMiddleware sharding)
|
|
{
|
|
foreach (var node in await sharding.ProvideNodes(entity))
|
|
await Synchronize(schema, node.ConnectionString);
|
|
}
|
|
}
|
|
|
|
private async Task Synchronize(ISchema schema, string connectionString)
|
|
{
|
|
var args = new SchemaExecutionContext(Storage, schema, connectionString);
|
|
/*
|
|
* Sinchronize schema object first.
|
|
*/
|
|
await new SchemaSynchronize().Execute(args);
|
|
/*
|
|
* Only tables are supported
|
|
*/
|
|
if (string.IsNullOrWhiteSpace(schema.Type) || string.Equals(schema.Type, SchemaAttribute.SchemaTypeTable, StringComparison.OrdinalIgnoreCase))
|
|
await new TableSynchronize().Execute(args);
|
|
}
|
|
|
|
private async Task<IShardingMiddleware?> ResolveShardingMiddleware(Type entityType)
|
|
{
|
|
var all = await Middleware.Query<IShardingMiddleware>();
|
|
|
|
foreach (var middleware in all)
|
|
{
|
|
if (middleware.SupportsEntity(entityType))
|
|
return middleware;
|
|
}
|
|
|
|
return null;
|
|
}
|
|
}
|