You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Connected.Logistics/Logistics.Stock/StockOps.cs

188 lines
5.7 KiB

using Connected.Caching;
using Connected.Collections.Queues;
using Connected.Entities;
using Connected.Entities.Storage;
using Connected.Notifications.Events;
using Connected.ServiceModel;
using Connected.Services;
using Connected.Threading;
using Logistics.Stock.Services;
using Logistics.Types.Serials;
using Logistics.Types.WarehouseLocations;
namespace Logistics.Stock;
internal sealed class StockOps
{
public const string StockQueue = "Stock";
static StockOps()
{
Locker = new();
}
private static AsyncLockerSlim Locker { get; }
/// <summary>
/// This method ensures that a stock (parent) record exists.
/// </summary>
/// <remarks>
/// The stock record is not created explicitly since this would introduce unnecessary complexity. It is
/// instead created on the fly when the first request is made. The tricky part is it must be thread safe
/// so we need an async locker since lock statement does not support async calls.
/// </remarks>
private static async Task<IStock> Ensure(IStorageProvider storage, IStockService stock, EntityArgs args)
{
/*
* First check for existence so we don't need to perform a lock if the record is found.
*/
if (await stock.Select(args) is IStock existing)
return existing;
/*
* Doesn't exist.
* Perform an async lock to ensure no one else is trying to insert the item.
*/
return await Locker.LockAsync(async () =>
{
/*
* Read again if two or more threads were competing for the insert. The thing is this
* is happening quite frequently even in semi loaded warehouse systems.
*/
if (await stock.Select(args) is IStock existing2)
return existing2;
/*
* Still nothing. We are safe to insert a new stock descriptor. Note that in scalable environments
* there is still a possibillity that two requests would made it here but from different processes.
* Thus we should have a unique constraint on the entity ensuring only one request will win, all the others
* lose. This also means the provider owning the entity must support unique constraints.
*/
var entity = await storage.Open<Stock>().Update(args.AsEntity<Stock>(State.New));
var result = await stock.Select(entity.Id);
/*
* This should not happen anyway but we'll do it for the sake of sompiler warning.
*/
if (result is null)
throw new NullReferenceException(nameof(IStock));
return result;
});
}
public sealed class Update : ServiceFunction<UpdateStockArgs, long>
{
public Update(IStorageProvider storage, ICacheContext cache, IEventService events, IStockService stock,
ISerialService serials, IQueueService queue, IWarehouseLocationService locations)
{
Storage = storage;
Cache = cache;
Events = events;
Stock = stock;
Serials = serials;
Queue = queue;
Locations = locations;
}
private IStorageProvider Storage { get; }
private ICacheContext Cache { get; }
private IEventService Events { get; }
private IStockService Stock { get; }
private ISerialService Serials { get; }
private IQueueService Queue { get; }
private IWarehouseLocationService Locations { get; }
private bool IsLeaf { get; set; }
protected override async Task<long> OnInvoke()
{
/*
* We need this info for queueing aggregations.
*/
IsLeaf = (await Locations.Select(Arguments.Location)).ItemCount == 0;
/*
* Validators should validate the existence. Serials don't get deleted.
*/
if (await Serials.Select(Arguments.Serial) is not ISerial serial)
return 0;
/*
* Ensure the stock record exists.
*/
var stock = await Ensure(Storage, Stock, serial.AsArguments<EntityArgs, long>());
/*
* Now we must check if the stock item exists for the specified serial and
* warehouse location. If so we'll only update the quantity.
*/
if (await FindExisting(stock) is not StockItem existing)
return await InsertItem(stock);
else
return await UpdateItem(existing);
}
private async Task<long> InsertItem(IStock stock)
{
return await Locker.LockAsync(async () =>
{
/*
* Query again if someone overtook us.
*/
if (await FindExisting(stock) is not StockItem existing)
{
/*
* Still doesn't exist, it's safe to insert it since we are in the locked area.
*/
return (await Storage.Open<StockItem>().Update(Arguments.AsEntity<StockItem>(State.New))).Id;
}
else
{
/*
* Indeed, there was a record inserted in the meantime.
*/
return await UpdateItem(existing);
}
});
}
/// <summary>
/// Performs the update on the existing stock item.
/// </summary>
/// <param name="item">The stock item to be updated.</param>
private async Task<long> UpdateItem(StockItem item)
{
await Storage.Open<StockItem>().Update(item, Arguments, async () =>
{
await Cache.Remove(StockItem.EntityKey, item.Id);
return SetState((await Stock.SelectItem(item.Id)) as StockItem);
},
async (e) =>
{
var quantity = item.Quantity + Arguments.Quantity;
await Task.CompletedTask;
return e.Merge(Arguments, State.Default, new { Quantity = quantity });
});
return item.Id;
}
private async Task<StockItem?> FindExisting(IStock stock)
{
var items = await Stock.QueryItems(new QueryStockItemsArgs
{
Id = stock.Id,
Location = Arguments.Location,
Serial = Arguments.Serial
});
if (items.IsEmpty || items[0] is not StockItem existing)
return null;
return existing;
}
protected override async Task OnCommitted()
{
await Cache.Remove(StockItem.EntityKey, Result);
await Events.Enqueue(this, Stock, nameof(IStockService.Updated), new PrimaryKeyArgs<long> { Id = Result });
if (IsLeaf)
await Queue.Enqueue<StockAggregator, PrimaryKeyQueueArgs<long>>(new PrimaryKeyQueueArgs<long> { Id = Result });
}
}
}