using Connected.Caching; using Connected.Collections.Queues; using Connected.Entities; using Connected.Entities.Storage; using Connected.Logistics.Stock.Services; using Connected.Logistics.Types.Serials; using Connected.Logistics.Types.WarehouseLocations; using Connected.Notifications.Events; using Connected.ServiceModel; using Connected.Services; using Connected.Threading; namespace Connected.Logistics.Stock; internal sealed class StockOps { public const string StockQueue = "Stock"; static StockOps() { Locker = new(); } private static AsyncLockerSlim Locker { get; } /// /// This method ensures that a stock (parent) record exists. /// /// /// The stock record is not created explicitly since this would introduce unnecessary complexity. It is /// instead created on the fly when the first request is made. The tricky part is it must be thread safe /// so we need an async locker since lock statement does not support async calls. /// private static async Task Ensure(IStorageProvider storage, IStockService stock, EntityArgs args) { /* * First check for existence so we don't need to perform a lock if the record is found. */ if (await stock.Select(args) is IStock existing) return existing; /* * Doesn't exist. * Perform an async lock to ensure no one else is trying to insert the item. */ return await Locker.LockAsync(async () => { /* * Read again if two or more threads were competing for the insert. The thing is this * is happening quite frequently even in semi loaded warehouse systems. */ if (await stock.Select(args) is IStock existing2) return existing2; /* * Still nothing. We are safe to insert a new stock descriptor. Note that in scalable environments * there is still a possibillity that two requests would made it here but from different processes. * Thus we should have a unique constraint on the entity ensuring only one request will win, all the others * lose. This also means the provider owning the entity must support unique constraints. */ var entity = await storage.Open().Update(args.AsEntity(State.New)); var result = await stock.Select(entity.Id); /* * This should not happen anyway but we'll do it for the sake of sompiler warning. */ if (result is null) throw new NullReferenceException(nameof(IStock)); return result; }); } public sealed class Update : ServiceFunction { public Update(IStorageProvider storage, ICacheContext cache, IEventService events, IStockService stock, ISerialService serials, IQueueService queue, IWarehouseLocationService locations) { Storage = storage; Cache = cache; Events = events; Stock = stock; Serials = serials; Queue = queue; Locations = locations; } private IStorageProvider Storage { get; } private ICacheContext Cache { get; } private IEventService Events { get; } private IStockService Stock { get; } private ISerialService Serials { get; } private IQueueService Queue { get; } private IWarehouseLocationService Locations { get; } private bool IsLeaf { get; set; } protected override async Task OnInvoke() { /* * We need this info for queueing aggregations. */ IsLeaf = (await Locations.Select(Arguments.Location)).ItemCount == 0; /* * Validators should validate the existence. Serials don't get deleted. */ if (await Serials.Select(Arguments.Serial) is not ISerial serial) return 0; /* * Ensure the stock record exists. */ var stock = await Ensure(Storage, Stock, serial.AsArguments()); /* * Now we must check if the stock item exists for the specified serial and * warehouse location. If so we'll only update the quantity. */ if (await FindExisting(stock) is not StockItem existing) return await InsertItem(stock); else return await UpdateItem(existing); } private async Task InsertItem(IStock stock) { return await Locker.LockAsync(async () => { /* * Query again if someone overtook us. */ if (await FindExisting(stock) is not StockItem existing) { /* * Still doesn't exist, it's safe to insert it since we are in the locked area. */ return (await Storage.Open().Update(Arguments.AsEntity(State.New))).Id; } else { /* * Indeed, there was a record inserted in the meantime. */ return await UpdateItem(existing); } }); } /// /// Performs the update on the existing stock item. /// /// The stock item to be updated. private async Task UpdateItem(StockItem item) { await Storage.Open().Update(item, Arguments, async () => { await Cache.Remove(StockItem.EntityKey, item.Id); return SetState((await Stock.SelectItem(item.Id)) as StockItem); }, async (e) => { var quantity = item.Quantity + Arguments.Quantity; await Task.CompletedTask; return e.Merge(Arguments, State.Default, new { Quantity = quantity }); }); return item.Id; } private async Task FindExisting(IStock stock) { var items = await Stock.QueryItems(new QueryStockItemsArgs { Id = stock.Id, Location = Arguments.Location, Serial = Arguments.Serial }); if (items.IsEmpty || items[0] is not StockItem existing) return null; return existing; } protected override async Task OnCommitted() { await Cache.Remove(StockItem.EntityKey, Result); await Events.Enqueue(this, Stock, nameof(IStockService.Updated), new PrimaryKeyArgs { Id = Result }); if (IsLeaf) await Queue.Enqueue>(new PrimaryKeyQueueArgs { Id = Result }); } } }