当前位置: 首页 > news >正文

【Java工具】Java-sftp线程池上传

定时使用线程池对指定目录下的文件进行多线程上传。
1.SftpConnectionPool 工具类

import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;/*** 简单的 SFTP 连接池:每个连接包含 Session + ChannelSftp*/
public class SftpConnectionPool {private static final Logger logger = LoggerFactory.getLogger(SftpConnectionPool.class);public static class SftpConnection {private final Session session;private final ChannelSftp channel;public SftpConnection(Session session, ChannelSftp channel) {this.session = session;this.channel = channel;}public ChannelSftp getChannel() {return channel;}public Session getSession() {return session;}public boolean isConnected() {return session != null && session.isConnected() && channel != null && channel.isConnected();}public void closeQuietly() {try {if (channel != null && channel.isConnected()) channel.disconnect();} catch (Exception ignore) {}try {if (session != null && session.isConnected()) session.disconnect();} catch (Exception ignore) {}}}private final BlockingQueue<SftpConnection> pool;private final String host;private final int port;private final String user;private final String password;private final int size;private final int connectTimeoutMs = 30000;public SftpConnectionPool(int size) throws JSchException {this.host = YmlUtil.getYmlValueNonNull("sftp.host");this.port = Integer.parseInt(YmlUtil.getYmlValueNonNull("sftp.port"));this.user = YmlUtil.getYmlValueNonNull("sftp.username");this.password = YmlUtil.getYmlValueNonNull("sftp.password");this.size =  size;this.pool = new ArrayBlockingQueue<>(size);for (int i = 0; i < size; i++) {pool.offer(createConnection());}}private SftpConnection createConnection() throws JSchException {JSch jsch = new JSch();Session session = jsch.getSession(user, host, port);session.setPassword(password);session.setConfig("StrictHostKeyChecking", "no");session.connect(connectTimeoutMs);ChannelSftp channel = (ChannelSftp) session.openChannel("sftp");channel.connect(connectTimeoutMs);return new SftpConnection(session, channel);}/*** 借用连接(会阻塞直到有连接)*/public SftpConnection borrow() throws InterruptedException {return pool.take();}/*** 归还连接。如果连接不可用,会尝试替换成新连接放回池中*/public void release(SftpConnection conn) {if (conn == null) return;try {if (conn.isConnected()) {pool.offer(conn, 5, TimeUnit.SECONDS);return;}} catch (InterruptedException e) {Thread.currentThread().interrupt();}// 若不可用:销毁并替换invalidate(conn);}/*** 标记失效:关闭旧连接并尝试创建新的连接放回池中*/public void invalidate(SftpConnection conn) {try {if (conn != null) conn.closeQuietly();} catch (Exception ignore) {}try {SftpConnection newConn = createConnection();pool.offer(newConn, 5, TimeUnit.SECONDS);} catch (Exception e) {logger.error("重建 SFTP 连接失败: {}", e.getMessage());// 如果重建失败,不阻塞(池会逐渐变小),上层会在下一次 borrow 中阻塞等待或报错}}/*** 关闭池内所有连接*/public void closeAll() {SftpConnection conn;while ((conn = pool.poll()) != null) {try {conn.closeQuietly();} catch (Exception ignore) {}}}
}

2.代码调用

public void start() {if (!getEnable()) {return;}// 初始化线程池与连接池(只做一次)if (executor == null) {executor = Executors.newFixedThreadPool(poolSize);}if (sftpPool == null) {try {sftpPool = new SftpConnectionPool(poolSize);} catch (Exception e) {logger.error("初始化 SFTP 连接池失败: {}", e.getMessage(), e);}}task = new TimerTask() {@Overridepublic void run() {if (runningFlag) {logger.info("上传任务正在执行,本次任务不执行");return;}runningFlag = true;try {logger.info("上传任务开始执行");dstPath = null;upload();} catch (Exception e) {logger.error("打包上传任务失败, error: {}", e.getMessage());} finally {runningFlag = false;}}};//TODO 更新上传间隔timer.scheduleAtFixedRate(task, 0, uploadInterval * 1000);}public void close() {if (!getEnable()) {return;}logger.info("上传资源释放开始");if (task != null) {task.cancel();timer.cancel();}logger.info("上传资源定时任务关闭");// 如果需要删除 dstPath.w,使用连接池中的连接操作(保留你原有 close() 的行为)if (!StringUtils.isEmpty(dstPath) && sftpPool != null) {SftpConnectionPool.SftpConnection conn = null;try {conn = sftpPool.borrow();ChannelSftp ch = conn.getChannel();try {ch.rm(dstPath + ".w");} catch (Exception ignore) {}} catch (Exception e) {logger.warn("close 时尝试删除远端 .w 文件失败: {}", e.getMessage());} finally {if (conn != null) sftpPool.release(conn);}}// 关闭线程池与连接池if (executor != null) {executor.shutdown();try {if (!executor.awaitTermination(1, TimeUnit.MINUTES)) executor.shutdownNow();} catch (InterruptedException e) {executor.shutdownNow();Thread.currentThread().interrupt();}}if (sftpPool != null) {sftpPool.closeAll();}logger.info("上传资源释放完成");}
http://www.dtcms.com/a/326647.html

相关文章:

  • ADK[5]调用外部工具流程
  • (附源码)基于Spring Boot的4S店信息管理系统 的设计与实现
  • 每日算法刷题Day61:8.11:leetcode 堆11道题,用时2h30min
  • 【功能测试】软件集成测试思路策略与经验总结
  • HTML应用指南:利用GET请求获取全国vivo体验店门店位置信息
  • 字节后端面经
  • 内网依赖管理新思路:Nexus与CPolar的协同实践
  • Linux-FTP服务器搭建
  • 【图像算法 - 12】OpenCV-Python 入门指南:图像视频处理与可视化(代码实战 + 视频教程 + 人脸识别项目讲解)
  • DHCP服务配置与管理实战指南
  • CRMEB多商户系统(Java)v2.0更新预告:区域管理+预约商品,激活本地商业新活力!
  • NTC热敏电阻、压敏电阻和保险丝工作原理
  • FFmpeg - 基本 API大全(视频编解码相关的)
  • python每日一题练习 两个数组的交集 非常简单
  • GCN: 图卷积网络,概念以及代码实现
  • 【LeetCode刷题集】--排序(三)
  • Protocol Buffers (protobuf) API 接口完全指南
  • maven项目打包成sdk后在别的项目使用
  • 从0开始的中后台管理系统-5(部门管理以及菜单管理页面功能实现)
  • 【科研绘图系列】R语言绘制散点图折线图误差棒组合图
  • 指派问题-匈牙利算法
  • 2025牛客多校第八场 根号-2进制 个人题解
  • HTTPS应用层协议-CA签名与证书
  • Vue 3 快速入门 第六章
  • MaixPy简介
  • Projects
  • 进程管理是什么
  • DeepSeek生成的高精度大数计算器
  • 自制网页并爬取,处理异常值(第十九节课内容总结)
  • .NET/C# webapi框架下给swagger的api文档中显示注释(可下载源码)