You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Connected.Framework/Connected.Collections/Concurrent/DispatcherJob.cs

103 lines
2.0 KiB

2 years ago
using System.Collections.Concurrent;
using System.ComponentModel;
using Connected.Data;
namespace Connected.Collections.Concurrent;
/// <summary>
/// This class acts as a job unit of the <see cref="IDispatcher{T}"/>.
/// </summary>
/// <typeparam name="TArgs"></typeparam>
public abstract class DispatcherJob<TArgs> : IDispatcherJob<TArgs>, IDisposable
{
public event EventHandler? Completed;
public bool IsRunning { get; private set; }
protected bool IsDisposed { get; private set; }
private CancellationToken CancellationToken { get; set; }
internal void Run(ConcurrentQueue<TArgs> queue, CancellationToken cancellationToken)
{
CancellationToken = cancellationToken;
if (IsRunning)
return;
Task.Run(async () =>
{
IsRunning = true;
TArgs? item = default;
try
{
while (queue.TryDequeue(out item))
{
if (item is null)
continue;
if (item is IPopReceipt pr && pr.NextVisible <= DateTime.UtcNow)
continue;
await Invoke(item);
if (cancellationToken.IsCancellationRequested || IsDisposed)
break;
}
}
catch (Exception ex)
{
await HandleException(item, ex);
}
IsRunning = false;
Completed?.Invoke(this, EventArgs.Empty);
}, CancellationToken);
}
private void OnCompleted(object? sender, RunWorkerCompletedEventArgs e)
{
Completed?.Invoke(this, EventArgs.Empty);
}
public async Task Invoke(TArgs args)
{
await OnInvoke(args, CancellationToken);
}
protected virtual async Task OnInvoke(TArgs args, CancellationToken cancellationToken)
{
await Task.CompletedTask;
}
private async Task HandleException(TArgs? args, Exception ex)
{
await OnHandleEception(args, ex);
}
protected virtual async Task OnHandleEception(TArgs? args, Exception ex)
{
await Task.CompletedTask;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (IsDisposed)
return;
if (disposing)
OnDisposing();
IsDisposed = true;
}
protected virtual void OnDisposing()
{
}
}