【Java工具】Java-sftp线程池上传
定时使用线程池对指定目录下的文件进行多线程上传。
1.SftpConnectionPool 工具类
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;/*** 简单的 SFTP 连接池:每个连接包含 Session + ChannelSftp*/
public class SftpConnectionPool {private static final Logger logger = LoggerFactory.getLogger(SftpConnectionPool.class);public static class SftpConnection {private final Session session;private final ChannelSftp channel;public SftpConnection(Session session, ChannelSftp channel) {this.session = session;this.channel = channel;}public ChannelSftp getChannel() {return channel;}public Session getSession() {return session;}public boolean isConnected() {return session != null && session.isConnected() && channel != null && channel.isConnected();}public void closeQuietly() {try {if (channel != null && channel.isConnected()) channel.disconnect();} catch (Exception ignore) {}try {if (session != null && session.isConnected()) session.disconnect();} catch (Exception ignore) {}}}private final BlockingQueue<SftpConnection> pool;private final String host;private final int port;private final String user;private final String password;private final int size;private final int connectTimeoutMs = 30000;public SftpConnectionPool(int size) throws JSchException {this.host = YmlUtil.getYmlValueNonNull("sftp.host");this.port = Integer.parseInt(YmlUtil.getYmlValueNonNull("sftp.port"));this.user = YmlUtil.getYmlValueNonNull("sftp.username");this.password = YmlUtil.getYmlValueNonNull("sftp.password");this.size = size;this.pool = new ArrayBlockingQueue<>(size);for (int i = 0; i < size; i++) {pool.offer(createConnection());}}private SftpConnection createConnection() throws JSchException {JSch jsch = new JSch();Session session = jsch.getSession(user, host, port);session.setPassword(password);session.setConfig("StrictHostKeyChecking", "no");session.connect(connectTimeoutMs);ChannelSftp channel = (ChannelSftp) session.openChannel("sftp");channel.connect(connectTimeoutMs);return new SftpConnection(session, channel);}/*** 借用连接(会阻塞直到有连接)*/public SftpConnection borrow() throws InterruptedException {return pool.take();}/*** 归还连接。如果连接不可用,会尝试替换成新连接放回池中*/public void release(SftpConnection conn) {if (conn == null) return;try {if (conn.isConnected()) {pool.offer(conn, 5, TimeUnit.SECONDS);return;}} catch (InterruptedException e) {Thread.currentThread().interrupt();}// 若不可用:销毁并替换invalidate(conn);}/*** 标记失效:关闭旧连接并尝试创建新的连接放回池中*/public void invalidate(SftpConnection conn) {try {if (conn != null) conn.closeQuietly();} catch (Exception ignore) {}try {SftpConnection newConn = createConnection();pool.offer(newConn, 5, TimeUnit.SECONDS);} catch (Exception e) {logger.error("重建 SFTP 连接失败: {}", e.getMessage());// 如果重建失败,不阻塞(池会逐渐变小),上层会在下一次 borrow 中阻塞等待或报错}}/*** 关闭池内所有连接*/public void closeAll() {SftpConnection conn;while ((conn = pool.poll()) != null) {try {conn.closeQuietly();} catch (Exception ignore) {}}}
}
2.代码调用
public void start() {if (!getEnable()) {return;}// 初始化线程池与连接池(只做一次)if (executor == null) {executor = Executors.newFixedThreadPool(poolSize);}if (sftpPool == null) {try {sftpPool = new SftpConnectionPool(poolSize);} catch (Exception e) {logger.error("初始化 SFTP 连接池失败: {}", e.getMessage(), e);}}task = new TimerTask() {@Overridepublic void run() {if (runningFlag) {logger.info("上传任务正在执行,本次任务不执行");return;}runningFlag = true;try {logger.info("上传任务开始执行");dstPath = null;upload();} catch (Exception e) {logger.error("打包上传任务失败, error: {}", e.getMessage());} finally {runningFlag = false;}}};//TODO 更新上传间隔timer.scheduleAtFixedRate(task, 0, uploadInterval * 1000);}public void close() {if (!getEnable()) {return;}logger.info("上传资源释放开始");if (task != null) {task.cancel();timer.cancel();}logger.info("上传资源定时任务关闭");// 如果需要删除 dstPath.w,使用连接池中的连接操作(保留你原有 close() 的行为)if (!StringUtils.isEmpty(dstPath) && sftpPool != null) {SftpConnectionPool.SftpConnection conn = null;try {conn = sftpPool.borrow();ChannelSftp ch = conn.getChannel();try {ch.rm(dstPath + ".w");} catch (Exception ignore) {}} catch (Exception e) {logger.warn("close 时尝试删除远端 .w 文件失败: {}", e.getMessage());} finally {if (conn != null) sftpPool.release(conn);}}// 关闭线程池与连接池if (executor != null) {executor.shutdown();try {if (!executor.awaitTermination(1, TimeUnit.MINUTES)) executor.shutdownNow();} catch (InterruptedException e) {executor.shutdownNow();Thread.currentThread().interrupt();}}if (sftpPool != null) {sftpPool.closeAll();}logger.info("上传资源释放完成");}