You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Connected.Common/Common/Collections/QueueOps.cs

98 lines
2.4 KiB

2 years ago
using System.Collections.Immutable;
using Connected.Collections.Queues;
using Connected.Entities;
using Connected.Interop;
using Connected.Services;
namespace Common.Collections;
internal sealed class QueueOps
{
public sealed class Dequeue : ServiceFunction<DequeueArgs, ImmutableList<IQueueMessage>>
{
public Dequeue(IQueueCache cache)
{
Cache = cache;
}
private IQueueCache Cache { get; }
protected override async Task<ImmutableList<IQueueMessage>?> OnInvoke()
{
var targets = await SelectTargets();
var result = new List<IQueueMessage>();
if (!targets.Any())
return ImmutableList<IQueueMessage>.Empty;
foreach (var message in targets)
{
var modified = new QueueMessage
{
DequeueTimestamp = DateTime.UtcNow,
Arguments = message.Arguments,
Created = message.Created,
DequeueCount = message.DequeueCount + 1,
Id = message.Id,
NextVisible = DateTime.UtcNow.Add(Arguments.NextVisible),
PopReceipt = Guid.NewGuid(),
Queue = message.Queue,
State = State.Default,
Sync = message.Sync,
ETag = message.ETag
};
try
{
Cache.Update(modified);
result.Add(modified);
}
catch
{
//concurrent exception, someone was faster
}
}
return result.ToImmutableList();
}
private async Task<ImmutableList<QueueMessage>> SelectTargets()
{
var targets = new List<QueueMessage>();
var items = await (from dc in Cache
where dc.NextVisible <= DateTime.UtcNow
&& dc.Arguments.Options.Expire > DateTime.UtcNow
&& Arguments.Queues.Any(f => string.Equals(f, dc.Queue, StringComparison.OrdinalIgnoreCase))
select dc).AsEntities();
if (!items.Any())
return ImmutableList<QueueMessage>.Empty;
var ordered = targets.OrderBy(f => f.NextVisible).ThenBy(f => f.Id);
if (ordered.Count() <= Arguments.MaxCount)
return ordered.ToImmutableList();
return ordered.Take(Arguments.MaxCount).ToImmutableList();
}
}
public sealed class Enqueue<TClient> : ServiceAction<QueueArgs>
{
public Enqueue(IQueueCache cache)
{
Cache = cache;
}
private IQueueCache Cache { get; }
protected override Task OnInvoke()
{
var message = Serializer.Serialize(Arguments);
Cache.Update(Arguments.AsEntity<QueueMessage>(State.New, new { Arguments, SerializedMessage = message, Queue = typeof(TClient).FullName }));
return Task.CompletedTask;
}
}
}