You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

114 lines
4.2 KiB

using Adnc.Infra.Caching.Core;
using Easy.Snowflakes;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization.Formatters.Binary;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
namespace Easy.Realization;
public class WorkerNode
{
private readonly ILogger<WorkerNode> _logger;
private readonly IDatabase Database;
public static string WorkerIdSortedSetCacheKey = "Snowflake:{{0}}:WorkIds";
public WorkerNode(ILogger<WorkerNode> logger, IDatabase database)
{
Database = database;
_logger = logger;
}
public async Task InitWorkerNodesAsync(string serviceName)
{
var workerIdSortedSetCacheKey = string.Format(WorkerIdSortedSetCacheKey, serviceName);
if (!Database.KeyExists(workerIdSortedSetCacheKey))
{
_logger.LogInformation("Starting InitWorkerNodes:{0}", workerIdSortedSetCacheKey);
var flag = await Database.LockAsync(workerIdSortedSetCacheKey, 5, true);
if (!flag.Success)
{
await Task.Delay(300);
await InitWorkerNodesAsync(serviceName);
}
long count = 0;
try
{
var set = new Dictionary<long, double>();
for (long index = 0; index <= DriftingSnowflakeIdGenerator.MaxWorkerId; index++)
{
set.Add(index, new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds());
}
count = await ZAddAsync(workerIdSortedSetCacheKey, set);
}
catch (Exception ex)
{
throw new Exception(ex.Message, ex);
}
finally
{
await Database.SafedUnLockAsync(workerIdSortedSetCacheKey, flag.LockValue);
}
_logger.LogInformation("Finlished InitWorkerNodes:{0}:{1}", workerIdSortedSetCacheKey, count);
}
else
_logger.LogInformation("Exists WorkerNodes:{0}", workerIdSortedSetCacheKey);
}
public async Task<long> GetWorkerIdAsync(string serviceName)
{
var workerIdSortedSetCacheKey = string.Format(WorkerIdSortedSetCacheKey, serviceName);
var script = @"local workerids = redis.call('ZRANGE', @key, @start,@stop)
redis.call('ZADD',@key,@score,workerids[1])
return workerids[1]";
var parameters = new { key = workerIdSortedSetCacheKey, start = 0, stop = 0, score = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds() };
var prepared = LuaScript.Prepare(script);
var luaResult = (byte[])await Database.ScriptEvaluateAsync(prepared, parameters, CommandFlags.None);
using var ms = new MemoryStream(luaResult);
var workerId = await JsonSerializer.DeserializeAsync<long>(ms);
_logger.LogInformation("Get WorkerNodes:{0}", workerId);
return workerId;
}
public async Task RefreshWorkerIdScoreAsync(string serviceName, long workerId, double? workerIdScore = null)
{
if (workerId < 0 || workerId > DriftingSnowflakeIdGenerator.MaxWorkerId)
throw new Exception(string.Format("worker Id can't be greater than {0} or less than 0", DriftingSnowflakeIdGenerator.MaxWorkerId));
var workerIdSortedSetCacheKey = string.Format(WorkerIdSortedSetCacheKey, serviceName);
var score = workerIdScore == null ? new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds() : workerIdScore.Value;
await ZAddAsync(workerIdSortedSetCacheKey, new Dictionary<long, double> { { workerId, score } });
_logger.LogDebug("Refresh WorkerNodes:{0}:{1}", workerId, score);
}
public async Task<long> ZAddAsync<T>(string cacheKey, Dictionary<T, double> cacheValues)
{
var param = new List<SortedSetEntry>();
foreach (var item in cacheValues)
{
param.Add(new SortedSetEntry(JsonSerializer.SerializeToUtf8Bytes(item.Key), item.Value));
}
var len = await Database.SortedSetAddAsync(cacheKey, param.ToArray());
return len;
}
}