当前位置: 首页 > news >正文

湖仓一体部署

数据存储 hadoop-3.4.1

 下载并安装软件

上传安装软件到服务器

解压安装:tar -xzvf hadoop-3.4.1.tar.gz  -C /opt/module/

Hadoop配置

  1. 配置hadoop环境变量:
    sudo vim /etc/profile.d/myprofile.sh#HADOOP_HOME
    export HADOOP_HOME=/opt/module/hadoop-3.4.1
    export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
    export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
    应用变更: source /etc/profile
  2. 配置hadoop使用的JAVA环境
    sudo vim $HADOOP_HOME/etc/hadoop/hadoop-env.sh# 配置Hadoop使用的JAVA路径
    export JAVA_HOME=/opt/module/jdk-17.0.10
  3. 配置core-site.xml
    vim $HADOOP_HOME/etc/hadoop/core-site.xml
     
    <configuration><property><name>fs.defaultFS</name><value>hdfs://mydoris:9000</value><description>NameNode的地址</description></property><property><name>hadoop.tmp.dir</name><value>/opt/data/hadoop/tmp</value><description>hadoop数据存储目录</description></property>
    </configuration>
  4. 配置hdfs-site.xml
    vim  $HADOOP_HOME/etc/hadoop/hdfs-site.xml
    <configuration><property><name>dfs.replication</name><value>1</value>  <!-- 单机只需1副本 --></property><property><name>dfs.namenode.name.dir</name><value>file://${hadoop.tmp.dir}/dfs/name</value></property><property><name>dfs.datanode.data.dir</name><value>file://${hadoop.tmp.dir}/dfs/data</value></property>
    </configuration>
  5. 配置 yarn-site.xml

    vim $HADOOP_HOME/etc/hadoop/yarn-site.xml
    <configuration><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><!-- 原默认值8030与doris冲突,改为 18030 --><property><name>yarn.resourcemanager.scheduler.address</name><value>0.0.0.0:18030</value></property><!-- 原默认值8040与doris冲突,改为 18040 --><property><name>yarn.nodemanager.localizer.address</name><value>0.0.0.0:18040</value></property><property><name>yarn.nodemanager.env-whitelist</name><value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value></property>
    </configuration>
    1. 配置mapred-site.xml
      vim $HADOOP_HOME/etc/hadoop/mapred-site.xml
      <configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property>
      </configuration>
  6. 解决Hadoop 3.4与JDK 17冲突问题
    # 编辑 Hadoop 环境配置文件
    cd /opt/module/hadoop-3.4.1
    vim etc/hadoop/hadoop-env.sh# 文件末尾 添加以下内容:
    # Fix for JDK 17+ module access restrictions
    export HADOOP_OPTS="$HADOOP_OPTS --add-opens=java.base/java.lang=ALL-UNNAMED"
    export HADOOP_OPTS="$HADOOP_OPTS --add-opens=java.base/java.lang.reflect=ALL-UNNAMED"
    export HADOOP_OPTS="$HADOOP_OPTS --add-opens=java.base/java.net=ALL-UNNAMED"
    export HADOOP_OPTS="$HADOOP_OPTS --add-opens=java.base/java.util=ALL-UNNAMED"
    export HADOOP_OPTS="$HADOOP_OPTS --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
    export HADOOP_OPTS="$HADOOP_OPTS --add-opens=java.base/java.nio=ALL-UNNAMED"
    export HADOOP_OPTS="$HADOOP_OPTS --add-opens=java.base/java.util.concurrent=ALL-UNNAMED"# 若已经启动过hdfs,需要清理并重启hadoop
    # 停止
    ./sbin/stop-yarn.sh
    ./sbin/stop-dfs.sh# 删除临时数据(避免 clusterID 冲突)
    rm -rf /tmp/hadoop-*# 重新格式化(使用 JDK 17)
    hdfs namenode -format# 启动
    ./sbin/start-dfs.sh
    ./sbin/start-yarn.sh

启动Hadoop服务

格式化 NameNode(首次)

./bin/hdfs namenode -format

启动 HDFS + YARN

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

验证服务

jps

查看启动日志

日志路径:/opt/module/hadoop-3.4.1/logs/

访问 WEB UI

HDFS: http://mydoris:9870


YARN: http://mydoris:8088

测试 HDFS 读写

./bin/hadoop fs -mkdir /t001
./bin/hadoop fs -put ~/.bashrc /t001/
./bin/hadoop fs -ls /t001

启停脚本

  1. 创建脚本文件:vim /opt/script/xhadoop
    #!/bin/bashif [ $# -lt 1 ]
    thenecho "No Args Input..."exit ;
    ficase $1 in
    "start")ssh mydoris "${HADOOP_HOME}/sbin/start-dfs.sh"ssh mydoris "${HADOOP_HOME}/sbin/start-yarn.sh";;
    "stop")ssh mydoris "${HADOOP_HOME}/sbin/stop-dfs.sh"ssh mydoris "${HADOOP_HOME}/sbin/stop-yarn.sh"
    ;;
    *)echo "Input Args Error..."
    ;;
    esac

  2. 修改脚本权限   chmod 777 /opt/script/xhadoop
  3. 添加一个链接 sudo ln -s -f /opt/script/xhadoop /bin/xhadoop
  4. 使用脚本
    xhadoop start
    xhadoop stop

数据湖 Iceberg 1.6.1

HIve Catalog

MySQL 8.0.43

  1. 在线安装MySQL
    # 更新软件包列表
    sudo apt update
    # 查看可使用的安装包
    sudo apt search mysql-server
    # 安装最新版本
    sudo apt install -y mysql-server
    # 安装指定版本:
    sudo apt install -y mysql-server-8.0
  2. 检查MySQL状态
    sudo systemctl status mysql
    # 解压文件
    tar -xJf mysql-8.0.43-linux-glibc2.17-x86_64.tar.xz
    mv mysql-8.0.43-linux-glibc2.17-x86_64 /opt/module/
    # 创建软链接
    sudo ln -s /opt/module/mysql-8.0.43-linux-glibc2.17-x86_64 /usr/local/mysql
  3. 设置开机启动
    # 启动MySQL服务(默认安装后会自动启动,不需要再执行)
    sudo systemctl start mysql
    # 设置开机自启动:
    sudo systemctl enable mysql

  4. 修改密码、权限、允许远程访问
    1. 登录mysql,在默认安装时如果没有让我们设置密码,则直接回车就能登录成功
      sudo mysql -u root -p

    2. 设置密码:ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'Admin1234';
    3. 刷新缓存:flush privileges;

    4. use mysql

    5. 检查用户可在哪台主机访问数据库
      select host, user from user;
    6. 修改root可在任意位置访问:update user set host = '%' where user = 'root';

    7. 配置mysqld.cnf
       

      sudo vim /etc/mysql/mysql.conf.d/mysqld.cnf
      bind-address           = 0.0.0.0
      mysqlx-bind-address    = 0.0.0.0

    8. 重启MySQL重新加载一下配置:sudo systemctl restart mysql

    9. 检查端口:netstat -an | grep 3306

    10. 创建 Hive Metastore 数据库和用户
       

      mysql -u root -pCREATE DATABASE hive_metastore;
      CREATE USER 'hive'@'%' IDENTIFIED BY 'Admin1234';
      GRANT ALL PRIVILEGES ON hive_metastore.* TO 'hive'@'%';
      FLUSH PRIVILEGES;
      EXIT;SELECT SCHEMA_NAME FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = 'hive_metastore';

hive-3.1.3

  1. 上传 apache-hive-3.1.3-bin.tar.gz 到服务器
  2. 解压安装
    tar -xzf apache-hive-3.1.3-bin.tar.gz
    mv apache-hive-3.1.3-bin /opt/module
    mv /opt/module/apache-hive-3.1.3-bin/ /opt/modulehive-3.1.3

  3. 设置hive环境变量
    sudo vim /etc/profile.d/myprofile.sh#设置 hive 环境变量
    export HIVE_HOME=/opt/module/hive-3.1.3
    export PATH=$PATH:$HIVE_HOME/bin# 使配置生效
    source /etc/profile

     
  4. 添加依赖JAR
    # MySQL 驱动(必须)
    将 mysql-connector-java-8.0.33.jar 放到 $HIVE_HOME/lib/
    # Iceberg Hive Runtime(关键!让 HMS 识别 Iceberg 表)
    将 iceberg-hive-runtime-1.6.1.jar 放到 $HIVE_HOME/lib/

  5. 配置 hive-site.xml
    vim $HIVE_HOME/conf/hive-site.xml
    <?xml version="1.0" encoding="UTF-8"?><configuration><!-- Mysql MetaStore -->
    <property><name>hive.metastore.db.type</name><value>mysql</value>
    </property>
    <property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://mydoris:3306/hive_metastore?useSSL=false</value>
    </property>
    <property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.cj.jdbc.Driver</value>
    </property>
    <property><name>javax.jdo.option.ConnectionUserName</name><value>hive</value>
    </property>
    <property><name>javax.jdo.option.ConnectionPassword</name><value>Admin1234</value>
    </property><!-- Hive Metastore Service --><property><name>hive.metastore.uris</name><value>thrift://mydoris:9083</value></property><property><name>hive.metastore.schema.verification</name><value>false</value></property>
    <!-- Iceberg 支持 --><property><name>metastore.storage.schema.reader.impl</name><value>org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader</value></property><!-- 重要:指定 warehouse 基于 HDFS --><property><name>hive.metastore.warehouse.dir</name><value>/iceberg/warehouse</value>  <!-- HDFS 路径 --></property></configuration>

  6. 初始化 Metastore Schema
    $HIVE_HOME/bin/schematool -dbType mysql -initSchema \-userName hive -passWord Admin1234 \-url "jdbc:mysql://mydoris:3306/hive_metastore?useSSL=false&allowPublicKeyRetrieval=true"

  7. 启动 Hive Metastore
    $HIVE_HOME/bin/hive --service metastore

  8. 验证
    netstat -tuln | grep 9083

  9.  
启停脚本
  1. 创建脚本文件:vim /opt/script/xhive
  2. 脚本内容
    #!/bin/bash# Hive Metastore 启停脚本 for Hive 3.1.3
    # 支持 start / stop / statusexport HIVE_HOME=${HIVE_HOME:-/opt/module/hive-3.1.3}
    export HADOOP_HOME=${HADOOP_HOME:-/opt/module/hadoop-3.4.1}LOG_DIR="$HIVE_HOME/logs"
    PID_FILE="$LOG_DIR/metastore.pid"
    PORT=${METASTORE_PORT:-9083}
    HOST=${METASTORE_HOST:-localhost}mkdir -p "$LOG_DIR"start() {if [ -f "$PID_FILE" ] && kill -0 $(cat "$PID_FILE") 2>/dev/null; thenecho "Hive Metastore is already running (PID: $(cat $PID_FILE))"exit 1fiecho "Starting Hive Metastore on $HOST:$PORT ..."nohup "$HIVE_HOME/bin/hive" --service metastore \> "$LOG_DIR/metastore.log" 2>&1 &echo $! > "$PID_FILE"sleep 2if kill -0 $(cat "$PID_FILE") 2>/dev/null; thenecho "Hive Metastore started successfully (PID: $(cat $PID_FILE))"elseecho "Failed to start Hive Metastore. Check $LOG_DIR/metastore.log"rm -f "$PID_FILE"exit 1fi
    }stop() {if [ ! -f "$PID_FILE" ]; thenecho "Not running."return 0fiPID=$(cat "$PID_FILE")if ! kill -0 "$PID" 2>/dev/null; thenecho "Stale PID."rm -f "$PID_FILE"return 0fiecho "Stopping Metastore (PID: $PID)..."kill "$PID"        # SIGTERM - 优雅sleep 8if kill -0 "$PID" 2>/dev/null; thenkill -9 "$PID"   # SIGKILL - 强制firm -f "$PID_FILE"echo "Stopped."
    }status() {if [ -f "$PID_FILE" ]; thenPID=$(cat "$PID_FILE")if kill -0 "$PID" 2>/dev/null; thenecho "Hive Metastore is running (PID: $PID)"echo "Log: tail -f $LOG_DIR/metastore.log"return 0elseecho "Hive Metastore is NOT running (stale PID file)."rm -f "$PID_FILE"return 1fielseecho "Hive Metastore is NOT running."return 1fi
    }case "$1" instart)start;;stop)stop;;restart)stopsleep 2start;;status)status;;*)echo "Usage: $0 {start|stop|restart|status}"exit 1;;
    esac

  3. 修改脚本权限 chmod 777 /opt/script/xhive
  4. 添加一个链接 : sudo ln -s -f /opt/script/xhive /bin/xhive
  5. 使用脚本
    xhive start
    xhive stop
    xhive restart
    xhive status

 消息队列 kafka_2.13-3.8.1

安装配置

  1. 将压缩包 kafka_2.13-3.8.1.tgz 上传到服务器
  2. 解压安装:tar -zxvf /opt/software/kafka_2.13-3.8.1.tgz -C /opt/module/
  3. 添加环境变量
    sudo vim /etc/profile.d/myprofile.sh
    # 设置kafka环境变量
    export KAFKA_HOME=/opt/module/kafka_2.13-3.8.1
    export PATH=$PATH:$KAFKA_HOME/bin# 重新加载配置
    source /etc/profile

配置 KRaft 模式

  1. 创建数据目录 mkdir -p /opt/data/kafka/kraft-logs
  2. 编辑配置文件
    cp $KAFKA_HOME/config/kraft/server.properties $KAFKA_HOME/config/kraft/server.properties.bak
    vim $KAFKA_HOME/config/kraft/server.propertieslog.dirs=/opt/data/kafka/kraft-logs

  3. 生成 Cluster ID(唯一标识)
    KAFKA_CLUSTER_ID=$($KAFKA_HOME/bin/kafka-storage.sh random-uuid)
    echo $KAFKA_CLUSTER_ID  # 保存这个值,例如:pHsOvoSPR8-tocrVrTTrQw

  4. 格式化数据目录
    $KAFKA_HOME/bin/kafka-storage.sh format \-t $KAFKA_CLUSTER_ID \-c $KAFKA_HOME/config/kraft/server.properties

  5. 验证 meta.properties 是否生成
    cat /opt/data/kafka/kraft-logs/meta.properties

  6. 启动 Kafka
    
    $KAFKA_HOME/bin/kafka-server-start.sh \$KAFKA_HOME/config/kraft/server.properties

功能测试

# 创建主题    
./bin/kafka-topics.sh --create --topic user_events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
# 查看主题    
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# 查询主题    
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic user_events
# 删除主题    
./bin/kafka-topics.sh --delete --topic user_events --bootstrap-server localhost:9092

启停脚本

#!/bin/bash
CONFIG=$KAFKA_HOME/config/kraft/server.properties
LOG_DIR=/opt/data/kafka/kraft-logs
PID_FILE=$LOG_DIR/kafka.pidstart() {if [ -f "$PID_FILE" ] && kill -0 $(cat $PID_FILE) 2>/dev/null; thenecho "Kafka already running (PID: $(cat $PID_FILE))"exit 1finohup $KAFKA_HOME/bin/kafka-server-start.sh $CONFIG \> $LOG_DIR/kafka.log 2>&1 &echo $! > $PID_FILEecho "Started Kafka (PID: $(cat $PID_FILE))"
}stop() {if [ ! -f "$PID_FILE" ]; thenecho "Kafka not running"returnfikill $(cat $PID_FILE)sleep 5rm -f $PID_FILEecho "Kafka stopped"
}case $1 instart) start ;;stop)  stop ;;restart) stop; sleep 2; start ;;*) echo "Usage: $0 {start|stop|restart}" ;;
esac

xkafka start

xkafka stop

xkafka restart
 

OLAP doris 3.0.8

修改环境变量

  1. 修改系统最大打开文件句柄数(需重新登录生效)
    sudo vim /etc/security/limits.conf
    * soft nofile 1000000
    * hard nofile 1000000
  2. 修改虚拟内存区域

    sudo vim /etc/sysctl.conf
    vm.max_map_count=2000000
    # 应用更改
    sudo sysctl -p
    # 检查
    cat /proc/sys/vm/max_map_count
    
  3. 检查主机是否支持 AVX2 指令集

    cat /proc/cpuinfo | grep avx2
    # 若有输出表示机器支持 AVX2 指令集,使用apache-doris-2.1.8.1-bin-x64.tar.gz
    # 否则使用apache-doris-2.1.8.1-bin-x64-noavx2.tar.gz
    

准备安装介质

从Apache Doris - Download | Easily deploy Doris anywhere - Apache Doris下载doris安装介质

apache-doris-3.0.8-bin-x64.tar

解压安装

  1. 将压缩包apache-doris-3.0.8-bin-x64.tar.gz上传到服务器
  2. 解压安装
    tar -zxvf apache-doris-3.0.8-bin-x64.tar.gz -C /opt/module/
    mv /opt/module/apache-doris-3.0.8-bin-x64/ /opt/module/doris-3.0.8
    

Doris 配置

  1. 配置环境变量
    sudo vim /etc/profile.d/myprofile.sh
    #设置DORIS环境变量
    export DORIS_HOME=/opt/module/doris-3.0.8
    export PATH=$DORIS_HOME/fe/bin:$DORIS_HOME/be/bin:$PATH
    # 使配置生效
    source /etc/profile
  2. 配置FE元数据目录(仅FE主机)
    # 选择独立于 BE 数据的硬盘,创建 FE 的元数据目录
    mkdir -p /opt/data/doris/meta
    vim $DORIS_HOME/fe/conf/fe.conf
    meta_dir = /opt/data/doris/meta

iceberg配置

vim $DORIS_HOME/fe/conf/fe.conf
# iceberg 配置
catalog.iceberg_hive.type=iceberg
catalog.iceberg_hive.hive.metastore.uris=thrift://mydoris:9083
catalog.iceberg_hive.hadoop_conf_path=/opt/module/hadoop-3.4.1/etc/hadoop

启停脚本

后台启停

$DORIS_HOME/fe/bin/start_fe.sh --daemon

$DORIS_HOME/be/bin/start_be.sh --daemon

$DORIS_HOME/fe/bin/stop_fe.sh --daemon

$DORIS_HOME/be/bin/stop_be.sh --daemon

前台启停

$DORIS_HOME/fe/bin/start_fe.sh

$DORIS_HOME/be/bin/start_be.sh

脚本启停

xdoris start

xdoris stop

  1. 创建脚本文件:vim /opt/script/xdoris
    #!/bin/bash# 定义节点列表和路径
    NODES=("mydoris")
    DORIS_HOME="/opt/module/doris-3.0.8"
    JAVA_HOME="/opt/module/jdk-17.0.10"
    # 检查参数是否有效
    case $1 in"start"|"stop");;*)echo "Invalid Args!"echo "Usage: $(basename $0) start|stop"exit 1;;
    esac
    # 定义启动和停止函数
    start_node() {local NODE=$1echo "Starting BE on $NODE..."ssh -o ConnectTimeout=5 $NODE "export JAVA_HOME=$JAVA_HOME && cd $DORIS_HOME && ./be/bin/start_be.sh --daemon"if [ $? -ne 0 ]; thenecho "Failed to start BE on $NODE."return 1elseecho "Successfully started BE on $NODE."fi
    }stop_node() {local NODE=$1echo "Stopping BE on $NODE..."ssh -o ConnectTimeout=5 $NODE "export JAVA_HOME=$JAVA_HOME && cd $DORIS_HOME && ./be/bin/stop_be.sh"if [ $? -ne 0 ]; thenecho "Failed to stop BE on $NODE."return 1elseecho "Successfully stopped BE on $NODE."fi
    }# 主逻辑
    case $1 in"start")# 启动 FEecho "Starting FE"ssh -o ConnectTimeout=5 mydoris "export JAVA_HOME=$JAVA_HOME && cd $DORIS_HOME && ./fe/bin/start_fe.sh --daemon"if [ $? -ne 0 ]; thenecho "Failed to start FE on mydoris."exit 1fi# 启动每个 BE 节点for NODE in "${NODES[@]}"; dostart_node $NODE &donewaitecho "Cluster started successfully.";;"stop")# 停止每个 BE 节点for NODE in "${NODES[@]}"; dostop_node $NODEdone# 停止 FEecho "Stopping FE on mydoris..."ssh -o ConnectTimeout=5 mydoris "export JAVA_HOME=$JAVA_HOME && cd $DORIS_HOME && ./fe/bin/stop_fe.sh"if [ $? -ne 0 ]; thenecho "Failed to stop FE on mydoris."exit 1fiecho "Cluster stopped successfully.";;
    esac
    
  2. 修改脚本权限
    chmod 777 /opt/script/xdoris
  3. 添加一个链接 
    sudo ln -s -f /opt/script/xdoris /bin/xdoris
  4. 使用脚本
    xdoris start
    xdoris stop

注册 BE 节点(仅一次)

  1. 配置dbeaver连接
  2. 使用SQL命令添加BE(仅需执行一次)
    ALTER SYSTEM ADD BACKEND "mydoris:9050";

登录

  1. 打开页面:http://mydoris:8030/

  2. 用户 root 密码 空

  3. 检查BE注册情况
  4. 查询iceberg表数据
    -- 查看 Catalog
    SHOW CATALOGS; 
    -- 切换 catalog
    SWITCH iceberg_hive;
    -- 查看 database
    SHOW DATABASES;
    -- 切换 database
    USE default_db;
    -- 查看表
    show tables;
    -- 查看数据
    SELECT * FROM iceberg_hive.default_db.user_events;

修改root 密码

默认root密码为空,需修改为非空

mysql -h mydoris -P 9030 -uroot
SET PASSWORD FOR 'root' = PASSWORD('Admin1234');

计算引擎 Flink 1.18.1

安装配置

  1. 将压缩包 flink-1.18.1-bin-scala_2.12.gz上传到服务器
  2. 解压安装
    tar -zxvf flink-1.18.1-bin-scala_2.12.tgz -C /opt/module/
     
  3. 配置环境变量
    sudo vim /etc/profile.d/myprofile.sh
    #设置 flink 环境变量
    export FLINK_HOME=/opt/module/flink-1.18.1
    export PATH=$FLINK_HOME/bin:$PATH# 配置生效
    source /etc/profile
  4. 配置flink-conf
    vim $FLINK_HOME/conf/flink-conf.yaml
    # flink web ui 可在外部访问
    rest.bind-address: 0.0.0.0
    rest.address: mydoris
    # 设置taskmanager有4个作业Slot
    taskmanager.numberOfTaskSlots: 4
    

  5. 启动集群:$FLINK_HOME/bin/start-cluster.sh
  6. 访问Flink Web UI
    http://mydoris:8081

启停脚本

	#!/bin/bash# 检查是否传入参数if [ $# -lt 1 ]; thenecho "No Args Input..."echo "Usage: $0 {start|stop}"exit 1fi# 定义环境变量JAVA_HOME="/opt/module/jdk-17.0.10"FLINK_HOME=${FLINK_HOME:-"/opt/module/flink-1.18.1"} # 如果未设置 FLINK_HOME,则使用默认路径# 获取命令参数case $1 in"start")echo "============== 启动 Flink 集群 ==================="# 检查 FLINK_HOME 是否有效if [ ! -d "$FLINK_HOME" ]; thenecho "Error: FLINK_HOME ($FLINK_HOME) 路径不存在,请检查配置。"exit 1fi# 启动 Flink 集群ssh mydoris "export JAVA_HOME=$JAVA_HOME && ${FLINK_HOME}/bin/start-cluster.sh";;"stop")echo "============== 关闭 Flink 集群 ==================="# 检查 FLINK_HOME 是否有效if [ ! -d "$FLINK_HOME" ]; thenecho "Error: FLINK_HOME ($FLINK_HOME) 路径不存在,请检查配置。"exit 1fi# 停止 Flink 集群ssh mydoris "export JAVA_HOME=$JAVA_HOME && ${FLINK_HOME}/bin/stop-cluster.sh";;*)echo "Input Args Error... Usage: $0 {start|stop}"exit 1;;esac

场景练习

Iceberg 建表

  1. 创建 SQL 初始化文件
  2. cat > $FLINK_HOME/conf/init-iceberg.sql <<'EOF'
    -- 创建 Iceberg catalog
    CREATE CATALOG iceberg_hms WITH ('type' = 'iceberg','catalog-type' = 'hive','uri' = 'thrift://mydoris:9083','warehouse' = 'hdfs://mydoris:9000/user/hive/warehouse','clients' = '5'
    );
    EOF

  3. 打开客户端
    $FLINK_HOME/bin/sql-client.sh -i $FLINK_HOME/conf/init-iceberg.sql   


     

    # 客户端启动有问题,可临时使用日志
    # 临时开启 SQL Client 的 DEBUG 日志
    # 备份原日志配置
    cp $FLINK_HOME/conf/log4j-cli.properties $FLINK_HOME/conf/log4j-cli.properties.bak
    # 添加 DEBUG 配置
    cat >> $FLINK_HOME/conf/log4j-cli.properties <<EOFlogger.iceberg.name = org.apache.iceberg
    logger.iceberg.level = DEBUG
    logger.client.name = org.apache.flink.table.client
    logger.client.level = DEBUG
    EOF
    # 启动 SQL Client 并查看日志
    cat  /opt/module/flink-1.18.1/log/flink-ph-sql-client-mydoris.log
    

  4. 操作数据表
    -- 如果不是 iceberg_hms,则需切换或加前缀
    SHOW CATALOGS;
    SHOW CURRENT CATALOG;
    USE CATALOG iceberg_hms;
    --
    SHOW DATABASES;
    -- 创建数据库(如果 default_db 不存在)
    CREATE DATABASE IF NOT EXISTS default_db;
    -- 切换到该库
    USE default_db;
    -- 在 Iceberg 中建表
    CREATE TABLE default_db.user_events (id BIGINT,event_time TIMESTAMP(3),user_id STRING,event_type STRING
    ) PARTITIONED BY (event_time)
    WITH ('write.format.default' = 'parquet','write.target-file-size-bytes' = '536870912',  -- 512MB'format-version' = '2'
    );
    -- 看有哪些表
    SHOW TABLES;
    -- 描述表
    DESCRIBE user_events;
    -- 插入数据
    INSERT INTO user_events VALUES (1, NOW(), 'user2', 'click');
    -- 查询数据
    SELECT * FROM user_events;

  5. 在mysql通过查询元数据看iceberg中有哪些表

    SELECT t.TBL_NAME FROM TBLS t JOIN TABLE_PARAMS p ON t.TBL_ID = p.TBL_ID WHERE p.PARAM_KEY = 'table_type' AND p.PARAM_VALUE = 'ICEBERG';

flink CDC

  1. 解压安装 flink-cdc-3.0.0-bin.tar.gz
    tar -xzvf flink-cdc-3.0.0-bin.tar.gz
     
  2. 将连接器上载到lib
    ph@mydoris:/opt/module/flink-cdc-3.0.0/lib$ ls *pip*con*
    flink-cdc-pipeline-connector-doris-3.0.0.jar 
    flink-cdc-pipeline-connector-mysql-3.0.0.jar
    

     
  3. 开启 checkpoint
    vim $FLINK_HOME/conf/flink-conf.yaml# 每15一次
    execution.checkpointing.interval: 15000# 重启集群
    $FLINK_HOME/bin/stop-cluster.sh
    $FLINK_HOME/bin/start-cluster.sh

  4. 在mysql数据库准备同步数据
    -- 创建数据库
    CREATE DATABASE app_db;USE app_db;-- 创建 orders 表
    CREATE TABLE `orders` (
    `id` INT NOT NULL,
    `price` DECIMAL(10,2) NOT NULL,
    PRIMARY KEY (`id`)
    );-- 插入数据
    INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
    INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- 创建 shipments 表
    CREATE TABLE `shipments` (
    `id` INT NOT NULL,
    `city` VARCHAR(255) NOT NULL,
    PRIMARY KEY (`id`)
    );-- 插入数据
    INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
    INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- 创建 products 表
    CREATE TABLE `products` (
    `id` INT NOT NULL,
    `product` VARCHAR(255) NOT NULL,
    PRIMARY KEY (`id`)
    );-- 插入数据
    INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
    INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
    INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

  5. 在doris中创建数据库
    create database app_db;
  6. 创建整库同步任务配置文件
    mysql-to-doris.yaml
    ################################################################################
    # Description: Sync MySQL all tables to Doris
    ################################################################################
    source:type: mysqlhostname: mydorisport: 3306username: rootpassword: Admin1234tables: app_db.\.*server-id: 5400-5404server-time-zone: UTC+8sink:type: dorisfenodes: mydoris:8030username: rootpassword: "Admin1234"table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2
    

  7. 通过命令行提交任务到 Flink Standalone cluster
    
    ph@mydoris:/opt/module/flink-cdc-3.0.0/yaml$ /opt/module/flink-cdc-3.0.0/bin/flink-cdc.sh mysql-to-doris.yaml
    Pipeline has been submitted to cluster.
    Job ID: daf31e7e87299e00dcd398ae217874d8
    Job Description: Sync MySQL Database to Doris
    

     
  8. 查看作业
  9. 查看表已经同步

    http://www.dtcms.com/a/606942.html

    相关文章:

  • 地接做的网站企业为什么做企业网站和推广
  • STM32 HAL库原子操作编译问题解决指南
  • 珠海门户网站建设多少钱网站做前端
  • 建设银行征信中心网站石景山广州网站建设
  • 08.引用
  • 网站综合查询工具wordpress群聊
  • 服务器里面如何做网站怎么提交网址让百度收录
  • 福州做网站的公成都it培训机构
  • 蓝桥java蜗牛
  • 05.判断和循环
  • wordpress 制作手机站万网域名注册后如何做网站教学
  • DiT block学习
  • 武安市住房和城乡规划建设局网站wordpress下拉框插件
  • 广东东信润建设有限公司网站搜索关键词排名查询
  • fastapi 中的db.add db.comit db.flush db.refresh都是什么意思 有顺序吗
  • 网站301设置上海百度推广排名
  • 免费做店招哪个网站好新站网站推广该如何做
  • 宝安三网合一网站建设佛山外贸网站建设机构
  • 打工人日报#20251113
  • 高压直流270V电源:无人机地面起动概述
  • 华建建设集团网站网站运营方法
  • java 全景图切片处理,前端用pannellum框架加载
  • 嵌入式开发核心题全解析
  • 营销型企业网站系统模板下载重庆宣传片制作
  • IDEA多java版本切换
  • phpcms校园网站厦门网站制作软件
  • 网站推广有什么方法wordpress电影广告插件
  • 广州的一起做网站自己做的网站打不开怎么搞
  • 2025 多场景运营:用 PageAdmin+QuickSSO 搭建站群,1 套认证管 N 个站点
  • AI解锁物流:文档抽取重塑供应链效率