You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
107 lines
4.0 KiB
107 lines
4.0 KiB
using Easy.Snowflakes;
|
|
using Microsoft.Extensions.Logging;
|
|
using StackExchange.Redis;
|
|
using System.Text.Json;
|
|
|
|
namespace Easy.Realization;
|
|
public class WorkerNode
|
|
{
|
|
private readonly ILogger<WorkerNode> _logger;
|
|
private readonly IDatabase Database;
|
|
|
|
private static readonly string WorkerIdSortedSetCacheKey = "Snowflake:{{0}}:WorkIds";
|
|
public WorkerNode(ILogger<WorkerNode> logger, IDatabase database)
|
|
{
|
|
Database = database;
|
|
_logger = logger;
|
|
}
|
|
public async Task InitWorkerNodesAsync(string serviceName)
|
|
{
|
|
var workerIdSortedSetCacheKey = string.Format(WorkerIdSortedSetCacheKey, serviceName);
|
|
|
|
|
|
if (!Database.KeyExists(workerIdSortedSetCacheKey))
|
|
{
|
|
_logger.LogInformation("Starting InitWorkerNodes:{0}", workerIdSortedSetCacheKey);
|
|
var flag = await Database.LockAsync(workerIdSortedSetCacheKey, 5, true);
|
|
if (!flag.Success)
|
|
{
|
|
await Task.Delay(300);
|
|
await InitWorkerNodesAsync(serviceName);
|
|
}
|
|
|
|
long count = 0;
|
|
try
|
|
{
|
|
var set = new Dictionary<long, double>();
|
|
for (long index = 0; index <= DriftingSnowflakeIdGenerator.MaxWorkerId; index++)
|
|
{
|
|
set.Add(index, new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds());
|
|
}
|
|
count = await ZAddAsync(workerIdSortedSetCacheKey, set);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
throw new Exception(ex.Message, ex);
|
|
}
|
|
finally
|
|
{
|
|
await Database.SafedUnLockAsync(workerIdSortedSetCacheKey, flag.LockValue);
|
|
}
|
|
|
|
_logger.LogInformation("Finlished InitWorkerNodes:{0}:{1}", workerIdSortedSetCacheKey, count);
|
|
}
|
|
else
|
|
_logger.LogInformation("Exists WorkerNodes:{0}", workerIdSortedSetCacheKey);
|
|
}
|
|
|
|
public async Task<long> GetWorkerIdAsync(string serviceName)
|
|
{
|
|
var workerIdSortedSetCacheKey = string.Format(WorkerIdSortedSetCacheKey, serviceName);
|
|
|
|
var script = @"local workerids = redis.call('ZRANGE', @key, @start,@stop)
|
|
redis.call('ZADD',@key,@score,workerids[1])
|
|
return workerids[1]";
|
|
|
|
var parameters = new { key = workerIdSortedSetCacheKey, start = 0, stop = 0, score = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds() };
|
|
|
|
var prepared = LuaScript.Prepare(script);
|
|
var luaResult = (byte[])await Database.ScriptEvaluateAsync(prepared, parameters, CommandFlags.None);
|
|
|
|
using var ms = new MemoryStream(luaResult);
|
|
|
|
var workerId = await JsonSerializer.DeserializeAsync<long>(ms);
|
|
|
|
_logger.LogInformation("Get WorkerNodes:{0}", workerId);
|
|
|
|
return workerId;
|
|
}
|
|
|
|
public async Task RefreshWorkerIdScoreAsync(string serviceName, long workerId, double? workerIdScore = null)
|
|
{
|
|
if (workerId < 0 || workerId > DriftingSnowflakeIdGenerator.MaxWorkerId)
|
|
throw new Exception(string.Format("worker Id can't be greater than {0} or less than 0", DriftingSnowflakeIdGenerator.MaxWorkerId));
|
|
|
|
var workerIdSortedSetCacheKey = string.Format(WorkerIdSortedSetCacheKey, serviceName);
|
|
|
|
var score = workerIdScore == null ? new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds() : workerIdScore.Value;
|
|
await ZAddAsync(workerIdSortedSetCacheKey, new Dictionary<long, double> { { workerId, score } });
|
|
_logger.LogDebug("Refresh WorkerNodes:{0}:{1}", workerId, score);
|
|
}
|
|
|
|
|
|
|
|
public async Task<long> ZAddAsync<T>(string cacheKey, Dictionary<T, double> cacheValues)
|
|
{
|
|
var param = new List<SortedSetEntry>();
|
|
|
|
foreach (var item in cacheValues)
|
|
{
|
|
param.Add(new SortedSetEntry(JsonSerializer.SerializeToUtf8Bytes(item.Key), item.Value));
|
|
}
|
|
|
|
var len = await Database.SortedSetAddAsync(cacheKey, param.ToArray());
|
|
|
|
return len;
|
|
}
|
|
}
|
|
|