You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Connected.Framework/Connected.Notifications/Events/EventService.cs

150 lines
3.7 KiB

2 years ago
using System.Reflection;
using Connected.Interop;
using Connected.Middleware;
using Connected.Net.Server;
using Connected.Notifications.Events.Server;
using Connected.ServiceModel;
using Connected.ServiceModel.Transactions;
namespace Connected.Notifications.Events;
internal class EventService : IEventService, IDisposable
{
public event ServiceEventHandler<EventServiceArgs>? Event;
public EventService(IEndpointServer endpoints, EventDispatcher dispatcher, EventServer server, EventServerConnection backplaneClient, IContextProvider provider)
{
Dispatcher = dispatcher;
Server = server;
BackplaneClient = backplaneClient;
Provider = provider;
Endpoints = endpoints;
endpoints.Changed += OnServerChanged;
endpoints.Initialized += OnServerInitialized;
BackplaneClient.Received += OnReceived;
}
private IEndpointServer Endpoints { get; }
private EventDispatcher Dispatcher { get; set; }
private EventServer Server { get; }
private EventServerConnection BackplaneClient { get; }
private IContextProvider Provider { get; }
private bool IsDisposed { get; set; }
private async void OnServerInitialized(object? sender, EventArgs e)
{
await Initialize();
}
public async Task Initialize()
{
await BackplaneClient.Disconnect();
try
{
if (!await Endpoints.IsServer())
{
await BackplaneClient.Initialize(Endpoints.ServerUrl);
await BackplaneClient.Connect();
}
}
catch
{
// Server probably not initalized yet
}
}
private async void OnServerChanged(object? sender, ServerChangedArgs e)
{
await Initialize();
}
private async void OnReceived(object? sender, EventNotificationArgs e)
{
//TODO: we are going to need some additional info here, like assembly
var serviceType = Type.GetType(e.Service);
if (serviceType is null)
return;
//Enqueue(serviceType, e.Event, EventOrigin.Remote, e.Arguments);
await Task.CompletedTask;
}
internal async Task Trigger(EventServiceArgs args)
{
Event?.Invoke(args.Sender, args);
await Server.Send(new EventNotificationArgs
{
Arguments = await Serializer.Serialize(args.Arguments),
Event = args.Event,
Service = args.Service.GetType().Name
});
using var context = Provider.Create();
var middlewareService = context.GetService<IMiddlewareService>();
var targetMiddleware = typeof(IEventListener<>);
var gt = targetMiddleware.MakeGenericType(args.Arguments.GetType());
var middleware = await middlewareService.Query(gt, new CallerContext(args.Service, args.Event));
foreach (var m in middleware)
{
if (m.GetType().GetMethod(nameof(IEventListener<IDto>.Invoke), BindingFlags.Public | BindingFlags.Instance, new Type[] { typeof(IOperationState), args.Arguments.GetType() }) is not MethodInfo method)
continue;
await method.InvokeAsync(m, args.Sender, args.Arguments);
}
if (context.GetService<ITransactionContext>() is ITransactionContext transactionContext)
await transactionContext.Commit();
}
public async Task Enqueue<TService, TArgs>(IOperationState sender, TService service, string @event, TArgs args)
{
await Enqueue(sender, service, @event, EventOrigin.InProcess, args);
}
public async Task Enqueue<TService, TArgs>(IOperationState sender, TService service, string @event, EventOrigin origin, TArgs args)
{
Dispatcher.Enqueue(new EventServiceArgs
{
Sender = sender,
Service = service.GetType(),
Event = @event,
Origin = origin,
Arguments = args
});
await Task.CompletedTask;
}
protected virtual void Dispose(bool disposing)
{
if (!IsDisposed)
{
if (disposing)
{
if (Dispatcher is not null)
{
Dispatcher.Cancel();
Dispatcher = null;
}
}
IsDisposed = true;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}