387 lines
12 KiB
TypeScript
387 lines
12 KiB
TypeScript
import { BookBackTaskListService } from '../../define/db/service/Book/bookBackTaskListService'
|
||
import { BookBackTaskStatus, BookBackTaskType, TaskExecuteType } from '../../define/enum/bookEnum'
|
||
import { SoftwareService } from '../../define/db/service/SoftWare/softwareService'
|
||
import { errorMessage, successMessage } from '../Public/generalTools'
|
||
import { BasicReverse } from './Book/basicReverse'
|
||
import { ReverseBook } from './Book/ReverseBook'
|
||
import { GeneralResponse } from '../../model/generalResponse'
|
||
import { DEFINE_STRING } from '../../define/define_string'
|
||
import { MJOpt } from './MJ/mj'
|
||
import { SDOpt } from './SD/sd'
|
||
import { D3Opt } from './d3'
|
||
import { FluxOpt } from './Flux/flux'
|
||
import { AsyncQueue } from '../../main/quene'
|
||
import { SoftWareServiceBasic } from './ServiceBasic/softwareServiceBasic'
|
||
import { MJSetting } from '../../model/Setting/mjSetting'
|
||
|
||
export class TaskManager {
|
||
isExecuting: boolean = false;
|
||
currentTaskList: TaskModal.Task[] = [];
|
||
globalConfig: any;
|
||
reverseBook: ReverseBook = new ReverseBook();
|
||
basicReverse: BasicReverse = new BasicReverse();
|
||
softwareService!: SoftwareService;
|
||
bookBackTaskListService!: BookBackTaskListService;
|
||
eventListeners: Record<string | number, Function[]> = {};
|
||
softWareServiceBasic: SoftWareServiceBasic
|
||
|
||
mjSetting: MJSetting.MjSetting
|
||
spaceTime: number = 5000;
|
||
count = 0;
|
||
isListening = false;
|
||
intervalId: any; // 用于存储 setInterval 的 ID
|
||
mjOpt: MJOpt
|
||
sdOpt: SDOpt
|
||
d3Opt: D3Opt
|
||
fluxOpt: FluxOpt
|
||
|
||
constructor() {
|
||
this.isExecuting = false;
|
||
this.currentTaskList = [];
|
||
this.globalConfig = global.config;
|
||
this.basicReverse = new BasicReverse();
|
||
this.reverseBook = new ReverseBook();
|
||
this.mjOpt = new MJOpt();
|
||
this.sdOpt = new SDOpt();
|
||
this.d3Opt = new D3Opt()
|
||
this.softWareServiceBasic = new SoftWareServiceBasic();
|
||
this.fluxOpt = new FluxOpt()
|
||
}
|
||
|
||
async InitService(getMJsetting = false) {
|
||
if (!this.softwareService) {
|
||
this.softwareService = await SoftwareService.getInstance();
|
||
}
|
||
if (!this.bookBackTaskListService) {
|
||
this.bookBackTaskListService = await BookBackTaskListService.getInstance();
|
||
}
|
||
if (getMJsetting) {
|
||
// 初始化MJ设置
|
||
this.mjSetting = await this.softWareServiceBasic.GetMjSetting()
|
||
}
|
||
}
|
||
|
||
async GetGlobalConfig() {
|
||
try {
|
||
await this.InitService();
|
||
let softData = this.softwareService.GetSoftwareData();
|
||
if (softData.data.length <= 0) {
|
||
throw new Error('获取软件数据失败');
|
||
}
|
||
let config = softData.data[0];
|
||
global.config = JSON.parse(config.globalSetting);
|
||
this.globalConfig = global.config;
|
||
return successMessage(global.config, '获取全局配置完成', 'TaskManager_GetGlobalConfig');
|
||
} catch (error) {
|
||
console.error('GetGlobalConfig Error:', error);
|
||
return errorMessage(`获取全局配置失败,失败信息如下:${error.message}`, 'TaskManager_GetGlobalConfig');
|
||
}
|
||
}
|
||
|
||
|
||
// 初始化事件监听方法
|
||
InitListeners() {
|
||
if (this.isListening) return; // 如果已经在监听,直接返回
|
||
|
||
this.isListening = true; // 标记为已开始监听
|
||
|
||
const executeWithDynamicInterval = async () => {
|
||
await this.ExecuteAutoTask();
|
||
this.count++;
|
||
console.log("等待时间--" + this.spaceTime, this.count);
|
||
// 动态调整等待时间
|
||
clearInterval(this.intervalId);
|
||
this.intervalId = setInterval(executeWithDynamicInterval, this.spaceTime);
|
||
};
|
||
this.intervalId = setInterval(executeWithDynamicInterval, this.spaceTime);
|
||
}
|
||
|
||
// 停止监听的方法
|
||
StopListeners() {
|
||
this.isListening = false; // 标记为停止监听
|
||
clearInterval(this.intervalId); // 清除定时器
|
||
}
|
||
|
||
async ExecuteAutoTask() {
|
||
await this.InitService();
|
||
|
||
// 加之前先判断是不是还能执行任务
|
||
let waitTask = global.requestQuene.getWaitingQueue();
|
||
if (waitTask > 20) // 最懂同时等待二十个
|
||
{
|
||
console.log('等待中的任务太多,等待中的任务数量:', waitTask);
|
||
this.spaceTime = 20000;
|
||
return;
|
||
}
|
||
|
||
// 判断MJ队列是不是存在
|
||
if (!global.mjQueue) {
|
||
// MJ 队列不存在,创建
|
||
global.mjQueue = new AsyncQueue(global, 1);
|
||
}
|
||
|
||
// 开始添加
|
||
// 查任务
|
||
const tasks = this.bookBackTaskListService.GetWaitTaskAndSlice(TaskExecuteType.AUTO, 20 - waitTask);
|
||
if (tasks.code == 0) {
|
||
return errorMessage(`获取等待中的任务失败,失败信息如下:${tasks.message}`, 'TaskManager_ExecuteAutoTask');
|
||
}
|
||
|
||
if (!tasks.data || tasks.data.length <= 0) {
|
||
console.log('没有等待中的任务');
|
||
this.spaceTime = 20000;
|
||
return;
|
||
}
|
||
|
||
this.spaceTime = 5000;
|
||
await this.InitService(true);
|
||
//循环添加任务
|
||
for (let index = 0; index < tasks.data.length; index++) {
|
||
const element = tasks.data[index];
|
||
if (element.type == BookBackTaskType.MJ_IMAGE || element.type == BookBackTaskType.MJ_REVERSE) {
|
||
// 判断任务数量是不是又修改
|
||
let taskNumber = global.mjQueue.getConcurrencyLimit();
|
||
if (taskNumber != this.mjSetting.taskCount) {
|
||
global.mjQueue.concurrencyLimit = this.mjSetting.taskCount // 重置并发执行的数量
|
||
}
|
||
|
||
if (global.mjQueue.getWaitingQueue() > 10) {
|
||
console.log('MJ等待中的任务太多,等待中的任务数量:', global.mjQueue.getWaitingQueue());
|
||
this.spaceTime = 20000;
|
||
return;
|
||
}
|
||
// MJ任务
|
||
let res = await this.AddQueue(element);
|
||
continue;
|
||
} else {
|
||
// 其他任务
|
||
let res = await this.AddQueue(element);
|
||
}
|
||
// 添加完成,修改一下提交时间 // 要判断是否超时
|
||
}
|
||
}
|
||
|
||
//#region 添加任务到内存任务中
|
||
/**
|
||
* 添加分镜计算任务
|
||
* @param task 任务信息
|
||
*/
|
||
AddGetFrameDataTask(task: TaskModal.Task): void {
|
||
let batch = DEFINE_STRING.BOOK.GET_FRAME_DATA;
|
||
global.requestQuene.enqueue(async () => {
|
||
await this.basicReverse.GetFrameData(task);
|
||
}, `${batch}_${task.id}`, batch);
|
||
|
||
}
|
||
|
||
/**
|
||
* 添加视频分镜任务
|
||
* @param task 任务信息
|
||
*/
|
||
AddCutVideoData(task: TaskModal.Task): void {
|
||
let batch = DEFINE_STRING.BOOK.FRAMING;
|
||
global.requestQuene.enqueue(async () => {
|
||
await this.basicReverse.CutVideoData(task);
|
||
}, `${batch}_${task.id}`, batch)
|
||
}
|
||
|
||
|
||
/**
|
||
* 添加切割视频任务
|
||
* @param task 任务信息
|
||
*/
|
||
AddSplitAudioData(task: TaskModal.Task): void {
|
||
let batch = DEFINE_STRING.BOOK.SPLI_TAUDIO;
|
||
global.requestQuene.enqueue(async () => {
|
||
await this.basicReverse.SplitAudioData(task);
|
||
}, `${batch}_${task.id}`, batch)
|
||
}
|
||
|
||
/**
|
||
* 添加抽帧的任务
|
||
* @param task 任务信息
|
||
*/
|
||
AddGetFrame(task: TaskModal.Task): void {
|
||
let batch = DEFINE_STRING.BOOK.GET_FRAME;
|
||
global.requestQuene.enqueue(async () => {
|
||
await this.basicReverse.GetFrame(task);
|
||
}, `${batch}_${task.id}`, batch)
|
||
}
|
||
|
||
/**
|
||
* 添加识别字幕任务
|
||
* @param task 任务信息
|
||
*/
|
||
AddExtractSubtitlesData(task: TaskModal.Task): void {
|
||
let batch = DEFINE_STRING.BOOK.GET_COPYWRITING;
|
||
global.requestQuene.enqueue(async () => {
|
||
await this.basicReverse.ExtractSubtitlesData(task);
|
||
}, `${batch}_${task.id}`, batch)
|
||
|
||
}
|
||
|
||
/**
|
||
* 添加单独反推任务
|
||
* @param task
|
||
*/
|
||
AddSingleReversePrompt(task: TaskModal.Task): void {
|
||
let batch = DEFINE_STRING.BOOK.ADD_REVERSE_PROMPT;
|
||
global.requestQuene.enqueue(async () => {
|
||
await this.reverseBook.SingleReversePrompt(task);
|
||
}, `${batch}_${task.id}`, batch)
|
||
}
|
||
|
||
/**
|
||
* 将MJ生图生成任务添加内存任务中
|
||
* @param task
|
||
*/
|
||
async AddImageMJImage(task: TaskModal.Task) {
|
||
// 判断是不是MJ的任务
|
||
let batch = DEFINE_STRING.MJ.MJ_IMAGE;
|
||
global.mjQueue.enqueue(async () => {
|
||
await this.mjOpt.MJImagine(task);
|
||
}, `${batch}_${task.id}`, batch, `${batch}_${task.id}_${new Date().getTime()}`);
|
||
}
|
||
|
||
|
||
/**
|
||
* 将SD生图任务添加到内存任务中
|
||
* @param task
|
||
*/
|
||
async AddSDImage(task: TaskModal.Task) {
|
||
let batch = DEFINE_STRING.SD.TXT2IMG
|
||
global.requestQuene.enqueue(async () => {
|
||
await this.sdOpt.SDImageGenerate(task);
|
||
}, `${batch}_${task.id}`, batch, `${batch}_${task.id}_${new Date().getTime()}`)
|
||
}
|
||
|
||
/**
|
||
* 异步添加D3图像生成任务
|
||
*
|
||
* 此方法接受一个任务对象,将基于该任务生成D3图像
|
||
* 它使用全局请求队列来管理任务,确保并发处理的效率和稳定性
|
||
*
|
||
* @param task 任务对象,包含任务的具体信息和标识
|
||
*/
|
||
async AddD3Image(task: TaskModal.Task) {
|
||
let batch = task.messageName;
|
||
global.requestQuene.enqueue(async () => {
|
||
await this.d3Opt.D3ImageGenerate(task);
|
||
}, `${batch}_${task.id}`, batch, `${batch}_${task.id}_${new Date().getTime()}`)
|
||
}
|
||
|
||
|
||
/**
|
||
* 添加 flux forge 任务到内存队列中
|
||
* @param task
|
||
*/
|
||
async AddFluxForgeImage(task: TaskModal.Task) {
|
||
let batch = task.messageName
|
||
global.requestQuene.enqueue(async () => {
|
||
await this.fluxOpt.FluxForgeImage(task);
|
||
}, `${batch}_${task.id}`, batch, `${batch}_${task.id}_${new Date().getTime()}`)
|
||
}
|
||
|
||
/**
|
||
* 添加 FLUX api 到内存队列中
|
||
* @param task
|
||
*/
|
||
async AddFluxAPIImage(task: TaskModal.Task) {
|
||
let batch = task.messageName;
|
||
global.requestQuene.enqueue(async () => {
|
||
await this.fluxOpt.FluxAPIImage(task);
|
||
}, `${batch}_${task.id}`, batch, `${batch}_${task.id}_${new Date().getTime()}`)
|
||
}
|
||
|
||
/**
|
||
* 添加任务到内存队列中,分流
|
||
* @param task 任务相关
|
||
* @returns
|
||
*/
|
||
async AddQueue(task: TaskModal.Task) {
|
||
let _bookBackTaskListService = await BookBackTaskListService.getInstance();
|
||
try {
|
||
switch (task.type) {
|
||
case BookBackTaskType.STORYBOARD:
|
||
this.AddGetFrameDataTask(task);
|
||
break;
|
||
case BookBackTaskType.SPLIT:
|
||
this.AddCutVideoData(task);
|
||
break;
|
||
case BookBackTaskType.AUDIO:
|
||
this.AddSplitAudioData(task);
|
||
break;
|
||
case BookBackTaskType.FRAME:
|
||
this.AddGetFrame(task);
|
||
break;
|
||
case BookBackTaskType.RECOGNIZE:
|
||
this.AddExtractSubtitlesData(task);
|
||
break;
|
||
case BookBackTaskType.MJ_REVERSE || BookBackTaskType.SD_REVERSE:
|
||
this.AddSingleReversePrompt(task);
|
||
break;
|
||
case BookBackTaskType.FLUX_FORGE_IMAGE:
|
||
this.AddFluxForgeImage(task);
|
||
break;
|
||
case BookBackTaskType.FLUX_API_IMAGE:
|
||
this.AddFluxAPIImage(task);
|
||
break;
|
||
case BookBackTaskType.MJ_IMAGE:
|
||
this.AddImageMJImage(task);
|
||
break;
|
||
case BookBackTaskType.SD_IMAGE:
|
||
this.AddSDImage(task);
|
||
break;
|
||
case BookBackTaskType.D3_IMAGE:
|
||
this.AddD3Image(task);
|
||
break;
|
||
default:
|
||
throw new Error('未知的任务类型');
|
||
}
|
||
// 是不是要添加自动任务
|
||
// await this.AddTaskHandle(task, true);
|
||
|
||
// 添加成功后,更新任务状态
|
||
let updateRes = _bookBackTaskListService.UpdateTaskStatus({
|
||
id: task.id,
|
||
status: BookBackTaskStatus.RUNNING
|
||
});
|
||
if (updateRes.code == 0) {
|
||
throw new Error(updateRes.message);
|
||
}
|
||
return successMessage(null, `${task.name}_${task.id} 任务添加调度完成`, 'TaskManager_AddQueue');
|
||
} catch (error) {
|
||
let updateRes = _bookBackTaskListService.UpdateTaskStatus({
|
||
id: task.id,
|
||
status: BookBackTaskStatus.FAIL,
|
||
errorMessage: "任务调度失败,请手动重试"
|
||
});
|
||
if (updateRes.code == 0) {
|
||
return errorMessage(`处理 ${task.type} 类型任务 ${task.name} 失败,失败信息如下:${error.message}`, 'TaskManager_handleTask');
|
||
}
|
||
|
||
return errorMessage(`处理 ${task.type} 类型任务 ${task.name} 失败,失败信息如下:${error.message}`, 'TaskManager_handleTask');
|
||
}
|
||
}
|
||
|
||
|
||
//#endregion
|
||
|
||
async AddTaskHandle(task: TaskModal.Task, isAdd = false) {
|
||
if (!isAdd) {
|
||
return;
|
||
}
|
||
if (task.type == BookBackTaskType.STORYBOARD) {
|
||
await this.basicReverse.AddCutVideoDataTask(task.bookId);
|
||
} else if (task.type == BookBackTaskType.SPLIT) {
|
||
await this.basicReverse.AddSplitAudioDataTask(task.bookId, task.bookTaskId);
|
||
} else if (task.type == BookBackTaskType.AUDIO) {
|
||
await this.basicReverse.AddGetFrameTask(task.bookId, task.bookTaskId);
|
||
} else if (task.type == BookBackTaskType.FRAME) {
|
||
await this.basicReverse.AddExtractSubtitlesDataTask(task.bookId, task.bookTaskId);
|
||
} else {
|
||
throw new Error('不支持的任务类型');
|
||
}
|
||
}
|
||
}
|