springboot3.4.1集成pulsar
目录
1、部署环境
2、部署pulsar
2.1、2-configmap-dev.yaml
2.2、3-zookeeper-dev.yaml
2.3、4-bookkeeper-dev.yaml
2.4、5-broker-dev.yaml
2.5、启动脚本ng.sh
3、部署pulsar-manager
3.1、6-pulsar-manager-dev.yaml
3.2、进入pulsar-manager-746bd74d85-gnk7c容器执行下面的命令
3.3、登陆系统输入配置
3.4、pulsar-manager效果
4、部署spring-boot程序连接pulsar
4.1、cloud-log-pulsar主要代码
4.1.1、pom.xml
4.1.2、application.yml
4.1.3、PulsarConfig
4.1.4、PulsarConsumer
4.1.5、PulsarInfo
4.1.6、PulsarProducer
4.1.7、ThreadConfig
4.1.8、LogPulsarServiceImpl
4.1.9、LogPulsarController
4.2、测试效果
4.3、Dockerfile
4.4、cloud-log-pulsar.yaml
1、部署环境
名称 | 版本 |
Kubernetes | v1.22.10 |
Kubesphere | v3.4.1 |
pulsar-manager | apachepulsar/pulsar-manager:v0.4.0 |
zookeeper | 3.8 |
bookkeeper | apachepulsar/pulsar:3.2.0 |
broker | apachepulsar/pulsar-all:3.2.0 |
spring-boot | 3.4.1 |
2、部署pulsar
2.1、2-configmap-dev.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: pulsar-config
namespace: dev
labels:
app: pulsar
component: configmap
data:
# ZooKeeper 配置
zoo.cfg: |
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=0.0.0.0:2888:3888# BookKeeper 配置(修复磁盘阈值问题)
bookkeeper.conf: |
# 基础配置
bookiePort=3181
httpServerPort=8000
bookieIPAddress=0.0.0.0
advertisedAddress=bk-0.bk-headless.dev.svc.cluster.local
# 存储路径
journalDirectory=/data/bookkeeper/journal
ledgerDirectories=/data/bookkeeper/ledgers
indexDirectories=/data/bookkeeper/index
# ZooKeeper 连接
zkServers=zk-headless.dev.svc.cluster.local:2181
metadataServiceUri=zk+null://zk-headless.dev.svc.cluster.local:2181/ledgers
zkLedgersRootPath=/ledgers
# 单节点适配
allowLoopback=true
enableLocalTransport=true
ensembleSize=1
writeQuorumSize=1
ackQuorumSize=1
cookieValidationEnabled=false
# 修复磁盘阈值配置(确保阈值在 0-1 之间且 warn <= lwm <= enforce)
diskUsageWarnThreshold=0.90
diskUsageLwmThreshold=0.92
diskUsageEnforceThreshold=0.95
# 资源优化(大幅减少内存使用)
dbStorage_writeCacheMaxSizeMb=32
dbStorage_readAheadCacheMaxSizeMb=16
journalMaxSizeMB=128
journalBufferedWritesThreshold=256
journalFlushWhenQueueEmpty=false
journalAdaptiveGroupWrites=false
journalMaxGroupWaitMSec=1
journalSyncData=false
journalMaxBackups=1
# 内存优化
useHeapForLedugerIndex=true
maxFrameSize=1048576
nettyMaxFrameSizeBytes=1048576# Broker 配置(优化内存使用)
broker.conf: |
# 元数据存储
metadataStoreUrl=zk:zk-headless.dev.svc.cluster.local:2181
configurationStoreUrl=zk:zk-headless.dev.svc.cluster.local:2181
# BookKeeper 适配
managedLedgerDefaultEnsembleSize=1
managedLedgerDefaultAckQuorum=1
managedLedgerDefaultWriteQuorum=1
managedLedgerDefaultWriteQuorumSize=1
managedLedgerDefaultAckQuorumSize=1
bookkeeperClientTimeoutMillis=60000
# 网络配置
brokerServicePort=6650
webServicePort=8080
clusterName=dev-pulsar-cluster
bindAddress=0.0.0.0
# 使用完全限定的域名
advertisedAddress=pulsar-broker.dev.svc.cluster.local
# 服务URL配置
webServiceUrl=http://pulsar-broker.dev.svc.cluster.local:8080
brokerServiceUrl=pulsar://pulsar-broker.dev.svc.cluster.local:6650
# 自动创建
allowAutoNamespaceCreation=true
allowAutoTopicCreation=true
allowAutoTopicCreationType=non-partitioned
# 开发环境优化
brokerDeleteInactiveTopicsEnabled=true
brokerDeduplicationEnabled=true
backlogQuotaDefaultRetentionPolicy=producer_request_hold
loadBalancerEnabled=false
# 内存优化
managedLedgerCacheSizeMB=64
managedLedgerCursorBackloggedThreshold=1000
managedLedgerCursorMaxEntries=50000
managedLedgerMaxEntriesPerLedger=5000
managedLedgerMinLedgerRolloverTimeMinutes=10
managedLedgerMaxLedgerRolloverTimeMinutes=60
managedLedgerOffloadAutoTriggerSizeThresholdBytes=10485760
# 资源与日志
PULSAR_MEM=-Xms256m -Xmx512m# JVM 参数配置(优化内存使用)
jvm.options: |
-XX:+UseG1GC
-XX:MaxGCPauseMillis=10
-XX:ParallelGCThreads=2
-XX:ConcGCThreads=2
-XX:G1NewSizePercent=30
-XX:+DisableExplicitGC
-XX:+ExitOnOutOfMemoryError
-XX:+PerfDisableSharedMem
# 将 GC 日志重定向到标准输出,避免文件权限问题
-Xlog:gc*:stdout:time,level,tags
# 限制元空间大小
-XX:MaxMetaspaceSize=128m
-XX:MetaspaceSize=64m
# 减少线程栈大小
-Xss256k# 日志配置
log4j2.yaml: |
Configuration:
status: error
Appenders:
Console:
name: Console
target: SYSTEM_OUT
PatternLayout:
pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Loggers:
Root:
level: info
AppenderRef:
- ref: Console
---
apiVersion: v1
kind: ConfigMap
metadata:
name: pulsar-log-config
namespace: dev
data:
log4j2.yaml: |
Configuration:
status: error
Appenders:
Console:
name: Console
target: SYSTEM_OUT
PatternLayout:
pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Loggers:
Root:
level: info
AppenderRef:
- ref: Console
2.2、3-zookeeper-dev.yaml
apiVersion: v1
kind: Service
metadata:
name: zk-headless
namespace: dev
labels:
app: pulsar
component: zookeeper
spec:
selector:
app: pulsar
component: zookeeper
clusterIP: None
ports:
- name: client
port: 2181
targetPort: 2181
- name: follower
port: 2888
- name: leader
port: 3888---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zk
namespace: dev
labels:
app: pulsar
component: zookeeper
spec:
serviceName: zk-headless
replicas: 1
selector:
matchLabels:
app: pulsar
component: zookeeper
template:
metadata:
labels:
app: pulsar
component: zookeeper
spec:
initContainers:
- name: fix-permissions
image: busybox:1.35
imagePullPolicy: IfNotPresent
command: ["/bin/sh"]
args:
- -c
- |
mkdir -p /data/zookeeper;
chown -R 1000:1000 /data/zookeeper;
volumeMounts:
- name: zk-data
mountPath: /data/zookeepercontainers:
- name: zookeeper
image: zookeeper:3.8
imagePullPolicy: IfNotPresent
command: ["/bin/sh", "-c"]
args:
- |
echo "1" > /data/zookeeper/myid;
exec /docker-entrypoint.sh zkServer.sh start-foreground;
ports:
- containerPort: 2181
name: client
- containerPort: 2888
name: follower
- containerPort: 3888
name: leader
volumeMounts:
- name: zk-config
mountPath: /conf/zoo.cfg
subPath: zoo.cfg
- name: zk-data
mountPath: /data/zookeeper
resources:
requests:
cpu: 200m
memory: 512Mi
limits:
cpu: 500m
memory: 1Gi
env:
- name: ZOO_4LW_COMMANDS_WHITELIST
value: "srvr,ruok,stat"
#livenessProbe:
#exec:
#command:
#- sh
#- -c
#- "echo ruok | nc localhost 2181 | grep imok"
#initialDelaySeconds: 30
#periodSeconds: 10
#timeoutSeconds: 5
#readinessProbe:
#exec:
#command:
#- sh
#- -c
#- "echo ruok | nc localhost 2181 | grep imok"
#initialDelaySeconds: 5
#periodSeconds: 5
#timeoutSeconds: 5
volumes:
- name: zk-config
configMap:
name: pulsar-config
items:
- key: zoo.cfg
path: zoo.cfg
volumeClaimTemplates:
- metadata:
name: zk-data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "nfs-client"
resources:
requests:
storage: 5Gi
2.3、4-bookkeeper-dev.yaml
apiVersion: v1
kind: Service
metadata:
name: bk-headless
namespace: dev
labels:
app: pulsar
component: bookkeeper
spec:
selector:
app: pulsar
component: bookkeeper
ports:
- port: 3181
name: bookie
targetPort: 3181
- port: 8000
name: http
targetPort: 8000
clusterIP: None
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: bk
namespace: dev
labels:
app: pulsar
component: bookkeeper
spec:
serviceName: bk-headless
replicas: 1
selector:
matchLabels:
app: pulsar
component: bookkeeper
template:
metadata:
labels:
app: pulsar
component: bookkeeper
spec:
# 移除安全上下文,让容器以默认用户运行
initContainers:
# 等待 ZooKeeper 就绪
- name: wait-zk
image: busybox:1.35
command: ["/bin/sh"]
args:
- -c
- |
until nc -z zk-headless.dev.svc.cluster.local 2181; do
echo "Waiting for ZooKeeper..."
sleep 2
done
echo "ZooKeeper is ready"# 准备目录和权限
- name: prepare-dirs
image: busybox:1.35
command: ["/bin/sh"]
args:
- -c
- |
# 创建所有必要的目录
mkdir -p /data/bookkeeper/journal
mkdir -p /data/bookkeeper/ledgers
mkdir -p /data/bookkeeper/index
mkdir -p /pulsar/logs
# 设置正确的权限
chmod -R 777 /data/bookkeeper
chmod -R 777 /pulsar/logs
echo "Directories prepared with correct permissions"
volumeMounts:
- name: bk-journal
mountPath: /data/bookkeeper/journal
- name: bk-ledgers
mountPath: /data/bookkeeper/ledgers
- name: bk-index
mountPath: /data/bookkeeper/index
- name: bk-logs
mountPath: /pulsar/logs
resources:
requests:
cpu: 50m
memory: 32Mi
limits:
cpu: 100m
memory: 64Mi# 初始化 BookKeeper 元数据(使用正确的方法)
- name: init-bk
image: apachepulsar/pulsar:3.2.0
command: ["/bin/sh"]
args:
- -c
- |
# 设置环境变量(使用更少内存)
export JAVA_HOME=/usr/lib/jvm/temurin-17-jdk-amd64
export PATH=$JAVA_HOME/bin:$PATH
export BOOKIE_MEM="-Xms128m -Xmx256m"
# 等待 ZooKeeper 完全就绪
sleep 5
echo "Checking if BookKeeper metadata exists..."
# 检查是否已初始化
if /pulsar/bin/bookkeeper shell whatisinstanceid >/dev/null 2>&1; then
echo "BookKeeper metadata already exists"
else
echo "Initializing BookKeeper metadata"
# 使用正确的 initnewcluster 命令格式(移除 -force 参数)
/pulsar/bin/bookkeeper shell initnewcluster
echo "BookKeeper metadata initialized"
fi
env:
- name: JAVA_HOME
value: "/usr/lib/jvm/temurin-17-jdk-amd64"
- name: BOOKIE_MEM
value: "-Xms128m -Xmx256m"
# 添加 ZooKeeper 连接配置
- name: BK_zkServers
value: "zk-headless.dev.svc.cluster.local:2181"
- name: BK_metadataServiceUri
value: "zk+null://zk-headless.dev.svc.cluster.local:2181/ledgers"
volumeMounts:
- name: bk-journal
mountPath: /data/bookkeeper/journal
- name: bk-ledgers
mountPath: /data/bookkeeper/ledgers
- name: bk-index
mountPath: /data/bookkeeper/index
# 挂载配置文件
- name: bk-config
mountPath: /pulsar/conf/bookkeeper.conf
subPath: bookkeeper.conf
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 200m
memory: 384Micontainers:
- name: bookkeeper
image: apachepulsar/pulsar:3.2.0
command: ["/bin/sh"]
args:
- -c
- |
# 设置环境变量(使用更少内存)
export JAVA_HOME=/usr/lib/jvm/temurin-17-jdk-amd64
export PATH=$JAVA_HOME/bin:$PATH
export BOOKIE_MEM="-Xms256m -Xmx512m"
# 等待 ZooKeeper 完全就绪
sleep 5
# 启动 BookKeeper
exec /pulsar/bin/bookkeeper bookie
ports:
- containerPort: 3181
name: bookie
- containerPort: 8000
name: http
volumeMounts:
- name: bk-config
mountPath: /pulsar/conf/bookkeeper.conf
subPath: bookkeeper.conf
- name: jvm-options
mountPath: /pulsar/conf/jvm.options
subPath: jvm.options
- name: log4j-config
mountPath: /pulsar/conf/log4j2.yaml
subPath: log4j2.yaml
- name: bk-journal
mountPath: /data/bookkeeper/journal
- name: bk-ledgers
mountPath: /data/bookkeeper/ledgers
- name: bk-index
mountPath: /data/bookkeeper/index
- name: bk-logs
mountPath: /pulsar/logs
env:
- name: JAVA_HOME
value: "/usr/lib/jvm/temurin-17-jdk-amd64"
- name: BOOKIE_MEM
value: "-Xms256m -Xmx512m"
- name: BOOKIE_EXTRA_OPTS
value: "-Dbookkeeper.bookie.cookieValidationEnabled=false -Dbookkeeper.bookie.allowEmptyDirs=true -Dlog4j.configurationFile=/pulsar/conf/log4j2.yaml"
# 添加 ZooKeeper 连接配置
- name: BK_zkServers
value: "zk-headless.dev.svc.cluster.local:2181"
- name: BK_metadataServiceUri
value: "zk+null://zk-headless.dev.svc.cluster.local:2181/ledgers"
livenessProbe:
tcpSocket:
port: 3181
initialDelaySeconds: 30
periodSeconds: 30
readinessProbe:
tcpSocket:
port: 3181
initialDelaySeconds: 5
periodSeconds: 10
resources:
requests:
cpu: 250m
memory: 768Mi
limits:
cpu: 500m
memory: 1Givolumes:
- name: bk-config
configMap:
name: pulsar-config
items:
- key: bookkeeper.conf
path: bookkeeper.conf
- name: jvm-options
configMap:
name: pulsar-config
items:
- key: jvm.options
path: jvm.options
- name: log4j-config
configMap:
name: pulsar-log-config
items:
- key: log4j2.yaml
path: log4j2.yaml
- name: bk-index
emptyDir: {}
- name: bk-logs
emptyDir: {}volumeClaimTemplates:
- metadata:
name: bk-journal
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: "nfs-client"
resources:
requests:
storage: 5Gi
- metadata:
name: bk-ledgers
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: "nfs-client"
resources:
requests:
storage: 10Gi
2.4、5-broker-dev.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: pulsar-broker
namespace: dev
labels:
app: pulsar
component: broker
spec:
replicas: 1
selector:
matchLabels:
app: pulsar
component: broker
template:
metadata:
labels:
app: pulsar
component: broker
spec:
# 添加主机别名,确保Pod能解析自己的服务名
hostAliases:
- ip: "127.0.0.1"
hostnames:
- "pulsar-broker"
- "pulsar-broker.dev.svc.cluster.local"
initContainers:
# 等待 ZooKeeper 就绪
- name: wait-zk
image: busybox:1.35
imagePullPolicy: IfNotPresent
command: ["/bin/sh"]
args:
- -c
- |
until nc -z zk-headless.dev.svc.cluster.local 2181; do
echo "Waiting for ZooKeeper...";
sleep 3;
done;
echo "ZooKeeper is ready"
resources:
requests:
cpu: 50m
memory: 32Mi
limits:
cpu: 100m
memory: 64Mi# 等待 BookKeeper 就绪
- name: wait-bk
image: busybox:1.35
imagePullPolicy: IfNotPresent
command: ["/bin/sh"]
args:
- -c
- |
until nc -z bk-headless.dev.svc.cluster.local 3181; do
echo "Waiting for BookKeeper...";
sleep 3;
done;
echo "BookKeeper is ready"
resources:
requests:
cpu: 50m
memory: 32Mi
limits:
cpu: 100m
memory: 64Mi# 准备目录和设置权限
- name: prepare-dirs
image: busybox:1.35
imagePullPolicy: IfNotPresent
command: ["/bin/sh"]
args:
- -c
- |
mkdir -p /pulsar/logs
chmod -R 777 /pulsar/logs
echo "Directories prepared"
volumeMounts:
- name: broker-logs
mountPath: /pulsar/logs
resources:
requests:
cpu: 50m
memory: 32Mi
limits:
cpu: 100m
memory: 64Micontainers:
- name: broker
image: apachepulsar/pulsar-all:3.2.0
imagePullPolicy: IfNotPresent
command: ["/bin/sh"]
args:
- -c
- |
# 设置环境变量
export JAVA_HOME=/usr/lib/jvm/temurin-17-jdk-amd64
export PATH=$JAVA_HOME/bin:$PATH
export PULSAR_MEM="-Xms512m -Xmx1g"
export PULSAR_EXTRA_OPTS="-Dpulsar.log.root.level=info"
# 获取当前Pod的主机名和IP
HOSTNAME=$(hostname)
POD_IP=$(hostname -i)
echo "Current Pod hostname: $HOSTNAME"
echo "Current Pod IP: $POD_IP"
# 检查是否需要初始化元数据
if [ ! -f /pulsar/.cluster_initialized ]; then
echo "Attempting cluster initialization..."
# 使用更简单的方法检查是否已初始化
if /pulsar/bin/pulsar-shell -e "admin clusters list" --fail-on-error 2>/dev/null | grep -q "dev-pulsar-cluster"; then
echo "Cluster already initialized"
else
echo "Initializing cluster metadata..."
/pulsar/bin/pulsar initialize-cluster-metadata \
--cluster dev-pulsar-cluster \
--zookeeper zk-headless.dev.svc.cluster.local:2181 \
--configuration-store zk-headless.dev.svc.cluster.local:2181 \
--web-service-url http://pulsar-broker.dev.svc.cluster.local:8080 \
--broker-service-url pulsar://pulsar-broker.dev.svc.cluster.local:6650 \
|| echo "Cluster initialization may have failed or already completed"
fi
touch /pulsar/.cluster_initialized
fi
# 等待服务完全就绪
sleep 5
# 启动 Broker,使用更详细的日志
echo "Starting Pulsar Broker with extra options: $PULSAR_EXTRA_OPTS"
exec /pulsar/bin/pulsar broker
ports:
- containerPort: 6650
name: pulsar
- containerPort: 8080
name: http
volumeMounts:
- name: broker-config
mountPath: /pulsar/conf/broker.conf
subPath: broker.conf
- name: jvm-options
mountPath: /pulsar/conf/jvm.options
subPath: jvm.options
- name: log4j-config
mountPath: /pulsar/conf/log4j2.yaml
subPath: log4j2.yaml
- name: broker-logs
mountPath: /pulsar/logs
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 1000m
memory: 2Gi
env:
- name: PULSAR_MEM
value: "-Xms512m -Xmx1g"
- name: JAVA_HOME
value: "/usr/lib/jvm/temurin-17-jdk-amd64"
- name: PULSAR_EXTRA_OPTS
value: "-Dlog4j.configurationFile=/pulsar/conf/log4j2.yaml -Dpulsar.log.root.level=info"
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
# 使用完全限定的域名(FQDN)
- name: PULSAR_PREFIX_advertisedAddress
value: "pulsar-broker.dev.svc.cluster.local"
- name: PULSAR_PREFIX_bindAddress
value: "0.0.0.0"
- name: PULSAR_PREFIX_clusterName
value: "dev-pulsar-cluster"
- name: PULSAR_PREFIX_zookeeperServers
value: "zk-headless.dev.svc.cluster.local:2181"
- name: PULSAR_PREFIX_configurationStoreServers
value: "zk-headless.dev.svc.cluster.local:2181"
- name: PULSAR_PREFIX_webServicePort
value: "8080"
- name: PULSAR_PREFIX_brokerServicePort
value: "6650"
# 使用完全限定的域名(FQDN)
- name: PULSAR_PREFIX_serviceUrl
value: "http://pulsar-broker.dev.svc.cluster.local:8080"
- name: PULSAR_PREFIX_brokerServiceUrl
value: "pulsar://pulsar-broker.dev.svc.cluster.local:6650"
# 简化advertisedListeners配置
- name: PULSAR_PREFIX_advertisedListeners
value: "pulsar://pulsar-broker.dev.svc.cluster.local:6650"
- name: PULSAR_PREFIX_loadBalancerOverrideBrokerNicSpeedGbps
value: "10"
# 启用健康检查
livenessProbe:
httpGet:
path: /admin/v2/brokers/health
port: 8080
initialDelaySeconds: 120
periodSeconds: 30
timeoutSeconds: 10
readinessProbe:
httpGet:
path: /admin/v2/brokers/health
port: 8080
initialDelaySeconds: 60
periodSeconds: 15
timeoutSeconds: 10volumes:
- name: broker-config
configMap:
name: pulsar-config
items:
- key: broker.conf
path: broker.conf
- name: jvm-options
configMap:
name: pulsar-config
items:
- key: jvm.options
path: jvm.options
- name: log4j-config
configMap:
name: pulsar-log-config
items:
- key: log4j2.yaml
path: log4j2.yaml
- name: broker-logs
emptyDir: {}
2.5、启动脚本ng.sh
# 1. 删除 BookKeeper StatefulSet 和 Pod
kubectl delete statefulset bk -n dev
kubectl delete pod bk-0 -n dev --grace-period=0 --force # 强制删除# 2. 删除 BookKeeper 持久化存储(清除残留文件)
kubectl delete pvc bk-journal-bk-0 bk-ledgers-bk-0 -n dev# 3. 清除 ZooKeeper 中 BookKeeper 元数据(关键!)
kubectl exec -it zk-0 -n dev -- zkCli.sh deleteall /ledgers
kubectl exec -it zk-0 -n dev -- zkCli.sh deleteall /bookkeeperkubectl delete pvc bk-journal-bk-0 bk-ledgers-bk-0 zk-data-zk-0 -n dev
sleep 10
kubectl delete -f .
sleep 30
kubectl apply -f .
3、部署pulsar-manager
3.1、6-pulsar-manager-dev.yaml
# Persistent Volume Claim
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: pulsar-manager-db-pvc
namespace: dev
spec:
storageClassName: nfs-client
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
---
# Database Initialization Script ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: pulsar-manager-db-init
namespace: dev
data:
init.sql: |
CREATE DATABASE IF NOT EXISTS pulsar_manager;
\c pulsar_manager;CREATE TABLE IF NOT EXISTS environments (
name varchar(256) NOT NULL,
broker varchar(1024) NOT NULL,
bookie varchar(1024) NOT NULL,
token varchar(1024),
CONSTRAINT PK_name PRIMARY KEY (name),
UNIQUE (broker)
);CREATE TABLE IF NOT EXISTS topics_stats (
topic_stats_id BIGSERIAL PRIMARY KEY,
environment varchar(255) NOT NULL,
cluster varchar(255) NOT NULL,
broker varchar(255) NOT NULL,
tenant varchar(255) NOT NULL,
namespace varchar(255) NOT NULL,
bundle varchar(255) NOT NULL,
persistent varchar(36) NOT NULL,
topic varchar(255) NOT NULL,
producer_count BIGINT,
subscription_count BIGINT,
msg_rate_in double precision,
msg_throughput_in double precision,
msg_rate_out double precision,
msg_throughput_out double precision,
average_msg_size double precision,
storage_size double precision,
time_stamp BIGINT
);CREATE INDEX IF NOT EXISTS ix_topics_stats_timestamp ON topics_stats(time_stamp);
CREATE TABLE IF NOT EXISTS publishers_stats (
publisher_stats_id BIGSERIAL PRIMARY KEY,
producer_id BIGINT,
topic_stats_id BIGINT NOT NULL,
producer_name varchar(255) NOT NULL,
msg_rate_in double precision,
msg_throughput_in double precision,
average_msg_size double precision,
address varchar(255),
connected_since varchar(128),
client_version varchar(36),
metadata text,
time_stamp BIGINT,
CONSTRAINT fk_publishers_stats_topic_stats_id FOREIGN KEY (topic_stats_id) REFERENCES topics_stats(topic_stats_id)
);CREATE TABLE IF NOT EXISTS replications_stats (
replication_stats_id BIGSERIAL PRIMARY KEY,
topic_stats_id BIGINT NOT NULL,
cluster varchar(255) NOT NULL,
connected BOOLEAN,
msg_rate_in double precision,
msg_rate_out double precision,
msg_rate_expired double precision,
msg_throughput_in double precision,
msg_throughput_out double precision,
msg_rate_redeliver double precision,
replication_backlog BIGINT,
replication_delay_in_seconds BIGINT,
inbound_connection varchar(255),
inbound_connected_since varchar(255),
outbound_connection varchar(255),
outbound_connected_since varchar(255),
time_stamp BIGINT,
CONSTRAINT FK_replications_stats_topic_stats_id FOREIGN KEY (topic_stats_id) REFERENCES topics_stats(topic_stats_id)
);CREATE TABLE IF NOT EXISTS subscriptions_stats (
subscription_stats_id BIGSERIAL PRIMARY KEY,
topic_stats_id BIGINT NOT NULL,
subscription varchar(255) NULL,
msg_backlog BIGINT,
msg_rate_expired double precision,
msg_rate_out double precision,
msg_throughput_out double precision,
msg_rate_redeliver double precision,
number_of_entries_since_first_not_acked_message BIGINT,
total_non_contiguous_deleted_messages_range BIGINT,
subscription_type varchar(16),
blocked_subscription_on_unacked_msgs BOOLEAN,
time_stamp BIGINT,
UNIQUE (topic_stats_id, subscription),
CONSTRAINT FK_subscriptions_stats_topic_stats_id FOREIGN KEY (topic_stats_id) REFERENCES topics_stats(topic_stats_id)
);CREATE TABLE IF NOT EXISTS consumers_stats (
consumer_stats_id BIGSERIAL PRIMARY KEY,
consumer varchar(255) NOT NULL,
topic_stats_id BIGINT NOT NULL,
replication_stats_id BIGINT,
subscription_stats_id BIGINT,
address varchar(255),
available_permits BIGINT,
connected_since varchar(255),
msg_rate_out double precision,
msg_throughput_out double precision,
msg_rate_redeliver double precision,
client_version varchar(36),
time_stamp BIGINT,
metadata text
);CREATE TABLE IF NOT EXISTS tokens (
token_id BIGSERIAL PRIMARY KEY,
role varchar(256) NOT NULL,
description varchar(128),
token varchar(1024) NOT NULL,
UNIQUE (role)
);CREATE TABLE IF NOT EXISTS users (
user_id BIGSERIAL PRIMARY KEY,
access_token varchar(256),
name varchar(256) NOT NULL,
description varchar(128),
email varchar(256),
phone_number varchar(48),
location varchar(256),
company varchar(256),
expire BIGINT NOT NULL,
password varchar(256),
UNIQUE (name)
);INSERT INTO users (name, email, password, expire, description)
VALUES (
'admin',
'admin@example.com',
'$2a$10$rL.4h3xaePf6E5e6N.8Qv.Lp6gYk0XJZJZJZJZJZJZJZJZJZJ2',
4102444800000,
'default superuser'
) ON CONFLICT (name) DO NOTHING;CREATE TABLE IF NOT EXISTS tenants (
tenant_id BIGSERIAL PRIMARY KEY,
tenant varchar(255) NOT NULL,
admin_roles varchar(255),
allowed_clusters varchar(255),
environment_name varchar(255),
UNIQUE(tenant)
);CREATE TABLE IF NOT EXISTS namespaces (
namespace_id BIGSERIAL PRIMARY KEY,
tenant varchar(255) NOT NULL,
namespace varchar(255) NOT NULL,
UNIQUE(tenant, namespace)
);
init.sh: |
#!/bin/bash
set -e# Wait for PostgreSQL to be ready
until psql -h localhost -U $POSTGRES_USER -d $POSTGRES_DB -c '\q'; do
echo "Waiting for database to become available..."
sleep 2
done# Execute initialization script
echo "Executing database initialization..."
psql -h localhost -U $POSTGRES_USER -d $POSTGRES_DB -f /scripts/init.sql
---
# Database Service
apiVersion: v1
kind: Service
metadata:
name: pulsar-manager-db
namespace: dev
labels:
app: pulsar-manager-db
component: database
spec:
selector:
app: pulsar-manager-db
ports:
- port: 5432
targetPort: 5432
name: postgres
type: ClusterIP
---
# Database Deployment with forced initialization
apiVersion: apps/v1
kind: Deployment
metadata:
name: pulsar-manager-db
namespace: dev
labels:
app: pulsar-manager-db
component: database
spec:
replicas: 1
selector:
matchLabels:
app: pulsar-manager-db
template:
metadata:
labels:
app: pulsar-manager-db
spec:
containers:
- name: postgres
image: postgres:13
imagePullPolicy: IfNotPresent
ports:
- containerPort: 5432
name: postgres
env:
- name: POSTGRES_DB
value: "pulsar_manager"
- name: POSTGRES_USER
value: "pulsar"
- name: POSTGRES_PASSWORD
value: "pulsar"
- name: PGDATA
value: /var/lib/postgresql/data/pgdata
volumeMounts:
- name: postgres-data
mountPath: /var/lib/postgresql/data
- name: init-scripts
mountPath: /scripts
readOnly: true
- name: init-script-volume
mountPath: /docker-entrypoint-initdb.d
readOnly: true
lifecycle:
postStart:
exec:
command: ["/bin/bash", "/scripts/init.sh"]
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 250m
memory: 512Mi
volumes:
- name: postgres-data
persistentVolumeClaim:
claimName: pulsar-manager-db-pvc
- name: init-scripts
configMap:
name: pulsar-manager-db-init
defaultMode: 0755
items:
- key: init.sh
path: init.sh
- key: init.sql
path: init.sql
- name: init-script-volume
configMap:
name: pulsar-manager-db-init
defaultMode: 0644
items:
- key: init.sql
path: init.sql
---
# Pulsar Manager Service
apiVersion: v1
kind: Service
metadata:
name: pulsar-manager
namespace: dev
labels:
app: pulsar-manager
component: web-ui
spec:
selector:
app: pulsar-manager
ports:
- port: 9527
targetPort: 9527
name: web-ui
- port: 7750
targetPort: 7750
name: backend-api
type: ClusterIP
---
# Pulsar Manager Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: pulsar-manager
namespace: dev
labels:
app: pulsar-manager
component: web-ui
spec:
replicas: 1
selector:
matchLabels:
app: pulsar-manager
template:
metadata:
labels:
app: pulsar-manager
spec:
initContainers:
- name: wait-for-db
image: busybox:1.35
command: ['sh', '-c', 'until nc -z pulsar-manager-db 5432; do echo "Waiting for database..."; sleep 2; done;']
containers:
- name: pulsar-manager
image: apachepulsar/pulsar-manager:v0.4.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9527
name: web-ui
- containerPort: 7750
name: backend-api
env:
- name: DRIVER_CLASS_NAME
value: "org.postgresql.Driver"
- name: URL
value: "jdbc:postgresql://pulsar-manager-db:5432/pulsar_manager"
- name: USERNAME
value: "pulsar"
- name: PASSWORD
value: "pulsar"
- name: PULSAR_MANAGER_BROKER_HTTP_SERVICE_URL
value: "http://pulsar-broker.dev.svc.cluster.local:8080"
- name: PULSAR_MANAGER_BROKER_HTTP_SERVICE_URL_TLS
value: "https://pulsar-broker.dev.svc.cluster.local:8443"
- name: DEFAULT_USER_NAME
value: "admin"
- name: DEFAULT_USER_PASSWORD
value: "apachepulsar"
- name: REDIRECT_HOST
value: "http://localhost"
- name: REDIRECT_PORT
value: "9527"
- name: JVM_OPTS
value: "-Xms512m -Xmx1g -XX:MaxMetaspaceSize=256m"
- name: LOG_LEVEL
value: "INFO"
resources:
requests:
cpu: 250m
memory: 1Gi
limits:
cpu: 500m
memory: 2Gi
---
# External Access Service
apiVersion: v1
kind: Service
metadata:
name: pulsar-manager-external
namespace: dev
labels:
app: pulsar-manager
component: web-ui-external
spec:
selector:
app: pulsar-manager
ports:
- port: 9527
targetPort: 9527
name: web-ui
nodePort: 30001
type: NodePort
数据库初始化效果
3.2、进入pulsar-manager-746bd74d85-gnk7c容器执行下面的命令
CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
-H 'X-XSRF-TOKEN: $CSRF_TOKEN' \
-H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \
-H "Content-Type: application/json" \
-X PUT http://localhost:7750/pulsar-manager/users/superuser \
-d '{"name": "pulsar", "password": "pulsar", "description": "test", "email": "username@test.org"}'
3.3、登陆系统输入配置
名称输入自定义
Service URL:http://pulsar-broker.dev.svc.cluster.local:8080
Bookie URL:http://bk-0.bk-headless.dev.svc.cluster.local:3181
3.4、pulsar-manager效果
4、部署spring-boot程序连接pulsar
4.1、cloud-log-pulsar主要代码
4.1.1、pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example.cloud</groupId><parent><groupId>com.example.cloud</groupId><artifactId>micro-service</artifactId><version>1.0</version></parent><artifactId>cloud-log-pulsar</artifactId><name>cloud-log-pulsar</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><pulsar-client.version>4.0.5</pulsar-client.version></properties><dependencyManagement><dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-bom</artifactId><version>${pulsar-client.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>2.17.1</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId></dependency><dependency><groupId>com.example.cloud</groupId><artifactId>cloud-service-common</artifactId><version>${project.version}</version></dependency><dependency><groupId>com.example.cloud</groupId><artifactId>cloud-log-sdk</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-loadbalancer</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId></dependency><!-- <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-consul-discovery</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-consul-config</artifactId></dependency>--><!-- <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bootstrap</artifactId></dependency>--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><finalName>${project.artifactId}</finalName><!-- 资源文件配置(过滤占位符+二进制文件不过滤) --><!-- 配置资源文件打包规则 --><resources><!-- 1. 处理带变量的配置文件(如 application.yml) --><resource><directory>src/main/resources</directory><filtering>true</filtering><includes><include>**/*.yml</include><include>**/*.properties</include><include>**/*.xml</include> <!-- 包含 logback.xml 等 --></includes></resource><!-- 2. 处理证书文件(不进行变量替换,避免破坏证书) --><resource><directory>src/main/resources</directory><filtering>false</filtering> <!-- 关键:证书文件禁用变量替换 --><includes><include>certs/**/*</include> <!-- 包含 certs 下所有文件 --></includes></resource></resources><plugins><!-- ========================== Spring Boot 打包插件 ========================== --><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><!-- 必须指定主类(确保打包后可直接运行) --><mainClass>com.example.cloud.CloudLogPulsarApplication</mainClass><layout>ZIP</layout> <!-- Spring Boot 可执行JAR的标准布局 --></configuration><executions><execution><goals><goal>repackage</goal> <!-- 生成可执行JAR(覆盖默认JAR) --></goals></execution></executions></plugin><!-- ========================== 依赖拷贝插件(拷贝到lib目录) ========================== --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>3.6.1</version><executions><execution><phase>prepare-package</phase> <!-- 打包前执行 --><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory> <!-- 依赖输出目录 --><excludeTransitive>false</excludeTransitive> <!-- 包含间接依赖 --><stripVersion>false</stripVersion> <!-- 保留依赖版本号 --><includeScope>runtime</includeScope> <!-- 仅拷贝运行时依赖 --></configuration></execution></executions></plugin><!-- ========================== 资源拷贝插件(拷贝配置+脚本) ========================== --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-resources-plugin</artifactId><version>3.3.1</version><executions><!-- 1. 拷贝配置文件到config目录 --><execution><id>copy-config</id><phase>package</phase><goals><goal>copy-resources</goal></goals><configuration><encoding>UTF-8</encoding> <!-- 统一编码 --><outputDirectory>${project.build.directory}/config</outputDirectory><resources><resource><directory>src/main/resources/</directory></resource></resources></configuration></execution><!-- 2. 拷贝脚本文件(如.sh)到target目录 --><execution><id>copy-scripts</id><phase>package</phase><goals><goal>copy-resources</goal></goals><configuration><encoding>UTF-8</encoding><outputDirectory>${project.build.directory}</outputDirectory><resources><resource><directory>bin/</directory> <!-- 脚本文件目录(需项目中存在) --><!-- 移除 fileMode:maven-resources-plugin 不支持该参数 --></resource></resources></configuration></execution></executions></plugin><!-- ========================== 脚本权限设置插件(给.sh添加执行权限) ========================== --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-antrun-plugin</artifactId><version>${maven-antrun-plugin.version}</version><executions><execution><id>chmod-scripts</id><phase>package</phase> <!-- 拷贝脚本后执行 --><goals><goal>run</goal></goals><configuration><target><!-- 给target目录下的.sh文件添加执行权限(755:所有者读写执行,其他读执行) --><chmod file="${project.build.directory}/*.sh" perm="755" /></target></configuration></execution></executions></plugin></plugins></build> </project>
4.1.2、application.yml
server:port: 8087tomcat:max-http-form-post-size: 200MB
spring:config:pulsar:#service_url: pulsar://localhost:6650service_url: pulsar://pulsar-broker.dev.svc.cluster.local:6650topic: my-topicsubscription_name: my-subscription
4.1.3、PulsarConfig
package com.example.cloud.config;import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class PulsarConfig {@Autowiredprivate PulsarInfo pulsarInfo;@Beanpublic PulsarClient pulsarClient() throws PulsarClientException {return PulsarClient.builder().serviceUrl(pulsarInfo.getServiceUrl())//.connectionTimeout(1, TimeUnit.MINUTES)//.operationTimeout(1, TimeUnit.MINUTES)//.enableBusyWait(true)//.ioThreads(3)//.enableTransaction(true)//.keepAliveInterval(60,TimeUnit.SECONDS).build();}
}
4.1.4、PulsarConsumer
package com.example.cloud.config;import cn.hutool.json.JSONUtil;
import com.example.cloud.audit.AuditOperationDto;
import com.example.cloud.util.ByteUtil;
import jakarta.annotation.PreDestroy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class PulsarConsumer implements InitializingBean {private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumer.class);private final PulsarInfo pulsarInfo;private final PulsarClient pulsarClient;private Consumer<byte[]> consumer;@Autowiredpublic PulsarConsumer(PulsarClient pulsarClient,PulsarInfo pulsarInfo) {this.pulsarClient = pulsarClient;this.pulsarInfo = pulsarInfo;}@PreDestroypublic void close() throws PulsarClientException {consumer.close();}private void startConsumer() {new Thread(() -> {while (true) {Message<byte[]> msg = null;try {msg = consumer.receive();} catch (PulsarClientException e) {LOG.error("消息获取失败",e);}if (msg != null) {try {AuditOperationDto auditOperationDto = ByteUtil.jdkDeserialize(msg.getData(),AuditOperationDto.class);LOG.info("接收pulsar消息: " + JSONUtil.toJsonStr(auditOperationDto));consumer.acknowledge(msg);} catch (PulsarClientException e) {LOG.error("消息消费失败",e);consumer.negativeAcknowledge(msg);} catch (IOException | ClassNotFoundException e) {throw new RuntimeException(e);}}}}).start();}@Overridepublic void afterPropertiesSet() {try {this.consumer = pulsarClient.newConsumer().topic(pulsarInfo.getTopic()).subscriptionName(pulsarInfo.getSubscriptionName()).subscribe();startConsumer();} catch (PulsarClientException e) {LOG.error("初始化失败",e);throw new RuntimeException(e);}}
}
4.1.5、PulsarInfo
package com.example.cloud.config;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "spring.config.pulsar")
public class PulsarInfo {private static final Logger LOG = LoggerFactory.getLogger(PulsarInfo.class);public PulsarInfo(){LOG.info(this.toString());}private String serviceUrl;private String subscriptionName;private String topic;public String getServiceUrl() {return serviceUrl;}public void setServiceUrl(String serviceUrl) {this.serviceUrl = serviceUrl;}public String getSubscriptionName() {return subscriptionName;}public void setSubscriptionName(String subscriptionName) {this.subscriptionName = subscriptionName;}public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}@Overridepublic String toString() {return "PulsarInfo{" +"serviceUrl='" + serviceUrl + '\'' +", subscriptionName='" + subscriptionName + '\'' +", topic='" + topic + '\'' +'}';}
}
4.1.6、PulsarProducer
package com.example.cloud.config;
import com.example.cloud.util.ByteUtil;
import jakarta.annotation.PreDestroy;
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class PulsarProducer<T> implements InitializingBean {private static final Logger LOG = LoggerFactory.getLogger(PulsarProducer.class);private final PulsarClient pulsarClient;private Producer<byte[]> producer;private final PulsarInfo pulsarInfo;@Autowiredpublic PulsarProducer(PulsarClient pulsarClient,PulsarInfo pulsarInfo) {this.pulsarClient = pulsarClient;this.pulsarInfo = pulsarInfo;}public MessageId sendMessage(T message) {try {return producer.send(ByteUtil.jdkSerialize(message));} catch (PulsarClientException e) {LOG.error("发送消息异常", e);throw new RuntimeException(e);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void afterPropertiesSet() {try {ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer();this.producer = producerBuilder.topic(pulsarInfo.getTopic()).create();} catch (PulsarClientException e) {LOG.error("初始化异常", e);throw new RuntimeException(e);}}@PreDestroypublic void close() throws PulsarClientException {this.producer.close();}
}
4.1.7、ThreadConfig
package com.example.cloud.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadPoolExecutor;
@Component
@Configuration
@EnableAsync
public class ThreadConfig {@Beanpublic ThreadPoolTaskExecutor threadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(5);threadPoolTaskExecutor.setMaxPoolSize(10);threadPoolTaskExecutor.setQueueCapacity(6000);threadPoolTaskExecutor.setKeepAliveSeconds(60);threadPoolTaskExecutor.setThreadNamePrefix("log-pulsar-task-");threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());threadPoolTaskExecutor.initialize();return threadPoolTaskExecutor;}
}
4.1.8、LogPulsarServiceImpl
package com.example.cloud.service.impl;import cn.hutool.json.JSONUtil;
import com.example.cloud.audit.AuditOperationDto;
import com.example.cloud.config.PulsarProducer;
import com.example.cloud.service.LogPulsarService;
import com.example.cloud.util.MyRequestUtil;
import com.example.cloud.util.Result;
import jakarta.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Map;@Service
public class LogPulsarServiceImpl implements LogPulsarService {private static final Logger LOG = LoggerFactory.getLogger(LogPulsarServiceImpl.class);@Autowiredprivate PulsarProducer pulsarProducer;@Overridepublic Object receive(AuditOperationDto auditOperationDto, HttpServletRequest request) {LOG.info("接收到IP: {} ----> 的审计日志: {}",MyRequestUtil.getClientIP(request),JSONUtil.toJsonStr(auditOperationDto));return pulsarProducer.sendMessage(auditOperationDto);}@Overridepublic Object send(Map<String, Object> map, HttpServletRequest request) {map.put("ip", MyRequestUtil.getClientIP(request));return pulsarProducer.sendMessage(map);}
}
4.1.9、LogPulsarController
package com.example.cloud.controller;import com.example.cloud.audit.AuditOperationDto;
import com.example.cloud.service.LogPulsarService;
import com.example.cloud.util.Result;
import jakarta.servlet.http.HttpServletRequest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/log")
public class LogPulsarController {@Autowiredprivate LogPulsarService logPulsarService;@PostMapping(value = "/receive")public Result receive(@RequestBody AuditOperationDto auditOperationDto, HttpServletRequest request) {try {return Result.ok(logPulsarService.receive(auditOperationDto,request));} catch (Exception e) {return Result.err(e.getMessage());}}
}
4.2、测试效果
进入容器执行下面的命令
curl -v -X POST "http://localhost:8087/log/receive" \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-d '{
"id": "audit_20240918_123456",
"description": "管理员修改用户权限",
"operationType": "UPDATE",
"methodName": "com.example.user.service.UserService.updatePermission",
"clientIp": "10.0.0.5",
"result": {
"code": 200,
"message": "权限修改成功",
"data": {"userId": "1001", "newRole": "ADMIN"}
},
"throwable": "",
"browserName": "Chrome 128.0",
"osName": "Windows 11",
"duration": 280
}'
4.3、Dockerfile
FROM openjdk:17-jdk-oracle
WORKDIR /opt/cloud-log-pulsar
COPY --chown=root:root /target/lib ./lib
COPY --chown=root:root /target/config ./config
COPY --chown=root:root /target/cloud-log-pulsar.jar ./cloud-log-pulsar.jar
ENV TZ=Asia/Shanghai
EXPOSE 8087
EXPOSE 9527
ENV JAVA_OPTS="\-Xms1024m -Xmx1024m \-Xss1m \-Xshare:off \-XX:ReservedCodeCacheSize=50m \-XX:+TieredCompilation -XX:TieredStopAtLevel=1 \-XX:MaxDirectMemorySize=100m \-XX:+UseG1GC \-XX:+UseStringDeduplication \-XX:+HeapDumpOnOutOfMemoryError \-XX:HeapDumpPath=/opt/cloud-log-pulsar/logs/heapdump.hprof"
HEALTHCHECK --interval=30s --timeout=3s --retries=3 \CMD wget -q --spider http://localhost:8087/actuator/health || exit 1
ENTRYPOINT ["sh", "-c", "exec java $JAVA_OPTS \-Dloader.path=/opt/cloud-log-pulsar/config,/opt/cloud-log-pulsar/lib \-jar /opt/cloud-log-pulsar/cloud-log-pulsar.jar"]
4.4、cloud-log-pulsar.yaml
apiVersion: v1
kind: Service
metadata:name: cloud-log-pulsarnamespace: devlabels:service: cloud-log-pulsar
spec:ports:- name: httpport: 8087targetPort: 8087protocol: TCP- name: debugport: 9527targetPort: 9527protocol: TCPtype: ClusterIPselector:app: cloud-log-pulsar
---
apiVersion: apps/v1
kind: Deployment
metadata:name: cloud-log-pulsarnamespace: devlabels:app: cloud-log-pulsar
spec:replicas: 1selector:matchLabels:app: cloud-log-pulsartemplate:metadata:labels:app: cloud-log-pulsarspec:restartPolicy: Alwayscontainers:- name: cloud-log-pulsarimagePullPolicy: IfNotPresentimage: cloud-log-pulsarenv:- name: "JAVA_OPT"value: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=9527"ports:- containerPort: 8087name: httpprotocol: TCP- containerPort: 9527name: debugprotocol: TCPresources:requests:memory: 1Gilimits:memory: 1GivolumeMounts:- name: volume-localtimemountPath: /etc/localtimevolumes:- name: volume-localtimehostPath:path: /etc/localtimetype: ''
5、pulsar操作说明
一、基础测试:验证 Broker 启动状态
首先确认 Broker 本身已成功启动(无协议处理器等启动错误),这是所有测试的前提。
检查 Pod 运行状态
先确认 Broker Pod 处于
Running
状态,无重启或异常退出:# 查看 dev 命名空间下的 Pulsar Broker Pod 状态 kubectl get pods -n dev -l app=pulsar,component=broker
正常结果:Pod 状态为
Running
,RESTARTS
为 0。异常处理:若状态为
CrashLoopBackOff
,先查看 Pod 事件(kubectl describe pod <pod-name> -n dev
),重点看「Events」部分是否有资源不足、配置错误等提示。查看 Broker 启动日志(关键验证)
通过日志确认 Broker 无核心错误,且服务已成功启动:
# 实时查看 Broker 日志(替换 <pod-name> 为实际 Pod 名称) kubectl logs -f <pod-name> -n dev需重点验证的日志信息:
协议处理器加载成功:无
No protocol handler is found for protocol 'pulsar'
错误,且出现类似:INFO org.apache.pulsar.broker.protocol.ProtocolHandlers - Loaded protocol handler for: pulsarBroker 启动成功:日志末尾出现 Broker 启动完成的标志:
INFO org.apache.pulsar.broker.PulsarService - Broker service started successfully依赖组件连接正常:确认与 ZooKeeper、BookKeeper 连接成功(无
Connection refused
或Session expired
错误),例如:INFO org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase - ZooKeeper client is connected now. INFO org.apache.pulsar.bookkeeper.BookKeeperClientFactory - Created BookKeeper client二、连通性测试:验证服务可访问性
确认 Broker 的网络端口(Pulsar 协议 6650、HTTP 管理端口 8080)可正常访问,分为「Pod 内部」和「集群内其他组件」两个维度。
Pod 内部端口监听验证
进入 Broker Pod 内部,检查 6650(Pulsar 协议)和 8080(HTTP)端口是否已正常监听:
# 进入 Broker Pod 终端 kubectl exec -it <pod-name> -n dev -- /bin/bash # 检查端口监听(两种命令均可) netstat -tulpn | grep -E "6650|8080" # 或 ss -tulpn | grep -E "6650|8080"
正常结果:显示
LISTEN状态,例如:
tcp 0 0 0.0.0.0:6650 0.0.0.0:* LISTEN 1/java tcp 0 0 0.0.0.0:8080 0.0.0.0:* LISTEN 1/java集群内跨 Pod 连通性测试
在 K8s 集群内的其他 Pod(如临时 busybox Pod)测试能否访问 Broker 的端口,模拟生产者 / 消费者的连接场景:
# 启动一个临时 busybox Pod 用于测试 kubectl run -it --rm busybox-test --image=busybox:1.35 -n dev -- /bin/sh # 在 busybox 内测试 6650 端口(Pulsar 协议) nc -zv <pulsar-broker-pod-ip> 6650 # 测试 8080 端口(HTTP 管理接口) nc -zv <pulsar-broker-pod-ip> 8080
正常结果:显示
succeeded
或connected
;异常处理:若失败,检查 Broker 的
bindAddress
是否为0.0.0.0
(允许外部访问),以及 K8s 网络策略是否阻止了端口访问。HTTP 管理接口验证
通过 Broker 的 HTTP 接口(8080 端口)验证服务状态,这是 Pulsar Admin 工具的基础:
# 在 Broker Pod 内或集群内其他可访问的 Pod 中执行 curl http://<pulsar-broker-pod-ip>:8080/admin/v2/clusters
正常结果:返回集群列表(如
["pulsar-cluster"]
),状态码 200;异常处理:若返回 404 或连接拒绝,检查
webServicePort
配置及端口监听状态。三、核心功能测试:验证消息生产与消费
这是最关键的测试环节,直接验证 Pulsar Broker 的核心能力 —— 接收和投递消息。可通过 Pulsar 官方提供的
pulsar-client
工具快速测试。准备:获取 Broker 访问地址
先记录 Broker 的 Pod IP(后续命令需替换):
# 查看 Broker Pod IP kubectl get pods -n dev -l app=pulsar,component=broker -o jsonpath='{.items[0].status.podIP}'假设获取到的 IP 为
10.233.115.50
,则 Pulsar 协议地址为pulsar://10.233.115.50:6650
。测试步骤:生产 - 消费消息
需要两个终端窗口(或使用后台进程),分别运行生产者和消费者。
步骤 1:启动消费者(接收消息)
在第一个终端进入 Broker Pod,启动消费者监听指定 Topic(如
persistent://public/default/test-topic
):# 进入 Broker Pod kubectl exec -it pulsar-broker-55685cf95-6jv9c -n dev -- /bin/bash # 启动消费者(监听 test-topic) pulsar-client consume -s "test-subscription" persistent://public/default/test-topic -n 0 # 参数说明:-s 订阅名称;-n 0 表示持续消费(不限制消息数量)消费者启动后会等待接收消息,显示类似:
Subscribed to topic on consumer: Consumer
。步骤 2:启动生产者(发送消息)
在第二个终端进入 Broker Pod,启动生产者发送测试消息:
# 进入 Broker Pod kubectl exec -it pulsar-broker-764b774dd8-5mb7c -n dev -- /bin/bash # 启动生产者(向 test-topic 发送 5 条消息) pulsar-client produce persistent://public/default/test-topic -m "北京天安门" -n 5 # 参数说明:-m 消息内容;-n 发送消息数量 pulsar-client produce persistent://public/default/test-topic -m "deploy k8s standalone pulsar is tohard" -n 500
生产者正常结果:显示
Successfully produced 5 messages
;消费者正常结果:立即接收到 5 条消息,日志显示每条消息的内容和 ID,例如:
Received message: 'Hello Pulsar! Message 1' id: 1:0:-1:0:1 Received message: 'Hello Pulsar! Message 1' id: 1:1:-1:0:2 ...验证 Topic 自动创建功能
之前的配置中设置了
allowAutoTopicCreation=true
,测试 Topic 未提前创建时能否自动生成:# 1. 先确认 test-auto-topic 不存在 pulsar-admin topics list public/default # 2. 直接向未创建的 Topic 发送消息 pulsar-client produce persistent://public/default/test-auto-topic -m "Auto-created topic test" -n 1 # 3. 再次查看 Topic 列表,确认 test-auto-topic 已存在 pulsar-admin topics list public/default
正常结果:
test-auto-topic
出现在列表中,且消息能正常生产 / 消费。四、管理功能测试:通过 Pulsar Admin 验证元数据
Pulsar Admin 是官方管理工具,可验证集群、Broker、Topic 等元数据状态,确保管理功能正常。
验证集群配置
# 进入 Broker Pod,执行 pulsar-admin 命令 kubectl exec -it <pod-name> -n dev -- /bin/bash # 查看集群列表(应包含配置的 pulsar-cluster) pulsar-admin clusters list # 查看集群详情(验证 ZooKeeper 地址等配置) pulsar-admin clusters get pulsar-cluster
正常结果:集群列表包含
pulsar-cluster
,详情中brokerServiceUrl
为pulsar://<pod-ip>:6650
。验证 Broker 状态
# 查看当前运行的 Broker 列表 pulsar-admin brokers list pulsar-cluster # 检查 Broker 健康状态(核心验证) pulsar-admin brokers healthcheck
正常结果:
healthcheck
返回OK
,表示 Broker 服务正常。验证 Topic 元数据
# 查看 test-topic 的详情(分区、订阅等信息) pulsar-admin topics get-metadata persistent://public/default/test-topic # 查看 test-subscription 的订阅状态 pulsar-admin topics subscriptions persistent://public/default/test-topic
正常结果:显示 Topic 的分区数(默认 1)、存储大小,以及
test-subscription
订阅存在。五、进阶测试(可选,按需验证)
若需验证更复杂的场景,可进行以下测试:
持久化消息测试
验证消息重启 Broker 后不丢失:
生产 1 条消息到
test-topic
,但不消费;重启 Broker Pod:
kubectl delete pod <pod-name> -n dev
;待 Pod 重启后,启动消费者,确认能接收到重启前的消息。
负载均衡测试(多 Broker 场景)
若部署了多个 Broker 副本,测试消息是否能均衡分发:
扩容 Broker 到 2 个:
kubectl scale deployment pulsar-broker --replicas=2 -n dev
;启动多个生产者发送大量消息(如 1000 条);
通过
pulsar-admin topics stats
查看每个 Broker 处理的消息量,确认负载均衡。权限测试(若配置了认证授权)
若后续配置了 Token 或 OAuth2 认证,测试权限控制:
# 使用无效 Token 尝试发送消息,应被拒绝 pulsar-client --auth-token "invalid-token" produce persistent://public/default/test-topic -m "test" -n 1六、常见问题排查
若测试失败,按以下优先级排查:
日志优先:重新查看 Broker 日志,搜索
ERROR
关键词,重点关注协议加载、ZooKeeper/BookKeeper 连接、权限等错误;配置复查:确认
advertisedListeners
中的 IP 正确(Pod IP),bindAddress
为0.0.0.0
;网络排查:使用
ping
或traceroute
确认 Broker 与 ZooKeeper、BookKeeper 的网络连通性;资源排查:查看 Pod 资源使用情况(
kubectl top pod <pod-name> -n dev
),确认未触发内存 / CPU 限制被 K8s 杀死。通过以上测试,可全面验证 Pulsar Broker 从启动到核心功能的可用性,确保服务能正常承接生产环境的消息业务。