当前位置: 首页 > news >正文

聊城网站建设信息兴义网站建设网站建设

聊城网站建设信息,兴义网站建设网站建设,wordpress 外链自动nofflow,本周热点新闻事件项目需要处理一堆表,这些表数据量不是很大都有经纬度信息,但是这些表的数据没有流域信息,需要按经纬度信息计算所属流域信息。比较简单的项目,按DeepSeek提示思索完成开发,AI真好用。 阿里AI个人版本IDEA安装 IDEA中使…

         项目需要处理一堆表,这些表数据量不是很大都有经纬度信息,但是这些表的数据没有流域信息,需要按经纬度信息计算所属流域信息。比较简单的项目,按DeepSeek提示思索完成开发,AI真好用。

       阿里AI个人版本IDEA安装

      IDEA中使用DeepSeek满血版的手把手教程来了!

代码实现

1、controller

/*** 批量流域处理任务*/
@Tag(name = "批量流域处理任务")
@ApiSupport(order = 2)
@RequestMapping("/job")
@RestController
public class SysBatchJobController {@AutowiredJobLauncher jobLauncher;@AutowiredJobOperator jobOperator;@Autowired@Qualifier("updateWaterCodeJob")private Job updateWaterCodeJob;// 多线程分页更新数据@GetMapping("/asyncJob")public void asyncJob() throws Exception {JobParameters jobParameters = new JobParametersBuilder().addLong("time",System.currentTimeMillis()).toJobParameters();JobExecution run = jobLauncher.run(updateWaterCodeJob, jobParameters);run.getId();}}

2、批量处理表

/*** 需要批量处理的业务表信息*/
@Builder
@AllArgsConstructor
@Data
@TableName(value = "ads_t_sys_batch_update_table")
public class SysBatchUpdateTable extends BaseEntity implements Serializable {private static final long serialVersionUID = -7367871287146067724L;@TableId(type = IdType.ASSIGN_UUID)private String pkId;/*** 需要更新的表名**/@TableField(value = "table_name")private String tableName;/*** 所需更新表所在数据库ID**/@TableField(value = "data_source_id")private String dataSourceId;/*** 表对应的主键字段**/@TableField(value = "key_id")private String keyId;/*** 表对应的流域字段**/@TableField(value = "water_code_column")private String waterCodeColumn;/*** 表对应的经度字段**/@TableField(value = "lon_column")private String lonColumn;/*** 表对应的纬度字段**/@TableField(value = "lat_column")private String latColumn;public SysBatchUpdateTable() {}}

3、Mapper,传递参数比较麻烦,可以考虑将参数动态整合到sql里面构造

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bigdatacd.panorama.system.domain.po.SysBatchUpdateTable;
import org.apache.ibatis.annotations.Mapper;import java.util.List;
import java.util.Map;@Mapper
public interface UpdateTableMapper extends BaseMapper<SysBatchUpdateTable> {/*** 根据表名分页查询数据* @param tableName 表名* @return*/List<Map<String, Object>> selectUpdateTableByPage(String tableName);/*** 更新数据* @param tableName 表名* @param waterCode 流域编码* @param pkId  表主键ID*/void updateWaterCode(String tableName,String waterCode,String pkId);
}<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.bigdatacd.panorama.system.mapper.UpdateTableMapper"><!-- 动态分页查询通过#和$区别动态构造更新所需参数 --><select id="selectUpdateTableByPage" resultType="java.util.HashMap"><!--如果有分页查询就直接使用分页查询sql-->SELECT${keyId} as pkId,#{keyId} as keyId,${waterCodeColumn} as waterCode,#{waterCodeColumn} as waterCodeColumn,${lonColumn} as lon,${latColumn} as lat,#{tableName} as tableNameFROM ${tableName}  a  where ${waterCodeColumn} is nullORDER BY ${keyId} <!-- 确保分页顺序 -->LIMIT #{_pagesize} OFFSET #{_skiprows}</select><!-- 动态更新 --><update id="updateWaterCode">UPDATE ${tableName}SET ${waterCodeColumn} = #{waterCode}WHERE ${keyId} = #{pkId} <!-- 假设主键为id --></update>
</mapper>

4、配置文件

Springbatch:job:enabled: false   #启动时不启动jobjdbc:initialize-schema: always  sql:init:schema-locations: classpath:/org/springframework/batch/core/schema-mysql.sql    数据源url加个批处理参数rewriteBatchedStatements=trueurl: jdbc:mysql://localhost:3306/xxxx?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&&serverTimezone=GMT%2b8&useSSL=false&rewriteBatchedStatements=true

5、主配置类调整(按表分区)

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.HashMap;
import java.util.Map;// 1. 主配置类调整(按表分区)
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate SqlSessionFactory sqlSessionFactory;/*** 主任务** @return*/@Bean("updateWaterCodeJob")public Job updateWaterCodeJob(@Qualifier("masterStep") Step masterStep) {return jobBuilderFactory.get("updateWaterCodeJob").start(masterStep).build();}@Bean("masterStep")public Step masterStep(@Qualifier("updateBatchTableData") Step updateBatchTableData,@Qualifier("multiTablePartitioner") MultiTablePartitioner multiTablePartitioner) {return stepBuilderFactory.get("masterStep").partitioner(updateBatchTableData.getName(), multiTablePartitioner) // 分区器按表名分区一个表一个分区.step(updateBatchTableData).gridSize(10) // 按表分区了 并发数一般设置为核心数.taskExecutor(taskExecutor()).listener(new BatchJobListener()).build();}// 线程池配置(核心线程数=表数量)@Bean("batchTaskExecutor")public TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setThreadNamePrefix("table-processor-");return executor;}/*** 处理分页数据更新步骤* @return*/@Bean("updateBatchTableData")public Step updateBatchTableData(@Qualifier("dynamicTableReader") MyBatisPagingItemReader<Map<String, Object>> myBatisPagingItemReader,@Qualifier("batchUpdateWriter") BatchUpdateWriter batchUpdateWriter,@Qualifier("tableProcessor") TableProcessor tableProcessor) {return stepBuilderFactory.get("updateBatchTableData").<Map<String, Object>, Map<String, Object>>chunk(100).reader(myBatisPagingItemReader).processor(tableProcessor).writer(batchUpdateWriter).faultTolerant().skipPolicy(new AlwaysSkipItemSkipPolicy()).build();}/*** 分页获取需要更新的表数据* @return*/@Bean@StepScopepublic MyBatisPagingItemReader<Map<String, Object>> dynamicTableReader(@Value("#{stepExecutionContext['keyId']}") String keyId, //需要更新的表ID字段@Value("#{stepExecutionContext['waterCodeColumn']}") String waterCodeColumn,// 需要更新的流域字段@Value("#{stepExecutionContext['lonColumn']}") String lonColumn,// 经度纬度字段@Value("#{stepExecutionContext['latColumn']}") String latColumn,// 经度纬度字段@Value("#{stepExecutionContext['tableName']}") String tableName // 需要更新的表名) {MyBatisPagingItemReader<Map<String, Object>> reader = new MyBatisPagingItemReader<>();reader.setSqlSessionFactory(sqlSessionFactory);reader.setQueryId("com.bigdatacd.panorama.system.mapper.UpdateTableMapper.selectUpdateTableByPage");Map<String,Object> param = new HashMap<>();param.put("keyId",keyId);param.put("waterCodeColumn",waterCodeColumn);param.put("lonColumn",lonColumn);param.put("latColumn",latColumn);param.put("tableName",tableName);reader.setParameterValues(param);reader.setPageSize(2000);return reader;}}import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Map;// 批量更新Writer
@Component("batchUpdateWriter")
@StepScope
public class BatchUpdateWriter implements ItemWriter<Map<String, Object>> {@Autowiredprivate NamedParameterJdbcTemplate jdbcTemplate;@Overridepublic void write(List<? extends Map<String, Object>> items) {// 构造动态sql更新数据StringBuilder sb = new StringBuilder();sb.append("UPDATE ");sb.append((String) items.get(0).get("tableName"));sb.append(" SET ");sb.append((String) items.get(0).get("waterCodeColumn"));sb.append(" = :waterCode");sb.append(" WHERE ");sb.append((String) items.get(0).get("keyId"));sb.append(" = :pkId");jdbcTemplate.batchUpdate(sb.toString(), items.stream().map(item -> new MapSqlParameterSource().addValue("waterCode", item.get("waterCode")).addValue("tableName", item.get("tableName")).addValue("waterCodeColumn", item.get("waterCodeColumn")).addValue("keyId", item.get("keyId")).addValue("pkId", item.get("pkId"))).toArray(SqlParameterSource[]::new));}
}import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Component
@Slf4j
public class MultiTablePartitioner implements Partitioner {private final DataSource dataSource;public MultiTablePartitioner(DataSource dataSource) {this.dataSource = dataSource;}@Overridepublic Map<String, ExecutionContext> partition(int gridSize) {JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);String sql = "SELECT key_id as keyId,water_code_column as waterCodeColumn,lon_column as lonColumn,lat_column as latColumn,page_sql as pageSql,table_name as tableName FROM ads_t_sys_batch_update_table where deleted = 0 and data_status = '0'";List<Map<String,Object>> tables = jdbcTemplate.queryForList(sql);log.info("查询" + sql);Map<String, ExecutionContext> partitions = new HashMap<>();for (int i = 0; i < tables.size(); i++) {ExecutionContext ctx = new ExecutionContext();// 将需要传递的参数放到上下文中,用于动态批量更新的sql用ctx.putString("keyId", String.valueOf(tables.get(i).get("keyId")));ctx.putString("waterCodeColumn", String.valueOf(tables.get(i).get("waterCodeColumn")));ctx.putString("lonColumn", String.valueOf(tables.get(i).get("lonColumn")));ctx.putString("latColumn", String.valueOf(tables.get(i).get("latColumn")));//ctx.putString("pageSql", String.valueOf(tables.get(i).get("pageSql")));ctx.putString("tableName", String.valueOf(tables.get(i).get("tableName")));partitions.put("partition" + i, ctx);}return partitions;}
}import com.bigdatacd.panorama.common.utils.GeoJsonUtil;
import lombok.Builder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;import java.util.Map;// 处理数据的经纬度所在流域
@Component("tableProcessor")
@Builder
public class TableProcessor implements ItemProcessor<Map<String, Object>, Map<String, Object>> {@Overridepublic Map<String, Object> process(Map<String, Object> item) {// 处理数据经纬度查找对应的流域item.put("waterCode", GeoJsonUtil.getWaterCode(Double.parseDouble(item.get("lon").toString()), Double.parseDouble(item.get("lat").toString())));return item;}
}import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;/*** Job 监听*/
public class BatchJobListener implements JobExecutionListener {private long beingTime;private long endTime;@Overridepublic void beforeJob(JobExecution jobExecution) {beingTime = System.currentTimeMillis();System.out.println(jobExecution.getJobInstance().getJobName() + "  beforeJob...... " + beingTime);}@Overridepublic void afterJob(JobExecution jobExecution) {endTime = System.currentTimeMillis();System.out.println(jobExecution.getJobInstance().getJobName() + "  afterJob...... " + endTime);System.out.println(jobExecution.getJobInstance().getJobName()  + "一共耗耗时:【" + (endTime - beingTime) + "】毫秒");}}

 6、通过经纬度计算流域工具类

import lombok.extern.slf4j.Slf4j;
import org.geotools.feature.FeatureCollection;
import org.geotools.feature.FeatureIterator;
import org.geotools.geojson.feature.FeatureJSON;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.locationtech.jts.geom.*;
import org.opengis.feature.Feature;
import org.opengis.feature.Property;import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;/*** @Description: GeoJSON工具类* @author: Mr.xulong* @date: 2023年01月09日 14:39*/
@Slf4j
public class GeoJsonUtil {/*public static void main(String[] args) {try {FeatureCollection featureCollection = getFeatureCollection("sichuanliuyu.json");double x = 106.955085;double y = 32.09546061139062;System.out.println(JSON.toJSONString(properties(x,y,featureCollection)));} catch (Exception e) {e.printStackTrace();}}*/private static String geoJsonFilePath = "sichuanliuyu.json";private GeoJsonUtil() {}/*** 获取区域数据集合** @return*/public static FeatureCollection getFeatureCollection() {// 读取 GeoJson 文件InputStream resourceAsStream = GeoJsonUtil.class.getResourceAsStream("/json/" + geoJsonFilePath);FeatureJSON featureJSON = new FeatureJSON();try {return featureJSON.readFeatureCollection(resourceAsStream);} catch (IOException e) {e.printStackTrace();}return null;}/*** 判断指定区域集合是否包含某个点* @param longitude* @param latitude* @param featureCollection* @return*/public static boolean contains(double longitude, double latitude, FeatureCollection featureCollection) {FeatureIterator features = featureCollection.features();try {while (features.hasNext()) {Feature next = features.next();if (isContains(longitude, latitude, next)) {return true;}}} finally {features.close();}return false;}/*** 判断指定区域集合是否包含某个点,如果包含,则返回所需属性* @param longitude* @param latitude* @param featureCollection* @return*/public static Map<String, Object> properties(double longitude, double latitude, FeatureCollection featureCollection) {FeatureIterator features = featureCollection.features();try {while (features.hasNext()) {Feature next = features.next();boolean contains = isContains(longitude, latitude, next);// 如果点在面内则返回所需属性if (contains) {HashMap<String, Object> properties = new HashMap<>();properties.put("waterCode", next.getProperty("FID").getValue());properties.put("waterName", next.getProperty("name").getValue());return properties;}}} finally {features.close();}return null;}/*** 判断指定区域集合是否包含某个点,如果包含,则返回所需属性* @param longitude* @param latitude* @return*/public static Map<String, Object> properties(double longitude, double latitude) {FeatureCollection featureCollection = getFeatureCollection();FeatureIterator features = featureCollection.features();try {while (features.hasNext()) {Feature next = features.next();boolean contains = isContains(longitude, latitude, next);// 如果点在面内则返回所需属性if (contains) {HashMap<String, Object> properties = new HashMap<>();properties.put("waterCode", next.getProperty("FID").getValue());properties.put("waterName", next.getProperty("name").getValue());return properties;}}} finally {features.close();}return null;}/*** 判断指定区域集合是否包含某个点,如果包含,则返回所需属性* @param longitude* @param latitude* @return*/public static String getWaterCode(double longitude, double latitude) {FeatureCollection featureCollection = getFeatureCollection();FeatureIterator features = featureCollection.features();try {while (features.hasNext()) {Feature next = features.next();boolean contains = isContains(longitude, latitude, next);// 如果点在面内则返回所需属性if (contains) {return String.valueOf(next.getProperty("FID").getValue());}}} finally {features.close();}return null;}private static boolean isContains(double longitude, double latitude, Feature feature) {// 获取边界数据Property geometry = feature.getProperty("geometry");Object value = geometry.getValue();// 创建坐标的pointGeometryFactory geometryFactory = JTSFactoryFinder.getGeometryFactory();Point point = geometryFactory.createPoint(new Coordinate(longitude, latitude));boolean contains = false;// 判断是单面还是多面if (value instanceof MultiPolygon) {MultiPolygon multiPolygon = (MultiPolygon) value;contains = multiPolygon.contains(point);} else if (value instanceof Polygon) {Polygon polygon = (Polygon) value;contains = polygon.contains(point);}return contains;}
}

7、地图依赖

<geotools.version>27-SNAPSHOT</geotools.version>
<!--地图--><dependency><groupId>org.geotools</groupId><artifactId>gt-shapefile</artifactId><version>${geotools.version}</version><exclusions><exclusion><groupId>org.geotools</groupId><artifactId>gt-main</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.geotools</groupId><artifactId>gt-main</artifactId><version>${geotools.version}</version></dependency><dependency><groupId>org.geotools</groupId><artifactId>gt-geojson</artifactId><version>${geotools.version}</version></dependency><dependency><groupId>org.geotools</groupId><artifactId>gt-swing</artifactId><version>${geotools.version}</version></dependency><repositories><repository><id>osgeo</id><name>OSGeo Release Repository</name><url>https://repo.osgeo.org/repository/release/</url><snapshots><enabled>false</enabled></snapshots><releases><enabled>true</enabled></releases></repository><repository><id>osgeo-snapshot</id><name>OSGeo Snapshot Repository</name><url>https://repo.osgeo.org/repository/snapshot/</url><snapshots><enabled>true</enabled></snapshots><releases><enabled>false</enabled></releases></repository></repositories>

参考git项目 springbatch: 这是一个SpringBoot开发的SpringBatch批处理示例,示例主要是将文件30W条数据使用多线程导入到t_cust_temp表,然后又将t_cust_temp表数据使用分片导入到t_cust_info表。下载即可用。

http://www.dtcms.com/a/524031.html

相关文章:

  • 今年前三季度浙江进出口总值首破四万亿元
  • 【一文了解】八大排序-插入排序、希尔排序
  • n8n数据存储在postgres
  • 数据结构——冒泡排序
  • 医疗连续体机器人模块化控制界面设计(2025年更新版Python库)
  • 做网站服务器需要系统wordpress折腾怕了
  • 022数据结构之树状数组——算法备赛
  • 从 TypeScript 到 Java(4):访问修饰符与作用域 —— Java 的封装哲学
  • 做网站要有什么团队上海网站营销推广
  • 残差网络的介绍及ResNet-18的搭建(pytorch版)
  • WPF绘制界面常用功能
  • vbs笔记 【未完更】
  • 不用服务器也能搭博客!Docsify+cpolar的极简方案
  • 一文了解开源大语言模型文件结构,以 Hugging Face DeepSeek-V3.1 模型仓库为例
  • 艾体宝洞察 | CRA 合规冲刺指南:艾体宝 ONEKEY 独家报告首发,洞察全球企业合规进度!
  • 网站设计方法常州网站制作维护
  • iOS 26 App 开发阶段性能优化 从多工具协作到数据驱动的实战体系
  • Nginx 配置解析与性能优化
  • vLLM 性能优化实战:批处理、量化与缓存配置方案
  • 【前端】前端浏览器性能优化的小方法
  • google广告联盟网站服务平台型网站
  • Android GPU的RenderThread Texture upload上传Bitmap优化prepareToDraw
  • 10.1 网络规划与设计——结构化布线系统
  • 国产麒麟、uos在线编辑数据库中的文件
  • 从零开始的C++学习生活 15:哈希表的使用和封装unordered_map/set
  • 【图像处理基石】通过立体视觉重建建筑高度:原理、实操与代码实现
  • 金融培训网站源码国内可以做的国外兼职网站
  • 东莞网站设计制作网站个人网页设计需求分析
  • 率先发布!浙人医基于KingbaseES构建多院区异构多活容灾新架构
  • CSS 样式用法大全