199 lines
7.7 KiB
C#
199 lines
7.7 KiB
C#
using LMS.Common.Extensions;
|
||
using LMS.DAO;
|
||
using LMS.Repository.MJPackage;
|
||
using Microsoft.EntityFrameworkCore;
|
||
using Microsoft.Extensions.DependencyInjection;
|
||
using Microsoft.Extensions.Logging;
|
||
using Quartz;
|
||
using System.Data;
|
||
using System.Text;
|
||
|
||
namespace LMS.Tools.MJPackage
|
||
{
|
||
[DisallowConcurrentExecution]
|
||
public class TokenSyncService : IJob
|
||
{
|
||
private readonly TokenUsageTracker _usageTracker;
|
||
private readonly IServiceProvider _serviceProvider;
|
||
private readonly ILogger<TokenSyncService> _logger;
|
||
|
||
// 活动阈值:5分钟内有活动的Token才同步
|
||
private readonly TimeSpan _activityThreshold = TimeSpan.FromMinutes(5);
|
||
|
||
public TokenSyncService(
|
||
TokenUsageTracker usageTracker,
|
||
IServiceProvider serviceProvider,
|
||
ILogger<TokenSyncService> logger)
|
||
{
|
||
_usageTracker = usageTracker;
|
||
_serviceProvider = serviceProvider;
|
||
_logger = logger;
|
||
}
|
||
|
||
public async Task Execute(IJobExecutionContext context)
|
||
{
|
||
_logger.LogInformation($"开始同步Token信息 - 同步间隔: 30 秒, 活动阈值: {_activityThreshold.TotalMinutes}分钟 (使用EF Core)");
|
||
|
||
var startTime = BeijingTimeExtension.GetBeijingTime();
|
||
try
|
||
{
|
||
var syncResult = await SyncActiveTokensToDatabase();
|
||
var duration = BeijingTimeExtension.GetBeijingTime() - startTime;
|
||
|
||
if (syncResult.ActiveTokenCount > 0)
|
||
{
|
||
_logger.LogInformation(
|
||
"Token同步完成: {ActiveTokens}/{TotalTokens} 个活跃Token已同步, 耗时: {Duration}ms, 更新记录: {RecordsUpdated}",
|
||
syncResult.ActiveTokenCount,
|
||
syncResult.TotalTokenCount,
|
||
duration.TotalMilliseconds,
|
||
syncResult.RecordsUpdated);
|
||
}
|
||
else
|
||
{
|
||
_logger.LogDebug(
|
||
"Token同步跳过: 无活跃Token (总计: {TotalTokens}, 耗时: {Duration}ms)",
|
||
syncResult.TotalTokenCount,
|
||
duration.TotalMilliseconds);
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
var duration = BeijingTimeExtension.GetBeijingTime() - startTime;
|
||
_logger.LogError(ex, "Token同步失败,耗时: {Duration}ms", duration.TotalMilliseconds);
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 同步活跃Token数据到数据库
|
||
/// </summary>
|
||
/// <returns>同步结果</returns>
|
||
private async Task<SyncResult> SyncActiveTokensToDatabase()
|
||
{
|
||
// 先 删除10分钟内不活跃得Token
|
||
var (act, nact) = _usageTracker.RemoveNotActiveTokens(TimeSpan.FromMinutes(10));
|
||
_logger.LogInformation($"删除不活跃的 Token 数 {nact},删除后活跃 Token 数:{act},判断不活跃时间:{10} 分钟");
|
||
|
||
// 1. 获取活跃Token(最近5分钟内有活动的Token)
|
||
var activeTokens = _usageTracker.GetActiveTokens(_activityThreshold).ToList();
|
||
var totalTokens = _usageTracker.GetAllTokens().Count();
|
||
|
||
if (!activeTokens.Any())
|
||
{
|
||
_logger.LogInformation("0 条活跃Token,跳过同步!");
|
||
return new SyncResult
|
||
{
|
||
TotalTokenCount = totalTokens,
|
||
ActiveTokenCount = 0,
|
||
RecordsUpdated = 0
|
||
};
|
||
}
|
||
|
||
// 2. 创建数据库上下文
|
||
using var scope = _serviceProvider.CreateScope();
|
||
var dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
|
||
|
||
var today = BeijingTimeExtension.GetBeijingTime().Date;
|
||
var recordsUpdated = 0;
|
||
|
||
// 3. 构造批量数据
|
||
var batchData = activeTokens.Select(token => new TokenUsageData
|
||
{
|
||
TokenId = token.Id,
|
||
Date = today,
|
||
DailyUsage = token.DailyUsage,
|
||
TotalUsage = token.TotalUsage,
|
||
LastActivityTime = token.LastActivityTime
|
||
}).ToList();
|
||
|
||
// 4. 使用EF Core事务批量更新数据库
|
||
using var transaction = await dbContext.Database.BeginTransactionAsync();
|
||
try
|
||
{
|
||
recordsUpdated = await BatchUpdateTokenUsageWithEfCore(dbContext, batchData);
|
||
await transaction.CommitAsync();
|
||
|
||
_logger.LogDebug(
|
||
"批量更新完成: {RecordsUpdated} 条活跃Token记录已更新,跳过 {SkippedCount} 条非活跃Token",
|
||
recordsUpdated,
|
||
totalTokens - activeTokens.Count);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
await transaction.RollbackAsync();
|
||
_logger.LogError(ex, "数据库事务失败,已回滚");
|
||
throw;
|
||
}
|
||
|
||
return new SyncResult
|
||
{
|
||
TotalTokenCount = totalTokens,
|
||
ActiveTokenCount = activeTokens.Count,
|
||
RecordsUpdated = recordsUpdated
|
||
};
|
||
}
|
||
|
||
/// <summary>
|
||
/// 使用EF Core批量更新Token使用数据
|
||
/// </summary>
|
||
/// <param name="dbContext">数据库上下文</param>
|
||
/// <param name="batchData">批量数据</param>
|
||
/// <returns>更新的记录数</returns>
|
||
private async Task<int> BatchUpdateTokenUsageWithEfCore(ApplicationDbContext dbContext, List<TokenUsageData> batchData)
|
||
{
|
||
int batchSize = 500;
|
||
if (!batchData.Any()) return 0;
|
||
|
||
var recordsUpdated = 0;
|
||
|
||
// 分批处理
|
||
for (int i = 0; i < batchData.Count; i += batchSize)
|
||
{
|
||
var batch = batchData.Skip(i).Take(batchSize).ToList();
|
||
|
||
// 构建真正的批量 SQL
|
||
var sqlBuilder = new StringBuilder();
|
||
var parameters = new List<object>();
|
||
|
||
sqlBuilder.AppendLine("INSERT INTO MJApiTokenUsage (TokenId, Date, DailyUsage, TotalUsage, LastActivityAt) VALUES ");
|
||
|
||
// 为每条记录构建 VALUES 子句
|
||
for (int j = 0; j < batch.Count; j++)
|
||
{
|
||
if (j > 0) sqlBuilder.Append(", ");
|
||
|
||
var paramIndex = j * 5;
|
||
sqlBuilder.Append($"({{{paramIndex}}}, {{{paramIndex + 1}}}, {{{paramIndex + 2}}}, {{{paramIndex + 3}}}, {{{paramIndex + 4}}})");
|
||
|
||
parameters.AddRange(new object[]
|
||
{
|
||
batch[j].TokenId,
|
||
batch[j].Date,
|
||
batch[j].DailyUsage,
|
||
batch[j].TotalUsage,
|
||
batch[j].LastActivityTime
|
||
});
|
||
}
|
||
|
||
sqlBuilder.AppendLine(@"
|
||
ON DUPLICATE KEY UPDATE
|
||
Date = VALUES(Date),
|
||
DailyUsage = VALUES(DailyUsage),
|
||
TotalUsage = VALUES(TotalUsage),
|
||
LastActivityAt = VALUES(LastActivityAt)");
|
||
|
||
// 一次性执行整个批次
|
||
var affectedRows = await dbContext.Database.ExecuteSqlRawAsync(
|
||
sqlBuilder.ToString(),
|
||
parameters.ToArray());
|
||
|
||
recordsUpdated += affectedRows;
|
||
|
||
_logger.LogInformation($"批量更新完成: 批次 {i / batchSize + 1}, 记录数: {batch.Count}, 影响行数: {affectedRows}");
|
||
}
|
||
|
||
return recordsUpdated;
|
||
}
|
||
}
|
||
}
|