当前位置: 首页 > news >正文

foreach 块并行加速

 实例1

1. 任务分块(chunking)

我们手动把 1:nrow(pair_list_df) 切分为 N 块,每块是一个线程要处理的任务:

  • 每个线程一次处理一个“任务块”而不是一个“任务点”,极大减少调度开销。

  • 保证线程之间处理量均衡,避免有的线程闲了、有的线程忙到最后。

library(dplyr)
library(stringr)
library(foreach)
library(doParallel)# 设置并行核心数
n_cores <- parallel::detectCores() - 1
cl <- makeCluster(n_cores)
registerDoParallel(cl)# 预处理 pair list
pair_list_df <- pair_list %>%str_split_fixed("__", 2) %>%as.data.frame(stringsAsFactors = FALSE) %>%filter(V1 %in% mRNA_id & V2 %in% miRNA_id)rm(pair_list)# 按核心数将任务切块
block_indices <- split(1:nrow(pair_list_df), cut(1:nrow(pair_list_df), n_cores, labels = FALSE))# 并行计算
results <- foreach(block = block_indices, .combine = rbind,.packages = c("dplyr")) %dopar% {block_result <- lapply(block, function(i) {tryCatch({mRNA_name <- pair_list_df$V1[i]miRNA_name <- pair_list_df$V2[i]x <- as.numeric(Esn_transcript_TPM[mRNA_name, ])y <- as.numeric(miRNAs_expressed_TPM[miRNA_name, ])if (all(is.na(x)) || all(is.na(y)) || sd(x, na.rm = TRUE) == 0 || sd(y, na.rm = TRUE) == 0) {return(data.frame(row = i, mRNA = mRNA_name, miRNA = miRNA_name, cor = NA, pvalue = NA))}test <- cor.test(x, y, method = "pearson")data.frame(row = i, mRNA = mRNA_name, miRNA = miRNA_name, cor = test$estimate, pvalue = test$p.value)}, error = function(e) {mRNA_name <- pair_list_df$V1[i]miRNA_name <- pair_list_df$V2[i]data.frame(row = i, mRNA = mRNA_name, miRNA = miRNA_name, cor = NA, pvalue = NA)})})do.call(rbind, block_result)
}# 添加结果回原始表
pair_list_df$cor <- results$cor
pair_list_df$pvalue <- results$pvalue# 可选:输出完整 results 表,包括 row、mRNA、miRNA、cor、pvalue
# View(results) 或 write.csv(results, "correlation_results.csv", row.names = FALSE)# 关闭并行线程
stopCluster(cl)

实例2 从基因gtf文件提取,将所有mRNA的位置分为100个2%的滑动窗口

library(dplyr)
library(tibble)# 可选:如果有染色体长度表 chr_len(命名向量),可用于上边界裁剪
# 例:chr_len <- c("NC_066509.1"=100000000, "NC_066510.1"=95000000, ...)
# 没有就注释掉下一行,以及下面代码中的 pmin(., chr_len[chr]) 那部分
# chr_len <- NULLsliding_windows <- tibble()   # 直接用数据框累积# 固定上下游参数
UPDN_LEN  <- 2000L
UPDN_WIN  <- max(1L, floor(UPDN_LEN * 0.02))  # 40
UPDN_STEP <- max(1L, floor(UPDN_LEN * 0.01))  # 20for (i in seq_len(nrow(mRNA_gtfdata))) {chr    <- mRNA_gtfdata$seqnames[i]s      <- as.integer(mRNA_gtfdata$start[i])e      <- as.integer(mRNA_gtfdata$end[i])strand <- as.character(mRNA_gtfdata$strand[i])gid    <- as.character(mRNA_gtfdata$gene_id[i])# ========= 1) gene body =========len     <- e - s + 1Lwin_len <- max(1L, floor(len * 0.02))step    <- max(1L, floor(len * 0.01))if (len >= win_len) {b_starts <- seq.int(s, e - win_len + 1L, by = step)b_ends   <- pmin(b_starts + win_len - 1L, e)# 1..100 相对编号(按区域起止映射;如需按转录方向,可把 frac 换成 strand-aware 的)b_centers <- (b_starts + b_ends) / 2b_frac    <- (b_centers - s) / lenb_rank    <- pmax(1L, pmin(100L, floor(b_frac * 100) + 1L))df_body <- tibble(seqnames  = chr,win_start = b_starts,win_end   = b_ends,gene_id   = gid,strand    = strand,region    = "body",rank      = b_rank)sliding_windows <- bind_rows(sliding_windows, df_body)}# ========= 2) upstream 2kb =========if (strand == "+") {us <- s - UPDN_LENue <- s - 1L} else {us <- e + 1Lue <- e + UPDN_LEN}# 下边界裁到 1,上边界如有 chr_len 可再裁一次us <- max(1L, us)# 若有 chr_len: ue <- if (is.null(chr_len)) ue else min(ue, as.integer(chr_len[chr]))if (ue - us + 1L >= UPDN_WIN) {u_starts <- seq.int(us, ue - UPDN_WIN + 1L, by = UPDN_STEP)u_ends   <- pmin(u_starts + UPDN_WIN - 1L, ue)u_centers <- (u_starts + u_ends) / 2u_frac    <- (u_centers - us) / (ue - us + 1L)u_rank    <- pmax(1L, pmin(100L, floor(u_frac * 100) + 1L))df_up <- tibble(seqnames  = chr,win_start = u_starts,win_end   = u_ends,gene_id   = gid,strand    = strand,region    = "upstream2k",rank      = u_rank)sliding_windows <- bind_rows(sliding_windows, df_up)}# ========= 3) downstream 2kb =========if (strand == "+") {ds <- e + 1Lde <- e + UPDN_LEN} else {ds <- s - UPDN_LENde <- s - 1L}ds <- max(1L, ds)# 若有 chr_len: de <- if (is.null(chr_len)) de else min(de, as.integer(chr_len[chr]))if (de - ds + 1L >= UPDN_WIN) {d_starts <- seq.int(ds, de - UPDN_WIN + 1L, by = UPDN_STEP)d_ends   <- pmin(d_starts + UPDN_WIN - 1L, de)d_centers <- (d_starts + d_ends) / 2d_frac    <- (d_centers - ds) / (de - ds + 1L)d_rank    <- pmax(1L, pmin(100L, floor(d_frac * 100) + 1L))df_down <- tibble(seqnames  = chr,win_start = d_starts,win_end   = d_ends,gene_id   = gid,strand    = strand,region    = "downstream2k",rank      = d_rank)sliding_windows <- bind_rows(sliding_windows, df_down)}print(i)
}# 结果预览
dplyr::glimpse(sliding_windows)
head(sliding_windows, 10)####  多线程版本library(doParallel)
library(foreach)
library(dplyr)
library(tibble)# 可选:染色体长度(如果有就填,避免越界;没有就留 NULL)
# chr_len <- c("NC_066509.1"=123456789, ...)
chr_len <- NULL# 上/下游固定参数
UPDN_LEN  <- 2000L
UPDN_WIN  <- max(1L, floor(UPDN_LEN * 0.02))  # 40 bp
UPDN_STEP <- max(1L, floor(UPDN_LEN * 0.01))  # 20 bp# 并行环境
n_cores <- max(1, parallel::detectCores() - 1)
cl <- makeCluster(n_cores)
registerDoParallel(cl)sliding_windows <- foreach(i = seq_len(nrow(mRNA_gtfdata)),.combine = dplyr::bind_rows,.packages = c("dplyr","tibble")) %dopar% {chr    <- mRNA_gtfdata$seqnames[i]s      <- as.integer(mRNA_gtfdata$start[i])e      <- as.integer(mRNA_gtfdata$end[i])strand <- as.character(mRNA_gtfdata$strand[i])gid    <- as.character(mRNA_gtfdata$gene_id[i])out_list <- list()# 1) gene body:窗=2%len,步=1%len;rank 随转录方向(5'->3')递增len     <- e - s + 1Lif (len > 0L) {win_len <- max(1L, floor(len * 0.02))step    <- max(1L, floor(len * 0.01))if (len >= win_len) {b_starts <- seq.int(s, e - win_len + 1L, by = step)b_ends   <- pmin(b_starts + win_len - 1L, e)centers <- (b_starts + b_ends) / 2# strand-aware:正链从 s->e,负链从 e->sfrac <- if (strand == "+") (centers - s)/len else (e - centers)/lenb_rank <- pmax(1L, pmin(100L, floor(frac * 100) + 1L))out_list$body <- tibble(seqnames  = chr,win_start = b_starts,win_end   = b_ends,gene_id   = gid,strand    = strand,region    = "body",rank      = b_rank)}}# 小工具:裁边(避免 <1;如提供 chr_len 再裁上界)clamp <- function(x, chr) {x <- pmax(1L, x)if (!is.null(chr_len) && !is.na(chr_len[chr])) x <- pmin(x, as.integer(chr_len[chr]))x}# 2) upstream 2kb(按链向定义)if (strand == "+") { us <- s - UPDN_LEN; ue <- s - 1L } else { us <- e + 1L; ue <- e + UPDN_LEN }us <- clamp(us, chr); ue <- clamp(ue, chr)if (ue - us + 1L >= UPDN_WIN) {u_starts <- seq.int(us, ue - UPDN_WIN + 1L, by = UPDN_STEP)u_ends   <- pmin(u_starts + UPDN_WIN - 1L, ue)u_centers <- (u_starts + u_ends) / 2# 远端->近端 映射到 1..100u_frac <- (u_centers - us) / (ue - us + 1L)u_rank <- pmax(1L, pmin(100L, floor(u_frac * 100) + 1L))out_list$up <- tibble(seqnames  = chr,win_start = u_starts,win_end   = u_ends,gene_id   = gid,strand    = strand,region    = "upstream2k",rank      = u_rank)}# 3) downstream 2kb(按链向定义)if (strand == "+") { ds <- e + 1L; de <- e + UPDN_LEN } else { ds <- s - UPDN_LEN; de <- s - 1L }ds <- clamp(ds, chr); de <- clamp(de, chr)if (de - ds + 1L >= UPDN_WIN) {d_starts <- seq.int(ds, de - UPDN_WIN + 1L, by = UPDN_STEP)d_ends   <- pmin(d_starts + UPDN_WIN - 1L, de)d_centers <- (d_starts + d_ends) / 2d_frac <- (d_centers - ds) / (de - ds + 1L)d_rank <- pmax(1L, pmin(100L, floor(d_frac * 100) + 1L))out_list$down <- tibble(seqnames  = chr,win_start = d_starts,win_end   = d_ends,gene_id   = gid,strand    = strand,region    = "downstream2k",rank      = d_rank)}dplyr::bind_rows(out_list)}stopCluster(cl)# 结果查看
dplyr::glimpse(sliding_windows)
head(sliding_windows, 10)####  多线程版本块并行写法
#  任务分块(chunking)
library(doParallel)
library(foreach)
library(dplyr)
library(tibble)## 并行环境 ---------------------------------------------------------------
n_cores <- max(1, parallel::detectCores() - 1)
cl <- makeCluster(n_cores)
registerDoParallel(cl)## 可选:染色体长度(命名向量;没有就设为 NULL)
# chr_len <- c("NC_066509.1"=123456789, ...)
chr_len <- NULL## 上/下游固定参数 --------------------------------------------------------
UPDN_LEN  <- 2000L
UPDN_WIN  <- max(1L, floor(UPDN_LEN*0.02))  # 40bp
UPDN_STEP <- max(1L, floor(UPDN_LEN*0.01))  # 20bp## 将任务切块:每核一块(行号均分到 n_cores 份) --------------------------
idx_all <- seq_len(nrow(mRNA_gtfdata))
block_indices <- split(idx_all, cut(idx_all, n_cores, labels = FALSE))## 开撸:块并行,每块内部用 for 循环拼成一个 data.frame 再返回 --------------
sliding_windows <- foreach(blk = block_indices,.combine = dplyr::bind_rows,.multicombine = TRUE,.maxcombine = n_cores,.packages = c("dplyr","tibble")
) %dopar% {out_block <- vector("list", length(blk))  # 先预分配,块内少拷贝k <- 0Lfor (i in blk) {chr    <- mRNA_gtfdata$seqnames[i]s      <- as.integer(mRNA_gtfdata$start[i])e      <- as.integer(mRNA_gtfdata$end[i])strand <- as.character(mRNA_gtfdata$strand[i])gid    <- as.character(mRNA_gtfdata$gene_id[i])res_list <- list()## 1) body:窗=2%len,步=1%len;rank 随转录方向(5'->3')len <- e - s + 1Lif (len > 0L) {win_len <- max(1L, floor(len * 0.02))step    <- max(1L, floor(len * 0.01))if (len >= win_len) {b_starts <- seq.int(s, e - win_len + 1L, by = step)b_ends   <- pmin(b_starts + win_len - 1L, e)centers <- (b_starts + b_ends) / 2frac    <- if (strand == "+") (centers - s)/len else (e - centers)/lenb_rank  <- pmax(1L, pmin(100L, floor(frac*100) + 1L))res_list$body <- tibble(seqnames  = chr,win_start = b_starts,win_end   = b_ends,gene_id   = gid,strand    = strand,region    = "body",rank      = b_rank)}}## clamp:裁边clamp <- function(x) {x <- pmax(1L, x)if (!is.null(chr_len) && !is.na(chr_len[chr])) x <- pmin(x, as.integer(chr_len[chr]))x}## 2) upstream 2kbif (strand == "+") { us <- s - UPDN_LEN; ue <- s - 1L } else { us <- e + 1L; ue <- e + UPDN_LEN }us <- clamp(us); ue <- clamp(ue)if (ue - us + 1L >= UPDN_WIN) {u_starts <- seq.int(us, ue - UPDN_WIN + 1L, by = UPDN_STEP)u_ends   <- pmin(u_starts + UPDN_WIN - 1L, ue)u_centers<- (u_starts + u_ends) / 2u_frac   <- (u_centers - us) / (ue - us + 1L)u_rank   <- pmax(1L, pmin(100L, floor(u_frac*100) + 1L))res_list$up <- tibble(seqnames  = chr,win_start = u_starts,win_end   = u_ends,gene_id   = gid,strand    = strand,region    = "upstream2k",rank      = u_rank)}## 3) downstream 2kbif (strand == "+") { ds <- e + 1L; de <- e + UPDN_LEN } else { ds <- s - UPDN_LEN; de <- s - 1L }ds <- clamp(ds); de <- clamp(de)if (de - ds + 1L >= UPDN_WIN) {d_starts <- seq.int(ds, de - UPDN_WIN + 1L, by = UPDN_STEP)d_ends   <- pmin(d_starts + UPDN_WIN - 1L, de)d_centers<- (d_starts + d_ends) / 2d_frac   <- (d_centers - ds) / (de - ds + 1L)d_rank   <- pmax(1L, pmin(100L, floor(d_frac*100) + 1L))res_list$down <- tibble(seqnames  = chr,win_start = d_starts,win_end   = d_ends,gene_id   = gid,strand    = strand,region    = "downstream2k",rank      = d_rank)}k <- k + 1Lout_block[[k]] <- dplyr::bind_rows(res_list)}# 把该块内所有基因的窗口拼成一个 data.frame 返回dplyr::bind_rows(out_block[seq_len(k)])
}stopCluster(cl)# 看看结果
dplyr::glimpse(sliding_windows)
head(sliding_windows, 10)

http://www.dtcms.com/a/325868.html

相关文章:

  • 澳洲增高营养品排行榜
  • 小波卷积YYDS!小波变换+CNN创新结合
  • 无人机航拍数据集|第11期 无人机人员行为目标检测YOLO数据集1868张yolov11/yolov8/yolov5可训练
  • 【bug】diff-gaussian-rasterization Windows下编译 bug 解决
  • STM32 HAL库驱动0.96寸OLED屏幕
  • 【学习】DCMM认证从“跟风“到“生存法则“的进化
  • EI检索-学术会议 | 人工智能、虚拟现实、可视化
  • react中父子数据流动和事件互相调用(和vue做比较)
  • 小杰python高级(three day)——matplotlib库
  • 关于微信小程序的笔记
  • 告别“焊武帝”时代!30-65W零外围A+C快充协议正式上线
  • Cherryusb UAC例程对接STM32内置ADC和PWM播放音乐和录音(下)=>UAC+STM32 ADC+PWM实现录音和播放
  • TradingAgents-CN: 基于多智能体的中文金融交易决策框架
  • Apache Ignite超时管理核心组件解析
  • XX生产线MES系统具体实施方案
  • 第2节 大模型分布式推理架构设计原则
  • react+echarts实现图表展示的两种方法
  • uni-app app端安卓和ios如何申请麦克风权限,唤起提醒弹框
  • 初识影刀:将多个相同格式EXCEL中内容汇总到一个EXCEL文件中去
  • HRM分层推理模型在医疗AI上的应用探析
  • LeetCode算法日记 - Day 8: 串联所有单词的子串、最小覆盖子串
  • 学习嵌入式-IMX6ULL学习——中断
  • 防火墙组网方式总结
  • 阿里发布数字人模型echomimic_v3,在视频合成的基础上支持prompt输入~
  • 计算机组成原理2-5:C语言中的数据类型及转换
  • LangChain 框架 Parser 讲解
  • LeetCode 2438.二的幂数组中查询范围内的乘积:模拟(前缀和可选)
  • 十二、Linux Shell脚本:正则表达式
  • Linux线程——线程控制及理解
  • SDRAM详细分析——01 SDRAM基础