LMS.service/LMS.Tools/MJPackage/TokenSyncService.cs

199 lines
7.7 KiB
C#
Raw Normal View History

using LMS.Common.Extensions;
using LMS.DAO;
using LMS.Repository.MJPackage;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Quartz;
using System.Data;
using System.Text;
namespace LMS.Tools.MJPackage
{
[DisallowConcurrentExecution]
public class TokenSyncService : IJob
{
private readonly TokenUsageTracker _usageTracker;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<TokenSyncService> _logger;
// 活动阈值5分钟内有活动的Token才同步
private readonly TimeSpan _activityThreshold = TimeSpan.FromMinutes(5);
public TokenSyncService(
TokenUsageTracker usageTracker,
IServiceProvider serviceProvider,
ILogger<TokenSyncService> logger)
{
_usageTracker = usageTracker;
_serviceProvider = serviceProvider;
_logger = logger;
}
public async Task Execute(IJobExecutionContext context)
{
_logger.LogInformation($"开始同步Token信息 - 同步间隔: 30 秒, 活动阈值: {_activityThreshold.TotalMinutes}分钟 (使用EF Core)");
var startTime = BeijingTimeExtension.GetBeijingTime();
try
{
var syncResult = await SyncActiveTokensToDatabase();
var duration = BeijingTimeExtension.GetBeijingTime() - startTime;
if (syncResult.ActiveTokenCount > 0)
{
_logger.LogInformation(
"Token同步完成: {ActiveTokens}/{TotalTokens} 个活跃Token已同步, 耗时: {Duration}ms, 更新记录: {RecordsUpdated}",
syncResult.ActiveTokenCount,
syncResult.TotalTokenCount,
duration.TotalMilliseconds,
syncResult.RecordsUpdated);
}
else
{
_logger.LogDebug(
"Token同步跳过: 无活跃Token (总计: {TotalTokens}, 耗时: {Duration}ms)",
syncResult.TotalTokenCount,
duration.TotalMilliseconds);
}
}
catch (Exception ex)
{
var duration = BeijingTimeExtension.GetBeijingTime() - startTime;
_logger.LogError(ex, "Token同步失败耗时: {Duration}ms", duration.TotalMilliseconds);
}
}
/// <summary>
/// 同步活跃Token数据到数据库
/// </summary>
/// <returns>同步结果</returns>
private async Task<SyncResult> SyncActiveTokensToDatabase()
{
// 先 删除10分钟内不活跃得Token
var (act, nact) = _usageTracker.RemoveNotActiveTokens(TimeSpan.FromMinutes(10));
_logger.LogInformation($"删除不活跃的 Token 数 {nact},删除后活跃 Token 数:{act},判断不活跃时间:{10} 分钟");
// 1. 获取活跃Token最近5分钟内有活动的Token
var activeTokens = _usageTracker.GetActiveTokens(_activityThreshold).ToList();
var totalTokens = _usageTracker.GetAllTokens().Count();
if (!activeTokens.Any())
{
_logger.LogInformation("0 条活跃Token跳过同步");
return new SyncResult
{
TotalTokenCount = totalTokens,
ActiveTokenCount = 0,
RecordsUpdated = 0
};
}
// 2. 创建数据库上下文
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
var today = BeijingTimeExtension.GetBeijingTime().Date;
var recordsUpdated = 0;
// 3. 构造批量数据
var batchData = activeTokens.Select(token => new TokenUsageData
{
TokenId = token.Id,
Date = today,
DailyUsage = token.DailyUsage,
TotalUsage = token.TotalUsage,
LastActivityTime = token.LastActivityTime
}).ToList();
// 4. 使用EF Core事务批量更新数据库
using var transaction = await dbContext.Database.BeginTransactionAsync();
try
{
recordsUpdated = await BatchUpdateTokenUsageWithEfCore(dbContext, batchData);
await transaction.CommitAsync();
_logger.LogDebug(
"批量更新完成: {RecordsUpdated} 条活跃Token记录已更新跳过 {SkippedCount} 条非活跃Token",
recordsUpdated,
totalTokens - activeTokens.Count);
}
catch (Exception ex)
{
await transaction.RollbackAsync();
_logger.LogError(ex, "数据库事务失败,已回滚");
throw;
}
return new SyncResult
{
TotalTokenCount = totalTokens,
ActiveTokenCount = activeTokens.Count,
RecordsUpdated = recordsUpdated
};
}
/// <summary>
/// 使用EF Core批量更新Token使用数据
/// </summary>
/// <param name="dbContext">数据库上下文</param>
/// <param name="batchData">批量数据</param>
/// <returns>更新的记录数</returns>
private async Task<int> BatchUpdateTokenUsageWithEfCore(ApplicationDbContext dbContext, List<TokenUsageData> batchData)
{
int batchSize = 500;
if (!batchData.Any()) return 0;
var recordsUpdated = 0;
// 分批处理
for (int i = 0; i < batchData.Count; i += batchSize)
{
var batch = batchData.Skip(i).Take(batchSize).ToList();
// 构建真正的批量 SQL
var sqlBuilder = new StringBuilder();
var parameters = new List<object>();
sqlBuilder.AppendLine("INSERT INTO MJApiTokenUsage (TokenId, Date, DailyUsage, TotalUsage, LastActivityAt) VALUES ");
// 为每条记录构建 VALUES 子句
for (int j = 0; j < batch.Count; j++)
{
if (j > 0) sqlBuilder.Append(", ");
var paramIndex = j * 5;
sqlBuilder.Append($"({{{paramIndex}}}, {{{paramIndex + 1}}}, {{{paramIndex + 2}}}, {{{paramIndex + 3}}}, {{{paramIndex + 4}}})");
parameters.AddRange(new object[]
{
batch[j].TokenId,
batch[j].Date,
batch[j].DailyUsage,
batch[j].TotalUsage,
batch[j].LastActivityTime
});
}
sqlBuilder.AppendLine(@"
ON DUPLICATE KEY UPDATE
Date = VALUES(Date),
DailyUsage = VALUES(DailyUsage),
TotalUsage = VALUES(TotalUsage),
LastActivityAt = VALUES(LastActivityAt)");
// 一次性执行整个批次
var affectedRows = await dbContext.Database.ExecuteSqlRawAsync(
sqlBuilder.ToString(),
parameters.ToArray());
recordsUpdated += affectedRows;
_logger.LogInformation($"批量更新完成: 批次 {i / batchSize + 1}, 记录数: {batch.Count}, 影响行数: {affectedRows}");
}
return recordsUpdated;
}
}
}