当前位置: 首页 > news >正文

RabbitMQ 自动化脚本安装方案

以下是 RocketMQ 自动化脚本安装方案,支持多种部署方式:


📦 RocketMQ 一键安装脚本

脚本功能

  • ✅ 支持单机版部署

  • ✅ 支持多个版本选择

  • ✅ 自动安装依赖和JDK

  • ✅ 配置NameServer和Broker

  • ✅ 集成控制台Dashboard

  • ✅ 性能优化配置

  • ✅ 系统服务配置


🔧 完整安装脚本

#!/bin/bash
# ====================================================================================
# 文件名称: install_rabbitmq.sh
# 创建作者: XiaoLan
# 创建时间: $(date +%Y-%m-%d)
# 描述信息: 在CentOS 7系统上自动化安装RabbitMQ服务
# ====================================================================================# -----------------------------------
# 1. 颜色定义模块
# -----------------------------------
readonly RED='\033[0;31m'
readonly GREEN='\033[0;32m'
readonly YELLOW='\033[1;33m'
readonly BLUE='\033[0;34m'
readonly PURPLE='\033[0;35m'
readonly CYAN='\033[0;36m'
readonly RESET='\033[0m' # 重置颜色# -----------------------------------
# 2. 输出函数模块
# -----------------------------------
function log_info() {echo -e "${GREEN}[INFO]$(date '+%Y-%m-%d %H:%M:%S')${RESET} $1"
}function log_warn() {echo -e "${YELLOW}[WARN]$(date '+%Y-%m-%d %H:%M:%S')${RESET} $1"
}function log_error() {echo -e "${RED}[ERROR]$(date '+%Y-%m-%d %H:%M:%S')${RESET} $1"
}function log_success() {echo -e "${CYAN}[SUCCESS]$(date '+%Y-%m-%d %H:%M:%S')${RESET} $1"
}# -----------------------------------
# 3. 变量配置模块
# -----------------------------------
readonly SCRIPT_NAME="RabbitMQ自动安装脚本"
readonly SCRIPT_VERSION="1.0.0"
readonly INSTALL_LOG="/var/log/rabbitmq_install.log"
readonly BACKUP_DIR="/opt/backup"
readonly WORK_DIR="/tmp/rabbitmq_install"# RabbitMQ相关配置
readonly RABBITMQ_VERSION="3.8.9"
readonly ERLANG_VERSION="23.2.7"
readonly RABBITMQ_USER="rabbitmq"
readonly RABBITMQ_HOME="/opt/rabbitmq"# 系统配置
readonly SYSTEM_USER=$(whoami)
readonly OS_VERSION=$(cat /etc/centos-release)# -----------------------------------
# 4. 显示横幅模块
# -----------------------------------
function show_banner() {clearecho -e "${CYAN}"echo "  _____       _     _   __  __       _     _ _ _   "echo " |  __ \     | |   | | |  \/  |     | |   (_) | |  "echo " | |__) |__ _| |__ | |_| \  / | __ _| |__  _| | |  "echo " |  _  // _\` | '_ \| __| |\/| |/ _\` | '_ \| | | |  "echo " | | \ \ (_| | |_) | |_| |  | | (_| | |_) | | | |  "echo " |_|  \_\__,_|_.__/ \__|_|  |_|\__,_|_.__/|_|_|_|  "echo -e "${RESET}"echo -e "${YELLOW}====================================================${RESET}"echo -e "${PURPLE} 脚本名称: ${SCRIPT_NAME}${RESET}"echo -e "${PURPLE} 脚本版本: ${SCRIPT_VERSION}${RESET}"echo -e "${PURPLE} 操作系统: ${OS_VERSION}${RESET}"echo -e "${PURPLE} 执行用户: ${SYSTEM_USER}${RESET}"echo -e "${YELLOW}====================================================${RESET}"echo ""
}# -----------------------------------
# 5. 显示安装摘要模块
# -----------------------------------
function show_install_summary() {echo -e "${BLUE}================ 安装摘要 ================${RESET}"echo -e "${GREEN}操作系统版本:${RESET} ${OS_VERSION}"echo -e "${GREEN}RabbitMQ版本:${RESET} ${RABBITMQ_VERSION}"echo -e "${GREEN}Erlang版本:${RESET} ${ERLANG_VERSION}"echo -e "${GREEN}安装日志路径:${RESET} ${INSTALL_LOG}"echo -e "${GREEN}备份目录路径:${RESET} ${BACKUP_DIR}"echo -e "${GREEN}工作目录路径:${RESET} ${WORK_DIR}"echo -e "${BLUE}=========================================${RESET}"echo ""
}# -----------------------------------
# 6. 环境检查模块
# -----------------------------------
function check_environment() {log_info "开始检查系统环境..."# 检查是否为root用户if [[ $EUID -ne 0 ]]; thenlog_error "此脚本需要root权限运行,请使用sudo执行"exit 1fi# 检查操作系统版本if ! grep -q "CentOS Linux release 7" /etc/centos-release; thenlog_error "此脚本仅支持CentOS 7系统"exit 1fi# 检查网络连接if ! ping -c 3 8.8.8.8 &> /dev/null; thenlog_error "网络连接异常,请检查网络配置"exit 1filog_success "系统环境检查通过"
}# -----------------------------------
# 7. 创建目录结构模块
# -----------------------------------
function create_directories() {log_info "创建必要的目录结构..."# 创建备份目录if [[ ! -d ${BACKUP_DIR} ]]; thenmkdir -p ${BACKUP_DIR}log_info "创建备份目录: ${BACKUP_DIR}"fi# 创建工作目录if [[ ! -d ${WORK_DIR} ]]; thenmkdir -p ${WORK_DIR}log_info "创建工作目录: ${WORK_DIR}"fi# 创建日志目录local log_dir=$(dirname ${INSTALL_LOG})if [[ ! -d ${log_dir} ]]; thenmkdir -p ${log_dir}log_info "创建日志目录: ${log_dir}"filog_success "目录结构创建完成"
}# -----------------------------------
# 8. 安装依赖模块
# -----------------------------------
function install_dependencies_old() {log_info "开始安装系统依赖..."# 更新系统软件包yum clean all >> ${INSTALL_LOG} 2>&1yum makecache >> ${INSTALL_LOG} 2>&1# 安装必要依赖local dependencies=("wget" "curl" "ca-certificates" "gnupg" "socat")for dep in "${dependencies[@]}"; doif ! rpm -q ${dep} &> /dev/null; thenlog_info "正在安装依赖: ${dep}"yum install -y ${dep} >> ${INSTALL_LOG} 2>&1if [[ $? -ne 0 ]]; thenlog_error "依赖安装失败: ${dep}"exit 1fielselog_info "依赖已存在: ${dep}"fidonelog_success "系统依赖安装完成"
}# -----------------------------------
# 8. 安装依赖模块 (修复版本)
# -----------------------------------
function install_dependencies() {log_info "开始安装系统依赖..."# 更新系统软件包缓存log_info "更新系统软件包缓存..."yum clean all >> ${INSTALL_LOG} 2>&1yum makecache >> ${INSTALL_LOG} 2>&1if [[ $? -ne 0 ]]; thenlog_warn "系统软件包缓存更新失败,尝试继续安装依赖..."fi# 安装必要依赖local dependencies=("wget" "curl" "ca-certificates" "socat")for dep in "${dependencies[@]}"; doif rpm -q ${dep} &> /dev/null; thenlog_info "依赖已存在: ${dep}"continuefilog_info "正在安装依赖: ${dep}"yum install -y ${dep} >> ${INSTALL_LOG} 2>&1if [[ $? -ne 0 ]]; thenlog_warn "依赖安装失败: ${dep},将继续安装其他依赖"elselog_info "依赖安装成功: ${dep}"fidone# 单独处理GPG相关组件install_gnupg_packagelog_success "系统依赖安装完成"
}# -----------------------------------
# GPG组件安装函数
# -----------------------------------
function install_gnupg_package() {log_info "处理GPG相关组件安装..."# 检查系统中是否已有GPG工具if command -v gpg &> /dev/null || command -v gpg2 &> /dev/null; thenlog_info "GPG工具已存在"return 0fi# 尝试安装gnupglog_info "尝试安装gnupg..."yum install -y gnupg >> ${INSTALL_LOG} 2>&1if [[ $? -eq 0 ]]; thenlog_info "gnupg安装成功"return 0fi# 尝试安装gnupg2log_info "尝试安装gnupg2..."yum install -y gnupg2 >> ${INSTALL_LOG} 2>&1if [[ $? -eq 0 ]]; thenlog_info "gnupg2安装成功"# 如果安装了gnupg2但没有gnupg命令,创建符号链接if command -v gpg2 &> /dev/null && ! command -v gpg &> /dev/null; thenln -s $(which gpg2) /usr/local/bin/gpg 2>/dev/null && log_info "创建gpg符号链接成功" || log_warn "创建gpg符号链接失败"fireturn 0fi# 尝试从基础仓库安装log_info "尝试从base仓库安装gnupg..."yum install -y --disablerepo=* --enablerepo=base gnupg >> ${INSTALL_LOG} 2>&1if [[ $? -eq 0 ]]; thenlog_info "从base仓库安装gnupg成功"return 0fi# 最后尝试安装gpgme(提供基本GPG功能)log_info "尝试安装gpgme..."yum install -y gpgme >> ${INSTALL_LOG} 2>&1if [[ $? -eq 0 ]]; thenlog_info "gpgme安装成功,提供基本GPG功能"return 0fi# 如果所有方法都失败,记录警告但继续执行log_warn "所有GPG组件安装尝试都失败,RabbitMQ安装可能缺少GPG验证功能"
}# -----------------------------------
# 9. 安装Erlang模块
# -----------------------------------
function install_erlang_old() {log_info "开始安装Erlang/OTP ${ERLANG_VERSION}..."# 添加Erlang仓库if [[ ! -f /etc/yum.repos.d/rabbitmq_erlang.repo ]]; thencat > /etc/yum.repos.d/rabbitmq_erlang.repo << EOF
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/\$basearch
repo_gpgcheck=0
gpgcheck=0
enabled=1
EOFlog_info "已添加Erlang仓库配置"fi# 安装Erlangyum install -y erlang-${ERLANG_VERSION} >> ${INSTALL_LOG} 2>&1if [[ $? -ne 0 ]]; thenlog_error "Erlang安装失败"exit 1fi# 验证安装local erlang_version=$(erl -eval 'erlang:display(erlang:system_info(otp_release)), halt().' -noshell 2>/dev/null | tr -d '"')if [[ "${erlang_version}" == "${ERLANG_VERSION%.*}"* ]]; thenlog_success "Erlang ${erlang_version} 安装成功"elselog_error "Erlang版本验证失败"exit 1fi
}# -----------------------------------
# Erlang手动安装函数 (增强版 - 多重下载源)
# -----------------------------------
function install_erlang_manual() {log_info "开始手动安装Erlang..."cd ${WORK_DIR}# 多个Erlang RPM包下载链接(按优先级排序)local download_urls=("https://github.com/rabbitmq/erlang-rpm/releases/download/v23.3.4.11/erlang-23.3.4.11-1.el7.x86_64.rpm""https://packages.erlang-solutions.com/erlang/rpm/centos/7/x86_64/esl-erlang_23.3.4.11-1~centos~7_amd64.rpm""https://github.com/rabbitmq/erlang-rpm/releases/download/v22.3.4.9/erlang-22.3.4.9-1.el7.x86_64.rpm""https://packages.erlang-solutions.com/erlang/rpm/centos/7/x86_64/esl-erlang_22.3.4.9-1~centos~7_amd64.rpm")local filename=""# 尝试从多个源下载for url in "${download_urls[@]}"; dofilename=$(basename ${url})log_info "尝试下载Erlang RPM包: ${filename}"# 使用多种下载方式尝试if command -v wget &> /dev/null; thenwget --timeout=60 --tries=3 ${url} -O ${filename} >> ${INSTALL_LOG} 2>&1elif command -v curl &> /dev/null; thencurl -L --connect-timeout 60 -o ${filename} ${url} >> ${INSTALL_LOG} 2>&1elselog_warn "系统中未找到wget或curl命令"continuefiif [[ $? -eq 0 && -f ${filename} && -s ${filename} ]]; thenlog_success "Erlang RPM包下载成功: ${filename}"breakelselog_warn "Erlang RPM包下载失败: ${url}"rm -f ${filename}filename=""fidone# 如果所有下载都失败if [[ -z "${filename}" || ! -f ${filename} ]]; thenlog_error "所有Erlang RPM包下载都失败"log_info "尝试使用EPEL仓库安装Erlang..."install_erlang_from_epelreturnfi# 验证下载的文件if [[ ! -s ${filename} ]]; thenlog_error "下载的RPM包文件为空"rm -f ${filename}exit 1fi# 显示文件信息log_info "下载的RPM包信息:"rpm -qip ${filename} >> ${INSTALL_LOG} 2>&1 || log_warn "无法获取RPM包信息"# 开始安装log_info "开始安装Erlang RPM包..."# 使用--nodeps忽略依赖关系,因为这是手动安装rpm -ivh --nodeps ${filename} >> ${INSTALL_LOG} 2>&1if [[ $? -eq 0 && -n "$(command -v erl 2>/dev/null)" ]]; thenlocal erlang_version=$(erl -eval 'erlang:display(erlang:system_info(otp_release)), halt().' -noshell 2>/dev/null | tr -d '"')log_success "Erlang ${erlang_version} 手动安装成功"rm -f ${filename}return 0elselog_error "Erlang RPM安装失败"# 显示安装错误详情rpm -ivh --test ${filename} >> ${INSTALL_LOG} 2>&1 2>&1rm -f ${filename}exit 1fi
}# -----------------------------------
# 从EPEL仓库安装Erlang
# -----------------------------------
function install_erlang_from_epel() {log_info "尝试从EPEL仓库安装Erlang..."# 安装EPEL仓库if ! rpm -q epel-release &> /dev/null; thenlog_info "安装EPEL仓库..."yum install -y epel-release >> ${INSTALL_LOG} 2>&1if [[ $? -ne 0 ]]; thenlog_error "EPEL仓库安装失败"exit 1fifi# 更新缓存yum clean all >> ${INSTALL_LOG} 2>&1yum makecache >> ${INSTALL_LOG} 2>&1# 尝试安装Erlanglog_info "从EPEL仓库安装Erlang..."yum install -y erlang >> ${INSTALL_LOG} 2>&1if [[ $? -eq 0 && -n "$(command -v erl 2>/dev/null)" ]]; thenlocal erlang_version=$(erl -eval 'erlang:display(erlang:system_info(otp_release)), halt().' -noshell 2>/dev/null | tr -d '"')log_success "Erlang ${erlang_version} 从EPEL仓库安装成功"return 0elselog_error "从EPEL仓库安装Erlang失败"exit 1fi
}# -----------------------------------
# 9. 安装Erlang模块 (增强修复版)
# -----------------------------------
function install_erlang() {log_info "开始安装Erlang/OTP..."# 首先检查是否已经安装了Erlangif command -v erl &> /dev/null; thenlocal existing_version=$(erl -eval 'erlang:display(erlang:system_info(otp_release)), halt().' -noshell 2>/dev/null | tr -d '"')log_info "检测到已安装的Erlang版本: ${existing_version}"return 0fi# 方法1: 使用RabbitMQ官方仓库log_info "方法1: 使用RabbitMQ官方仓库安装Erlang..."install_erlang_rabbitmq_repo# 检查安装结果if command -v erl &> /dev/null; thenlocal erlang_version=$(erl -eval 'erlang:display(erlang:system_info(otp_release)), halt().' -noshell 2>/dev/null | tr -d '"')log_success "Erlang ${erlang_version} 安装成功"return 0fi# 方法2: 手动下载RPM包log_warn "方法1失败,尝试方法2: 手动下载RPM包..."install_erlang_manual# 检查安装结果if command -v erl &> /dev/null; thenlocal erlang_version=$(erl -eval 'erlang:display(erlang:system_info(otp_release)), halt().' -noshell 2>/dev/null | tr -d '"')log_success "Erlang ${erlang_version} 安装成功"return 0fi# 方法3: 使用EPEL仓库log_warn "方法2失败,尝试方法3: 使用EPEL仓库..."install_erlang_from_epel# 最终检查if command -v erl &> /dev/null; thenlocal erlang_version=$(erl -eval 'erlang:display(erlang:system_info(otp_release)), halt().' -noshell 2>/dev/null | tr -d '"')log_success "Erlang ${erlang_version} 安装成功"return 0elselog_error "所有Erlang安装方法都失败"exit 1fi
}# -----------------------------------
# 使用RabbitMQ官方仓库安装Erlang
# -----------------------------------
function install_erlang_rabbitmq_repo() {log_info "配置RabbitMQ Erlang仓库..."# 删除旧的仓库配置if [[ -f /etc/yum.repos.d/rabbitmq_erlang.repo ]]; thenrm -f /etc/yum.repos.d/rabbitmq_erlang.repofi# 添加新的仓库配置(禁用GPG检查)cat > /etc/yum.repos.d/rabbitmq_erlang.repo << EOF
[rabbitmq_erlang]
name=RabbitMQ Erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/\$basearch
repo_gpgcheck=0
gpgcheck=0
enabled=1
EOFlog_info "已添加Erlang仓库配置(已禁用GPG检查)"# 清理并更新缓存log_info "清理和更新YUM缓存..."yum clean all >> ${INSTALL_LOG} 2>&1yum makecache >> ${INSTALL_LOG} 2>&1# 尝试安装Erlanglog_info "正在安装Erlang..."yum install -y erlang >> ${INSTALL_LOG} 2>&1if [[ $? -eq 0 ]]; thenlog_success "Erlang通过YUM安装成功"return 0elselog_warn "通过YUM安装Erlang失败"return 1fi
}# -----------------------------------
# 10. 安装RabbitMQ模块
# -----------------------------------
function install_rabbitmq_old() {log_info "开始安装RabbitMQ ${RABBITMQ_VERSION}..."# 添加RabbitMQ仓库if [[ ! -f /etc/yum.repos.d/rabbitmq.repo ]]; thencat > /etc/yum.repos.d/rabbitmq.repo << EOF
[bintray-rabbitmq-server]
name=bintray-rabbitmq-server
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1
EOFlog_info "已添加RabbitMQ仓库配置"fi# 安装RabbitMQyum install -y rabbitmq-server-${RABBITMQ_VERSION} >> ${INSTALL_LOG} 2>&1if [[ $? -ne 0 ]]; thenlog_error "RabbitMQ安装失败"exit 1filog_success "RabbitMQ安装完成"
}# -----------------------------------
# 10. 安装RabbitMQ模块 (最终修复版本)
# -----------------------------------
function install_rabbitmq() {log_info "开始安装RabbitMQ..."# 导入 RabbitMQ 签名密钥log_info "导入RabbitMQ签名密钥..."rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc >> ${INSTALL_LOG} 2>&1local import_result=$?if [[ ${import_result} -ne 0 ]]; thenlog_warn "导入RabbitMQ GPG密钥失败,将继续安装(可能会有警告)"elselog_info "成功导入RabbitMQ GPG密钥"fi# 添加新的RabbitMQ仓库 (使用官方Package Cloud仓库)log_info "配置RabbitMQ官方仓库..."if [[ -f /etc/yum.repos.d/rabbitmq.repo ]]; thenrm -f /etc/yum.repos.d/rabbitmq.repoficat > /etc/yum.repos.d/rabbitmq.repo << 'EOF'
[rabbitmq_server]
name=RabbitMQ Server
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/$basearch
enabled=1
gpgcheck=0
repo_gpgcheck=0
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
EOF# 同时添加Erlang仓库(确保版本匹配)if [[ ! -f /etc/yum.repos.d/rabbitmq_erlang.repo ]]; thencat > /etc/yum.repos.d/rabbitmq_erlang.repo << 'EOF'
[rabbitmq_erlang]
name=RabbitMQ Erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch
enabled=1
gpgcheck=0
repo_gpgcheck=0
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
EOFfilog_info "已添加RabbitMQ和Erlang仓库配置"# 清理并更新缓存log_info "清理和更新YUM缓存..."yum clean all >> ${INSTALL_LOG} 2>&1yum makecache >> ${INSTALL_LOG} 2>&1# 查看可用的RabbitMQ版本log_info "检查可用的RabbitMQ版本..."yum list available rabbitmq-server --showduplicates >> ${INSTALL_LOG} 2>&1# 先尝试安装最新版本log_info "正在安装RabbitMQ服务器..."yum install -y rabbitmq-server >> ${INSTALL_LOG} 2>&1if [[ $? -eq 0 ]]; thenlog_success "RabbitMQ安装完成"return 0fi# 如果标准安装失败,尝试指定版本安装log_warn "标准安装失败,尝试指定版本安装..."local versions_to_try=("3.8.9" "3.9.13" "3.10.7")for version in "${versions_to_try[@]}"; dolog_info "尝试安装RabbitMQ版本: ${version}"yum install -y rabbitmq-server-${version} >> ${INSTALL_LOG} 2>&1if [[ $? -eq 0 ]]; thenlog_success "RabbitMQ ${version} 安装完成"return 0fidone# 如果YUM安装全部失败,尝试手动下载安装log_warn "YUM安装失败,尝试手动下载安装..."install_rabbitmq_manual
}# -----------------------------------
# RabbitMQ手动安装函数
# -----------------------------------
function install_rabbitmq_manual() {log_info "开始手动安装RabbitMQ..."cd ${WORK_DIR}# 尝试下载几个不同版本local download_urls=("https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server-3.8.9-1.el7.noarch.rpm""https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.9.13/rabbitmq-server-3.9.13-1.el7.noarch.rpm""https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.7/rabbitmq-server-3.10.7-1.el7.noarch.rpm")for url in "${download_urls[@]}"; dolocal filename=$(basename ${url})log_info "尝试下载: ${filename}"wget --timeout=30 ${url} -O ${filename} >> ${INSTALL_LOG} 2>&1if [[ $? -eq 0 && -f ${filename} ]]; thenlog_info "下载成功,开始安装..."# 尝试安装(忽略依赖关系)rpm -ivh --nodeps ${filename} >> ${INSTALL_LOG} 2>&1if [[ $? -eq 0 ]]; thenlog_success "RabbitMQ手动安装成功: ${filename}"return 0elselog_warn "安装失败: ${filename}"rm -f ${filename}fielselog_warn "下载失败: ${url}"rm -f ${filename}fidone# 如果所有方法都失败log_error "所有RabbitMQ安装方法都失败"log_info "请检查网络连接或手动下载安装包"log_info "安装日志位置: ${INSTALL_LOG}"exit 1
}
# -------------------------# -----------------------------------
# 11. 配置RabbitMQ模块
# -----------------------------------
function configure_rabbitmq() {log_info "开始配置RabbitMQ..."# 启用管理插件rabbitmq-plugins enable rabbitmq_management >> ${INSTALL_LOG} 2>&1if [[ $? -ne 0 ]]; thenlog_error "启用管理插件失败"exit 1filog_info "已启用RabbitMQ管理插件"# 设置开机自启systemctl enable rabbitmq-server >> ${INSTALL_LOG} 2>&1if [[ $? -ne 0 ]]; thenlog_error "设置开机自启失败"exit 1filog_info "已设置RabbitMQ开机自启"log_success "RabbitMQ配置完成"
}# -----------------------------------
# 12. 启动服务模块
# -----------------------------------
function start_rabbitmq_service_old() {log_info "启动RabbitMQ服务..."# 启动服务systemctl start rabbitmq-server >> ${INSTALL_LOG} 2>&1if [[ $? -ne 0 ]]; thenlog_error "启动RabbitMQ服务失败"exit 1fi# 检查服务状态sleep 5if systemctl is-active --quiet rabbitmq-server; thenlog_success "RabbitMQ服务启动成功"elselog_error "RabbitMQ服务启动失败"exit 1fi
}# -----------------------------------
# 12. 启动服务模块 (增强版)
# -----------------------------------
function start_rabbitmq_service() {log_info "启动RabbitMQ服务..."# 检查服务文件是否存在if [[ ! -f /usr/lib/systemd/system/rabbitmq-server.service ]]; thenlog_error "RabbitMQ服务文件不存在,安装可能失败"exit 1fi# 重新加载systemd配置systemctl daemon-reload >> ${INSTALL_LOG} 2>&1# 启动服务systemctl start rabbitmq-server >> ${INSTALL_LOG} 2>&1# 等待服务启动local max_attempts=5local attempt=1while [[ ${attempt} -le ${max_attempts} ]]; doif systemctl is-active --quiet rabbitmq-server; thenlog_success "RabbitMQ服务启动成功"return 0filog_info "等待RabbitMQ服务启动... (尝试 ${attempt}/${max_attempts})"sleep 5((attempt++))done# 如果启动失败,显示错误信息log_error "RabbitMQ服务启动失败"log_info "检查服务状态: systemctl status rabbitmq-server"log_info "查看服务日志: journalctl -u rabbitmq-server"# 尝试手动诊断diagnose_rabbitmq_issueexit 1
}# -----------------------------------
# RabbitMQ问题诊断函数
# -----------------------------------
function diagnose_rabbitmq_issue() {log_info "开始诊断RabbitMQ问题..."# 检查Erlang是否安装if ! command -v erl &> /dev/null; thenlog_error "Erlang未正确安装"return 1fi# 检查RabbitMQ二进制文件if ! command -v rabbitmqctl &> /dev/null; thenlog_error "rabbitmqctl命令不存在"return 1fi# 检查端口占用if netstat -tlnp | grep -q ":5672 "; thenlog_warn "端口5672已被占用"fi# 检查文件权限local rabbitmq_dirs=("/var/lib/rabbitmq" "/var/log/rabbitmq")for dir in "${rabbitmq_dirs[@]}"; doif [[ -d ${dir} ]]; thenlocal owner=$(stat -c '%U' ${dir})if [[ ${owner} != "rabbitmq" ]]; thenlog_warn "目录 ${dir} 的所有者不是rabbitmq用户"fifidonelog_info "诊断完成"
}# -----------------------------------
# 13. 创建管理用户模块
# -----------------------------------
function create_admin_user() {log_info "创建RabbitMQ管理用户..."# 设置默认用户名和密码(生产环境应修改)local admin_user="admin"local admin_pass="admin123"# 创建管理用户rabbitmqctl add_user ${admin_user} ${admin_pass} >> ${INSTALL_LOG} 2>&1if [[ $? -ne 0 ]]; thenlog_error "创建管理用户失败"exit 1fi# 设置用户权限rabbitmqctl set_user_tags ${admin_user} administrator >> ${INSTALL_LOG} 2>&1rabbitmqctl set_permissions -p / ${admin_user} ".*" ".*" ".*" >> ${INSTALL_LOG} 2>&1log_warn "默认管理用户已创建: ${admin_user}/${admin_pass},生产环境请立即修改密码"log_success "管理用户创建完成"
}# -----------------------------------
# 14. 防火墙配置模块
# -----------------------------------
function configure_firewall() {log_info "配置防火墙规则..."# 检查firewalld状态if systemctl is-active --quiet firewalld; then# 开放RabbitMQ端口firewall-cmd --permanent --add-port=5672/tcp >> ${INSTALL_LOG} 2>&1firewall-cmd --permanent --add-port=15672/tcp >> ${INSTALL_LOG} 2>&1firewall-cmd --reload >> ${INSTALL_LOG} 2>&1log_info "已开放RabbitMQ端口: 5672, 15672"elselog_warn "Firewalld未运行,跳过防火墙配置"filog_success "防火墙配置完成"
}# -----------------------------------
# 15. 安装后验证模块
# -----------------------------------
function post_install_check() {log_info "执行安装后验证..."# 验证服务状态if systemctl is-active --quiet rabbitmq-server; thenlog_info "RabbitMQ服务运行正常"elselog_error "RabbitMQ服务未运行"exit 1fi# 验证端口监听if netstat -tlnp | grep -q ":5672 "; thenlog_info "AMQP端口(5672)监听正常"elselog_warn "AMQP端口(5672)未监听"fiif netstat -tlnp | grep -q ":15672 "; thenlog_info "管理界面端口(15672)监听正常"elselog_warn "管理界面端口(15672)未监听"filog_success "安装后验证完成"
}# -----------------------------------
# 16. 显示完成信息模块
# -----------------------------------
function show_completion_info() {echo -e "${BLUE}"echo "=============================================="echo "        RabbitMQ安装完成!"echo "=============================================="echo -e "${RESET}"echo -e "${GREEN}服务状态:${RESET} $(systemctl is-active rabbitmq-server)"echo -e "${GREEN}服务开机自启:${RESET} $(systemctl is-enabled rabbitmq-server)"echo -e "${GREEN}管理界面:${RESET} http://$(hostname -I | awk '{print $1}'):15672"echo -e "${GREEN}默认用户:${RESET} admin / admin123"echo -e "${GREEN}安装日志:${RESET} ${INSTALL_LOG}"echo ""echo -e "${YELLOW}注意事项:${RESET}"echo "1. 请及时修改默认管理用户密码"echo "2. 根据需要配置集群或高可用"echo "3. 检查防火墙设置是否符合安全要求"echo ""
}# -----------------------------------
# 17. 主函数模块
# -----------------------------------
function main() {# 显示横幅show_banner# 显示安装摘要show_install_summary# 确认安装echo -e "${YELLOW}请确认是否继续安装? (y/N)${RESET}"read -r confirmif [[ ! "${confirm}" =~ ^[Yy]$ ]]; thenlog_info "用户取消安装"exit 0fi# 开始安装流程{check_environmentcreate_directoriesinstall_dependenciesinstall_erlanginstall_rabbitmqconfigure_rabbitmqstart_rabbitmq_servicecreate_admin_userconfigure_firewallpost_install_check} 2>&1 | tee -a ${INSTALL_LOG}# 显示完成信息show_completion_info
}# -----------------------------------
# 18. 脚本入口点
# -----------------------------------
# 检查是否有参数传入
case "${1}" in--help|-h)echo "用法: $0 [选项]"echo "选项:"echo "  -h, --help     显示帮助信息"echo "  -v, --version  显示版本信息"echo ""echo "描述:"echo "  此脚本用于在CentOS 7系统上自动化安装RabbitMQ服务"exit 0;;--version|-v)echo "${SCRIPT_NAME} 版本 ${SCRIPT_VERSION}"exit 0;;*)main;;
esac

🔍 安装执行过程


🔍 管理界面访问

🐍 生产者代码(Java)

添加Maven依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version>
</dependency>
package com.xl.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConfirmListener;import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;/*** 模拟发送500条消息的生产者代码*/
public class RabbitMQProducerBatch {private static final String EXCHANGE_NAME = "batch_message_exchange";private static final String QUEUE_NAME = "batch_message_queue";private static final String ROUTING_KEY = "batch.routing.key";private static final int TOTAL_MESSAGES = 1000;private static AtomicInteger successCount = new AtomicInteger(0);private static AtomicInteger failCount = new AtomicInteger(0);private static ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();// 配置连接信息 - 请根据实际情况修改factory.setHost("192.168.198.120"); // RabbitMQ 服务器地址factory.setPort(5672);factory.setUsername("admin"); // 用户名factory.setPassword("admin123"); // 密码factory.setVirtualHost("/");// 设置连接超时和时间间隔factory.setConnectionTimeout(30000);factory.setRequestedHeartbeat(60);Connection connection = null;Channel channel = null;try {System.out.println("【生产者】开始连接RabbitMQ服务器...");long startTime = System.currentTimeMillis();// 建立连接connection = factory.newConnection();channel = connection.createChannel();System.out.println("【生产者】连接成功!耗时: " + (System.currentTimeMillis() - startTime) + "ms");// 1. 声明交换机和队列System.out.println("【生产者】声明交换机和队列...");channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);  // 持久化交换机channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 持久化队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);System.out.println("【生产者】交换机和队列声明完成");// 2. 开启发布确认模式System.out.println("【生产者】开启发布确认模式...");channel.confirmSelect();// 3. 添加确认监听器channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("【确认成功】DeliveryTag: " + deliveryTag + ", Multiple: " + multiple);handleConfirm(deliveryTag, multiple, true);}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.err.println("【确认失败】DeliveryTag: " + deliveryTag + ", Multiple: " + multiple);handleConfirm(deliveryTag, multiple, false);}});// 4. 批量发送500条消息System.out.println("【生产者】开始批量发送 " + TOTAL_MESSAGES + " 条消息...");System.out.println("===============================================");for (int i = 1; i <= TOTAL_MESSAGES; i++) {String message = "第 " + i + " 条测试消息 - 时间戳: " + System.currentTimeMillis();// 存储未确认的消息long nextPublishSeqNo = channel.getNextPublishSeqNo();outstandingConfirms.put(nextPublishSeqNo, message);// 发送消息channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes("UTF-8"));// 打印发送进度if (i % 50 == 0) {System.out.println("【发送进度】已发送 " + i + "/" + TOTAL_MESSAGES + " 条消息");}// 模拟生产间隔if (i % 100 == 0) {Thread.sleep(100); // 每100条休息100ms}}System.out.println("===============================================");System.out.println("【生产者】所有消息发送完成,等待确认...");// 5. 等待所有消息被确认(最多等待30秒)boolean allConfirmed = channel.waitForConfirms(30000);if (allConfirmed) {System.out.println("【生产者】所有消息确认完成!");} else {System.err.println("【生产者】等待确认超时,可能还有未确认的消息");}// 6. 统计结果long endTime = System.currentTimeMillis();long totalTime = endTime - startTime;System.out.println("\n=============== 发送统计报告 ===============");System.out.println("总消息数: " + TOTAL_MESSAGES);System.out.println("成功确认: " + successCount.get());System.out.println("失败确认: " + failCount.get());System.out.println("未确认消息: " + outstandingConfirms.size());System.out.println("总耗时: " + totalTime + "ms");System.out.println("平均吞吐量: " + (TOTAL_MESSAGES * 1000.0 / totalTime) + " 条/秒");System.out.println("===============================================");// 7. 如果有未确认的消息,打印详情if (!outstandingConfirms.isEmpty()) {System.err.println("\n【警告】以下消息未确认:");for (String unconfirmedMsg : outstandingConfirms.values()) {System.err.println("未确认: " + unconfirmedMsg);}}} catch (IOException | TimeoutException | InterruptedException e) {System.err.println("【生产者错误】发生异常: " + e.getMessage());e.printStackTrace();// 打印连接信息用于调试System.err.println("连接信息: " + factory.getHost() + ":" + factory.getPort());} finally {// 8. 关闭连接try {if (channel != null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}System.out.println("【生产者】连接已关闭");} catch (IOException | TimeoutException e) {System.err.println("关闭连接时发生错误: " + e.getMessage());}}}/*** 处理消息确认结果*/private static void handleConfirm(long deliveryTag, boolean multiple, boolean ack) {if (multiple) {// 批量确认ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);int batchSize = confirmed.size();if (ack) {successCount.addAndGet(batchSize);System.out.println("【批量确认成功】数量: " + batchSize + ", 截至DeliveryTag: " + deliveryTag);} else {failCount.addAndGet(batchSize);System.err.println("【批量确认失败】数量: " + batchSize + ", 截至DeliveryTag: " + deliveryTag);}confirmed.clear();} else {// 单条确认String message = outstandingConfirms.remove(deliveryTag);if (ack) {successCount.incrementAndGet();if (successCount.get() % 50 == 0) {System.out.println("【单条确认成功】累计: " + successCount.get() + ", 内容: " +(message != null ? message.substring(0, Math.min(20, message.length())) + "..." : "N/A"));}} else {failCount.incrementAndGet();System.err.println("【单条确认失败】DeliveryTag: " + deliveryTag + ", 内容: " + message);}}// 实时显示进度int totalProcessed = successCount.get() + failCount.get();if (totalProcessed % 50 == 0) {System.out.println("【确认进度】已确认: " + totalProcessed + "/" + TOTAL_MESSAGES +" (成功: " + successCount.get() + ", 失败: " + failCount.get() + ")");}}
}

🐍 消费者代码(Java)

package com.xl.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;/*** 批量消费*/
public class RabbitMQConsumerBatch {private static final String EXCHANGE_NAME = "batch_message_exchange";private static final String QUEUE_NAME = "batch_message_queue";private static final String ROUTING_KEY = "batch.routing.key";private static AtomicInteger messageCount = new AtomicInteger(0);private static long startTime = 0;public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.198.120"); // RabbitMQ 服务器地址factory.setPort(5672);factory.setUsername("admin"); // 用户名factory.setPassword("admin123"); // 密码try {Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明交换机和队列(确保存在)channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);channel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);// 设置QoS,一次处理10条消息channel.basicQos(10);System.out.println("【消费者】等待接收批量消息...");startTime = System.currentTimeMillis();DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");long deliveryTag = delivery.getEnvelope().getDeliveryTag();int currentCount = messageCount.incrementAndGet();// 模拟消息处理try {// 随机模拟处理时间 10-100msThread.sleep((long) (Math.random() * 90 + 10));// 随机模拟1%的失败率if (Math.random() < 0.01) {System.err.println("【消费者】消息处理失败,重新入队: " + deliveryTag);channel.basicNack(deliveryTag, false, true);} else {channel.basicAck(deliveryTag, false);// 每10条打印一次进度if (currentCount % 10 == 0) {long currentTime = System.currentTimeMillis();long elapsed = currentTime - startTime;double rate = currentCount * 1000.0 / elapsed;System.out.println("【消费进度】已处理: " + currentCount + " 条, " +"速率: " + String.format("%.2f", rate) + " 条/秒");}}} catch (InterruptedException e) {System.err.println("处理消息时被中断: " + e.getMessage());channel.basicNack(deliveryTag, false, true);} catch (IOException e) {System.err.println("确认消息时发生错误: " + e.getMessage());}};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {System.out.println("消费者被取消: " + consumerTag);});// 添加关闭钩子,显示最终统计Runtime.getRuntime().addShutdownHook(new Thread(() -> {long endTime = System.currentTimeMillis();long totalTime = endTime - startTime;int finalCount = messageCount.get();System.out.println("\n=============== 消费统计报告 ===============");System.out.println("总消费消息数: " + finalCount);System.out.println("总耗时: " + totalTime + "ms");System.out.println("平均消费速率: " + String.format("%.2f", finalCount * 1000.0 / totalTime) + " 条/秒");System.out.println("===============================================");}));System.out.println("按任意键退出消费者...");System.in.read();channel.close();connection.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}
}

🔍 消息代码解释

这个Java RabbitMQ代码实现了一个完整的消息队列批量处理系统,包含以下核心功能:

1. 生产者核心功能

🔄 批量消息发送

  • 自动发送1000条测试消息到RabbitMQ

  • 每条消息包含序号和时间戳,便于追踪和去重

  • 支持大规模连续消息发送

消息可靠性保证

channel.confirmSelect(); // 开启发布确认
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); // 持久化交换机
channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 持久化队列
  • 发布确认机制:确保消息成功到达Broker

  • 消息持久化:防止服务器重启时数据丢失

  • 事务性保证:明确的成功/失败状态追踪

📊 实时监控与统计系统

private static AtomicInteger successCount = new AtomicInteger(0);
private static AtomicInteger failCount = new AtomicInteger(0);
private static ConcurrentNavigableMap<Long, String> outstandingConfirms;
  • 实时显示发送进度(每50条打印一次)

  • 追踪未确认消息状态

  • 计算吞吐量和性能指标

🎯 智能确认机制

channel.addConfirmListener(new ConfirmListener() {public void handleAck(long deliveryTag, boolean multiple) // 成功确认public void handleNack(long deliveryTag, boolean multiple) // 失败确认
});
  • 批量确认:提高确认效率,减少网络开销

  • 单条确认:精确追踪每条消息状态

  • 异步处理:不阻塞主发送线程

2. 消费者核心功能

🚀 高效消息消费

channel.basicQos(10); // 流量控制
DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消息处理逻辑
};
  • QoS控制:一次最多处理10条消息,避免过载

  • 手动确认:确保消息被正确处理后才确认

  • 并发安全:原子计数器保证统计准确性

模拟真实业务场景

// 随机模拟处理时间 10-100ms
Thread.sleep((long) (Math.random() * 90 + 10));
​
// 随机模拟1%的失败率
if (Math.random() < 0.01) {channel.basicNack(deliveryTag, false, true); // 重新入队
}
  • 随机处理延迟:模拟真实业务处理时间

  • 故障模拟:1%的随机失败率,测试重试机制

  • 重试逻辑:失败消息重新入队处理

📈 性能监控与报告

Runtime.getRuntime().addShutdownHook(new Thread(() -> {// 生成消费统计报告
}));
  • 实时消费进度显示

  • 消费速率计算

  • 关闭时自动生成统计报告

3. 系统级特性

🛡️ 健壮性设计

try {// 业务逻辑
} catch (Exception e) {// 异常处理
} finally {// 资源清理
}
  • 完整异常处理:网络异常、超时等情况的处理

  • 资源自动释放:连接和通道的正确关闭

  • 连接超时控制:30秒连接超时设置

🔧 配置灵活性

factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setConnectionTimeout(30000);
  • 可配置的连接参数

  • 支持不同的认证方式

  • 灵活的网络设置

4. 业务价值

🎯 适用场景

  1. 压力测试 - 验证RabbitMQ集群性能

  2. 可靠性验证 - 测试消息不丢失机制

  3. 性能基准 - 建立系统性能基线

  4. 容错测试 - 验证异常情况下的系统行为

📊 输出价值

  • 可靠性报告:明确每条消息的最终状态

  • 性能指标:吞吐量、延迟等关键metrics

  • 问题诊断:快速定位消息丢失原因

  • 容量规划:为生产环境部署提供数据支撑

5. 技术亮点

🏆 企业级特性

  • 生产就绪:包含企业应用需要的所有可靠性特性

  • 高可用性:支持网络波动和服务器异常

  • 可观测性:完整的日志和监控输出

  • 可扩展性:轻松调整消息数量和处理逻辑

🔍 监控维度

维度生产者消费者
数量统计发送/成功/失败数量消费数量
性能指标发送吞吐量消费速率
状态追踪未确认消息处理失败消息
时间统计总耗时平均处理时间

这个系统不仅是一个消息发送工具,更是一个完整的消息中间件验证框架,适合用于生产环境的性能测试、可靠性验证和容量规划。


🔍 代码执行结果

🔍 生产者

🔍 消费者


🔍 界面数据展示


🚀 快速安装方式

方法1:一键脚本安装

# 下载脚本
wget -O install-rocketmq.sh https://raw.githubusercontent.com/example/install-rocketmq/master/install-rocketmq.sh
​
# 添加执行权限
chmod +x install-rocketmq.sh
​
# 运行安装(默认单机版)
sudo ./install-rocketmq.sh

方法2:Docker快速安装

# 使用Docker Compose安装RocketMQ
mkdir -p /opt/rocketmq && cd /opt/rocketmq
​
cat > docker-compose.yml << EOF
version: '3.8'
services:namesrv:image: apache/rocketmq:4.9.7container_name: rocketmq-namesrvports:- "9876:9876"volumes:- ./logs:/home/rocketmq/logscommand: sh mqnamesrv
​broker:image: apache/rocketmq:4.9.7container_name: rocketmq-brokerports:- "10911:10911"- "10909:10909"volumes:- ./logs:/home/rocketmq/logs- ./store:/home/rocketmq/storeenvironment:- NAMESRV_ADDR=namesrv:9876command: sh mqbroker -c /home/rocketmq/conf/broker.confdepends_on:- namesrv
​dashboard:image: apacherocketmq/rocketmq-dashboard:latestcontainer_name: rocketmq-dashboardports:- "8080:8080"environment:- JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876depends_on:- namesrv- broker
EOF
​
docker-compose up -d

⚙️ 常用管理命令

# 服务管理
sudo systemctl start rocketmq-namesrv      # 启动NameServer
sudo systemctl start rocketmq-broker       # 启动Broker
sudo systemctl start rocketmq-dashboard    # 启动Dashboard
sudo systemctl stop rocketmq-broker        # 停止Broker
sudo systemctl status rocketmq-namesrv     # 查看状态
​
# 使用管理脚本
rocketmq-start        # 启动所有服务
rocketmq-stop         # 停止所有服务
rocketmq-status       # 查看状态
rocketmq-test         # 运行测试
rocketmq-monitor      # 性能监控
​
# RocketMQ管理工具
cd /opt/rocketmq/bin
./mqadmin clusterList -n 127.0.0.1:9876                    # 查看集群列表
./mqadmin topicList -n 127.0.0.1:9876                     # 查看主题列表
./mqadmin updateTopic -n 127.0.0.1:9876 -t TestTopic      # 创建主题
./mqadmin consumerProgress -n 127.0.0.1:9876              # 查看消费者进度

📁 重要目录和文件

/opt/rocketmq/                    # 安装目录
├── bin/                         # 执行脚本
├── conf/                        # 配置文件
│   ├── namesrv.properties       # NameServer配置
│   └── broker.conf             # Broker配置
├── lib/                         # 依赖库
/data/rocketmq/                  # 数据目录
├── store/                       # 消息存储
│   ├── commitlog/              # 提交日志
│   ├── consumequeue/           # 消费队列
│   └── index/                  # 索引文件
/var/log/rocketmq/               # 日志目录
├── namesrv.log                 # NameServer日志
└── broker.log                  # Broker日志

🔧 常用配置修改

调整JVM内存参数

sudo vi /opt/rocketmq/bin/runserver.sh
# 修改NameServer内存:
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g"
​
sudo vi /opt/rocketmq/bin/runbroker.sh  
# 修改Broker内存:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g"

配置Broker存储路径

sudo vi /opt/rocketmq/conf/broker.conf
# 修改存储路径:
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog

调整消息大小限制

sudo vi /opt/rocketmq/conf/broker.conf
# 修改最大消息大小:
maxMessageSize=1048576  # 1MB

配置集群节点

# 在多节点集群中,修改namesrvAddr
namesrvAddr=192.168.1.10:9876;192.168.1.11:9876

🔍 安装验证和测试

# 检查服务状态
sudo systemctl status rocketmq-namesrv
sudo systemctl status rocketmq-broker# 检查端口监听
netstat -tulpn | grep -E "(9876|10911|10909|8080)"
ss -tulpn | grep -E "(9876|10911|10909|8080)"# 运行健康检查
rocketmq-status
rocketmq-test# 使用管理工具测试
cd /opt/rocketmq/bin
./mqadmin clusterList -n 127.0.0.1:9876
./mqadmin topicList -n 127.0.0.1:9876# 创建测试主题
./mqadmin updateTopic -n 127.0.0.1:9876 -t TestTopic -c DefaultCluster# 发送测试消息
./tools.sh org.apache.rocketmq.example.quickstart.Producer# 消费测试消息  
./tools.sh org.apache.rocketmq.example.quickstart.Consumer

这个脚本提供了完整的RocketMQ安装方案,支持单机部署,集成了Dashboard监控,适合高并发消息处理场景!

http://www.dtcms.com/a/512193.html

相关文章:

  • 前端三驾马车(HTML/CSS/JS)核心概念深度解析
  • Debug——主机无法访问虚拟机中Docker开启的容器
  • 网站配色与布局凡客诚品公司介绍
  • 怎么用企业网站做营销商城小程序多少钱
  • 大模型推理中的 Prefill/Decode 分离技术的一些思考
  • PCIe 枚举设备 学习
  • Linux外设驱动模块加载底层原理深度剖析
  • NAS文件远程同步攻略:群晖CloudSync进阶,告别U盘拷贝时代!
  • 关于网站建设请示校园二手交易网站值得做吗
  • 【C语言实战(39)】C语言排序算法实战:冒泡、选择与插入的对决
  • spark组件-spark core(批处理)-rdd血缘
  • 8 款企业微信 SCRM 工具功能对比分析
  • 手机网站建设的方法推广运营平台
  • stack、queue与priority_queue的用法解析与模拟实现
  • 【C++基本功】OOA OOD OOP面向对象彻底详解
  • 切换/获取root权限
  • 爬虫 beautifulSoup 方法
  • 深入BERT内核:用数学解密掩码语言模型的工作原理
  • 在webos中,在桌面上添加应用
  • 【Spring Security】授权(一)
  • 数据结构八大排序:快速排序-挖坑法(递归与非递归)及其优化
  • Docker 中卷、容器、镜像的区别
  • 学习React-21-受控组件非受控组件
  • 银行测试学习计划
  • 电商自建站中企动力网站建设公司
  • 怎么搜 织梦的网站唐山海港经济开发区人才网
  • Qt打包工具Enigma Virtual Box
  • 【同步/异步 日志系统】--- 介绍
  • 【软考备考】 数据与文件的加解密种类详解和使用场景
  • GitLab 版本控制与管理指南