导入利用布隆BloomFilter

package com.youlai.boot.system.utils;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.youlai.boot.common.util.ExcelUtilsA;
import com.youlai.boot.system.mapper.PhoneMapper;
import com.youlai.boot.system.model.dto.PhoneImportDTO;
import com.youlai.boot.system.model.entity.Phone;
import jakarta.annotation.Resource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;import java.nio.charset.Charset;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;@Component
public class UploadExcel {@Resourceprivate PhoneMapper phoneMapper;@Resourceprivate JdbcTemplate jdbcTemplate;private static final int BATCH_SIZE = 10000;private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;private static final AtomicInteger totalProcessed = new AtomicInteger(0);private static final AtomicInteger duplicateCount = new AtomicInteger(0);public List<String> upLoad(MultipartFile file, String name) {List<String> listCf = new ArrayList<>();long startTime = System.currentTimeMillis();try {System.out.println("(1) :");listCf.addAll(importPhoneNumbers(file,name)) ;} catch (Exception e) {e.printStackTrace();}long endTime = System.currentTimeMillis();System.out.println("导入完成,耗时: " + (endTime - startTime) / 1000 + "秒");System.out.println("处理总数: " + totalProcessed.get() + ", 重复数: " + duplicateCount.get());return listCf;}public List<String> importPhoneNumbers(MultipartFile file,String name) throws Exception {List<String> listCf =new ArrayList<>();ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);CompletionService<List<String>> completionService =new ExecutorCompletionService<>(executor);List<PhoneImportDTO> phoneNumbers = ExcelUtilsA.read(file,PhoneImportDTO.class);if(!phoneNumbers.isEmpty()){try {BloomFilter<String> yuan = yuan();List<String> list =phoneNumbers.stream().map(PhoneImportDTO::getId).toList();int totalRows = list.size();for (int start = 1; start < totalRows; start += BATCH_SIZE) {int end = Math.min(start + BATCH_SIZE, totalRows);completionService.submit(new ExcelProcessor(list, start, end));}BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()),6_000_000,0.001);int ci = 0;for (int i = 0; i < (totalRows + BATCH_SIZE - 1) / BATCH_SIZE; i++) {List<Phone> uniqueNumbers = new ArrayList<>();List<String> batch = completionService.take().get();long startTime = System.currentTimeMillis();listCf.addAll(processBatch(batch, bloomFilter,name,yuan,uniqueNumbers));long entTime = System.currentTimeMillis();System.out.println("数据筛选+分段封装耗时: " + (entTime - startTime) + " 毫秒");if (!uniqueNumbers.isEmpty()) {long s = System.currentTimeMillis();ci++;asynchronous(uniqueNumbers);long e = System.currentTimeMillis();System.out.println("数据插入表数据 第"+ci+"次 耗时: " + ( e - s) + " 毫秒");
}}} finally {executor.shutdown();executor.awaitTermination(1, TimeUnit.HOURS);}}executor.shutdown();return listCf;}
private List<String> processBatch(List<String> phoneNumbers, BloomFilter<String> bloomFilter,String name,BloomFilter<String> yuan,List<Phone> uniqueNumbers) {List<String> listCf = new ArrayList<>();LocalDateTime time = LocalDateTime.now();for (String phone : phoneNumbers) {if (!bloomFilter.mightContain(phone)) {if(!yuan.mightContain(phone)){Phone s = new Phone();bloomFilter.put(phone);s.setCreateTime(time);s.setId(phone);s.setName(name);uniqueNumbers.add(s);yuan.put(phone);}else {listCf.add(phone);}} else {duplicateCount.incrementAndGet();}totalProcessed.incrementAndGet();}return listCf;}private void asynchronous(List<Phone> uniqueNumbers){CompletableFuture.runAsync(() -> {batchInsertToDatabase(uniqueNumbers);System.out.println("Task running asynchronously");});}private void batchInsertToDatabase(List<Phone> phoneNumbers) {if(!phoneNumbers.isEmpty()){phoneMapper.saveAll(phoneNumbers);}}private BloomFilter<String> yuan(){BloomFilter<String> yuanBloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()),6_000_000_0,0.001);List<String> listAll = geng(yuanBloomFilter.approximateElementCount());if(!listAll.isEmpty()){List<CompletableFuture<Void>> futures = listAll.stream().map(data -> CompletableFuture.runAsync(() -> {if(!yuanBloomFilter.mightContain(data)){yuanBloomFilter.put(data);}})).collect(Collectors.toList());CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); }return yuanBloomFilter;}private List<String> geng(long yuanBloomFilterCount){long startTime = System.currentTimeMillis();List<String> listAll = new ArrayList<>();long countAll = phoneMapper.countAll();if(yuanBloomFilterCount<countAll){int countLimit = 1000;if(checkOddEven((int)countAll)){countLimit = (int)countAll/20+20;}else {countLimit =(int)countAll/20+20;}if (countAll<500000){listAll.addAll(phoneMapper.phoneIdList());}else {int offset = 0;int limit = countLimit;while (true) {List<String> records =new ArrayList<>();
yibuList(offset,limit,records);if (records.isEmpty()) break;listAll.addAll(records);offset += limit;}}}long entTime = System.currentTimeMillis();System.out.println("读取原数据耗时: " + (entTime - startTime) + " 毫秒");return listAll;}private void yibuList(int offset,int limit,List<String> listAll){CompletableFuture.runAsync(() -> {listAll.addAll(jdbcTemplate.queryForList("SELECT id FROM phone LIMIT " + limit + " OFFSET " + offset, String.class));System.out.println("Task running asynchronously");});}public boolean checkOddEven(int number) {if (number % 2 == 0) {return true;}return false;}public void zao(){LocalDateTime l = LocalDateTime.now();Long yua = 18645550314L;for(int i=0; i<100; i++){List<Phone> phoneNumbers = new ArrayList<>();for(int j = 0; j<50000; j++){yua++;Phone p = new Phone();p.setId(yua.toString());p.setName("测试");p.setCreateTime(l);phoneNumbers.add(p);}
}}static class ExcelProcessor implements Callable<List<String>> {private final List<String> phoneNumbers;private final int startRow;private final int endRow;public ExcelProcessor(List<String> phoneNumbers, int startRow, int endRow) {this.phoneNumbers = phoneNumbers;this.startRow = startRow;this.endRow = endRow;}@Overridepublic List<String> call() {List<String> numbers = new ArrayList<>();for (int i = startRow; i < endRow; i++) {String p = phoneNumbers.get(i);if(Objects.nonNull(p)){numbers.add(p);}}return numbers;}}
}