新冠(covid19)完整测序流程(java调用docker容器方式实现,算法为nextclade和pangolin)
1,配置docker环境,链接如下
Docker: Accelerated Container Application Developmenthttps://www.docker.com/
2,通过docker-compose生成相关容器,链接如下:
docker-compose(一键部署配置文件)_docker compose 配置文件-CSDN博客https://blog.csdn.net/weixin_62108279/article/details/143897925?spm=1011.2415.3001.5331
3,下载nextclade算法镜像
本地部署nextclade作为进化枝分配、突变检出和序列质量分析(数据集为新冠的数据集,可根据不同的数据集进行,不同病毒的相关分析)_nextclade安装下载-CSDN博客https://blog.csdn.net/weixin_62108279/article/details/145966789?spm=1011.2415.3001.5331
4,下载pangolin算法镜像
本地部署pangolin获取谱系,从而达到预测新冠的流行趋势-CSDN博客https://blog.csdn.net/weixin_62108279/article/details/146055003?spm=1011.2415.3001.5331
5,添加java 调用docker工具类
Jvav调用docke API (通过本地镜像生成容器获取运算结果)-CSDN博客https://blog.csdn.net/weixin_62108279/article/details/146328209?spm=1011.2415.3001.5331
6,整合rabbitMq消息发送,从而达到算法的调用
逻辑:序列添加成功后,发送消息到任务模块,任务模块监听到消息,通过docker工具类调用算法获取结果进行封装
以下为监听器代码示例:
package org.springblade.task.listeners;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.constant.DockerConstantIn;
import org.springblade.common.constant.RabbitMqConstant;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.StringUtil;
import org.springblade.sample.entity.Resultmp;
import org.springblade.sample.entity.SampleCovid19;
import org.springblade.sample.feign.ISampleFeign;
import org.springblade.system.entity.DictBiz;
import org.springblade.system.feign.IDictBizClient;
import org.springblade.task.entity.TaskCovid19;
import org.springblade.task.service.ITaskCovid19Service;
import org.springblade.task.utils.DockerUtil;
import org.springblade.task.utils.JacksonUtil;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class TmListener {
private final ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private ITaskCovid19Service taskCovid19Service;
@Resource
private ISampleFeign sampleFeign;
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = RabbitMqConstant.TOPIC_SAMPLE_COVID19_DEFAULT_QUEUE), exchange = @Exchange(name = RabbitMqConstant.TOPIC_SAMPLE_COVID19_DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC), key = {RabbitMqConstant.SAMPLE_COVID19_DEFAULT_KEY}))
private void topicSampleCovid19Default(Message msg) throws JsonProcessingException {
Long userId = AuthUtil.getUserId();
// 将字节数组转换为字符串
String json = new String(msg.getBody(), StandardCharsets.UTF_8);
Map<String, Object> messageBodyMap = objectMapper.readValue(json, objectMapper.getTypeFactory().constructMapType(Map.class, String.class, Object.class));
String messageId = msg.getMessageProperties().getMessageId();
log.info("消息接收成功,唯一id:{}", messageId);
String taskName = (String) messageBodyMap.get("taskName");
Long genomeId = (Long) messageBodyMap.get("genomeId");
Long sampleId = (Long) messageBodyMap.get("sampleId");
String pangolinInput = (String) messageBodyMap.get("pangolinInputFileName");
String nextcladeInput = (String) messageBodyMap.get("nextcladeInputFileName");
Long taskId = (Long) messageBodyMap.get("taskId");
log.info("开始调用算法!");
//编写调用算法逻辑
DockerUtil.DockerConnectionNextstrain(taskName, nextcladeInput);
DockerUtil.DockerConnectionPangolin(taskName, pangolinInput);
//判断文件是否存在
Path nextcladePath = Paths.get(DockerConstantIn.NEXTCLADE_OUTPUT_PATH + taskName + DockerConstantIn.NEXTCLADE_SUFFIX);
Path pangolinPath = Paths.get(DockerConstantIn.PANGOLIN_OUTPUT_PATH + taskName + DockerConstantIn.PANGOLIN_SUFFIX);
try {
if (Files.exists(nextcladePath) && Files.exists(pangolinPath)) {
//获取结果整合入库
Map<String, Object> nextclademap = objectMapper.readValue(new File(nextcladePath.toString()), Map.class);
Map<String, String> pangolinMap = JacksonUtil.readCSV(pangolinPath.toString());
//获取pangolin lineage
String lineage = pangolinMap.get("lineage");
//获取nextclade results
List<Map<String, Object>> list = (List<Map<String, Object>>) nextclademap.get("results");
Map<String, Object> results = list.get(0);
//获取clade分型
String clade = (String) results.get("clade");
//获取序列覆盖度
Double coverage = (Double) results.get("coverage");
BigDecimal overage = new BigDecimal(coverage.toString()).multiply(new BigDecimal("100")).setScale(2, RoundingMode.HALF_UP);
//获取序列质量
Map<String, Object> qc = (Map<String, Object>) results.get("qc");
Map<String, Object> missingData = (Map<String, Object>) qc.get("missingData");
String missingDataStatus = (String) missingData.get("status");
Map<String, Object> mixedSites = (Map<String, Object>) qc.get("mixedSites");
String mixedSitesStatus = (String) mixedSites.get("status");
Map<String, Object> privateMutations = (Map<String, Object>) qc.get("privateMutations");
String privateMutationsStatus = (String) privateMutations.get("status");
Map<String, Object> snpClusters = (Map<String, Object>) qc.get("snpClusters");
Double snpClustersScore = (Double) snpClusters.get("score");
Map<String, Object> frameShifts = (Map<String, Object>) qc.get("frameShifts");
String frameShiftsStatus = (String) frameShifts.get("status");
Map<String, Object> stopCodons = (Map<String, Object>) qc.get("stopCodons");
String stopCodonsStatus = (String) stopCodons.get("status");
Double overallScore = (Double) qc.get("overallScore");
BigDecimal qcOverallScore = new BigDecimal(overallScore.toString()).setScale(6, RoundingMode.HALF_UP);
String overallStatus = (String) qc.get("overallStatus");
//获取nextclade lineage
Map<String, Object> customNodeAttributes = (Map<String, Object>) results.get("customNodeAttributes");
String nextcladePango = (String) customNodeAttributes.get("Nextclade_pango");
//获取nextclade Unaliased
String partiallyAliased = (String) customNodeAttributes.get("partiallyAliased");
//获取 "total_substitutions_exclude_utr" IS '核苷酸变异数量
Integer totalSubstitutions = (Integer) results.get("totalSubstitutions");
//获取 non-ACGTN
Integer totalNonACGTNs = (Integer) results.get("totalNonACGTNs");
//获取Ns
Integer totalMissing = (Integer) results.get("totalMissing");
//获取Gaps
Integer totalDeletions = (Integer) results.get("totalDeletions");
//获取Ins.
Integer totalInsertions = (Integer) results.get("totalInsertions");
//获取FS
Integer totalFrameShifts = (Integer) results.get("totalFrameShifts");
//获取SC
Integer totalAminoacidInsertions = (Integer) results.get("totalAminoacidInsertions");
//创建结果对象进行封装插入
Resultmp resultmp = new Resultmp();
resultmp.setSampleId(sampleId);
resultmp.setGenomeId(genomeId);
resultmp.setEndDate(new Date());
resultmp.setLineage(lineage);
resultmp.setClade(clade);
resultmp.setOverage(overage);
resultmp.setQcOverallStatus(overallStatus);
resultmp.setQcOverallScore(qcOverallScore);
resultmp.setQcMissingDataStatus(missingDataStatus);
resultmp.setQcMixedSitesStatus(mixedSitesStatus);
resultmp.setQcPrivateMutationsStatus(privateMutationsStatus);
if (snpClustersScore != null) {
if (snpClustersScore <= 30) {
resultmp.setQcSnpClustersStatus("good");
} else if (snpClustersScore <= 99) {
resultmp.setQcSnpClustersStatus("mediocre");
} else {
resultmp.setQcSnpClustersStatus("bad");
}
}
resultmp.setQcFrameShiftsStatus(frameShiftsStatus);
resultmp.setQcStopCodonsStatus(stopCodonsStatus);
resultmp.setPangoLineage(nextcladePango);
resultmp.setUnaliased(partiallyAliased);
resultmp.setTotalSubstitutionsExcludeUtr(totalSubstitutions.toString());
resultmp.setMut(totalSubstitutions.toString());
resultmp.setNonAcgtn(totalNonACGTNs.toString());
resultmp.setNs(totalMissing.toString());
resultmp.setCov(overage.toString());
resultmp.setGaps(totalDeletions.toString());
resultmp.setIns(totalInsertions.toString());
resultmp.setFs(totalFrameShifts.toString());
resultmp.setSc(totalAminoacidInsertions.toString());
SampleCovid19 covid19 = sampleFeign.SampleCovid19GetById(sampleId);
if (overage.doubleValue() >= 96) {
covid19.setIsValid(1);
resultmp.setIsResult(Short.valueOf("1"));
} else {
resultmp.setIsResult(Short.valueOf("0"));
covid19.setIsValid(0);
}
resultmp.setAce2Binding("0");
resultmp.setImmuneEscape("0");
resultmp.setUpdateUser(userId);
resultmp.setCreateUser(userId);
resultmp.setIsResult(Short.valueOf("1"));
resultmp.setSequenceState(Short.valueOf("1"));
sampleFeign.sampleCovid19ResultSave(resultmp);
log.info("nextclade算法结果:{}", nextcladePath);
log.info("pangolin算法结果:{}", nextcladePath);
//更改任务状态
this.updateTaskCovid19(taskId, 1, lineage, nextcladePango);
} else {
log.error("NEXTCLADE PANGOLIN结果获取失败:{}", taskName);
//更改任务状态
this.updateTaskCovid19(taskId, 2, null, null);
}
} catch (Exception e) {
log.error("NEXTCLADE PANGOLIN 数据结果获取失败:{} :{}", taskName, e.getMessage());
//更改任务状态
this.updateTaskCovid19(taskId, 2, null, null);
} finally {
this.deleteFile(nextcladePath);
this.deleteFile(Paths.get(DockerConstantIn.NEXTCLADE_INPUT_PATH + nextcladeInput));
this.deleteFile(Paths.get(DockerConstantIn.PANGOLIN_INPUT_PATH + pangolinInput));
this.deleteFile(pangolinPath);
}
}
/**
* @param taskId 任务id
* @param taskState 任务状态
* @param lineage PANGOLIN谱系
* @param nextcladePango NEXTCLADE 谱系
*/
private void updateTaskCovid19(Long taskId, Integer taskState, String lineage, String nextcladePango) {
LambdaUpdateWrapper<TaskCovid19> taskCovid19LambdaUpdateWrapper = new LambdaUpdateWrapper<>();
taskCovid19LambdaUpdateWrapper.eq(TaskCovid19::getId, taskId);
taskCovid19LambdaUpdateWrapper.set(TaskCovid19::getTaskState, taskState);
taskCovid19LambdaUpdateWrapper.set(TaskCovid19::getAnalysisTime, LocalDateTime.now());
taskCovid19LambdaUpdateWrapper.set(StringUtil.isNotBlank(lineage), TaskCovid19::getPangolinLineage, lineage);
taskCovid19LambdaUpdateWrapper.set(StringUtil.isNotBlank(nextcladePango), TaskCovid19::getNextcladeClade, nextcladePango);
taskCovid19Service.update(taskCovid19LambdaUpdateWrapper);
}
/**
* 删除指定文件
*
* @param path 文件路径
*/
private void deleteFile(Path path) {
try {
Files.delete(path);
System.out.println("文件删除成功:" + path);
} catch (IOException e) {
System.out.println("文件删除失败:" + e.getMessage());
}
}
}