当前位置: 首页 > news >正文

任务进度状态同步 万能版 参考 工厂+策略+观察者设计模式 +锁设计 springboot+redission

文章目录

    • 概要
    • 效果
    • 解释
    • 状态
    • 流转说明
    • 设计
      • AI任务实体类
      • AI任务状态枚举
      • AI模型枚举
      • 基础实体类
      • 简单字典接口
      • 工厂+策略模式 接口设计
        • AiJobProcessor
        • AiJobProcessorFactory
      • 观察者模式
        • AI任务相关的Event
        • MyEventListener
        • MyEventPubLisher
      • RedissonConfig
      • 定时任务
    • 实现
      • ReplicateJobProcessor
      • ReplicateApi
      • OkHttpClientUtil 通用万能版
      • 新建任务参考
    • 定时任务实现
      • IAiJobService
      • AiJobServiceImpl
    • 整体业务流程
    • 总结

概要

我发现在无论是什么项目中,都几乎很难避免三方对接API的任务,或者本地长时间的服务,那么有必要设计一套稳定并发高且扩展性高的一套设计方案来完成这些任务的状态监听和进度更新,那么我在多年的探索中,设计一套适用于三方API的任务状态更新的解决方案的设计。

效果

webscoket状态同步效果展示

解释

效果用的是webscoket通信来实现的,这里就不侧重讲了,本文章注重后端的设计,包您满意

状态

- DRAFT(0, "草稿", 0)
- SUBMITTED(1, "已提交", 16.67)
- QUEUED(2, "排队中", 33.33)
- PROCESSING(3, "生成中", 50.00)
- GENERATED(4, "已生成", 66.67)
- MIGRATING(5, "迁移中", 83.33)
- SUCCESS(6, "成功", 100.00)
- FAILED(7, "失败", 0)
- TIMEOUT(8, "超时", 0)
- CANCELED(9, "取消", 0)
这些状态我发现是必不可少的,几乎可以同时概括三方任务的状态?为什么要有迁移,这个也是必须,三方给的文件(图片,视频)什么的毕竟都是三方不可靠的,要迁移到自己的oss或者存储系统中去。

流转说明

- 正常流程(进度递增): 草稿(0) → 已提交(1) → 排队中(2) → 生成中(3) → 已生成(4) → 迁移中(5) → 成功(6)
- 异常终止流程:
- 取消(CANCELED):可从任意非终止状态主动触发
- 超时(TIMEOUT):提交/排队/处理/迁移阶段超时时触发
- 失败(FAILED):提交/处理/迁移阶段执行异常时触发
- 终态节点: 成功(6)、失败(7)、超时(8)、取消(9)为最终状态,无后续流转

设计

AI任务实体类

package com.cc672cc.entity.tb;import com.gitee.sunchenbin.mybatis.actable.annotation.*;
import com.gitee.sunchenbin.mybatis.actable.constants.MySqlTypeConstant;
import com.cc672cc.entity.BaseEntity;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import tk.mybatis.mapper.annotation.KeySql;
import javax.persistence.Id;
import javax.persistence.Table;
import java.math.BigDecimal;
import java.util.Date;/*** AI任务表*/
@Data
@Table(name = "tb_ai_job")
@TableComment("AI任务表")
@Schema(description = "AI任务表")
public class AiJob extends BaseEntity {/*** ID*/@Id@KeySql(useGeneratedKeys = true)@IsAutoIncrement@Column(name = "id", type = MySqlTypeConstant.BIGINT, isKey = true, isNull = false, comment = "ID")@Schema(description = "ID")private Long id;/*** 用户id*/@Column(name = "user_id", type = MySqlTypeConstant.BIGINT, comment = "用户id")@Schema(description = "用户id")private Long userId;/*** 照片点评分析编码*/@Column(name = "photo_review_analysis_code", type = MySqlTypeConstant.VARCHAR, length = 10, comment = "照片点评分析编码")@Schema(description = "照片点评分析编码")private String photoReviewAnalysisCode;/*** 类型* 生文本 生图片 生视频*/@Column(name = "type", type = MySqlTypeConstant.VARCHAR, length = 10, comment = "类型(生文本/生图片/生视频)")@Schema(description = "类型")private String type;/*** 动作*/@Column(name = "action", type = MySqlTypeConstant.VARCHAR, length = 20, comment = "动作")@Schema(description = "动作")private String action;/*** 编码*/@Column(name = "code", type = MySqlTypeConstant.VARCHAR, length = 10, comment = "编码")@Schema(description = "编码")private String code;/*** 渠道*/@Column(name = "channel", type = MySqlTypeConstant.VARCHAR, length = 20, comment = "渠道")@Schema(description = "渠道")private String channel;/*** 平台*/@Column(name = "platform", type = MySqlTypeConstant.VARCHAR, length = 20, comment = "平台")@Schema(description = "平台")private String platform;/*** 模型*/@Column(name = "model", type = MySqlTypeConstant.VARCHAR, length = 50, comment = "模型")@Schema(description = "模型")private String model;/*** 是否异步* 0否1是*/@Column(name = "asyn", type = MySqlTypeConstant.TINYINT, length = 1, comment = "是否异步(0否1是)")@Schema(description = "是否异步")private Boolean asyn;/*** 模型版本*/@Column(name = "model_version", type = MySqlTypeConstant.VARCHAR, length = 50, comment = "模型版本")@Schema(description = "模型版本")private String modelVersion;/*** 模型id*/@Column(name = "model_id", type = MySqlTypeConstant.VARCHAR, length = 50, comment = "模型id")@Schema(description = "模型id")private String modelId;/*** 模型名称*/@Column(name = "model_name", type = MySqlTypeConstant.VARCHAR, length = 50, comment = "模型名称")@Schema(description = "模型名称")private String modelName;/*** 输出数量*/@Column(name = "output_count", type = MySqlTypeConstant.INT, comment = "输出数量", defaultValue = "1")@Schema(description = "输出数量")private Integer outputCount = 1;/*** 创建日期*/@Column(name = "create_date", type = MySqlTypeConstant.VARCHAR, length = 10, comment = "创建日期")@Schema(description = "创建日期")private String createDate;/*** 请求时间*/@Column(name = "req_time", type = MySqlTypeConstant.DATETIME, comment = "请求时间")@Schema(description = "请求时间")private Date reqTime;/*** 响应时间*/@Column(name = "resp_time", type = MySqlTypeConstant.DATETIME, comment = "响应时间")@Schema(description = "响应时间")private Date respTime;/*** 耗时* 单位s*/@Column(name = "cost_time", type = MySqlTypeConstant.BIGINT, comment = "耗时(单位s)")@Schema(description = "耗时 单位s")private Long costTime;/*** 三方id*/@Column(name = "out_id", type = MySqlTypeConstant.VARCHAR, length = 100, comment = "三方id")@Schema(description = "三方id")private String outId;/*** 请参json*/@Column(name = "req_json", type = MySqlTypeConstant.TEXT, comment = "请参json")@Schema(description = "请参json")private String reqJson;/*** 反参json*/@Column(name = "resp_json", type = MySqlTypeConstant.TEXT, comment = "反参json")@Schema(description = "反参json")private String respJson;/*** 任务状态(0草稿 1已提交 2排队中 3生成中 4已生成 5迁移中 6成功 7失败 8超时 9取消)*/@Column(name = "job_status", type = MySqlTypeConstant.INT, comment = "任务状态(0草稿 1已提交 2排队中 3生成中 4已生成 5迁移中 6成功 7失败 8超时 9取消)")@Schema(description = "任务状态(0草稿 1已提交 2排队中 3生成中 4已生成 5迁移中 6成功 7失败 8超时 9取消)")private Integer jobStatus;/*** 单元进度* 对应的是每个任务阶段的进度*/@Column(name = "unit_progress", type = MySqlTypeConstant.DECIMAL, length = 5, decimalLength = 2, comment = "单元进度", defaultValue = "0")@Schema(description = "单元进度")private BigDecimal unitProgress;/*** 任务状态描述*/@Column(name = "job_status_desc", type = MySqlTypeConstant.VARCHAR, length = 255, comment = "任务状态描述")@Schema(description = "图片状态描述")private String jobStatusDesc;/*** 整体进度*/@Column(name = "overall_progress", type = MySqlTypeConstant.DECIMAL, length = 5, decimalLength = 2, comment = "整体进度", defaultValue = "0")@Schema(description = "整体进度")private BigDecimal overallProgress;}

AI任务状态枚举

package com.cc672cc.enums.dict;import com.cc672cc.enums.IDict;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;import java.math.BigDecimal;
import java.util.LinkedHashMap;
import java.util.Map;/*** AI任务状态枚举** @author CC* @date 2019/5/5 14:34**/
@Getter
@Schema(description = "AI任务状态 0草稿 1已提交 2排队中 3生成中 4已生成 5迁移中 6成功 7失败 8超时 9取消")
public enum AiJobStatusEnum implements IDict {DRAFT(0, "草稿", BigDecimal.ZERO),SUBMITTED(1, "已提交", new BigDecimal("16.67")),QUEUED(2, "排队中", new BigDecimal("33.33")),PROCESSING(3, "生成中", new BigDecimal("50.00")),GENERATED(4, "已生成", new BigDecimal("66.67")),MIGRATING(5, "迁移中", new BigDecimal("83.33")),SUCCESS(6, "成功", new BigDecimal("100.00")),FAILED(7, "失败", BigDecimal.ZERO),TIMEOUT(8, "超时", BigDecimal.ZERO),CANCELED(9, "取消", BigDecimal.ZERO);private Integer code;private String description;/*** 进度*/private BigDecimal progress;public static Map<String, String> cdMap;public static Map<Integer, AiJobStatusEnum> map;AiJobStatusEnum(int code, String description, BigDecimal progress) {this.code = code;this.description = description;this.progress = progress;}@Overridepublic Map<String, String> dictMap() {if (cdMap == null) {cdMap = new LinkedHashMap<>();AiJobStatusEnum[] values = values();for (AiJobStatusEnum value : values) {cdMap.put(String.valueOf(value.getCode()), value.getDescription());}}return cdMap;}public static Map<Integer, AiJobStatusEnum> getMap() {if (map == null) {map = new LinkedHashMap<>();AiJobStatusEnum[] values = values();for (AiJobStatusEnum value : values) {map.put(value.getCode(), value);}}return map;}public static BigDecimal getProgress(Integer code) {return getMap().get(code).getProgress();}
}

AI模型枚举

package com.cc672cc.enums.dict;import com.cc672cc.enums.IDict;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;import java.util.LinkedHashMap;
import java.util.Map;/*** AI模型枚举** @author CC* @date 2019/5/5 14:34**/
@Getter
@Schema(description = "AI模型枚举")
public enum AiModelEnum implements IDict {TTAPI_MIDJOURNEY_V7("ttapi_midjourney_v7", "TTAPI_MIDJOURNEY_V7", "TTApi Midjourney V7", "Midjourney", "7.0", "TTapi的Midjourney的第七版本", "TTApi", "TTApi", true, 1),REPLICATE_IMAGEUPSCALE_V1("replicate_imageupscale_v1", "REPLICATE_IMAGEUPSCALE_V1", "Replicate ImageUpscale V1", "ImageUpscale", "1.0", "Replicate的图片放大的第一版本", "Replicate", "Replicate", true, 2),PHOTOREVIEW_ANALYSIS_V1("photoReview_analysis_v1", "PHOTOREVIEW_ANALYSIS_V1", "PhotoReview Analysis V1", "PhotoReviewAnalysis", "1.0", "照片点评的第一版本", "PhotoReview", "PhotoReview", true, 1);/*** 模型id*/private String modelId;/*** 模型编码* 自定义的编码 格式 {平台}_{模型}_{版本}*/private String modelCode;private String modelName;private String model;private String modelVersion;private String modelDesc;/*** 平台*/private String platform;/*** 渠道* 指哪个公司的 集团下面的*/private String channel;/*** 是否异步*/private Boolean asyn;/*** 单位积分* 就是每个输出的积分*/private Integer unitPoint;public static Map<String, AiModelEnum> map;public Map<String, String> inmap;AiModelEnum(String modelId, String modelCode, String modelName, String model, String modelVersion, String modelDesc, String platform, String channel, Boolean asyn, Integer unitPoint) {this.modelId = modelId;this.modelCode = modelCode;this.modelName = modelName;this.model = model;this.modelVersion = modelVersion;this.modelDesc = modelDesc;this.platform = platform;this.channel = channel;this.asyn = asyn;this.unitPoint = unitPoint;}@Overridepublic Map<String, String> dictMap() {if (inmap == null) {inmap = new LinkedHashMap<>();AiModelEnum[] values = values();for (AiModelEnum value : values) {inmap.put(value.getModelId(), value.getModelName());}}return inmap;}public static Map<String, AiModelEnum> getMap() {if (map == null) {map = new LinkedHashMap<>();AiModelEnum[] values = values();for (AiModelEnum value : values) {map.put(value.getModelId(), value);}}return map;}}

基础实体类

package com.cc672cc.entity;import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableLogic;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.gitee.sunchenbin.mybatis.actable.annotation.Column;
import com.gitee.sunchenbin.mybatis.actable.annotation.ColumnComment;
import com.gitee.sunchenbin.mybatis.actable.constants.MySqlTypeConstant;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;import java.util.Date;@Data
@Schema(description = "基础实体类")
public class BaseEntity {/*** 状态(0禁止1正常)*/@Column(type = MySqlTypeConstant.INT, length = 1, defaultValue = "1")@Schema(description = "状态(0禁止1正常)")@ColumnComment("状态(0禁止1正常)")@TableField(fill = FieldFill.INSERT)private Integer status;/*** 删除状态(0否1是)*/@TableLogic@Schema(description = "删除状态(0否1是)")@Column(type = MySqlTypeConstant.INT, length = 1, defaultValue = "0")@ColumnComment("删除状态(0否1是)")@TableField(fill = FieldFill.INSERT)private Integer del;/*** 排序权重*/@Schema(description = "排序权重")@Column(type = MySqlTypeConstant.INT, length = 4, defaultValue = "0")@ColumnComment("排序权重")private Integer sort;/*** 创建人*/@Column(type = MySqlTypeConstant.VARCHAR, length = 64)@ColumnComment("创建人")@TableField(fill = FieldFill.INSERT)private String createBy;/*** 创建时间*/@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@Schema(description = "创建时间")@Column(type = MySqlTypeConstant.DATETIME, defaultValue = "CURRENT_TIMESTAMP")@ColumnComment("创建时间")private Date createTime;/*** 更新人*/@Column(type = MySqlTypeConstant.VARCHAR, length = 64)@ColumnComment("更新人")@TableField(fill = FieldFill.UPDATE)private String updateBy;/*** 更新时间*/@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@Schema(description = "更新时间")@Column(type = MySqlTypeConstant.DATETIME, defaultValue = "NULL ON UPDATE CURRENT_TIMESTAMP")@ColumnComment("更新时间")private Date updateTime;/*** 版本*/@Column(type = MySqlTypeConstant.VARCHAR, length = 10, defaultValue = "v1")@TableField(fill = FieldFill.INSERT)@ColumnComment("版本")private String version;
}

简单字典接口

public interface IDict {Map<String,String> dictMap();
}

工厂+策略模式 接口设计

AiJobProcessor
package com.cc672cc.processor;import com.cc672cc.entity.tb.AiJob;
import com.cc672cc.entity.tb.PhotoReviewAnalysis;
import com.cc672cc.pojo.vo.reqvo.BeautifyPhotoReqVO;public interface AiJobProcessor {/*** 创建请参Json** @param reqVO* @param photoReviewAnalysis* @return*/String buildReqJson(BeautifyPhotoReqVO reqVO, PhotoReviewAnalysis photoReviewAnalysis);/*** 处理任务** @param aiJob AI任务* @return 任务状态*/Integer process(AiJob aiJob);/*** 查询AI任务状态** @param aiJob AI任务* @return 任务状态*/Integer query(AiJob aiJob);/*** 迁移AI任务** @param aiJobId* @return*/Integer migrate(Long aiJobId, Integer jobStatus);/*** 获取当前处理器支持的模型类型(与AiJob.model字段对应)** @return 模型类型*/String getSupportedModel();/*** 业务超时时间(秒)** @return*/Long businessTimeoutS();/*** 单元进度展示标志位* 0不展示 1展示* @return*/Integer[] unitProgressShowFlag();
}
AiJobProcessorFactory
package com.cc672cc.processor;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** AI任务处理器工厂(根据model动态获取处理器)*/
@Component
public class AiJobProcessorFactory {private final Map<String, AiJobProcessor> processorMap = new HashMap<>();// 自动注入所有AiJobProcessor实现类@Autowiredpublic AiJobProcessorFactory(List<AiJobProcessor> processors) {for (AiJobProcessor processor : processors) {processorMap.put(processor.getSupportedModel(), processor);}}/*** 根据模型类型获取处理器* @param model 模型类型(AiJob.model)* @return 处理器实例* @throws IllegalArgumentException 无对应处理器时抛出异常*/public AiJobProcessor getProcessor(String model) {AiJobProcessor processor = processorMap.get(model);if (processor == null) {throw new IllegalArgumentException("未找到模型[" + model + "]对应的处理器");}return processor;}
}

观察者模式

AI任务相关的Event
public class AiJobMigrateEvent extends ApplicationEvent {private AiJob aiJob;public AiJobMigrateEvent(Object source) {super(source);}public AiJobMigrateEvent(Object source, AiJob aiJob) {super(source);this.aiJob = aiJob;}public AiJob getAiJob() {return aiJob;}public void setAiJob(AiJob aiJob) {this.aiJob = aiJob;}
}public class AiJobMsgEvent extends ApplicationEvent {private String msg;public AiJobMsgEvent(Object source) {super(source);}public AiJobMsgEvent(Object source, String msg) {super(source);this.msg = msg;}public String getMsg() {return msg;}public void setMsg(String msg) {this.msg = msg;}
}/*** AI任务需要立马提交事件*/
public class AiJobNeedSubmitRightNowEvent extends ApplicationEvent {private AiJob aiJob;public AiJobNeedSubmitRightNowEvent(Object source) {super(source);}public AiJobNeedSubmitRightNowEvent(Object source, AiJob aiJob) {super(source);this.aiJob = aiJob;}public AiJob getAiJob() {return aiJob;}public void setAiJob(AiJob aiJob) {this.aiJob = aiJob;}
}public class AiJobStatusRefreshEvent extends ApplicationEvent {private AiJob aiJob;public AiJobStatusRefreshEvent(Object source) {super(source);}public AiJobStatusRefreshEvent(Object source, AiJob aiJob) {super(source);this.aiJob = aiJob;}public AiJob getAiJob() {return aiJob;}public void setAiJob(AiJob aiJob) {this.aiJob = aiJob;}
}
MyEventListener
@Component
@Slf4j
public class MyEventListener {private static final String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";@Autowiredprivate BusinessAsync businessAsync;/*** 统一监听** @param applicationEvent*/@EventListener(classes = {LifyMsgEvent.class})public void listener(ApplicationEvent applicationEvent) {String simpleName = applicationEvent.getClass().getSimpleName();log.info("***** listener reception time : {} , simpleName : {} ***** , context : {}", DateUtil.format(DateUtil.date(), TIME_FORMAT), simpleName, JSON.toJSONString(applicationEvent));}/*** @param lifyMsgEvent*/@EventListenerpublic void listener(LifyMsgEvent lifyMsgEvent) {businessAsync.saveChatContext(lifyMsgEvent);businessAsync.saveChatHistory(lifyMsgEvent);}/*** @param photoReviewMsgEvent*/@EventListenerpublic void listener(PhotoReviewMsgEvent photoReviewMsgEvent) {businessAsync.websocketMsg(photoReviewMsgEvent);}/*** @param aiJobMsgEvent*/@EventListenerpublic void listener(AiJobMsgEvent aiJobMsgEvent) {businessAsync.websocketMsg(aiJobMsgEvent);}/*** @param taskMsgEvent*/@EventListenerpublic void listener(TaskMsgEvent taskMsgEvent) {businessAsync.websocketMsg(taskMsgEvent);}/*** @param aiJobNeedSubmitRightNowEvent*/@EventListenerpublic void listener(AiJobNeedSubmitRightNowEvent aiJobNeedSubmitRightNowEvent) {businessAsync.submitAiJob(aiJobNeedSubmitRightNowEvent);}@EventListenerpublic void listener(TaskNeedSubmitRightNowEvent taskNeedSubmitRightNowEvent) {businessAsync.submitTask(taskNeedSubmitRightNowEvent);}/*** @param aiJobStatusRefreshEvent*/@EventListenerpublic void listener(AiJobStatusRefreshEvent aiJobStatusRefreshEvent) {businessAsync.refreshAiJobStatusDetail(aiJobStatusRefreshEvent);}/*** @param taskStatusRefreshEvent*/@EventListenerpublic void listener(TaskStatusRefreshEvent taskStatusRefreshEvent) {businessAsync.refreshTaskStatusDetail(taskStatusRefreshEvent);}/*** @param aiJobMigrateEvent*/@EventListenerpublic void listener(AiJobMigrateEvent aiJobMigrateEvent) {businessAsync.migrateAiJobDetail(aiJobMigrateEvent);}/*** @param taskMigrateEvent*/@EventListenerpublic void listener(TaskMigrateEvent taskMigrateEvent) {businessAsync.migrateTaskDetail(taskMigrateEvent);}}
MyEventPubLisher
/*** 我的事件发布器** @author liaoqian* @since 2024-01-24*/
@Component
public class MyEventPubLisher {@Autowiredprivate IRedisService redisService;@Autowiredprivate ApplicationEventPublisher applicationEventPublisher;public void pushLifyMsgEvent(String msg) {applicationEventPublisher.publishEvent(new LifyMsgEvent(this, msg));}public void pushPhotoReviewMsgEvent(String msg) {applicationEventPublisher.publishEvent(new PhotoReviewMsgEvent(this, msg));}public void pushAiJobMsgEvent(String msg) {applicationEventPublisher.publishEvent(new AiJobMsgEvent(this, msg));}public void pushTaskMsgEvent(String msg) {applicationEventPublisher.publishEvent(new TaskMsgEvent(this, msg));}public void pushAiJobNeedSubmitRightNow(AiJob aiJob) {applicationEventPublisher.publishEvent(new AiJobNeedSubmitRightNowEvent(this, aiJob));}public void pushTaskNeedSubmitRightNow(Task task) {applicationEventPublisher.publishEvent(new TaskNeedSubmitRightNowEvent(this, task));}public void pushAiJobStatusRefreshEvent(AiJob aiJob) {String cacheAiJobTaskProcess = RedisPreKey.CACHE_AI_JOB_PROCESS;String redisPreKey = cacheAiJobTaskProcess + aiJob.getId();if (!redisService.hasKey(redisPreKey)) {applicationEventPublisher.publishEvent(new AiJobStatusRefreshEvent(this, aiJob));}}public void pushTaskStatusRefreshEvent(Task task) {String cacheTaskTaskProcess = RedisPreKey.CACHE_TASK_PROCESS;String redisPreKey = cacheTaskTaskProcess + task.getId();if (!redisService.hasKey(redisPreKey)) {applicationEventPublisher.publishEvent(new TaskStatusRefreshEvent(this, task));}}public void pushAiJobMigrateEvent(AiJob aiJob) {String cacheAiJobTaskProcess = RedisPreKey.CACHE_AI_JOB_PROCESS;String redisPreKey = cacheAiJobTaskProcess + aiJob.getId();if (!redisService.hasKey(redisPreKey)) {applicationEventPublisher.publishEvent(new AiJobMigrateEvent(this, aiJob));}}public void pushTaskMigrateEvent(Task task) {String cacheTaskTaskProcess = RedisPreKey.CACHE_TASK_PROCESS;String redisPreKey = cacheTaskTaskProcess + task.getId();if (!redisService.hasKey(redisPreKey)) {applicationEventPublisher.publishEvent(new TaskMigrateEvent(this, task));}}
}

RedissonConfig

/*** RedissonClient* 常用:分布式锁功能* @author cc* @date 2020/05/13*/
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(RedisProperties prop) {String address = "redis://%s:%d";Config config = new Config();config.useSingleServer().setPassword(prop.getPassword()).setAddress(String.format(address, prop.getHost(), prop.getPort())).setDatabase(0);return Redisson.create(config);}
}

定时任务

package com.cc672cc.scheduler;import com.cc672cc.service.IAiJobService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** AI任务调度器*/
@Slf4j
@Component
public class AiJobScheduler {private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Autowiredprivate IAiJobService aiJobService;/*** 每5秒* 刷新AI任务状态*/@Scheduled(initialDelay = 10000, fixedDelay = 5000)public void refreshAiJobStatus() {log.info("refreshAiJobStatus start-->{}", sdf.format(new Date()));int refreshCount = aiJobService.refreshAiJobStatus();log.info("refreshAiJobStatus  end refreshCount-->{}", refreshCount);}/*** 每5秒* 迁移AI任务*/@Scheduled(initialDelay = 10000, fixedDelay = 5000)public void migrateAiJob() {log.info("migrateAiJob start-->{}", sdf.format(new Date()));int migrateCount = aiJobService.migrateAiJob();log.info("migrateAiJob  end  migrateCount-->{}", migrateCount);}
}

实现

就拿其中的一个来举例,具体实现还得看自己的业务,拿其中的ReplicateJobProcessor举例

ReplicateJobProcessor

package com.cc672cc.processor.aijob;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.cc672cc.common.utils.CodeUtil;
import com.cc672cc.common.utils.DateUtil;
import com.cc672cc.common.utils.MessageUtils;
import com.cc672cc.entity.tb.AiJob;
import com.cc672cc.entity.tb.PhotoBeautify;
import com.cc672cc.entity.tb.PhotoReviewAnalysis;
import com.cc672cc.enums.dict.AiJobStatusEnum;
import com.cc672cc.enums.dict.DelEnum;
import com.cc672cc.enums.dict.StatusEnum;
import com.cc672cc.pojo.vo.reqvo.BeautifyPhotoReqVO;
import com.cc672cc.pojo.vo.reqvo.ReplicateImageUpscaleBeautifyPhotoReqVO;
import com.cc672cc.pojo.vo.reqvo.client.ReplicateCommonReqVO;
import com.cc672cc.pojo.vo.reqvo.client.ReplicateImageUpscaleReqVO;
import com.cc672cc.pojo.vo.respvo.client.ReplicateCommonRespVO;
import com.cc672cc.processor.AiJobProcessor;
import com.cc672cc.service.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;@Slf4j
@Component
public class ReplicateJobProcessor implements AiJobProcessor {@Autowiredprivate IReplicateService replicateService;@Autowired@Lazyprivate IAiJobService aiJobService;@Autowiredprivate IQiNiuService qiNiuService;@Lazy@Autowiredprivate IPhotoBeautifyService photoBeautifyService;private List<String> enhanceModelList = Arrays.asList("Standard V2", "Low Resolution V2", "CGI", "High Fidelity V2", "Text Refine");private List<Integer> upscaleFactorList = Arrays.asList(2, 4, 6);@Overridepublic Integer[] unitProgressShowFlag() {return new Integer[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};}@Overridepublic Long businessTimeoutS() {return 60 * 60 * 24L;}@Overridepublic String buildReqJson(BeautifyPhotoReqVO reqVO, PhotoReviewAnalysis photoReviewAnalysis) {ReplicateImageUpscaleBeautifyPhotoReqVO detailReqVo = JSON.parseObject(JSON.toJSONString(reqVO.getParams()), ReplicateImageUpscaleBeautifyPhotoReqVO.class);if (StringUtils.isBlank(detailReqVo.getEnhanceModel())) {throw new RuntimeException("增强模型不能为空");}if (!enhanceModelList.contains(detailReqVo.getEnhanceModel())) {throw new RuntimeException("增强模型错误");}if (detailReqVo.getUpscaleFactor() == null) {throw new RuntimeException("放大倍数不能为空");}if (!upscaleFactorList.contains(detailReqVo.getUpscaleFactor())) {throw new RuntimeException("放大倍数错误");}ReplicateCommonReqVO<ReplicateImageUpscaleReqVO> req = new ReplicateCommonReqVO<>();ReplicateImageUpscaleReqVO input = new ReplicateImageUpscaleReqVO();input.setImage(photoReviewAnalysis.getOriImgUrl());input.setEnhanceModel(detailReqVo.getEnhanceModel());input.setUpscaleFactor(detailReqVo.getUpscaleFactor() + "x");req.setInput(input);return JSON.toJSONString(req);}@Overridepublic Integer process(AiJob aiJob) {String reqJson = aiJob.getReqJson();if (StringUtils.isBlank(reqJson)) {aiJob.setJobStatusDesc("reqJson is blank");return AiJobStatusEnum.FAILED.getCode();}Type type = new TypeReference<ReplicateCommonReqVO<ReplicateImageUpscaleReqVO>>() {}.getType();ReplicateCommonReqVO<ReplicateImageUpscaleReqVO> reqVO = JSON.parseObject(reqJson, type);ReplicateCommonRespVO<String> respVO = null;String message = "";try {respVO = replicateService.imageUpscale(reqVO);message = respVO != null ? respVO.getError() : "";if (respVO != null && StringUtils.isNotBlank(respVO.getId())) {String jobId = respVO.getId();aiJob.setOutId(jobId);return AiJobStatusEnum.SUBMITTED.getCode();}} catch (Exception e) {message = MessageUtils.normalMaxLength(e.getMessage());}aiJob.setJobStatusDesc(message);return AiJobStatusEnum.FAILED.getCode();}@Overridepublic Integer query(AiJob aiJob) {String jobId = aiJob.getOutId();ReplicateCommonRespVO<String> ajax = replicateService.query(jobId);Integer jobStatus = aiJob.getJobStatus();String jobStatusDesc = aiJob.getJobStatusDesc();String respJson = aiJob.getRespJson();Date respTime = aiJob.getRespTime() == null ? new Date() : aiJob.getRespTime();Long costTime = aiJob.getCostTime();Date reqTime = aiJob.getReqTime();if (ajax != null && StringUtils.isNotBlank(ajax.getStatus())) {String message = ajax.getError();String status = ajax.getStatus();String data = ajax.getOutput();respTime = new Date();respJson = JSON.toJSONString(ajax);if ("processing".equals(status) && AiJobStatusEnum.SUBMITTED.getCode().equals(jobStatus)) {jobStatus = AiJobStatusEnum.QUEUED.getCode();} else if ("processing".equals(status) && AiJobStatusEnum.QUEUED.getCode().equals(jobStatus)) {jobStatus = AiJobStatusEnum.PROCESSING.getCode();} else if ("succeeded".equals(status) && StringUtils.isNotBlank(data)) {jobStatus = AiJobStatusEnum.GENERATED.getCode();}}costTime = (respTime.getTime() - reqTime.getTime()) / 1000;if (costTime >= businessTimeoutS()) {jobStatus = AiJobStatusEnum.TIMEOUT.getCode();jobStatusDesc = "任务业务超时";}aiJob.setCostTime(costTime);aiJob.setRespTime(respTime);aiJob.setRespJson(respJson);aiJob.setJobStatus(jobStatus);aiJob.setJobStatusDesc(jobStatusDesc);aiJob.setUpdateTime(new Date());return jobStatus;}@Overridepublic Integer migrate(Long aiJobId, Integer jobStatus) {Date now = new Date();try {// 如果是已生成,则直接迁移if (AiJobStatusEnum.GENERATED.getCode().equals(jobStatus)) {jobStatus = AiJobStatusEnum.MIGRATING.getCode();return jobStatus;}// 1. 查询AiJob信息AiJob aiJob = aiJobService.selectOneById(aiJobId);if (aiJob == null) {log.error("迁移任务失败:未找到AiJob记录,aiJobId={}", aiJobId);return jobStatus;}// 2. 解析响应JSONString respJson = aiJob.getRespJson();if (StringUtils.isBlank(respJson)) {log.error("迁移任务失败:AiJob[aiJobId={}]的respJson为空", aiJobId);return jobStatus;}Type type = new TypeReference<ReplicateCommonRespVO<String>>() {}.getType();ReplicateCommonRespVO<String> respVO;try {respVO = JSON.parseObject(respJson, type);} catch (Exception e) {log.error("迁移任务失败:AiJob[aiJobId={}]的respJson解析失败,json={}", aiJobId, respJson, e);return jobStatus;}String data = respVO.getOutput();if (data == null) {log.error("迁移任务失败:AiJob[aiJobId={}]的响应数据data为空", aiJobId);return jobStatus;}String mediaData = data;List<String> images = new ArrayList<>();images.add(mediaData);// 3. 下载图片并创建PhotoBeautify列表List<PhotoBeautify> addList = new ArrayList<>();for (int i = 0; i < images.size(); i++) {String originalImgUrl = images.get(i);long start = System.currentTimeMillis();try {// 下载网络图片到七牛云(假设downloadWebFile已处理异常)String imageUrl = qiNiuService.downloadWebFile(originalImgUrl);PhotoBeautify photoBeautify = new PhotoBeautify();photoBeautify.setAiJobId(aiJobId);photoBeautify.setStatus(StatusEnum.EFFECTIVE.getCode());photoBeautify.setDel(DelEnum.NOT_DELETED.getCode());photoBeautify.setCode(CodeUtil.getRandomCode(10));photoBeautify.setImgUrl(imageUrl);photoBeautify.setOriginalImgUrl(originalImgUrl);photoBeautify.setImgStatus(4);photoBeautify.setCreateDate(DateUtil.format(now, "yyyy-MM-dd"));photoBeautify.setCreateTime(now);photoBeautify.setPhotoReviewAnalysisCode(aiJob.getPhotoReviewAnalysisCode());photoBeautify.setModelName(aiJob.getModelName());long end = System.currentTimeMillis();// 计算耗时 单位秒photoBeautify.setCostTime((end - start) / 1000);photoBeautify.setSort(i);addList.add(photoBeautify);} catch (IOException e) {log.error("迁移任务失败:下载图片[url={}]失败,aiJobId={}", originalImgUrl, aiJobId, e);// 可选择继续处理后续图片或直接返回失败(根据业务需求)return AiJobStatusEnum.FAILED.getCode();}}// 4. 批量插入数据库boolean insertResult = photoBeautifyService.batchAdd(addList);if (!insertResult) {log.error("迁移任务失败:批量插入PhotoBeautify失败,aiJobId={}", aiJobId);return AiJobStatusEnum.FAILED.getCode();}log.info("迁移任务成功,aiJobId={},共迁移图片{}张", aiJobId, images.size());return AiJobStatusEnum.SUCCESS.getCode();} catch (Exception e) {log.error("迁移任务发生未知异常,aiJobId={}", aiJobId, e);return jobStatus;}}@Overridepublic String getSupportedModel() {return "ImageUpscale";}
}

ReplicateApi

package com.cc672cc.client;import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.cc672cc.common.utils.OkHttpClientUtil;
import com.cc672cc.pojo.vo.reqvo.client.ReplicateCommonReqVO;
import com.cc672cc.pojo.vo.reqvo.client.ReplicateImageUpscaleReqVO;import com.cc672cc.pojo.vo.respvo.client.ReplicateCommonRespVO;
import com.cc672cc.properties.ReplicateProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.lang.reflect.Type;import java.util.Map;@Component
public class ReplicateApi {@Autowiredprivate ReplicateProperties replicateProperties;public ReplicateCommonRespVO<String> imageUpscale(ReplicateCommonReqVO<ReplicateImageUpscaleReqVO> req) {String uri = "/v1/models/topazlabs/image-upscale/predictions";Map<String, String> headers = Map.of("Authorization", String.format("Bearer %s", replicateProperties.getAppKey()));Map<String, Object> body = JSON.parseObject(JSON.toJSONString(req), Map.class);// 使用 TypeReference 传递完整泛型类型Type type = new TypeReference<ReplicateCommonRespVO<String>>() {}.getType();return OkHttpClientUtil.ajax(replicateProperties.getBaseUrl(), uri, "POST", headers,OkHttpClientUtil.EMPTY_MAP, body, type);}public ReplicateCommonRespVO<String> query(String jobId) {String uri = "/v1/predictions/" + jobId;Map<String, String> headers = Map.of("Authorization", String.format("Bearer %s", replicateProperties.getAppKey()));Type type = new TypeReference<ReplicateCommonRespVO<String>>() {}.getType();return OkHttpClientUtil.ajax(replicateProperties.getBaseUrl(), uri, "GET", headers, OkHttpClientUtil.EMPTY_MAP, OkHttpClientUtil.EMPTY_MAP, type);}}

OkHttpClientUtil 通用万能版

package com.cc672cc.common.utils;import cn.hutool.http.ContentType;
import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.cc672cc.common.model.ReturnInfo;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.HashMap;import java.util.Map;
import java.util.concurrent.TimeUnit;/*** 简单封装的okhttpcient工具用于返回同一反参** @author liaoqian* @date 2024-01-17*/
@Slf4j
@SuppressWarnings("all")
public class OkHttpClientUtil {private static final String okHttpClientName = "okHttpClientUtil";public static final String METHOD_POST = "POST";public static final String METHOD_GET = "GET";public static final Map<String, Object> EMPTY_MAP = new HashMap<>();/*** 连接超时时间**/private static final int CONNECT_TIMEOUT_SECONDS = 60;/*** 读取返回信息超时时间**/private static final int READ_TIMEOUT_SECONDS = 60;/*** 读取返回信息超时时间*/private static final int CALL_TIMEOUT_SECONDS = 120;/*** 读取返回信息超时时间**/private static final int WRITE_TIMEOUT_SECONDS = 300;private static OkHttpClient okHttpClient;static {if (okHttpClient == null) {synchronized (OkHttpClientUtil.class) {if (okHttpClient == null) {okHttpClient = new OkHttpClient.Builder().connectTimeout(CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS).readTimeout(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS).callTimeout(CALL_TIMEOUT_SECONDS, TimeUnit.SECONDS).writeTimeout(WRITE_TIMEOUT_SECONDS, TimeUnit.SECONDS).retryOnConnectionFailure(true).build();}}}}/*** @param host      请求host* @param uri       请求uri* @param method    请求方式* @param headers   请求头* @param paramsObj 请求参数* @param bodyObj   请求体* @return 返回JSONObject*/public static JSONObject ajax(String host, String uri, String method, Map<String, String> headers, Object paramsObj, Object bodyObj) {Map params = JSON.parseObject(JSON.toJSONString(paramsObj), Map.class);Map body = JSON.parseObject(JSON.toJSONString(bodyObj), Map.class);return ajax(host, uri, method, headers, params, body);}/*** @param host    请求host* @param uri     请求uri* @param method  请求方式* @param headers 请求头* @param params  请求参数* @param body    请求体* @return 返回JSONObject*/public static JSONObject ajax(String host, String uri, String method, Map<String, String> headers, Map<String, Object> params, Map<String, Object> body) {Response response = ajaxProcess(host, uri, method, headers, params, body);JSONObject jsonObject = null;if (response != null) {try (ResponseBody responseBody = response.body()) {String respContentType = response.header("Content-Type");if (StringUtils.isNotBlank(respContentType)) {if ("text/event-stream".equals(respContentType)) {StringBuilder sb = new StringBuilder();// 将这个response的内容转为字符串BufferedReader reader = new BufferedReader(new InputStreamReader(responseBody.byteStream()));String line;while ((line = reader.readLine()) != null) {if (line.startsWith("event: text")) {sb.append(extractTextData(reader));}}ReturnInfo returnInfo = new ReturnInfo(sb.toString());jsonObject = new JSONObject(returnInfo);return jsonObject;}}String result = responseBody.string();log.info("***** {} ajax result : {} *****", okHttpClientName, result);if (JSON.isValid(result)) {jsonObject = new JSONObject(result);} else {ReturnInfo returnInfo = new ReturnInfo(result);jsonObject = new JSONObject(returnInfo);}} catch (Exception e) {log.error("***** {} ajaxProcess  e : {}  *****", okHttpClientName, e);}}return jsonObject;}private static String extractTextData(BufferedReader reader) throws IOException {StringBuilder sb = new StringBuilder();String line;while ((line = reader.readLine()) != null && !line.isEmpty()) {if (line.startsWith("data: ")) {String substring = line.substring("data: ".length());substring = substring.replace("\"", "");sb.append(substring);}}return sb.toString();}private static Response ajaxProcess(String host, String uri, String method, Map<String, String> headers, Map<String, Object> params, Map<String, Object> body) {OkHttpClient client = okHttpClient;String url = host + uri;Request.Builder builder = new Request.Builder();// 请求头处理if (headers != null && !headers.isEmpty()) {builder.headers(Headers.of(headers));}// 请求方式处理if (true) {if (params != null && !params.isEmpty()) {StringBuilder sb = new StringBuilder();sb.append("?");params.entrySet().stream().forEach(e -> {sb.append(e.getKey()).append("=").append(String.valueOf(e.getValue())).append("&");});sb.delete(sb.length() - 1, sb.length());url += sb.toString();}builder.get();}if (METHOD_POST.equals(method.toUpperCase())) {if (body != null) {builder.post(RequestBody.create(MediaType.parse(ContentType.JSON.toString()), JSON.toJSONString(body)));}}Request request = builder.url(url).build();Response response = null;try {response = client.newCall(request).execute();} catch (IOException e) {log.error("***** {} ajaxProcess  e : {}  *****", okHttpClientName, e);}return response;}public static <T> T ajax(String host, String uri, String method, Map<String, String> headers, Map<String, Object> params, Map<String, Object> body, Type type) {JSONObject ajax = ajax(host, uri, method, headers, params, body);if (ajax != null) {return JSON.parseObject(ajax.toString(), type);} else {return null;}}}

新建任务参考

public AiJobStatusModel beautifyPhoto(BeautifyPhotoReqVO reqVO) {UserInfo userInfo = userService.getCurLoginUser(true);String photoReviewAnalysisCode = reqVO.getPhotoReviewAnalysisCode();PhotoReviewAnalysis photoReviewAnalysis = photoReviewService.selectOneByCode(photoReviewAnalysisCode);if (photoReviewAnalysis == null) {throw new BusinessException(ExceptionEnum.DATA_NOT_FOUND);}if (!userInfo.getId().equals(photoReviewAnalysis.getUserId())) {throw new BusinessException(ExceptionEnum.PERMISSION_DENIED);}Boolean beautifyAbility = photoReviewAnalysis.getBeautifyAbility();if (Boolean.FALSE.equals(beautifyAbility)) {throw new RuntimeException("图片暂不支持美化,请查看【开启美化能力依据】");}Map<String, AiModelEnum> map = AiModelEnum.getMap();AiModelEnum aiModelEnum = map.get(reqVO.getModelId());if (aiModelEnum == null) {throw new RuntimeException("模型不存在");}Long userId = userInfo.getId();Date now = new Date();String dateFormat = DateUtil.format(now, "yyyy-MM-dd");UserPointPerDayCheckModel checkModel = userBenefitsService.checkPointLimitPerDay(userId, dateFormat);if (checkModel.getLimit()) {throw new RuntimeException("已到达今日次数上限,请明日再来试试吧");}String code = CodeUtil.getRandomCode(10);AiJob aiJob = new AiJob();aiJob.setStatus(StatusEnum.EFFECTIVE.getCode());aiJob.setDel(DelEnum.NOT_DELETED.getCode());aiJob.setJobStatus(AiJobStatusEnum.DRAFT.getCode());aiJob.setUnitProgress(BigDecimal.ZERO);aiJob.setOverallProgress(BigDecimal.ZERO);aiJob.setUserId(userInfo.getId());aiJob.setPhotoReviewAnalysisCode(photoReviewAnalysis.getCode());aiJob.setAction(reqVO.getAction());aiJob.setType(reqVO.getType());aiJob.setCode(code);aiJob.setModel(aiModelEnum.getModel());aiJob.setModelVersion(aiModelEnum.getModelVersion());aiJob.setModelId(aiModelEnum.getModelId());aiJob.setModelName(aiModelEnum.getModelName());aiJob.setOutputCount(reqVO.getOutputCount());aiJob.setPlatform(aiModelEnum.getPlatform());aiJob.setChannel(aiModelEnum.getChannel());aiJob.setAsyn(aiModelEnum.getAsyn());aiJob.setCreateDate(DateUtil.format(now, "yyyy-MM-dd"));aiJob.setReqTime(now);String reqJson = processorFactory.getProcessor(aiModelEnum.getModel()).buildReqJson(reqVO, photoReviewAnalysis);aiJob.setReqJson(reqJson);aiJob.setCreateTime(now);Long id = aiJobService.add(aiJob);if (id > 0) {// 给个人账户加1userBenefitsService.addPointCountPerDay(userId, dateFormat, aiModelEnum, reqVO.getOutputCount());AiJobStatusModel res = BeanHelper.copyProperties(aiJob, AiJobStatusModel.class);// 这里推送消息myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(res));// 立马去提交任务myEventPubLisher.pushAiJobNeedSubmitRightNow(aiJob);return res;}throw new RuntimeException("生成美化图片创建失败,请稍后再试");}

定时任务实现

IAiJobService

package com.cc672cc.service;import com.cc672cc.entity.tb.AiJob;public interface IAiJobService {/*** 添加任务** @param aiJob* @return 任务ID*/Long add(AiJob aiJob);/*** 提交任务* @param submitAiJob 提交的任务* @return*/int submitAiJob(AiJob submitAiJob);/*** 刷新AI任务状态** @return 刷新的任务数量*/int refreshAiJobStatus();/*** 刷新AI任务状态详情* @param aiJob AI任务* @return*/int refreshAiJobStatusDetail(AiJob aiJob);/*** 迁移AI任务* @return*/int migrateAiJob();/*** 迁移AI任务详情* @param aiJob AI任务* @return*/int migrateAiJobDetail(AiJob aiJob);/**** @param aiJobId* @return*/AiJob selectOneById(Long aiJobId);}

AiJobServiceImpl

package com.cc672cc.service.impl;import com.alibaba.fastjson2.JSON;
import com.cc672cc.common.constants.RedisPreKey;
import com.cc672cc.common.model.AiJobStatusModel;
import com.cc672cc.common.utils.BeanHelper;
import com.cc672cc.dp.listenermode.publisher.MyEventPubLisher;
import com.cc672cc.entity.tb.AiJob;
import com.cc672cc.entity.tb.PhotoReviewAnalysis;
import com.cc672cc.enums.dict.AiJobStatusEnum;
import com.cc672cc.enums.dict.AiModelEnum;
import com.cc672cc.enums.dict.DelEnum;
import com.cc672cc.enums.dict.StatusEnum;
import com.cc672cc.mapper.AiJobMapper;
import com.cc672cc.processor.AiJobProcessorFactory;
import com.cc672cc.service.IAiJobService;
import com.cc672cc.service.IPhotoReviewService;
import com.cc672cc.service.IUserBenefitsService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import tk.mybatis.mapper.entity.Example;import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;@Slf4j
@Service
public class AiJobServiceImpl implements IAiJobService {@Autowiredprivate AiJobMapper aiJobMapper;@Autowiredprivate RedissonClient redissonClient;@Lazy@Autowiredprivate AiJobProcessorFactory processorFactory;@Lazy@Autowiredprivate IUserBenefitsService userBenefitsService;@Lazy@Autowiredprivate IPhotoReviewService photoReviewService;@Lazy@Autowiredprivate MyEventPubLisher myEventPubLisher;@Overridepublic Long add(AiJob aiJob) {int insert = aiJobMapper.insert(aiJob);return insert > 0 ? aiJob.getId() : null;}@Overridepublic int submitAiJob(AiJob submitAiJob) {int res = 0;List<AiJob> aiJobs = new ArrayList<>();if (submitAiJob != null) {aiJobs.add(submitAiJob);} else {Example example = new Example(AiJob.class);Example.Criteria criteria = example.createCriteria();criteria.andEqualTo("status", StatusEnum.EFFECTIVE.getCode());criteria.andEqualTo("del", DelEnum.NOT_DELETED.getCode());criteria.andIn("jobStatus", List.of(AiJobStatusEnum.DRAFT.getCode()));example.orderBy("reqTime").desc();aiJobs = aiJobMapper.selectByExample(example);}String redisPreKey = RedisPreKey.CACHE_AI_JOB_PROCESS;if (aiJobs != null && !aiJobs.isEmpty()) {for (AiJob aiJob : aiJobs) {String lock = redisPreKey + aiJob.getId();RLock rLock = redissonClient.getLock(lock);try {boolean tryLock = rLock.tryLock(5, 60, TimeUnit.SECONDS);if (!tryLock) {continue;}AiJob newAiJob = aiJobMapper.selectByPrimaryKey(aiJob.getId());String model = newAiJob.getModel();Integer aiJobStatus = processorFactory.getProcessor(model).process(newAiJob);Integer[] unitProgressShowFlag = processorFactory.getProcessor(model).unitProgressShowFlag();if (List.of(AiJobStatusEnum.SUBMITTED.getCode(),AiJobStatusEnum.QUEUED.getCode(),AiJobStatusEnum.PROCESSING.getCode(),AiJobStatusEnum.GENERATED.getCode(),AiJobStatusEnum.SUCCESS.getCode(),AiJobStatusEnum.FAILED.getCode(),AiJobStatusEnum.CANCELED.getCode(),AiJobStatusEnum.TIMEOUT.getCode()).contains(aiJobStatus)) {newAiJob.setJobStatus(aiJobStatus);newAiJob.setOverallProgress(AiJobStatusEnum.getProgress(aiJobStatus));Integer showFlag = unitProgressShowFlag[aiJobStatus];if (0 == showFlag) {newAiJob.setUnitProgress(null);}int update = aiJobMapper.updateByPrimaryKey(newAiJob);if (update == 1) {AiJobStatusModel aiJobStatusModel = BeanHelper.copyProperties(newAiJob, AiJobStatusModel.class);myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(aiJobStatusModel));res++;}}} catch (Exception e) {log.error("提交AI任务失败", e);} finally {if (rLock != null && rLock.isHeldByCurrentThread()) {rLock.unlock();}}}}return res;}@Overridepublic int refreshAiJobStatus() {int res = 0;Example example = new Example(AiJob.class);Example.Criteria criteria = example.createCriteria();criteria.andEqualTo("status", StatusEnum.EFFECTIVE.getCode());criteria.andEqualTo("del", DelEnum.NOT_DELETED.getCode());criteria.andIn("jobStatus", List.of(AiJobStatusEnum.SUBMITTED.getCode(),AiJobStatusEnum.QUEUED.getCode(),AiJobStatusEnum.PROCESSING.getCode()));example.orderBy("reqTime").desc();List<AiJob> aiJobs = aiJobMapper.selectByExample(example);if (aiJobs != null && !aiJobs.isEmpty()) {for (AiJob aiJob : aiJobs) {myEventPubLisher.pushAiJobStatusRefreshEvent(aiJob);res++;}}return res;}@Overridepublic int refreshAiJobStatusDetail(AiJob aiJob) {String redisPreKey = RedisPreKey.CACHE_AI_JOB_PROCESS;if (aiJob != null && aiJob.getId() != null) {myEventPubLisher.pushAiJobStatusRefreshEvent(aiJob);String lock = redisPreKey + aiJob.getId();RLock rLock = redissonClient.getLock(lock);try {boolean tryLock = rLock.tryLock(5, 30, TimeUnit.SECONDS);if (!tryLock) {return 0;}AiJob newAiJob = aiJobMapper.selectByPrimaryKey(aiJob.getId());String model = newAiJob.getModel();Integer aiJobStatus = processorFactory.getProcessor(model).query(newAiJob);Integer[] unitProgressShowFlag = processorFactory.getProcessor(model).unitProgressShowFlag();if (List.of(AiJobStatusEnum.QUEUED.getCode(),AiJobStatusEnum.PROCESSING.getCode(),AiJobStatusEnum.GENERATED.getCode(),AiJobStatusEnum.SUCCESS.getCode(),AiJobStatusEnum.FAILED.getCode(),AiJobStatusEnum.CANCELED.getCode(),AiJobStatusEnum.TIMEOUT.getCode()).contains(aiJobStatus)) {newAiJob.setJobStatus(aiJobStatus);newAiJob.setOverallProgress(AiJobStatusEnum.getProgress(aiJobStatus));Integer showFlag = unitProgressShowFlag[aiJobStatus];if (0 == showFlag) {newAiJob.setUnitProgress(null);}int update = aiJobMapper.updateByPrimaryKey(newAiJob);if (update == 1) {AiJobStatusModel aiJobStatusModel = BeanHelper.copyProperties(newAiJob, AiJobStatusModel.class);myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(aiJobStatusModel));return 1;}}} catch (Exception e) {log.error("刷新AI任务失败", e);} finally {if (rLock != null && rLock.isHeldByCurrentThread()) {rLock.unlock();}}}return 0;}@Overridepublic int migrateAiJob() {int res = 0;Example example = new Example(AiJob.class);Example.Criteria criteria = example.createCriteria();criteria.andEqualTo("status", StatusEnum.EFFECTIVE.getCode());criteria.andEqualTo("del", DelEnum.NOT_DELETED.getCode());criteria.andIn("jobStatus", List.of(AiJobStatusEnum.GENERATED.getCode(),AiJobStatusEnum.MIGRATING.getCode()));example.orderBy("reqTime").desc();List<AiJob> aiJobs = aiJobMapper.selectByExample(example);if (aiJobs != null && !aiJobs.isEmpty()) {for (AiJob aiJob : aiJobs) {myEventPubLisher.pushAiJobMigrateEvent(aiJob);res++;}}return res;}@Overridepublic int migrateAiJobDetail(AiJob aiJob) {String redisPreKey = RedisPreKey.CACHE_AI_JOB_PROCESS;if (aiJob != null && aiJob.getId() != null) {String lock = redisPreKey + aiJob.getId();RLock rLock = redissonClient.getLock(lock);try {boolean tryLock = rLock.tryLock(5, 300, TimeUnit.SECONDS);if (!tryLock) {return 0;}AiJob newAiJob = aiJobMapper.selectByPrimaryKey(aiJob.getId());String model = newAiJob.getModel();Integer jobStatus = newAiJob.getJobStatus();if (AiJobStatusEnum.SUCCESS.getCode().equals(jobStatus)) {AiJobStatusModel aiJobStatusModel = BeanHelper.copyProperties(newAiJob, AiJobStatusModel.class);myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(aiJobStatusModel));return 1;}Integer aiJobStatus = processorFactory.getProcessor(model).migrate(newAiJob.getId(), jobStatus);Integer[] unitProgressShowFlag = processorFactory.getProcessor(model).unitProgressShowFlag();if (List.of(AiJobStatusEnum.MIGRATING.getCode(),AiJobStatusEnum.SUCCESS.getCode(),AiJobStatusEnum.FAILED.getCode(),AiJobStatusEnum.CANCELED.getCode(),AiJobStatusEnum.TIMEOUT.getCode()).contains(aiJobStatus)) {newAiJob.setJobStatus(aiJobStatus);newAiJob.setOverallProgress(AiJobStatusEnum.getProgress(aiJobStatus));Integer showFlag = unitProgressShowFlag[aiJobStatus];if (0 == showFlag) {newAiJob.setUnitProgress(null);}int update = aiJobMapper.updateByPrimaryKey(newAiJob);if (update == 1) {AiJobStatusModel aiJobStatusModel = BeanHelper.copyProperties(newAiJob, AiJobStatusModel.class);myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(aiJobStatusModel));return 1;}}if (List.of(AiJobStatusEnum.FAILED.getCode(),AiJobStatusEnum.CANCELED.getCode(),AiJobStatusEnum.TIMEOUT.getCode()).contains(aiJobStatus)) {Long userId = newAiJob.getUserId();newAiJob.setJobStatusDesc("迁移照片过程失败,已返还每日积分");String modelId = newAiJob.getModelId();AiModelEnum aiModelEnum = AiModelEnum.getMap().get(modelId);Integer outputCount = newAiJob.getOutputCount();userBenefitsService.subtractPointCountPerDay(userId, newAiJob.getCreateDate(), aiModelEnum, outputCount);String photoReviewAnalysisCode = newAiJob.getPhotoReviewAnalysisCode();PhotoReviewAnalysis photoReviewAnalysis = photoReviewService.selectOneByCode(photoReviewAnalysisCode);photoReviewAnalysis.setBeautifyImage(Boolean.FALSE);photoReviewAnalysis.setUpdateTime(new Date());photoReviewService.updateById(photoReviewAnalysis);}} catch (Exception e) {log.error("迁移AI任务失败", e);} finally {if (rLock != null && rLock.isHeldByCurrentThread()) {rLock.unlock();}}}return 0;}@Overridepublic AiJob selectOneById(Long aiJobId) {return aiJobMapper.selectByPrimaryKey(aiJobId);}
}

整体业务流程

提交AI任务->定时任务刷新状态->定时任务迁移 (配合webscoket实时推送状态)

总结

该方案适用于所有需要对接三方异步 API 的场景(如 AI 生成、视频处理、数据分析等),通过标准化状态管理与流程控制,解决了异步任务的复杂性与不可靠性问题。通用95%以上的场景,非常好用

http://www.dtcms.com/a/319520.html

相关文章:

  • itextPdf获取pdf文件宽高不准确
  • 设计模式-装饰模式 Java
  • 客户端利用MinIO对服务器数据进行同步
  • VN1 供应链销量预测建模竞赛技巧总结与分享(七)
  • 四边形面积
  • 极简 5 步:Ubuntu+RTX4090 源码编译 vLLM
  • JavaWeb03——基础标签及样式(表单)(黑马视频笔记)
  • 八、基于GD32 Embedded Builder开发GD32VW553(蓝牙广播)
  • 复杂光照场景漏检率↓76%!陌讯多模态融合算法在打电话识别的边缘部署优化
  • 使用Puppeteer轻松自动化浏览器操作
  • PYLON交叉编译:Ubuntu是x86,编译出arm64上运行的程序
  • 无人机航拍数据集|第8期 无人机海上目标检测YOLO数据集3641张yolov11/yolov8/yolov5可训练
  • 下载 | Windows Server 2016最新原版ISO映像!(集成7月更新、标准版、数据中心版、14393.8246)
  • 基于 C 语言的多态机制的驱动架构
  • 十八、k8s细粒度流量管理:服务网格
  • UiPath Studio介绍
  • CS231n2017 Assignment3 RNN、LSTM部分
  • 仁懋高压MOSFET在新能源汽车充电领域的应用
  • Java并发与数据库锁机制:悲观锁、乐观锁、隐式锁与显式锁
  • Java基础学习1(Java语言概述)
  • 音视频时间戳获取与同步原理详解
  • 如何为WordPress启用LiteSpeed缓存
  • --- Eureka 服务注册发现 ---
  • 安卓Handler和Looper的学习记录
  • 计算机视觉-OpenCV
  • GPT-5 将在周五凌晨1点正式发布,王炸模型将免费使用??
  • Android 之 Kotlin 扩展库KTX
  • 突破距离桎梏:5G 高清视频终端如何延伸无人机图传边界
  • RK3568项目(十三)--linux驱动开发之基础通讯接口(下)
  • 闪迪 SN8100 旗舰固态评测:读 14.9GB/s,写 14.0GB/s 的性能怪兽