From c12e1b5d65b2e3bb2f321bbd08fa28334e89e194 Mon Sep 17 00:00:00 2001 From: lq1405 <2769838458@qq.com> Date: Mon, 16 Jun 2025 20:12:35 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=BC=93=E5=AD=98=20=E9=98=B2=E6=AD=A2=E5=9B=9E=E8=B0=83?= =?UTF-8?q?=E9=A2=91=E7=B9=81=E4=BF=AE=E6=94=B9=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../MJPackage/ITaskConcurrencyManager.cs | 7 +- LMS.Tools/MJPackage/ITokenService.cs | 4 + LMS.Tools/MJPackage/TaskConcurrencyManager.cs | 160 ++++++++---------- LMS.Tools/MJPackage/TaskService.cs | 4 +- LMS.Tools/MJPackage/TaskStatusCheckService.cs | 23 ++- LMS.Tools/MJPackage/TaskSyncService .cs | 27 +++ LMS.Tools/MJPackage/TokenService.cs | 36 ++++ LMS.Tools/MJPackage/TokenUsageTracker.cs | 108 ++++++++++++ .../QuartzTaskSchedulerConfig.cs | 13 ++ .../Controllers/MJPackageController.cs | 8 +- .../Attributes/RateLimitAttribute.cs | 4 +- .../Service/MJPackage/MJPackageService.cs | 34 +++- SQL/v1.1.3/FileUploads.sql | 19 +++ 13 files changed, 333 insertions(+), 114 deletions(-) create mode 100644 LMS.Tools/MJPackage/TaskSyncService .cs create mode 100644 SQL/v1.1.3/FileUploads.sql diff --git a/LMS.Tools/MJPackage/ITaskConcurrencyManager.cs b/LMS.Tools/MJPackage/ITaskConcurrencyManager.cs index 6a67758..83fef90 100644 --- a/LMS.Tools/MJPackage/ITaskConcurrencyManager.cs +++ b/LMS.Tools/MJPackage/ITaskConcurrencyManager.cs @@ -8,12 +8,9 @@ namespace LMS.Tools.MJPackage { Task CreateTaskAsync(string token, string thirdPartyTaskId); Task UpdateTaskInDatabase(MJApiTasks mJApiTasks); - Task GetTaskInfoAsync(string taskId); - Task GetTaskInfoByThirdPartyIdAsync(string taskId); + Task BatchUpdateTaskChaheToDatabaseAsync(); - Task> GetRunningTasksAsync(string token = null); - Task<(int maxConcurrency, int running, int available)> GetConcurrencyStatusAsync(string token); - Task CleanupTimeoutTasksAsync(TimeSpan timeout); + Task GetTaskInfoByThirdPartyIdAsync(string taskId); } } diff --git a/LMS.Tools/MJPackage/ITokenService.cs b/LMS.Tools/MJPackage/ITokenService.cs index f7b39c0..36817ef 100644 --- a/LMS.Tools/MJPackage/ITokenService.cs +++ b/LMS.Tools/MJPackage/ITokenService.cs @@ -18,5 +18,9 @@ namespace LMS.Tools.MJPackage Task LoadOriginTokenAsync(); Task GetOriginToken(); + + Task LoadMJAPIBasicUrlAsync(); + + Task GetMJAPIBasicUrl(); } } diff --git a/LMS.Tools/MJPackage/TaskConcurrencyManager.cs b/LMS.Tools/MJPackage/TaskConcurrencyManager.cs index 5e714c0..c877d62 100644 --- a/LMS.Tools/MJPackage/TaskConcurrencyManager.cs +++ b/LMS.Tools/MJPackage/TaskConcurrencyManager.cs @@ -12,23 +12,18 @@ namespace LMS.Tools.MJPackage { public class TaskConcurrencyManager : ITaskConcurrencyManager { - private readonly ConcurrentDictionary _activeTasks = new(); - private readonly ConcurrentDictionary _thirdPartyTaskMap = new(); // ThirdPartyTaskId -> TaskId private readonly TokenUsageTracker _usageTracker; - private readonly IServiceScopeFactory _scopeFactory; private readonly ILogger _logger; private readonly ApplicationDbContext _dbContext; private readonly ITokenService _tokenService; public TaskConcurrencyManager( TokenUsageTracker usageTracker, - IServiceScopeFactory scopeFactory, ILogger logger, ApplicationDbContext dbContext, ITokenService tokenService) { _usageTracker = usageTracker; - _scopeFactory = scopeFactory; _logger = logger; _dbContext = dbContext; _tokenService = tokenService; @@ -63,7 +58,8 @@ namespace LMS.Tools.MJPackage }; // 5. 持久化任务信息到数据库 - await SaveTaskToDatabase(mJApiTasks); + await _dbContext.AddAsync(mJApiTasks); + await _dbContext.SaveChangesAsync(); } catch (Exception ex) { @@ -71,98 +67,28 @@ namespace LMS.Tools.MJPackage } } - - /// - /// 获取任务信息 - /// - public async Task GetTaskInfoAsync(string taskId) - { - if (_activeTasks.TryGetValue(taskId, out var taskInfo)) - { - return taskInfo; - } - - // 如果内存中没有,尝试从数据库加载 - return await LoadTaskFromDatabase(taskId); - } - /// /// 通过第三方ID获取数据 /// /// /// - public async Task GetTaskInfoByThirdPartyIdAsync(string thirdPartyId) + public async Task GetTaskInfoByThirdPartyIdAsync(string thirdPartyId) { if (string.IsNullOrWhiteSpace(thirdPartyId)) { _logger.LogWarning("第三方任务ID为空"); return null; } - MJApiTasks? mJApiTasks = await _dbContext.MJApiTasks.FirstOrDefaultAsync(x => x.ThirdPartyTaskId == thirdPartyId); - return mJApiTasks; - } - - /// - /// 获取运行中的任务列表 - /// - public async Task> GetRunningTasksAsync(string token = null) - { - var runningTasks = _activeTasks.Values - .Where(t => t.Status != MJTaskStatus.SUCCESS && t.Status != MJTaskStatus.FAILURE && t.Status != MJTaskStatus.CANCEL) - .Where(t => string.IsNullOrEmpty(token) || t.Token == token) - .OrderBy(t => t.StartTime) - .ToList(); - - _logger.LogDebug($"当前运行中的任务数: {runningTasks.Count}" + (string.IsNullOrEmpty(token) ? "" : $", Token={token}")); - - return await Task.FromResult(runningTasks); - } - - /// - /// 获取Token的并发状态 - /// - public async Task<(int maxConcurrency, int running, int available)> GetConcurrencyStatusAsync(string token) - { - var status = _usageTracker.GetConcurrencyStatus(token); - return await Task.FromResult((status.maxCount, status.currentlyExecuting, status.available)); - } - - /// - /// 清理超时任务 - /// - public async Task CleanupTimeoutTasksAsync(TimeSpan timeout) - { - _logger.LogInformation($"开始清理超时任务,超时阈值: {timeout.TotalMinutes}分钟"); - - var cutoffTime = BeijingTimeExtension.GetBeijingTime() - timeout; - var timeoutTasks = _activeTasks.Values - .Where(t => t.StartTime < cutoffTime && t.Status != MJTaskStatus.SUCCESS && t.Status != MJTaskStatus.FAILURE && t.Status != MJTaskStatus.CANCEL) - .ToList(); - - _logger.LogInformation($"发现 {timeoutTasks.Count} 个超时任务"); - - foreach (var task in timeoutTasks) + // 先尝试从内存中获取 + MJApiTasks? mjApiTasks = _usageTracker.TryGetTaskCache(thirdPartyId); + // 从数据库获取 + mjApiTasks ??= await LoadTaskFromDatabaseByThirdPartyId(thirdPartyId); + if (mjApiTasks == null) { - _logger.LogWarning($"清理超时任务: TaskId={task.TaskId}, Token={task.Token}, 开始时间={task.StartTime:yyyy-MM-dd HH:mm:ss}"); - _usageTracker.ReleaseConcurrencyPermit(task.Token); - } - } - - /// - /// 保存任务到数据库 - /// - private async Task SaveTaskToDatabase(MJApiTasks mJApiTasks) - { - try - { - await _dbContext.MJApiTasks.AddAsync(mJApiTasks); - await _dbContext.SaveChangesAsync(); - _logger.LogInformation($"任务已保存到数据库: TaskId={mJApiTasks.TaskId}, Token={mJApiTasks.Token}"); - } - catch (Exception ex) - { - _logger.LogError(ex, $"保存任务到数据库失败: TaskId={mJApiTasks.TaskId}"); + _logger.LogWarning($"缓存和数据库中均未找到任务: ThirdPartyTaskId={thirdPartyId}"); + return null; } + return mjApiTasks; } /// @@ -190,25 +116,79 @@ namespace LMS.Tools.MJPackage } } + public async Task BatchUpdateTaskChaheToDatabaseAsync() + { + var startTime = BeijingTimeExtension.GetBeijingTime(); + try + { + // 获取所有缓存中的任务 + var tasks = _usageTracker.GetAllTaskCaches(); + if (tasks == null || tasks.Count == 0) + { + _logger.LogInformation("缓存中没有需要更新的任务"); + return; + } + // 批量同步 + var taskList = new List(); + foreach (var task in tasks) + { + // 从缓存中获取任务 + MJApiTasks? mJApiTasks = _usageTracker.TryGetTaskCache(task.ThirdPartyTaskId); + if (mJApiTasks != null) + { + taskList.Add(mJApiTasks); + } + } + if (taskList.Count == 0) + { + _logger.LogInformation("缓存中没有需要更新的任务"); + return; + } + // 批量更新到数据库 + _dbContext.MJApiTasks.UpdateRange(taskList); + await _dbContext.SaveChangesAsync(); + + int count = 0; + // 删除缓存中状态为已完成的任务 + for (int i = 0; i < taskList.Count; i++) + { + + var task = taskList[i]; + if (task.Status == MJTaskStatus.SUCCESS || task.Status == MJTaskStatus.FAILURE || task.Status == MJTaskStatus.CANCEL) + { + bool removeResult = _usageTracker.RemoveTaskCache(task.ThirdPartyTaskId); + if (removeResult == true) + { + count++; + } + } + } + + var duration = BeijingTimeExtension.GetBeijingTime() - startTime; + _logger.LogInformation($"批量更新了 {taskList.Count} 个缓存中的任务到数据库,耗费时间: {duration.TotalMilliseconds}, 缓存中删除了 {count} 个完成的任务"); + } + catch (Exception ex) + { + _logger.LogError(ex, "批量更新任务到数据库失败"); + } + } + /// /// 从数据库加载任务 /// - private async Task LoadTaskFromDatabase(string taskId) + private async Task LoadTaskFromDatabaseByThirdPartyId(string thirdPartyId) { try { - MJApiTasks? mJApiTasks = await _dbContext.MJApiTasks.FirstOrDefaultAsync(x => x.TaskId == taskId); + MJApiTasks? mJApiTasks = await _dbContext.MJApiTasks.FirstOrDefaultAsync(x => x.ThirdPartyTaskId == thirdPartyId); if (mJApiTasks == null) { - _logger.LogWarning($"未找到任务: TaskId={taskId}"); return null; } - return mJApiTasks; } catch (Exception ex) { - _logger.LogError(ex, $"从数据库加载任务失败: TaskId={taskId}"); return null; } } diff --git a/LMS.Tools/MJPackage/TaskService.cs b/LMS.Tools/MJPackage/TaskService.cs index 4fa0e91..3d65154 100644 --- a/LMS.Tools/MJPackage/TaskService.cs +++ b/LMS.Tools/MJPackage/TaskService.cs @@ -85,7 +85,8 @@ namespace LMS.Tools.MJPackage } private async Task TryBackupApiAsync(string id, string useToken) { - const string backupUrlTemplate = "https://api.laitool.cc/mj/task/{0}/fetch"; + string mjAPIBasicUrl = await _tokenService.GetMJAPIBasicUrl(); + string backupUrl = $"{mjAPIBasicUrl}/mj/task/{id}/fetch"; const int maxRetries = 3; const int baseDelayMs = 1000; @@ -93,7 +94,6 @@ namespace LMS.Tools.MJPackage client.DefaultRequestHeaders.Add("Authorization", "sk-" + useToken); client.Timeout = TimeSpan.FromSeconds(30); - var backupUrl = string.Format(backupUrlTemplate, id); for (int attempt = 1; attempt <= maxRetries; attempt++) { diff --git a/LMS.Tools/MJPackage/TaskStatusCheckService.cs b/LMS.Tools/MJPackage/TaskStatusCheckService.cs index 9049353..c44e7fd 100644 --- a/LMS.Tools/MJPackage/TaskStatusCheckService.cs +++ b/LMS.Tools/MJPackage/TaskStatusCheckService.cs @@ -9,13 +9,14 @@ using Quartz; namespace LMS.Tools.MJPackage { [DisallowConcurrentExecution] - public class TaskStatusCheckService(ITokenService tokenService, ApplicationDbContext dbContext, ILogger logger, ITaskService taskService, ITaskConcurrencyManager taskConcurrencyManager) : IJob + public class TaskStatusCheckService(ITokenService tokenService, ApplicationDbContext dbContext, ILogger logger, ITaskService taskService, ITaskConcurrencyManager taskConcurrencyManager, TokenUsageTracker tokenUsageTracker) : IJob { private readonly ITokenService _tokenService = tokenService; private readonly ApplicationDbContext _dbContext = dbContext; private readonly ILogger _logger = logger; private readonly ITaskService _taskService = taskService; private readonly ITaskConcurrencyManager _taskConcurrencyManager = taskConcurrencyManager; + private readonly TokenUsageTracker _tokenUsageTracker = tokenUsageTracker; public async Task Execute(IJobExecutionContext context) { @@ -24,9 +25,14 @@ namespace LMS.Tools.MJPackage var startTime = BeijingTimeExtension.GetBeijingTime(); try { - // 强制同步数据库数据 + // 强制同步数据库数据,原请求Token await _tokenService.LoadOriginTokenAsync(); + // 强制同步数据 MJ API 的 Basic URL + + await _tokenService.LoadMJAPIBasicUrlAsync(); + + // 检查Task状态和返回值 // 获取所有超过五分钟没有完成的人物 List tasks = await _dbContext.MJApiTasks.Where(t => t.Status != MJTaskStatus.CANCEL && t.Status != MJTaskStatus.SUCCESS && t.Status != MJTaskStatus.FAILURE && t.StartTime < BeijingTimeExtension.GetBeijingTime().AddMinutes(-5)).ToListAsync(); @@ -54,6 +60,10 @@ namespace LMS.Tools.MJPackage }; task.EndTime = BeijingTimeExtension.GetBeijingTime(); task.Properties = JsonConvert.SerializeObject(newProperties); + // 尝试释放 当前缓存中的任务 + _tokenUsageTracker.RemoveTaskCache(task.ThirdPartyTaskId); + _logger.LogWarning("任务轮询检查,未请求到对应的MJ数据,释放Token,释放任务" + task.Token); + _tokenUsageTracker.ReleaseConcurrencyPermit(task.Token); } else { @@ -75,6 +85,10 @@ namespace LMS.Tools.MJPackage // 开始修改数据 task.EndTime = BeijingTimeExtension.GetBeijingTime(); task.Properties = JsonConvert.SerializeObject(properties); + _logger.LogInformation("任务轮询检查,已请求到对应的MJ数据,并且状态为成功,失败,取消,释放Token,释放任务" + task.Token); + _tokenUsageTracker.ReleaseConcurrencyPermit(task.Token); + // 尝试释放 当前缓存中的任务 + _tokenUsageTracker.RemoveTaskCache(task.ThirdPartyTaskId); } else { @@ -89,7 +103,7 @@ namespace LMS.Tools.MJPackage catch (Exception ex) { // 报错 - _logger.LogError(ex, "检查任务 {TaskId} 时发生错误", task.TaskId); + _logger.LogError(ex, $"检查任务 {task.Token} 时发生错误,释放Token,释放任务", task.TaskId); task.Status = MJTaskStatus.FAILURE; var newProperties = new { @@ -97,6 +111,9 @@ namespace LMS.Tools.MJPackage }; task.EndTime = BeijingTimeExtension.GetBeijingTime(); task.Properties = JsonConvert.SerializeObject(newProperties); + _tokenUsageTracker.ReleaseConcurrencyPermit(task.Token); + // 尝试释放 当前缓存中的任务 + _tokenUsageTracker.RemoveTaskCache(task.ThirdPartyTaskId); // 开始修改数据 await _taskConcurrencyManager.UpdateTaskInDatabase(task); } diff --git a/LMS.Tools/MJPackage/TaskSyncService .cs b/LMS.Tools/MJPackage/TaskSyncService .cs new file mode 100644 index 0000000..1c1a698 --- /dev/null +++ b/LMS.Tools/MJPackage/TaskSyncService .cs @@ -0,0 +1,27 @@ +using LMS.Common.Extensions; +using LMS.DAO; +using LMS.Repository.MJPackage; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Quartz; +using System.Data; +using System.Text; + +namespace LMS.Tools.MJPackage +{ + [DisallowConcurrentExecution] + public class TaskSyncService( + ILogger logger, + ITaskConcurrencyManager taskConcurrencyManager) : IJob + { + private readonly ILogger _logger = logger; + private readonly ITaskConcurrencyManager _taskConcurrencyManager = taskConcurrencyManager; + + public async Task Execute(IJobExecutionContext context) + { + _logger.LogInformation($"开始 Task 信息 - 同步间隔: 15 秒, (使用EF Core)"); + await _taskConcurrencyManager.BatchUpdateTaskChaheToDatabaseAsync(); + } + } +} diff --git a/LMS.Tools/MJPackage/TokenService.cs b/LMS.Tools/MJPackage/TokenService.cs index 2de2ce1..9a1a7c0 100644 --- a/LMS.Tools/MJPackage/TokenService.cs +++ b/LMS.Tools/MJPackage/TokenService.cs @@ -181,6 +181,42 @@ namespace LMS.Tools.MJPackage return await LoadOriginTokenAsync(); } + public async Task LoadMJAPIBasicUrlAsync() + { + // 没找到 从数据库中获取 + Options? oprions = await _dbContext.Options.Where(x => x.Key == "MJAPIBasicUrl").FirstOrDefaultAsync(); + if (oprions == null) + { + _logger.LogWarning("未找到配置的MJAPI Basic URL, 使用默认的!"); + _usageTracker.MJAPIBasicUrl = string.Empty; + return _usageTracker.MJAPIBasicUrl; + } + else + { + // 处理数据 + string mjBasicUrl = oprions.GetValueObject() ?? string.Empty; + if (string.IsNullOrWhiteSpace(mjBasicUrl)) + { + _logger.LogWarning("未找到配置的MJAPI Basic URL 数据, 使用默认的!"); + _usageTracker.MJAPIBasicUrl = string.Empty; + return _usageTracker.MJAPIBasicUrl; + } + _usageTracker.MJAPIBasicUrl = mjBasicUrl; + return mjBasicUrl; + } + } + + public async Task GetMJAPIBasicUrl() + { + // 缓存中就有 直接返回 + if (!string.IsNullOrWhiteSpace(_usageTracker.MJAPIBasicUrl)) + { + return _usageTracker.MJAPIBasicUrl; + } + // 缓存中没有 从数据库中获取 + return await LoadMJAPIBasicUrlAsync(); + } + /// /// 重置Token的使用数据 diff --git a/LMS.Tools/MJPackage/TokenUsageTracker.cs b/LMS.Tools/MJPackage/TokenUsageTracker.cs index dfc716b..906e0b7 100644 --- a/LMS.Tools/MJPackage/TokenUsageTracker.cs +++ b/LMS.Tools/MJPackage/TokenUsageTracker.cs @@ -9,9 +9,11 @@ namespace LMS.Tools.MJPackage public class TokenUsageTracker { private readonly ConcurrentDictionary _tokenCache = new(); + private readonly ConcurrentDictionary _taskCache = new(); private readonly ConcurrentDictionary> _concurrencyControllers = new(); private readonly ReaderWriterLockSlim _cacheLock = new(LockRecursionPolicy.SupportsRecursion); private string _originToken = string.Empty; + private string _mjAPIBasicUrl = string.Empty; private readonly ILogger _logger; @@ -500,5 +502,111 @@ namespace LMS.Tools.MJPackage } } } + + public string MJAPIBasicUrl + { + get + { + if (_mjAPIBasicUrl.EndsWith('/')) + { + // 删除最后一个 / + return _mjAPIBasicUrl = _mjAPIBasicUrl.TrimEnd('/'); + } + else + { + return _mjAPIBasicUrl.Trim(); + } + } + set + { + if (!string.IsNullOrWhiteSpace(value)) + { + _mjAPIBasicUrl = value; + } + else + { + // 设置初始值 + _mjAPIBasicUrl = "https://laitool.net"; + // 如果尝试设置为空值,记录警告日志,可能请求原始的请求不可用 + _logger.LogWarning("尝试设置OriginToken为空值,可能请求原始的请求不可用!!"); + } + } + } + + public MJApiTasks? TryGetTaskCache(string thirdPartyId) + { + _cacheLock.EnterReadLock(); + try + { + if (_taskCache.TryGetValue(thirdPartyId, out var task)) + { + return task; + } + else + { + _logger.LogWarning($"未找到任务缓存: {thirdPartyId}"); + return null; + } + } + finally + { + _cacheLock.ExitReadLock(); + } + } + + public List GetAllTaskCaches() + { + _cacheLock.EnterReadLock(); + try + { + return _taskCache.Values.ToList() ?? []; + } + finally + { + _cacheLock.ExitReadLock(); + } + } + + public bool AddOrUpdateTaskCache(MJApiTasks task) + { + _cacheLock.EnterWriteLock(); + try + { + if (task == null || string.IsNullOrWhiteSpace(task.TaskId) || string.IsNullOrWhiteSpace(task.ThirdPartyTaskId)) + { + _logger.LogWarning("尝试添加或更新任务缓存时,任务、任务ID或者第三方任务ID为空"); + return false; + } + _taskCache[task.ThirdPartyTaskId] = task; + _logger.LogDebug($"任务缓存已增加或更新: {task.TaskId}, 状态: {task.Status}, 第三方任务ID: {task.ThirdPartyTaskId}"); + return true; + } + finally + { + _cacheLock.ExitWriteLock(); + } + } + + public bool RemoveTaskCache(string thirdPartyId) + { + _cacheLock.EnterWriteLock(); + try + { + if (_taskCache.TryRemove(thirdPartyId, out var removedTask)) + { + _logger.LogDebug($"任务缓存已移除: {removedTask.TaskId}, 状态: {removedTask.Status}, 第三方任务ID: {removedTask.ThirdPartyTaskId}"); + return true; + } + else + { + _logger.LogWarning($"尝试移除但未找到的第三方任务的缓存: {removedTask}"); + return false; + } + } + finally + { + _cacheLock.ExitWriteLock(); + } + } } } diff --git a/LMS.service/Configuration/QuartzTaskSchedulerConfig.cs b/LMS.service/Configuration/QuartzTaskSchedulerConfig.cs index 0b9cb14..7e81e8d 100644 --- a/LMS.service/Configuration/QuartzTaskSchedulerConfig.cs +++ b/LMS.service/Configuration/QuartzTaskSchedulerConfig.cs @@ -20,6 +20,8 @@ public static class QuartzTaskSchedulerConfig // 每30秒任务配置 ConfigureThirtySecondTask(q, chinaTimeZone); + ConfigureFiftySecondTask(q, chinaTimeZone); + ConfigureFiveMinuteTask(q, chinaTimeZone); }); @@ -33,6 +35,7 @@ public static class QuartzTaskSchedulerConfig services.AddTransient(); services.AddTransient(); services.AddTransient(); + services.AddTransient(); } private static TimeZoneInfo GetChinaTimeZone() @@ -88,6 +91,16 @@ public static class QuartzTaskSchedulerConfig .WithCronSchedule("*/30 * * * * ?", x => x.InTimeZone(timeZone))); // 每30秒执行一次 } + private static void ConfigureFiftySecondTask(IServiceCollectionQuartzConfigurator q, TimeZoneInfo timeZone) + { + var jobKey = new JobKey("FiftySecondTask", "DefaultGroup"); + q.AddJob(opts => opts.WithIdentity(jobKey)); + q.AddTrigger(opts => opts + .ForJob(jobKey) + .WithIdentity("FiftySecondTaskTrigger", "DefaultGroup") + .WithCronSchedule("*/15 * * * * ?", x => x.InTimeZone(timeZone))); // 每30秒执行一次 + } + private static void ConfigureFiveMinuteTask(IServiceCollectionQuartzConfigurator q, TimeZoneInfo timeZone) { var jobKey = new JobKey("FiveMinuteTask", "DefaultGroup"); diff --git a/LMS.service/Controllers/MJPackageController.cs b/LMS.service/Controllers/MJPackageController.cs index f4815c4..51d3efb 100644 --- a/LMS.service/Controllers/MJPackageController.cs +++ b/LMS.service/Controllers/MJPackageController.cs @@ -12,12 +12,13 @@ namespace LMS.service.Controllers { [Route("api/[controller]")] [ApiController] - public class MJPackageController(TokenUsageTracker usageTracker, ITaskConcurrencyManager taskConcurrencyManager, ILogger logger, IMJPackageService mJPackageService) : ControllerBase + public class MJPackageController(TokenUsageTracker usageTracker, ITaskConcurrencyManager taskConcurrencyManager, ILogger logger, IMJPackageService mJPackageService, ITokenService tokenService) : ControllerBase { private readonly TokenUsageTracker _usageTracker = usageTracker; private readonly ILogger _logger = logger; private readonly ITaskConcurrencyManager _taskConcurrencyManager = taskConcurrencyManager; private readonly IMJPackageService _mJPackageService = mJPackageService; + private readonly ITokenService _tokenService = tokenService; [HttpPost("mj/submit/imagine")] [RateLimit] @@ -42,7 +43,10 @@ namespace LMS.service.Controllers string body = JsonConvert.SerializeObject(model); client.Timeout = Timeout.InfiniteTimeSpan; - string mjUrl = "https://api.laitool.cc/mj/submit/imagine"; + + string mjAPIBasicUrl = await _tokenService.GetMJAPIBasicUrl(); + + string mjUrl = $"{mjAPIBasicUrl}/mj/submit/imagine"; var response = await client.PostAsync(mjUrl, new StringContent(body, Encoding.UTF8, "application/json")); // 读取响应内容 string content = await response.Content.ReadAsStringAsync(); diff --git a/LMS.service/Extensions/Attributes/RateLimitAttribute.cs b/LMS.service/Extensions/Attributes/RateLimitAttribute.cs index 70859ef..0979834 100644 --- a/LMS.service/Extensions/Attributes/RateLimitAttribute.cs +++ b/LMS.service/Extensions/Attributes/RateLimitAttribute.cs @@ -150,11 +150,9 @@ namespace LMS.service.Extensions.Attributes // 在异常情况下也要释放并发许可 if (_concurrencyAcquired) { - usageTracker.ReleaseConcurrencyPermit(_token); - } - logger.LogError(ex, $"处理Token请求时发生错误: {_token},已释放Token许可!"); + logger.LogError(ex, $"处理Token请求时发生错误: {_token},是否有并发 {_concurrencyAcquired},已释放Token许可!"); context.Result = new ObjectResult("Internal server error") { StatusCode = StatusCodes.Status500InternalServerError diff --git a/LMS.service/Service/MJPackage/MJPackageService.cs b/LMS.service/Service/MJPackage/MJPackageService.cs index 40f801c..0cdd50d 100644 --- a/LMS.service/Service/MJPackage/MJPackageService.cs +++ b/LMS.service/Service/MJPackage/MJPackageService.cs @@ -6,7 +6,6 @@ using Microsoft.AspNetCore.Mvc; using Microsoft.EntityFrameworkCore; using System.Net.Sockets; using System.Text.Json; -using static Betalgo.Ranul.OpenAI.ObjectModels.StaticValues.AssistantsStatics.MessageStatics; namespace LMS.service.Service.MJPackage { @@ -94,6 +93,7 @@ namespace LMS.service.Service.MJPackage { TaskId = mjTask.TaskId, Token = mjTask.Token, + TokenId = mjTask.TokenId, Status = status, StartTime = mjTask.StartTime, EndTime = null, @@ -106,20 +106,36 @@ namespace LMS.service.Service.MJPackage // 当前任务已经被释放过了 // 开始修改数据 mJApiTasks.EndTime = BeijingTimeExtension.GetBeijingTime(); - await _taskConcurrencyManager.UpdateTaskInDatabase(mJApiTasks); - + // 不直接修改数据库了 改为修改缓存 + bool modifySatus = _usageTracker.AddOrUpdateTaskCache(mJApiTasks); + if (modifySatus == false) + { + // 缓存修改失败,可能是因为任务不存在或状态不匹配,尝试修改数据库 + await _taskConcurrencyManager.UpdateTaskInDatabase(mJApiTasks); + } return new OkObjectResult(null); } if (status == MJTaskStatus.SUCCESS || status == MJTaskStatus.FAILURE || status == MJTaskStatus.CANCEL) { mJApiTasks.EndTime = BeijingTimeExtension.GetBeijingTime(); + // 任务状态为成功、失败或取消,释放Token + _logger.LogInformation("MJNotifyHook 回调: 任务状态为成功、失败或取消,释放Token " + mjTask.Token); + // 开始修改数据,然后在释放 _usageTracker.ReleaseConcurrencyPermit(mjTask.Token); } - + else + { + // 只修改状态,不释放Token + mJApiTasks.EndTime = null; // 处理中没有结束时间 + } // 开始修改数据 - await _taskConcurrencyManager.UpdateTaskInDatabase(mJApiTasks); - + bool updateStatus = _usageTracker.AddOrUpdateTaskCache(mJApiTasks); + if (updateStatus == false) + { + // 缓存修改失败,可能是因为任务不存在或状态不匹配,尝试修改数据库 + await _taskConcurrencyManager.UpdateTaskInDatabase(mJApiTasks); + } return new OkObjectResult(null); } catch (Exception ex) @@ -146,7 +162,7 @@ namespace LMS.service.Service.MJPackage private async Task?> TryOriginApiAsync(string id) { - const string originUrl = "https://mjapi.bzu.cn/mj/task/{0}/fetch"; + string originUrl = $"https://mjapi.bzu.cn/mj/task/{id}/fetch"; // 判断 原始token 不存在 直接 返回空 string orginToken = await _tokenService.GetOriginToken(); @@ -188,12 +204,12 @@ namespace LMS.service.Service.MJPackage private async Task?> TryBackupApiAsync(string id, string useToken) { - const string backupUrlTemplate = "https://api.laitool.cc/mj/task/{0}/fetch"; + string mjAPIBasicUrl = await _tokenService.GetMJAPIBasicUrl(); + string backupUrl = $"{mjAPIBasicUrl}/mj/task/{id}/fetch"; const int maxRetries = 3; const int baseDelayMs = 1000; using var client = CreateHttpClient($"Bearer sk-{useToken}", true); - var backupUrl = string.Format(backupUrlTemplate, id); for (int attempt = 1; attempt <= maxRetries; attempt++) { diff --git a/SQL/v1.1.3/FileUploads.sql b/SQL/v1.1.3/FileUploads.sql new file mode 100644 index 0000000..edf65b3 --- /dev/null +++ b/SQL/v1.1.3/FileUploads.sql @@ -0,0 +1,19 @@ +-- 文件上传记录表 +CREATE TABLE FileUploads ( + Id BIGINT AUTO_INCREMENT PRIMARY KEY, + UserId BIGINT NOT NULL, -- 用户ID + FileName VARCHAR(255) NOT NULL, -- 原始文件名 + FileKey VARCHAR(500) NOT NULL, -- 七牛云存储key + FileSize BIGINT NOT NULL, -- 文件大小 + ContentType VARCHAR(100) NOT NULL, -- 内容类型 + Hash VARCHAR(100) NOT NULL, -- 文件哈希值 + QiniuUrl VARCHAR(1000) NOT NULL, -- 七牛云访问URL + UploadTime DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + Status VARCHAR(20) NOT NULL DEFAULT 'active', + CreatedAt DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP +) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +-- 创建索引 +CREATE INDEX IX_FileUploads_UserId ON FileUploads(UserId); +CREATE INDEX IX_FileUploads_FileKey ON FileUploads(FileKey); +CREATE INDEX IX_FileUploads_UploadTime ON FileUploads(UploadTime); \ No newline at end of file