当前位置: 首页 > news >正文

SpringBoot+shardingsphere实现按月分表功能

SpringBoot+shardingsphere实现按月分表功能

文章目录


前言

ShardingSphere 是一套开源的分布式数据库中间件解决方案,旨在简化数据库分片、读写分离、分布式事务等复杂场景的管理。它由 Apache 软件基金会支持,广泛应用于需要处理大规模数据的系统中


一、ShardingSphere 是什么?

主要是为了防止一张表的数据量过大而设计的,数据库本身就支持,但是由于自行设计需要满足跨表查询,事务一致性,分页聚合等很多的复杂场景,还需要很多的配套监控,设计,扩容等方案,所以总体来说是一个任务量很大的任务,故而这里采用ShardingSphere 来实现。

二、使用步骤

1.引入库

<!-- 分库分表 -->
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
            <version>5.2.0</version>
        </dependency>

2.环境配置+Mysql表

CREATE TABLE `user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `username` varchar(255) NOT NULL,
  `password` varchar(255) NOT NULL,
  `gender` tinyint(4) NOT NULL COMMENT '0:男 1:女',
  `createTime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updateTime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1890651990057906179 DEFAULT CHARSET=utf8mb4;
# 配置服务器端口
server:
  port: 9999

# Spring框架下的ShardingSphere配置
spring:
  shardingsphere:
    # 模式配置,设置为独立模式
    mode:
      type: Standalone
    # 数据源配置
    datasource:
      # 定义数据源名称
      names: ds0
      # 数据源ds0的具体配置
      ds0:
        # 数据源类型为HikariCP
        type: com.zaxxer.hikari.HikariDataSource
        # 数据库驱动类名称
        driver-class-name: com.mysql.cj.jdbc.Driver
        # 数据库连接URL,包含时区设置
        jdbc-url: jdbc:mysql://localhost:3306/sharding_db?serverTimezone=Asia/Shanghai
        # 数据库用户名
        username: root
        # 数据库密码
        password: root
    # 规则配置
    rules:
      # 分片规则配置
      sharding:
        # 定义分片的表
        tables:
          user:
            # 只配置基础表,其他表会动态创建
            actual-data-nodes: ds0.user,ds0.user_202401,ds0.user_202402,ds0.user_202403,ds0.user_202404,ds0.user_202405
            table-strategy:
              standard:
                sharding-column: createtime
                sharding-algorithm-name: user_inline
            # 添加主键生成策略
            key-generate-strategy:
              column: id
              key-generator-name: snowflake
        sharding-algorithms:
          user_inline:
            type: CLASS_BASED
            props:
              strategy: standard
              algorithmClassName: com.hhh.sharding.standa.UserShardingAlgorithm
        # 配置主键生成器
        key-generators:
          snowflake:
            type: SNOWFLAKE
            props:
              worker-id: 123
        # 添加默认分片策略
        default-sharding-column: gender
    # 属性配置
    props:
      # 是否显示SQL语句
      sql-show: true

# MyBatis-Plus配置
mybatis-plus:
  configuration:
    # 不将下划线转换为驼峰命名
    map-underscore-to-camel-case: false
    # 使用标准输出日志实现
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
  global-config:
    enable-sql-runner: true

这里有一个注意事项,那就是id一定要使用bigint使用雪花策略算法来实现,至于为什么这样呢,是为了防止分表的主键id一致的情况,这里首先推荐就是使用mybatisPlus来实现,因为他天然支持雪花算法

3.分表代码实现

主要是两个文件一个是自己实现分表算法的UserShardingAlgorithm文件

package com.hhh.sharding.standa;

import com.baomidou.mybatisplus.extension.toolkit.SqlRunner;
import com.hhh.sharding.domain.User;
import com.hhh.sharding.service.UserService;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.stream.Collectors;

@Component
@Slf4j
public class DynamicShardingManager {

    @Resource
    private DataSource dataSource;

    @Resource
    private UserService userService;


    private static final String LOGIC_TABLE_NAME = "user";

    private static final String DATABASE_NAME = "sharding_db"; // 配置文件中的数据库名称

    @PostConstruct
    public void initialize() {
        log.info("初始化动态分表配置...");
        updateShardingTableNodes();
    }


    /**
     * 获取所有用户相关的表名
     * 此方法旨在动态地收集所有用户表的表名,以支持可能存在的不同性别用户表
     * 如果无法获取动态表名或列表为空,则默认返回包含单一的默认用户表名"user"
     *
     * @return 包含所有用户表名的集合
     */
    private Set<String> fetchAllUserTableNames() {
        //获取所有动态化表名
        Set<String> tableNames = new HashSet<>();
        try {
            // 获取用户列表
            List<User> users = userService.list();
            // 如果用户列表不为空,则映射每个用户到对应的表名,并收集到集合中
            if (users != null) {
                tableNames = users.stream()
                        .map(user -> "user_" + user.getGender())
                        .collect(Collectors.toSet());
            }
            // 确保至少包含默认表
            tableNames.add("user");
        } catch (Exception e) {
            // 记录获取表名时发生的错误
            log.error("获取所有动态化表名失败", e);
            // 发生异常时至少返回默认表
            tableNames.add("user");
        }
        // 返回收集到的表名集合
        return tableNames;
    }


    /**
     * 动态更新分片表节点配置
     * 
     * 本方法旨在根据当前的用户表名称,动态地更新分片表的节点配置
     * 它首先获取所有用户表的名称,然后构建新的分片表节点配置,并尝试更新到数据库的元数据中
     */
    private void updateShardingTableNodes() {
        try {
            // 获取所有用户表的名称
            Set<String> tableNames = fetchAllUserTableNames();
            if (tableNames.isEmpty()) {
                // 如果未获取到任何表名,则使用默认的表配置
                log.warn("未获取到任何表名,将使用默认表配置");
                tableNames.add("user");
            }
    
            // 确保包含所有可能的表
            tableNames.add("user");
            tableNames.add("user_0");
            tableNames.add("user_1");
    
            // 构建新的分片表节点配置
            String newActualDataNodes = tableNames.stream()
                    .distinct()
                    .map(tableName -> "ds0." + tableName)
                    .collect(Collectors.joining(","));
            log.info("动态分表 actual-data-nodes 配置: {}", newActualDataNodes);
    
            // 获取 ContextManager 实例
            ContextManager contextManager = getContextManager();
            if (contextManager == null) {
                log.error("获取 ContextManager 失败");
                return;
            }
    
            // 获取 MetaDataContexts 实例
            var metaDataContexts = contextManager.getMetaDataContexts();
            if (metaDataContexts == null) {
                log.error("获取 MetaDataContexts 失败");
                return;
            }
    
            // 获取 MetaData 实例
            var metaData = metaDataContexts.getMetaData();
            if (metaData == null) {
                log.error("获取 MetaData 失败");
                return;
            }
    
            // 检查数据库是否存在
            var databases = metaData.getDatabases();
            if (databases == null || !databases.containsKey(DATABASE_NAME)) {
                log.error("数据库 {} 不存在", DATABASE_NAME);
                return;
            }
    
            // 获取 ShardingSphere 的规则元数据
            ShardingSphereRuleMetaData ruleMetaData = databases.get(DATABASE_NAME).getRuleMetaData();
            if (ruleMetaData == null) {
                log.error("获取规则元数据失败");
                return;
            }
    
            // 查找 ShardingRule
            Optional<ShardingRule> shardingRule = ruleMetaData.findSingleRule(ShardingRule.class);
            if (shardingRule.isPresent()) {
                // 获取分片规则配置
                ShardingRuleConfiguration ruleConfig = (ShardingRuleConfiguration) shardingRule.get().getConfiguration();
                if (ruleConfig.getTables() == null || ruleConfig.getTables().isEmpty()) {
                    log.error("分片规则配置为空");
                    return;
                }
    
                // 更新分片表规则配置
                List<ShardingTableRuleConfiguration> updatedRules = ruleConfig.getTables()
                        .stream()
                        .map(oldTableRule -> {
                            if (LOGIC_TABLE_NAME.equals(oldTableRule.getLogicTable())) {
                                ShardingTableRuleConfiguration newTableRuleConfig = new ShardingTableRuleConfiguration(LOGIC_TABLE_NAME, newActualDataNodes);
                                newTableRuleConfig.setDatabaseShardingStrategy(oldTableRule.getDatabaseShardingStrategy());
                                newTableRuleConfig.setTableShardingStrategy(oldTableRule.getTableShardingStrategy());
                                newTableRuleConfig.setKeyGenerateStrategy(oldTableRule.getKeyGenerateStrategy());
                                newTableRuleConfig.setAuditStrategy(oldTableRule.getAuditStrategy());
                                return newTableRuleConfig;
                            }
                            return oldTableRule;
                        })
                        .collect(Collectors.toList());
                ruleConfig.setTables(updatedRules);
                
                // 尝试更新分片规则配置
                try {
                    contextManager.alterRuleConfiguration(DATABASE_NAME, Collections.singleton(ruleConfig));
                    contextManager.reloadDatabase(DATABASE_NAME);
                    log.info("动态分表规则更新成功!");
                } catch (Exception e) {
                    log.error("更新分片规则失败", e);
                }
            } else {
                log.error("未找到 ShardingSphere 的分片规则配置,动态分表更新失败。");
            }
        } catch (Exception e) {
            log.error("更新分片规则时发生异常", e);
        }
    }

    /**
     * 获取 ShardingSphere ContextManager
     */
    private ContextManager getContextManager() {
        try {
            if (dataSource == null) {
                log.error("数据源未注入");
                return null;
            }
            
            var connection = dataSource.getConnection();
            if (connection == null) {
                log.error("获取数据库连接失败");
                return null;
            }

            ShardingSphereConnection shardingConnection = connection.unwrap(ShardingSphereConnection.class);
            if (shardingConnection == null) {
                log.error("无法获取 ShardingSphereConnection");
                connection.close();
                return null;
            }

            ContextManager contextManager = shardingConnection.getContextManager();
            connection.close();
            return contextManager;
        } catch (SQLException e) {
            log.error("获取 ShardingSphere ContextManager 失败", e);
            return null;
        }
    }

    /**
     * 根据用户信息创建用户表
     * 表名基于用户创建时间生成,格式为:LOGIC_TABLE_NAME_YYYYMM
     * 如果表已存在,则不进行创建操作
     * 
     * @param user 用户对象,包含用户创建时间等信息
     */
    public void createUserTable(User user) {
        // 获取用户创建时间
        Date createTime = user.getCreatetime();
        // 创建日期格式化对象,用于生成表名
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM");
        // 生成完整的表名
        String tableName = LOGIC_TABLE_NAME + "_" + dateFormat.format(createTime);
        
        try {
            // 首先检查表是否已存在
            String checkTableSql = "SHOW TABLES LIKE '" + tableName + "'";
            List<Map<String, Object>> tables = SqlRunner.db().selectList(checkTableSql);
            
            // 如果表存在,记录日志并结束方法
            if (tables != null && !tables.isEmpty()) {
                log.info("表 {} 已经存在,无需创建", tableName);
                return;
            }
            
            // 创建表
            String createTableSql = "CREATE TABLE IF NOT EXISTS " + tableName + " LIKE user";
            log.info("开始创建表,SQL: {}", createTableSql);
            
            SqlRunner.db().update(createTableSql);
            log.info("表 {} 创建成功", tableName);
            
            // 更新分片配置
            updateShardingTableNodes();
        } catch (Exception e) {
            log.error("创建分表 {} 失败: {}", tableName, e.getMessage(), e);
            // 检查异常消息,如果表已存在,则记录日志并结束方法
            if (e.getMessage() != null && e.getMessage().contains("already exists")) {
                log.info("表 {} 已经存在,继续处理", tableName);
                return;
            }
            // 如果异常与表已存在无关,则抛出运行时异常
            throw new RuntimeException("创建分表失败: " + e.getMessage(), e);
        }
    }

}
package com.hhh.sharding.standa;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.sharding.api.sharding.standard.PreciseShardingValue;
import org.apache.shardingsphere.sharding.api.sharding.standard.RangeShardingValue;
import org.apache.shardingsphere.sharding.api.sharding.standard.StandardShardingAlgorithm;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Properties;

@Slf4j
public class UserShardingAlgorithm implements StandardShardingAlgorithm<Date> {
    private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyyMM");

    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Date> preciseShardingValue) {
        Date createTime = preciseShardingValue.getValue();
        String logicTableName = preciseShardingValue.getLogicTableName();
        
        log.info("分片算法执行 - 可用目标表: {}, 分片值: {}, 逻辑表名: {}", 
                availableTargetNames, createTime, logicTableName);
        
        if (createTime == null) {
            log.info("createTime为空,返回逻辑表名: {}", logicTableName);
            return logicTableName;
        }
        
        // 根据 createTime 动态生成分表名
        String suffix = DATE_FORMAT.format(createTime);
        String realTableName = "user_" + suffix;
        log.info("计算得到的实际表名: {}", realTableName);
        
        if (availableTargetNames.contains(realTableName)) {
            log.info("找到匹配的目标表: {}", realTableName);
            return realTableName;
        } else {
            log.warn("未找到匹配的目标表,返回逻辑表名: {}", logicTableName);
            return logicTableName;
        }
    }

    @Override
    public Collection<String> doSharding(Collection<String> collection, RangeShardingValue<Date> rangeShardingValue) {
        return new ArrayList<>();
    }

    @Override
    public Properties getProps() {
        return new Properties();
    }

    @Override
    public void init(Properties properties) {
        // 可以添加初始化逻辑
    }
}

 4.测试用例

package com.hhh.sharding.controller;


import cn.hutool.core.util.RandomUtil;
import com.hhh.sharding.domain.User;
import com.hhh.sharding.service.UserService;
import com.hhh.sharding.standa.DynamicShardingManager;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.text.SimpleDateFormat;

@RestController
@RequestMapping("/user")
public class UserController {

    @Resource
    private UserService userService;

    @Resource
    private DynamicShardingManager dynamicShardingManager;

    @GetMapping("/add")
    public Boolean user() {
        // 创建一些2024年的随机日期
        Date[] dates = {
            getDate("2024-01-15"),
            getDate("2024-02-20"),
            getDate("2024-03-10"),
            getDate("2024-04-05"),
            getDate("2024-05-25")
        };
        
        for (int i = 0; i < 10; i++) {
            User user = new User();
            user.setUsername(generateRandomUsername());
            user.setPassword("123456");
            user.setGender(RandomUtil.randomInt(2));
            // 随机选择一个2024年的日期
            Date randomDate = dates[RandomUtil.randomInt(dates.length)];
            user.setCreatetime(randomDate);
            user.setUpdatetime(randomDate);
            //这里每一次新增数据的时候去判断是否要创建出来当月的数据表,这张表一定要在    
            //application.yml中的actual-data-nodes中去添加
            dynamicShardingManager.createUserTable(user);
            userService.save(user);
        }
        return true;
    }

    private Date getDate(String dateStr) {
        try {
            return new SimpleDateFormat("yyyy-MM-dd").parse(dateStr);
        } catch (Exception e) {
            return new Date();
        }
    }

    // 生成10位随机数字的用户名
    private String generateRandomUsername() {
        return RandomUtil.randomNumbers(10);  // 生成10位数字
    }

    @GetMapping("/all")
    public List<User> all() {
        return userService.list();
    }
}

 5.测试结果

新增数据

 查询数据

 数据库情况

 数据库表数据展示


总结

由于公司有一个需求那就是按月来分表展示数据,看了好多人的博客都没有效果,最终三天得以解决这个功能,故而写下此博客,希望可以真正的帮助到你

相关文章:

  • python实现YouTube关键词爬虫(2025/02/11)
  • 【Spring】Spring MVC入门(二)
  • Linux第107步_Linux之PCF8563实验
  • java05(类、泛型、JVM、线程)---java八股
  • 前端与浏览器安全知识详解
  • 程序员升级进阶之路
  • spring security 超详细使用教程(接入springboot、前后端分离)
  • [创业之路-299]:图解金融体系结构
  • Linux 文件系统:恢复已删除文件的挑战
  • 【大模型】阿里云百炼平台对接DeepSeek-R1大模型使用详解
  • 【深度强化学习】Actor-Critic 算法
  • 一个根据输入内容过滤下拉选的组件
  • 网络编程-
  • 设计模式Python版 命令模式(下)
  • Keysight E5071C (Agilent) 网络分析仪的特性和规格
  • DeepSeek 本地部署(电脑安装)
  • 笔试题笔记#6 模拟三道题和总结知识
  • CTF-web:java-h2 堆叠注入rce -- N1ctf Junior EasyDB
  • 消息中间件深度剖析:以 RabbitMQ 和 Kafka 为核心
  • vue2和vue3响应式区别最通俗易懂的理解
  • 从《缶翁的世界》看吴昌硕等湖州籍书画家对海派的影响
  • 一女游客在稻城亚丁景区因高反去世,急救两个多小时未能恢复生命体征
  • 多图|多款先进预警机亮相雷达展,专家:中国预警机已达世界先进水平
  • 《五行令》《攻守占》,2个月后国博见
  • 老字号“逆生长”,上海制造的出海“蜜”钥
  • 清雪车司机未拉手刹下车导致溜车被撞亡,事故调查报告发布