613 lines
21 KiB
C#
613 lines
21 KiB
C#
using LMS.Common.Extensions;
|
||
using LMS.Repository.DB;
|
||
using LMS.Repository.MJPackage;
|
||
using Microsoft.Extensions.Logging;
|
||
using System.Collections.Concurrent;
|
||
|
||
namespace LMS.Tools.MJPackage
|
||
{
|
||
public class TokenUsageTracker
|
||
{
|
||
private readonly ConcurrentDictionary<string, TokenCacheItem> _tokenCache = new();
|
||
private readonly ConcurrentDictionary<string, MJApiTasks> _taskCache = new();
|
||
private readonly ConcurrentDictionary<string, Lazy<ConcurrencyController>> _concurrencyControllers = new();
|
||
private readonly ReaderWriterLockSlim _cacheLock = new(LockRecursionPolicy.SupportsRecursion);
|
||
private string _originToken = string.Empty;
|
||
private string _mjAPIBasicUrl = string.Empty;
|
||
|
||
private readonly ILogger<TokenUsageTracker> _logger;
|
||
|
||
public TokenUsageTracker(ILogger<TokenUsageTracker> logger)
|
||
{
|
||
_logger = logger;
|
||
_logger.LogInformation("TokenUsageTracker服务已初始化");
|
||
}
|
||
|
||
/// <summary>
|
||
/// 并发控制器 - 支持平滑调整并发限制
|
||
/// </summary>
|
||
private class ConcurrencyController
|
||
{
|
||
private SemaphoreSlim _semaphore;
|
||
private int _maxCount;
|
||
private int _currentlyExecuting;
|
||
private readonly object _lock = new object();
|
||
private readonly ILogger _logger;
|
||
|
||
public ConcurrencyController(int initialLimit, ILogger logger)
|
||
{
|
||
_maxCount = initialLimit;
|
||
_semaphore = new SemaphoreSlim(initialLimit, initialLimit);
|
||
_currentlyExecuting = 0;
|
||
_logger = logger;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取当前最大并发数
|
||
/// </summary>
|
||
public int MaxCount => _maxCount;
|
||
|
||
/// <summary>
|
||
/// 获取当前正在执行的任务数
|
||
/// </summary>
|
||
public int CurrentlyExecuting => _currentlyExecuting;
|
||
|
||
/// <summary>
|
||
/// 获取当前可用的并发槽位
|
||
/// </summary>
|
||
public int AvailableCount => _semaphore.CurrentCount;
|
||
|
||
/// <summary>
|
||
/// 等待获取执行许可
|
||
/// </summary>
|
||
public async Task<bool> WaitAsync(string token)
|
||
{
|
||
var acquired = await _semaphore.WaitAsync(0);
|
||
if (acquired)
|
||
{
|
||
lock (_lock)
|
||
{
|
||
_currentlyExecuting++;
|
||
}
|
||
_logger.LogInformation($"Token获取并发许可: {token}, 当前执行中: {_currentlyExecuting}/{_maxCount}");
|
||
}
|
||
return acquired;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 释放执行许可
|
||
/// </summary>
|
||
public void Release(string token)
|
||
{
|
||
lock (_lock)
|
||
{
|
||
if (_currentlyExecuting > 0)
|
||
{
|
||
_currentlyExecuting--;
|
||
_semaphore.Release();
|
||
_logger.LogInformation($"Token释放并发许可: {token}, 当前执行中: {_currentlyExecuting}/{_maxCount}");
|
||
}
|
||
else
|
||
{
|
||
_logger.LogWarning($"Token释放并发许可: {token}, 尝试释放许可但当前执行数已为0: {token}");
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 平滑调整并发限制
|
||
/// </summary>
|
||
public bool AdjustLimitAsync(int newLimit, string token)
|
||
{
|
||
if (newLimit <= 0)
|
||
{
|
||
throw new ArgumentException("并发限制必须大于0", nameof(newLimit));
|
||
}
|
||
|
||
lock (_lock)
|
||
{
|
||
if (_maxCount == newLimit)
|
||
{
|
||
return false; // 无需调整
|
||
}
|
||
|
||
var oldLimit = _maxCount;
|
||
_maxCount = newLimit;
|
||
|
||
if (newLimit > oldLimit)
|
||
{
|
||
// 扩大并发限制:释放额外的许可
|
||
var additionalPermits = newLimit - oldLimit;
|
||
for (int i = 0; i < additionalPermits; i++)
|
||
{
|
||
_semaphore.Release();
|
||
}
|
||
|
||
_logger.LogInformation($"Token并发限制已扩大: {token}, {oldLimit} -> {newLimit}, 当前执行: {_currentlyExecuting}");
|
||
}
|
||
else
|
||
{
|
||
// 缩小并发限制:等待现有任务完成
|
||
var excessExecuting = _currentlyExecuting - newLimit;
|
||
if (excessExecuting > 0)
|
||
{
|
||
_logger.LogWarning($"Token并发限制缩小但有超额任务: {token}, {oldLimit} -> {newLimit}, 超额: {excessExecuting}, 将等待任务自然完成");
|
||
}
|
||
else
|
||
{
|
||
_logger.LogInformation($"Token并发限制已缩小: {token}, {oldLimit} -> {newLimit}, 当前执行: {_currentlyExecuting}");
|
||
}
|
||
}
|
||
|
||
return true;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 销毁资源
|
||
/// </summary>
|
||
public void Dispose()
|
||
{
|
||
_semaphore?.Dispose();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 尝试从缓存中获取Token
|
||
/// </summary>
|
||
public bool TryGetToken(string token, out TokenCacheItem cacheItem)
|
||
{
|
||
|
||
var found = _tokenCache.TryGetValue(token, out cacheItem);
|
||
if (found)
|
||
{
|
||
_logger.LogDebug($"从缓存中找到Token: {token}");
|
||
}
|
||
return found;
|
||
|
||
}
|
||
|
||
/// <summary>
|
||
/// 添加或更新Token到缓存(支持平滑并发限制调整)
|
||
/// </summary>
|
||
public void AddOrUpdateTokenAsync(TokenCacheItem tokenItem)
|
||
{
|
||
_cacheLock.EnterWriteLock();
|
||
try
|
||
{
|
||
_tokenCache[tokenItem.Token] = tokenItem;
|
||
|
||
// 获取或创建并发控制器
|
||
var lazyController = _concurrencyControllers.GetOrAdd(
|
||
tokenItem.Token,
|
||
_ => new Lazy<ConcurrencyController>(() =>
|
||
new ConcurrencyController(tokenItem.ConcurrencyLimit, _logger)));
|
||
|
||
var controller = lazyController.Value;
|
||
// 平滑调整并发限制
|
||
var adjusted = controller.AdjustLimitAsync(tokenItem.ConcurrencyLimit, tokenItem.Token);
|
||
|
||
if (adjusted)
|
||
{
|
||
_logger.LogInformation($"Token并发限制已调整: {tokenItem.Token}, 新限制: {tokenItem.ConcurrencyLimit}");
|
||
}
|
||
|
||
_logger.LogDebug($"Token已添加到缓存: {tokenItem.Token}, 日限制: {tokenItem.DailyLimit}, 总限制: {tokenItem.TotalLimit}, 并发限制: {tokenItem.ConcurrencyLimit}");
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitWriteLock();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 同步版本(保持向后兼容)
|
||
/// </summary>
|
||
public void AddOrUpdateToken(TokenCacheItem tokenItem)
|
||
{
|
||
// 使用异步版本,但同步等待
|
||
AddOrUpdateTokenAsync(tokenItem);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 增加Token使用量
|
||
/// </summary>
|
||
public void IncrementUsage(string token)
|
||
{
|
||
_cacheLock.EnterUpgradeableReadLock();
|
||
try
|
||
{
|
||
if (_tokenCache.TryGetValue(token, out var cacheItem))
|
||
{
|
||
_cacheLock.EnterWriteLock();
|
||
try
|
||
{
|
||
cacheItem.DailyUsage++;
|
||
cacheItem.TotalUsage++;
|
||
cacheItem.LastActivityTime = BeijingTimeExtension.GetBeijingTime();
|
||
|
||
_logger.LogDebug($"Token使用量已更新: {token}, 今日使用: {cacheItem.DailyUsage}, 总使用: {cacheItem.TotalUsage}");
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitWriteLock();
|
||
}
|
||
}
|
||
else
|
||
{
|
||
_logger.LogWarning($"尝试更新未缓存的Token使用量: {token}");
|
||
}
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitUpgradeableReadLock();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取Token的并发控制器
|
||
/// </summary>
|
||
public async Task<bool> WaitForConcurrencyPermitAsync(string token)
|
||
{
|
||
|
||
if (_concurrencyControllers.TryGetValue(token, out var controller))
|
||
{
|
||
return await controller.Value.WaitAsync(token);
|
||
}
|
||
|
||
_logger.LogWarning($"未找到Token的并发控制器: {token}");
|
||
return false;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 释放Token的并发许可
|
||
/// </summary>
|
||
public void ReleaseConcurrencyPermit(string token)
|
||
{
|
||
|
||
if (_concurrencyControllers.TryGetValue(token, out var controller))
|
||
{
|
||
controller.Value.Release(token);
|
||
}
|
||
else
|
||
{
|
||
_logger.LogWarning($"未找到Token的并发控制器无法释放: {token}");
|
||
}
|
||
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取Token的并发状态信息
|
||
/// </summary>
|
||
public (int maxCount, int currentlyExecuting, int available) GetConcurrencyStatus(string token)
|
||
{
|
||
_cacheLock.EnterReadLock();
|
||
try
|
||
{
|
||
if (_concurrencyControllers.TryGetValue(token, out var controller))
|
||
{
|
||
return (controller.Value.MaxCount, controller.Value.CurrentlyExecuting, controller.Value.AvailableCount);
|
||
}
|
||
return (0, 0, 0);
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitReadLock();
|
||
}
|
||
}
|
||
|
||
|
||
/// <summary>
|
||
/// 获取活跃Token列表
|
||
/// </summary>
|
||
public List<TokenCacheItem> GetActiveTokens(TimeSpan activityThreshold)
|
||
{
|
||
_cacheLock.EnterReadLock();
|
||
try
|
||
{
|
||
var cutoffTime = BeijingTimeExtension.GetBeijingTime() - activityThreshold;
|
||
var activeTokens = _tokenCache.Values
|
||
.Where(t => t.LastActivityTime > cutoffTime)
|
||
.ToList();
|
||
|
||
_logger.LogDebug($"找到 {activeTokens.Count} 个活跃Token (阈值: {activityThreshold.TotalMinutes} 分钟)");
|
||
return activeTokens;
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitReadLock();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 移除不活跃的Token
|
||
/// </summary>
|
||
/// <param name="activityThreshold">活跃时间阈值</param>
|
||
/// <returns>移除的Token数量</returns>
|
||
public (int activateTokenCount, int notActivateTokenCount) RemoveNotActiveTokens(TimeSpan activityThreshold)
|
||
{
|
||
_cacheLock.EnterWriteLock();
|
||
try
|
||
{
|
||
var cutoffTime = BeijingTimeExtension.GetBeijingTime() - activityThreshold;
|
||
var initialCount = _tokenCache.Count;
|
||
|
||
// 找出需要移除的不活跃Token
|
||
var tokensToRemove = _tokenCache
|
||
.Where(kvp => kvp.Value.LastActivityTime <= cutoffTime)
|
||
.Select(kvp => kvp.Key)
|
||
.ToList();
|
||
|
||
if (tokensToRemove.Count == 0)
|
||
{
|
||
_logger.LogDebug($"没有找到需要移除的不活跃Token (阈值: {activityThreshold.TotalMinutes} 分钟)");
|
||
return (initialCount, 0);
|
||
}
|
||
|
||
// 移除不活跃的Token缓存
|
||
var removedCount = 0;
|
||
foreach (var tokenKey in tokensToRemove)
|
||
{
|
||
if (_tokenCache.TryRemove(tokenKey, out var removedToken))
|
||
{
|
||
// 同时清理对应的并发控制器
|
||
if (_concurrencyControllers.TryRemove(tokenKey, out var controller))
|
||
{
|
||
// 如果并发控制器已经被创建,需要释放资源
|
||
if (controller.IsValueCreated)
|
||
{
|
||
controller.Value.Dispose();
|
||
}
|
||
}
|
||
removedCount++;
|
||
_logger.LogDebug($"移除不活跃Token: {tokenKey}, 最后活跃时间: {removedToken.LastActivityTime:yyyy-MM-dd HH:mm:ss}");
|
||
}
|
||
}
|
||
|
||
_logger.LogInformation($"清理不活跃Token完成: 移除 {removedCount} 个Token (阈值: {activityThreshold.TotalMinutes} 分钟), 剩余: {_tokenCache.Count} 个");
|
||
|
||
return (_tokenCache.Count, removedCount);
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitWriteLock();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 移除指定的token
|
||
/// </summary>
|
||
/// <param name="token"></param>
|
||
/// <returns></returns>
|
||
public int RemoveToken(string token)
|
||
{
|
||
_cacheLock.EnterWriteLock();
|
||
try
|
||
{
|
||
// 找出需要移除的不活跃Token
|
||
var tokensToRemove = _tokenCache
|
||
.Where(kvp => kvp.Value.Token == token)
|
||
.Select(kvp => kvp.Key)
|
||
.ToList();
|
||
|
||
if (tokensToRemove.Count == 0)
|
||
{
|
||
// 没有找到
|
||
return 0;
|
||
}
|
||
|
||
// 移除不活跃的Token缓存
|
||
var removedCount = 0;
|
||
foreach (var tokenKey in tokensToRemove)
|
||
{
|
||
if (_tokenCache.TryRemove(tokenKey, out var removedToken))
|
||
{
|
||
// 同时清理对应的并发控制器
|
||
if (_concurrencyControllers.TryRemove(tokenKey, out var controller))
|
||
{
|
||
// 如果并发控制器已经被创建,需要释放资源
|
||
if (controller.IsValueCreated)
|
||
{
|
||
controller.Value.Dispose();
|
||
}
|
||
}
|
||
removedCount++;
|
||
}
|
||
}
|
||
return _tokenCache.Count;
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitWriteLock();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取所有Token列表
|
||
/// </summary>
|
||
public IEnumerable<TokenCacheItem> GetAllTokens()
|
||
{
|
||
_cacheLock.EnterReadLock();
|
||
try
|
||
{
|
||
return _tokenCache.Values.ToList();
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitReadLock();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取缓存统计信息(包含并发状态)
|
||
/// </summary>
|
||
public TokenCacheStats GetCacheStats()
|
||
{
|
||
_cacheLock.EnterReadLock();
|
||
try
|
||
{
|
||
var now = BeijingTimeExtension.GetBeijingTime();
|
||
var tokens = _tokenCache.Values.ToList();
|
||
|
||
var stats = new TokenCacheStats
|
||
{
|
||
TotalTokens = tokens.Count,
|
||
ActiveTokens = tokens.Count(t => t.LastActivityTime > now.AddMinutes(-5)),
|
||
InactiveTokens = tokens.Count(t => t.LastActivityTime <= now.AddMinutes(-5)),
|
||
TotalDailyUsage = tokens.Sum(t => t.DailyUsage),
|
||
TotalUsage = tokens.Sum(t => t.TotalUsage)
|
||
};
|
||
|
||
return stats;
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitReadLock();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 重置所有Token的日使用量
|
||
/// </summary>
|
||
public void ResetDailyUsage()
|
||
{
|
||
_cacheLock.EnterWriteLock();
|
||
try
|
||
{
|
||
// 重置缓存中的使用量
|
||
foreach (var token in _tokenCache.Values)
|
||
{
|
||
token.DailyUsage = 0;
|
||
}
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitWriteLock();
|
||
}
|
||
}
|
||
|
||
public string OriginToken
|
||
{
|
||
get => _originToken;
|
||
set
|
||
{
|
||
if (!string.IsNullOrWhiteSpace(value))
|
||
{
|
||
_originToken = value;
|
||
}
|
||
else
|
||
{
|
||
// 如果尝试设置为空值,记录警告日志,可能请求原始的请求不可用
|
||
_logger.LogWarning("尝试设置OriginToken为空值,可能请求原始的请求不可用!!");
|
||
}
|
||
}
|
||
}
|
||
|
||
public string MJAPIBasicUrl
|
||
{
|
||
get
|
||
{
|
||
if (_mjAPIBasicUrl.EndsWith('/'))
|
||
{
|
||
// 删除最后一个 /
|
||
return _mjAPIBasicUrl = _mjAPIBasicUrl.TrimEnd('/');
|
||
}
|
||
else
|
||
{
|
||
return _mjAPIBasicUrl.Trim();
|
||
}
|
||
}
|
||
set
|
||
{
|
||
if (!string.IsNullOrWhiteSpace(value))
|
||
{
|
||
_mjAPIBasicUrl = value;
|
||
}
|
||
else
|
||
{
|
||
// 设置初始值
|
||
_mjAPIBasicUrl = "https://laitool.net";
|
||
// 如果尝试设置为空值,记录警告日志,可能请求原始的请求不可用
|
||
_logger.LogWarning("尝试设置OriginToken为空值,可能请求原始的请求不可用!!");
|
||
}
|
||
}
|
||
}
|
||
|
||
public MJApiTasks? TryGetTaskCache(string thirdPartyId)
|
||
{
|
||
_cacheLock.EnterReadLock();
|
||
try
|
||
{
|
||
if (_taskCache.TryGetValue(thirdPartyId, out var task))
|
||
{
|
||
return task;
|
||
}
|
||
else
|
||
{
|
||
_logger.LogWarning($"未找到任务缓存: {thirdPartyId}");
|
||
return null;
|
||
}
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitReadLock();
|
||
}
|
||
}
|
||
|
||
public List<MJApiTasks> GetAllTaskCaches()
|
||
{
|
||
_cacheLock.EnterReadLock();
|
||
try
|
||
{
|
||
return _taskCache.Values.ToList() ?? [];
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitReadLock();
|
||
}
|
||
}
|
||
|
||
public bool AddOrUpdateTaskCache(MJApiTasks task)
|
||
{
|
||
_cacheLock.EnterWriteLock();
|
||
try
|
||
{
|
||
if (task == null || string.IsNullOrWhiteSpace(task.TaskId) || string.IsNullOrWhiteSpace(task.ThirdPartyTaskId))
|
||
{
|
||
_logger.LogWarning("尝试添加或更新任务缓存时,任务、任务ID或者第三方任务ID为空");
|
||
return false;
|
||
}
|
||
_taskCache[task.ThirdPartyTaskId] = task;
|
||
_logger.LogDebug($"任务缓存已增加或更新: {task.TaskId}, 状态: {task.Status}, 第三方任务ID: {task.ThirdPartyTaskId}");
|
||
return true;
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitWriteLock();
|
||
}
|
||
}
|
||
|
||
public bool RemoveTaskCache(string thirdPartyId)
|
||
{
|
||
_cacheLock.EnterWriteLock();
|
||
try
|
||
{
|
||
if (_taskCache.TryRemove(thirdPartyId, out var removedTask))
|
||
{
|
||
_logger.LogDebug($"任务缓存已移除: {removedTask.TaskId}, 状态: {removedTask.Status}, 第三方任务ID: {removedTask.ThirdPartyTaskId}");
|
||
return true;
|
||
}
|
||
else
|
||
{
|
||
_logger.LogWarning($"尝试移除但未找到的第三方任务的缓存: {removedTask}");
|
||
return false;
|
||
}
|
||
}
|
||
finally
|
||
{
|
||
_cacheLock.ExitWriteLock();
|
||
}
|
||
}
|
||
}
|
||
}
|