using Easy.Snowflakes; using Microsoft.Extensions.Logging; using StackExchange.Redis; using System.Text.Json; namespace Easy.Realization; public class WorkerNode { private readonly ILogger _logger; private readonly IDatabase Database; private static readonly string WorkerIdSortedSetCacheKey = "Snowflake:{{0}}:WorkIds"; public WorkerNode(ILogger logger, IDatabase database) { Database = database; _logger = logger; } public async Task InitWorkerNodesAsync(string serviceName) { var workerIdSortedSetCacheKey = string.Format(WorkerIdSortedSetCacheKey, serviceName); if (!Database.KeyExists(workerIdSortedSetCacheKey)) { _logger.LogInformation("Starting InitWorkerNodes:{0}", workerIdSortedSetCacheKey); var flag = await Database.LockAsync(workerIdSortedSetCacheKey, 5, true); if (!flag.Success) { await Task.Delay(300); await InitWorkerNodesAsync(serviceName); } long count = 0; try { var set = new Dictionary(); for (long index = 0; index <= DriftingSnowflakeIdGenerator.MaxWorkerId; index++) { set.Add(index, new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds()); } count = await ZAddAsync(workerIdSortedSetCacheKey, set); } catch (Exception ex) { throw new Exception(ex.Message, ex); } finally { await Database.SafedUnLockAsync(workerIdSortedSetCacheKey, flag.LockValue); } _logger.LogInformation("Finlished InitWorkerNodes:{0}:{1}", workerIdSortedSetCacheKey, count); } else _logger.LogInformation("Exists WorkerNodes:{0}", workerIdSortedSetCacheKey); } public async Task GetWorkerIdAsync(string serviceName) { var workerIdSortedSetCacheKey = string.Format(WorkerIdSortedSetCacheKey, serviceName); var script = @"local workerids = redis.call('ZRANGE', @key, @start,@stop) redis.call('ZADD',@key,@score,workerids[1]) return workerids[1]"; var parameters = new { key = workerIdSortedSetCacheKey, start = 0, stop = 0, score = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds() }; var prepared = LuaScript.Prepare(script); var luaResult = (byte[])await Database.ScriptEvaluateAsync(prepared, parameters, CommandFlags.None); using var ms = new MemoryStream(luaResult); var workerId = await JsonSerializer.DeserializeAsync(ms); _logger.LogInformation("Get WorkerNodes:{0}", workerId); return workerId; } public async Task RefreshWorkerIdScoreAsync(string serviceName, long workerId, double? workerIdScore = null) { if (workerId < 0 || workerId > DriftingSnowflakeIdGenerator.MaxWorkerId) throw new Exception(string.Format("worker Id can't be greater than {0} or less than 0", DriftingSnowflakeIdGenerator.MaxWorkerId)); var workerIdSortedSetCacheKey = string.Format(WorkerIdSortedSetCacheKey, serviceName); var score = workerIdScore == null ? new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds() : workerIdScore.Value; await ZAddAsync(workerIdSortedSetCacheKey, new Dictionary { { workerId, score } }); _logger.LogDebug("Refresh WorkerNodes:{0}:{1}", workerId, score); } public async Task ZAddAsync(string cacheKey, Dictionary cacheValues) { var param = new List(); foreach (var item in cacheValues) { param.Add(new SortedSetEntry(JsonSerializer.SerializeToUtf8Bytes(item.Key), item.Value)); } var len = await Database.SortedSetAddAsync(cacheKey, param.ToArray()); return len; } }