当前位置: 首页 > news >正文

Starrocks 命令 Alter table DISTRIBUTED 重分布数据的实现

背景

在前文Starrocks 写入报错 primary key memory usage exceeds the limit中,可以通过ALTER TABLE xxxx DISTRIBUTED BY HASH(xx) BUCKETS 50;来改变数据的分布状态,具体的执行过程是怎么样的呢?

分析

首先对应的g4文件中为 alterTableStatement ,这里最终的调用是 AlterJobExecutor.visitAlterTableStatement:

if (statement.hasSchemaChangeOp()) {
   Locker locker = new Locker();
   locker.lockTableWithIntensiveDbLock(db, table.getId(), LockType.WRITE);
   try {
       SchemaChangeHandler schemaChangeHandler = GlobalStateMgr.getCurrentState().getSchemaChangeHandler();
       assert table instanceof OlapTable;
       schemaChangeHandler.process(statement.getAlterClauseList(), db, (OlapTable) table);
   } catch (UserException e) {
       throw new AlterJobException(e.getMessage());
   } finally {
       locker.unLockTableWithIntensiveDbLock(db, table, LockType.WRITE);
   }

   isSynchronous = false;

schemaChangeHandler.process会创建OptimizeJobV2 实例去优化对象,数据链路如下:

SchemaChangeHandler.process
      ||
      \/
analyzeAndCreateJob
      ||
      \/
createOptimizeTableJob
      ||
      \/
OptimizeJobV2Builder.build()
      ||
      \/
new OptimizeJobV2()

SchemaChangeHandler.process 会把当前的OptimizeJobV2 job 放入要执行的队列中,之后SchemaChangeHandler 以 alter_scheduler_interval_millisecond (10000ms)的轮询间隔从队列中取出要执行的任务,并调用run方法.run方法如下:

public synchronized void run() {
        if (isTimeout()) {
            cancelImpl("Timeout");
            return;
        }

        // create connectcontext
        createConnectContextIfNeeded();

        try {
            while (true) {
                JobState prevState = jobState;
                switch (prevState) {
                    case PENDING:
                        runPendingJob();
                        break;
                    case WAITING_TXN:
                        runWaitingTxnJob();
                        break;
                    case RUNNING:
                        runRunningJob();
                        break;
                    case FINISHED_REWRITING:
                        runFinishedRewritingJob();
                        break;
                    default:
                        break;
                }
                if (jobState == prevState) {
                    break;
                } // else: handle the new state
            }
        } catch (AlterCancelException e) {
            cancelImpl(e.getMessage());
        }
    }
  • PENDING
    创建完任务初始状态就是PENDING,所以调用 runPendingJob() 方法,这里有几个关键点是
  1. 创建该Alter语句涉及到的所有的分区
  2. 检查改任务所涉及到表的状态,必须该表的tablet都为健康状态才可以进行下一步,否则设置该表的状态为WAITING_STABLE,并直接跳过该任务
  3. 会获取到在一个事务的ID
  4. 改变该作业的状态为WAITING_TXN
  • WAITING_TXN
    如果任务所涉到的表为正常状态,则会进入runWaitingTxnJob()方法,这里的几个关键点是
  1. 会等待在该任务对应的事务之前的事务都运行完才会执行该任务
  2. 每个分区建立一个任务,并把分区写入一个临时分区中
  3. 改变该作业的状态为RUNNING
  • RUNNING
    如果任务正常运行的话,则会进入runRunningJob()方法,这里的几个关键点是
  1. 等待所有的写入临时分区的任务完成
  2. 锁住该表所在库以及该表,并且是排他锁,所以读取该库的操作也是不可行的
  3. 替换临时分区到对应的分区上去
  4. 改变该作业的状态为FINISHED

相关文章:

  • 2025年全国铁路线路及站点(矢量shp数据)
  • C++多线程
  • 设计模式之代理模式:原理、实现与应用
  • IntelliJ IDEA新建文件配置作者信息、日期和描述等
  • FSC森林认证证书应用场景
  • python 数据可视化matplotib库安装与使用
  • 如何破解集运企业的劳动密集型困局,提高人效?
  • 10、STL中的unordered_map使用方法
  • Docker 离线安装教程
  • 2025-gazebo配置on vmware,wsl
  • 【练习】PAT 乙 1081 检查密码
  • 【Golang那些事】go1.22和1.23 更新重点及测评
  • nginx性能优化有哪些方式?
  • OpenNJet:下一代云原生应用引擎,支持动态配置与高效管理,简化运维任务,提升应用灵活性与安全性。
  • Sring Boot整合Minio实现图片上传功能
  • Web3 环境下用户数据隐私保护的技术方案分析
  • 【初学者】谈谈DeepSeek使用的算法?
  • 合法C标识符查(信息学奥赛一本通-1134)
  • 告别“人工智障”!给小米音箱“开个挂”?(接入各类AI大模型,让小爱同学秒变全屋智能AI中枢!)
  • 深入解析 Linux 声卡驱动:从架构到实战
  • GDP逼近五千亿,向海图强,对接京津,沧州剑指沿海经济强市
  • 习近平出席中国-拉美和加勒比国家共同体论坛第四届部长级会议开幕式并发表重要讲话
  • 挖掘机4月销量同比增17.6%,出口增幅创近两年新高
  • 80后莆田市文旅局长马骏登台与杨宗纬合唱,“演唱会秒变旅游推介会”
  • 射箭世界杯上海站摘得两银,中国队新周期冲击韩国缩小差距
  • 四姑娘山一游客疑因高反身亡,镇卫生院:送到时已很严重