当前位置: 首页 > news >正文

数据批处理(队列方式)

数据批处理(队列方式)

public class DataProcessor {

    private static final int THREAD_COUNT = 4;
    private static final int QUEUE_SIZE = 10;

    private LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);

    public DataProcessor() {
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);

        for (int i = 0; i < THREAD_COUNT; i++) {
            executor.execute(() -> {
                try {
                    while (true) {
                        Data data = queue.take();
                        processData(data);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
    }

    public void addData(Data data) {
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void processData(Data data) {
        // Process data here
        System.out.println("Processing data: " + data);
    }

    public static void main(String[] args) {
        DataProcessor processor = new DataProcessor();

        // Add data to be processed
        for (int i = 0; i < 20; i++) {
            Data data = new Data("Data " + i);
            processor.addData(data);
        }
    }

    static class  Data {
        private String value;

        public Data(String value) {
            this.value = value;
        }

        @Override
        public String toString() {
            return value;
        }
    }

}

参考网址:
一:https://blog.csdn.net/zhizhengguan/article/details/86551270
二:https://blog.csdn.net/qq_41128049/article/details/134442487

数据批处理入库(线程池方式)

    public void reSync() {
        // 超过半小时状态为1的数据状态重置0
        Date date = DateUtil.getAddMinuteDate(new Date(), -30);
        baseMapper.resetTimeoutStatus(date);
        // 查询推送失败且失败次数小于6次的
        List<CardRecordSync> list = baseMapper.listFail();
        if (list.isEmpty()) {
            return;
        }
        log.info("待处理补推记录数size={}", list.size());
        List<List<CardRecordSync>> partition = Lists.partition(list, 100);
        for (List<CardRecordSync> syncs : partition) {
            executor.execute(() -> {
                List<Long> ids = syncs.stream().map(CardRecordSync::getId).collect(Collectors.toList());
                // 状态变更为处理中
                EntityWrapper<CardRecordSync> wrapper = new EntityWrapper<>();
                wrapper.in("ID", ids);
                CardRecordSync po = new CardRecordSync();
                po.setStatus(CardRecordSyncStatus.PUSHING.getStatus());
                baseMapper.update(po, wrapper);

                syncs.forEach(record -> {
//                    boolean flag = thirdService.cardRecordSync(record.getSyncParam());
                    DataSyncResult dataSyncResult = thirdService.cardRecordSync(record.getSyncParam());
                    boolean flag = dataSyncResult.isDataSyncSuc();
                    int status = CardRecordSyncStatus.FAIL.getStatus();
                    if (flag) {
                        status = CardRecordSyncStatus.SUCCESS.getStatus();
                    }
                    baseMapper.updateStatusDescById(dataSyncResult.getDataSyncSucDesc(),status, record.getId());
                });
            });
        }
    }

相关文章:

  • 高并发场景下如何实现消息精准一次消费?实战Java幂等性设计
  • 如何阅读webpack-bundle-analyzer分析生成的图
  • MySQL regexp 命令
  • C++基础(VScode环境安装)
  • MyBatis 的配置对象 Configuration 作用详解
  • 【QT】QScrollBar设置样式:圆角、隐藏箭头、上边距等
  • Qt配置OpenGL相机踩的坑
  • 蓝桥杯 C++ b组 统计子矩阵双指针+一维前缀和
  • 【2025深夜随笔】简单认识一下Android Studio
  • Redis 缓存穿透、缓存击穿与缓存雪崩详解:问题、解决方案与最佳实践
  • C语言一维数组
  • SD模型进阶学习全攻略(三)
  • 深入理解Mesa:Linux图形渲染背后的开源力量
  • OSPF总结
  • 正则表达式快速入门
  • MyBatis 中SQL 映射文件是如何与 Mapper 接口关联起来的? MyBatis 如何知道应该调用哪个 SQL 语句?
  • 高校数字素养通识教育解决方案
  • 饮食调治痉挛性斜颈,开启健康生活
  • 【python运行Janus-Pro-1B文生图功能】
  • 可视化图解算法:链表指定区间反转
  • 图忆|上海车展40年:中国人的梦中情车有哪些变化(上)
  • 海南机场拟超23亿元收购美兰空港控股权,进一步聚焦机场主业
  • 全文丨中华人民共和国传染病防治法
  • 小核酸药物企业瑞博生物递表港交所,去年亏损2.81亿元
  • 中国科学院院士张泽民已任重庆医科大学校长
  • 住房和城乡建设部办公厅主任李晓龙已任部总工程师