当前位置: 首页 > news >正文

使用Kafka和kafkajs构建示例项目

设置Kafka

按照Docker搭建kafka环境文档安装Kafka

创建主题

docker run --rm -it --net=host lensesio/fast-data-dev bash
kafka-topics --create --topic notification-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

新建项目

mkdir kafka-sample-project
cd kafka-sample-project/
npm init -y
npm install kafkajs typescript ts-node

创建配置文件

tsconfig.json

{
  "compilerOptions": {
    "target": "ES2020",
    "module": "NodeNext",
    "moduleResolution": "NodeNext",
    "esModuleInterop": true,
    "strict": true,
    "sourceMap": true,
    "removeComments": true,
    "forceConsistentCasingInFileNames": true,
    "skipLibCheck": true
  },
  "include": ["*.ts"],
  "exclude": ["node_modules", "dist"]
}

package.json

{
  "name": "kafka-sample-project",
  "version": "1.0.0",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "start:producer": "ts-node producer.ts",
    "start:consumer": "ts-node consumer.ts"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "description": "",
  "dependencies": {
    "kafkajs": "^2.2.4",
    "typescript": "^5.8.2"
  },
  "devDependencies": {
    "ts-node": "^10.9.2"
  }
}

config.ts

/**
 * Kafka配置文件
 * 集中管理Kafka相关的配置参数
 */

// Kafka连接配置
export const kafkaConfig = {
  brokers: ['localhost:9092'],
  clientId: {
    producer: 'sample-producer',
    consumer: 'sample-consumer'
  },
  consumerGroup: 'notification-group',
};

// 主题配置
export const topics = {
  email: 'email-topic',
  sms: 'sms-topic'
};

// 消息类型定义
export interface EmailPayload {
  to: string;
  from: string;
  subject: string;
  body: string;
}

export interface SmsPayload {
  phoneNumber: string;
  message: string;
}

export type NotificationPayload = EmailPayload | SmsPayload;

创建Kafka生产者

import { Kafka, Producer } from 'kafkajs';
import { kafkaConfig, topics, EmailPayload, SmsPayload } from './config';

// 创建Kafka实例
const kafka = new Kafka({
  clientId: kafkaConfig.clientId.producer,
  brokers: kafkaConfig.brokers,
});

const producer: Producer = kafka.producer();

const runProducer = async (): Promise<void> => {
  await producer.connect();

  const sendMessage = async (topic: string, message: string): Promise<void> => {
    await producer.send({
      topic,
      messages: [{ value: message }],
    });
  };

  const sendNotification = async <T extends EmailPayload | SmsPayload>(topic: string, payload: T): Promise<void> => {
    const message = JSON.stringify(payload);
    await sendMessage(topic, message);
    console.log(`Message sent to ${topic}: ${message}`);
  };

  try {
    // 发送邮件通知示例
    const emailPayload: EmailPayload = {
      to: 'receiver@example.com',
      from: 'sender@example.com',
      subject: 'Sample Email',
      body: 'This is a sample email notification',
    };
    await sendNotification(topics.email, emailPayload);

    // 发送短信通知示例
    const smsPayload: SmsPayload = {
      phoneNumber: '1234567890',
      message: 'This is a sample SMS notification',
    };
    await sendNotification(topics.sms, smsPayload);
  } catch (error) {
    console.error('Error sending notifications:', error);
    throw error; // 重新抛出错误以便在外层处理
  }

  await producer.disconnect();
};

runProducer()
  .then(() => {
    console.log('Producer completed successfully');
  })
  .catch((error) => {
    console.error('Failed to run kafka producer', error);
    process.exit(1);
  });

创建Kafka消费者

import { type EachMessagePayload, Kafka, Consumer } from 'kafkajs';
// 从正确的配置文件路径导入
import { kafkaConfig, topics, EmailPayload, SmsPayload } from './config';

// 创建Kafka实例
const kafka = new Kafka({
  clientId: kafkaConfig.clientId.consumer,
  brokers: kafkaConfig.brokers,
});

const consumer: Consumer = kafka.consumer({ groupId: kafkaConfig.consumerGroup });

/**
 * 处理接收到的消息
 */
const handleMessage = async ({ topic, partition, message }: EachMessagePayload): Promise<void> => {
  try {
    // 检查 message.value 是否为 null
    const messageContent = message.value?.toString() ?? 'No message content';
    console.log(`Received message from topic '${topic}': ${messageContent}`);
    
    // 尝试解析消息内容
    let parsedMessage: EmailPayload | SmsPayload | null = null;
    try {
      if (message.value) {
        parsedMessage = JSON.parse(messageContent);
      }
    } catch (parseError) {
      console.error(`Failed to parse message from topic ${topic}:`, parseError);
      // 解析失败时不提交偏移量,让消息可以重新处理
      return;
    }

    // 根据主题处理不同类型的消息
    if (topic === topics.email) {
      // 处理邮件通知
      console.log('Handling email notification:', messageContent);
      // 这里可以添加实际的邮件处理逻辑
    } else if (topic === topics.sms) {
      // 处理短信通知
      console.log('Handling SMS notification:', messageContent);
      // 这里可以添加实际的短信处理逻辑
    } else {
      console.log('Unknown topic:', topic);
    }
    
    // 只有在成功处理消息后才提交偏移量
    await consumer.commitOffsets([{ topic, partition, offset: message.offset }]);
  } catch (error) {
    console.error(`Error processing message from topic ${topic}:`, error);
    // 处理失败时不提交偏移量,让消息可以重新处理
  }
};

/**
 * 运行消费者服务
 */
const runConsumer = async (): Promise<void> => {
  try {
    await consumer.connect();
    
    // 订阅主题
    await consumer.subscribe({ topic: topics.email });
    await consumer.subscribe({ topic: topics.sms });

    console.log(`Consumer subscribed to topics: ${topics.email}, ${topics.sms}`);

    await consumer.run({
      eachMessage: handleMessage,
    });
  } catch (error) {
    console.error('Error running consumer:', error);
    throw error;
  }
};

// 添加优雅关闭处理
const gracefulShutdown = async (): Promise<void> => {
  try {
    console.log('Shutting down consumer...');
    await consumer.disconnect();
    console.log('Consumer disconnected');
    process.exit(0);
  } catch (error) {
    console.error('Error during shutdown:', error);
    process.exit(1);
  }
};

// 监听进程终止信号
process.on('SIGINT', gracefulShutdown);
process.on('SIGTERM', gracefulShutdown);

runConsumer()
  .then(() => {
    console.log('Consumer is running...');
  })
  .catch((error) => {
    console.error('Failed to run kafka consumer', error);
    process.exit(1);
  });

运行项目

新建2个终端,运行命令:

npm run start:producer
npm run start:consumer

可以看到终端输出正常

详细代码:https://github.com/wan88888/kafka-sample-project

http://www.dtcms.com/a/108653.html

相关文章:

  • 前端面试题(三):axios有哪些常用的方法
  • Linux上位机开发实践(从用板子到自己做板子)
  • 针对 SQL 查询中 IN 子句性能优化 以及 等值 JOIN 和不等值 JOIN 对比 的详细解决方案、代码示例及表格总结
  • Webpack vs Vite:现代前端构建工具的巅峰对决与选型指南
  • Linux学习七——进程回收
  • 一文详解QT环境搭建:Windows平台Qt安装配置指南
  • react 15-16-17-18各版本的核心区别、底层原理及演进逻辑的深度解析--react18
  • 电脑异常关机导致oracle监听器启动后自动停止
  • 蓝桥杯 web 请到下一步
  • Spread使用 配合report使用前篇
  • python爬虫基础讲解
  • 【调用通义千问实现手写文字识别】
  • 04-08手写持久层框架——核心配置和映射配置文件解析
  • 从零构建大语言模型全栈开发指南:第四部分:工程实践与部署-4.2.1视觉-语言模型(VLM)架构设计(CLIP与Flamingo模式)
  • HarmonyOS 基础组件和基础布局的介绍
  • Nyquist插件基础:LISP语法-条件语句
  • 数据量管理系统
  • 光学关键尺寸量测设备市场报告:2024年全球市场销售额达到了14.75亿美元
  • 鸿蒙NEXT开发土司工具类(ArkTs)
  • 前端中rem,vh,vw
  • 网约车APP评价系统从0到1
  • 红宝书第二十六讲:详解Web Workers:专用、共享、Service Worker
  • PyTorch中Linear全连接层
  • 视频设备轨迹回放平台EasyCVR如何搭建公共娱乐场所远程视频监控系统
  • 铁路语义分割数据下载RailSem19: A Dataset for Semantic Rail Scene Understanding
  • 使用Android 原生LocationManager获取经纬度
  • 教育软件 UI 设计:打造吸睛又实用的学习入口
  • SELinux
  • Leetcode-100 二分查找常见操作总结
  • 数据点燃创新引擎:数据驱动的产品开发如何重塑未来?