当前位置: 首页 > news >正文

java 导入利用布隆BloomFilter

导入利用布隆BloomFilter

在这里插入图片描述

package com.youlai.boot.system.utils;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.youlai.boot.common.util.ExcelUtilsA;
import com.youlai.boot.system.mapper.PhoneMapper;
import com.youlai.boot.system.model.dto.PhoneImportDTO;
import com.youlai.boot.system.model.entity.Phone;
import jakarta.annotation.Resource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;import java.nio.charset.Charset;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;@Component
public class UploadExcel {@Resourceprivate PhoneMapper phoneMapper;@Resourceprivate JdbcTemplate jdbcTemplate;// 配置参数private static final int BATCH_SIZE = 10000;private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;// 使用线程安全的计数器private static final AtomicInteger totalProcessed = new AtomicInteger(0);private static final AtomicInteger duplicateCount = new AtomicInteger(0);public List<String> upLoad(MultipartFile file, String name) {List<String> listCf = new ArrayList<>();long startTime = System.currentTimeMillis();try {System.out.println("(1) :");listCf.addAll(importPhoneNumbers(file,name)) ;} catch (Exception e) {e.printStackTrace();}long endTime = System.currentTimeMillis();System.out.println("导入完成,耗时: " + (endTime - startTime) / 1000 + "秒");System.out.println("处理总数: " + totalProcessed.get() + ", 重复数: " + duplicateCount.get());return listCf;}public List<String> importPhoneNumbers(MultipartFile file,String name) throws Exception {List<String> listCf =new ArrayList<>();// 创建线程池ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);// 使用CompletionService管理任务结果CompletionService<List<String>> completionService =new ExecutorCompletionService<>(executor);List<PhoneImportDTO> phoneNumbers =  ExcelUtilsA.read(file,PhoneImportDTO.class);if(!phoneNumbers.isEmpty()){try {BloomFilter<String> yuan =  yuan();List<String> list =phoneNumbers.stream().map(PhoneImportDTO::getId).toList();int totalRows = list.size();// 分片处理Excel,每个任务处理一部分行for (int start = 1; start < totalRows; start += BATCH_SIZE) {int end = Math.min(start + BATCH_SIZE, totalRows);completionService.submit(new ExcelProcessor(list, start, end));}// 创建布隆过滤器 (实际生产环境应使用分布式布隆过滤器如RedisBloom)BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()),6_000_000,0.001);int ci = 0;for (int i = 0; i < (totalRows + BATCH_SIZE - 1) / BATCH_SIZE; i++) {// 格式化当前cell对应数据为StringList<Phone> uniqueNumbers = new ArrayList<>();List<String> batch = completionService.take().get();long startTime = System.currentTimeMillis();listCf.addAll(processBatch(batch, bloomFilter,name,yuan,uniqueNumbers));long entTime = System.currentTimeMillis();System.out.println("数据筛选+分段封装耗时: " + (entTime - startTime) + " 毫秒");// 批量插入数据库if (!uniqueNumbers.isEmpty()) {long s = System.currentTimeMillis();ci++;asynchronous(uniqueNumbers);long e = System.currentTimeMillis();System.out.println("数据插入表数据 第"+ci+"次  耗时: " + ( e - s) + " 毫秒");
//                        batchInsertToDatabase(uniqueNumbers);}}} finally {executor.shutdown();executor.awaitTermination(1, TimeUnit.HOURS);}}// 关闭线程池executor.shutdown();return listCf;}
//    cash/applyOutOfflineCashprivate List<String> processBatch(List<String> phoneNumbers, BloomFilter<String> bloomFilter,String name,BloomFilter<String> yuan,List<Phone> uniqueNumbers) {List<String> listCf = new ArrayList<>();LocalDateTime time = LocalDateTime.now();for (String phone : phoneNumbers) {if (!bloomFilter.mightContain(phone)) {if(!yuan.mightContain(phone)){Phone s = new Phone();bloomFilter.put(phone);s.setCreateTime(time);s.setId(phone);s.setName(name);uniqueNumbers.add(s);yuan.put(phone);}else {// 重复手机号listCf.add(phone);}} else {duplicateCount.incrementAndGet();}totalProcessed.incrementAndGet();}return listCf;}// 异步插入private void asynchronous(List<Phone> uniqueNumbers){CompletableFuture.runAsync(() -> {batchInsertToDatabase(uniqueNumbers);System.out.println("Task running asynchronously");});}/*** JDBC 批量插入或JPA批量处理* @param phoneNumbers*/private void batchInsertToDatabase(List<Phone> phoneNumbers) {// 使用JDBC批量插入或JPA批量处理// 这里简化为伪代码if(!phoneNumbers.isEmpty()){phoneMapper.saveAll(phoneNumbers);}}/*** 获取表中全部数据 大数据表分段查询* @return*/private BloomFilter<String> yuan(){BloomFilter<String> yuanBloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()),6_000_000_0,0.001);// 获取表信息策略List<String> listAll = geng(yuanBloomFilter.approximateElementCount());if(!listAll.isEmpty()){// 处理每个数据项的异步任务List<CompletableFuture<Void>> futures = listAll.stream().map(data -> CompletableFuture.runAsync(() -> {if(!yuanBloomFilter.mightContain(data)){yuanBloomFilter.put(data);}})).collect(Collectors.toList());CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); // 等待所有任务完成}return yuanBloomFilter;}// 更新策略private List<String> geng(long yuanBloomFilterCount){long startTime = System.currentTimeMillis();List<String> listAll = new ArrayList<>();long countAll = phoneMapper.countAll();if(yuanBloomFilterCount<countAll){int countLimit = 1000;if(checkOddEven((int)countAll)){countLimit = (int)countAll/20+20;}else {countLimit =(int)countAll/20+20;}if (countAll<500000){listAll.addAll(phoneMapper.phoneIdList());}else {int offset = 0;int limit = countLimit;while (true) {List<String> records =new ArrayList<>();
//                    List<String> records = jdbcTemplate.queryForList("SELECT id FROM phone LIMIT " + limit + " OFFSET " + offset, String.class);yibuList(offset,limit,records);if (records.isEmpty()) break;listAll.addAll(records);// 处理recordsoffset += limit;}}}long entTime = System.currentTimeMillis();System.out.println("读取原数据耗时: " + (entTime - startTime) + " 毫秒");return listAll;}// 异步读取多线程读取 非阻private void yibuList(int offset,int limit,List<String> listAll){CompletableFuture.runAsync(() -> {listAll.addAll(jdbcTemplate.queryForList("SELECT id FROM phone LIMIT " + limit + " OFFSET " + offset, String.class));System.out.println("Task running asynchronously");});}// 取模5数判断public boolean checkOddEven(int number) {if (number % 2 == 0) {return true;}return false;}// 造数据public void zao(){LocalDateTime l =  LocalDateTime.now();Long yua =  18645550314L;for(int i=0; i<100; i++){List<Phone> phoneNumbers = new ArrayList<>();for(int j = 0; j<50000; j++){yua++;Phone p = new Phone();p.setId(yua.toString());p.setName("测试");p.setCreateTime(l);phoneNumbers.add(p);}
//            batchInsertToDatabase(phoneNumbers);}}static class ExcelProcessor implements Callable<List<String>> {private final List<String> phoneNumbers;private final int startRow;private final int endRow;public ExcelProcessor(List<String> phoneNumbers, int startRow, int endRow) {this.phoneNumbers = phoneNumbers;this.startRow = startRow;this.endRow = endRow;}@Overridepublic List<String> call() {List<String> numbers = new ArrayList<>();for (int i = startRow; i < endRow; i++) {String p = phoneNumbers.get(i);if(Objects.nonNull(p)){numbers.add(p);}}return numbers;}}
}
http://www.dtcms.com/a/274805.html

相关文章:

  • SSE事件流简单示例
  • Paimon 写入磁盘文件名字生成机制
  • 2025年NSSCTF-青海民族大学 2025 新生赛WP
  • 学习C++、QT---20(C++的常用的4种信号与槽、自定义信号与槽的讲解)
  • 动力系统模拟与推导-AI云计算数值分析和代码验证
  • BLE低功耗设计:从广播模式到连接参数优化的全链路分析与真题解析
  • Django母婴商城项目实践(一)
  • 【JMeter】接口加密
  • 蜗轮丝杆升降机拆装图
  • 在多个DHCP服务器的网络环境中选择指定的DHCP服务
  • Windows GNU Radio避坑
  • 深入探究编程拷贝
  • mysql的性能优化:组提交、数据页复用、全表扫描优化、刷脏页
  • Vue 表单开发避坑指南:从响应式数据到动态规则的实践总结
  • Go 编译报错排查:vendor/golang.org/x/crypto/cryptobyte/asn1 no Go source files
  • Java外包怎么选?这几点不注意,项目可能血亏!
  • day21——特殊文件:XML、Properties、以及日志框架
  • Linux中geoserver中文乱码
  • 离线环境二进制安装docker
  • uniapp获取状态栏高度,胶囊按钮的高度,底部安全区域的高度,自定义导航栏
  • [实战]调频三角波和锯齿波信号生成(完整C代码)
  • hbuilderx打包的应用上传苹果应用商店最简方法
  • 字节豆包又一个新功能,超级实用,4 种玩法,你肯定用得上!(建议收藏)
  • Uniapp视频聊天软件内容监控插件开发指南
  • OA系统中的搜索功能方案:简单搜索vs高级搜索
  • 2-Git提交本地项目到远程仓库
  • 问有几条病狗?
  • 【linux网络】深入理解 TCP/UDP:从基础端口号到可靠传输机制全解析
  • 机器学习-06(Optimization-自动调整学习率)
  • consul 的安装与服务发现