当前位置: 首页 > news >正文

OpenSIPS 邂逅 Kafka:构建高效 VoIP 消息处理架构

  • 使用场景
  • 使用步骤
    • 引入模块
    • 组装&发送数据
    • 消费数据
    • 故障转移

使用场景

在这里插入图片描述

  1. 异步日志处理:将 OpenSIPS 中的 SIP 信令日志、通话记录(CDR)等数据发送到 Kafka 队列中。

  2. 事件通知与监控:利用 OpenSIPS 的 event_interface 模块将 SIP 事件(如呼叫建立、断开、注册等)推送到 Kafka

OpenSIPS中事件接口有以下类型:

  • EVENT_DATAGRAM - Publish JSON-RPC notifications using UDP, stable
  • EVENT_FLATSTORE - Text/File backend for events, stable
  • EVENT_KAFKA - Publish JSON-RPC notifications/generic messages to Apache Kafka , stable
  • EVENT_STREAM - Publish JSON-RPC notifications using TCP, stable
  • EVENT_ROUTE - Route triggering based on events, stable
  • EVENT_ROUTING - Event-based routing, stable
  • EVENT_RABBITMQ - Publish JSON-RPC notifications using AMQP over TCP , stable
  • EVENT_VIRTUAL - Aggregator of event backends (failover & balancing), stable
  • EVENT_XMLRPC - Event XMLRPC client module , stable
  1. 分布式消息队列集成:在复杂的 VoIP 架构中,OpenSIPS 可以通过 Kafka 与其他服务(如计费系统、CRM 系统)解耦

  2. 计费与数据分析:OpenSIPS 生成的 CDR(Call Detail Records)可以通过 Kafka 推送至后端计费系统

  3. 故障隔离与重试机制:在 OpenSIPS 调用外部服务时(如鉴权、计费接口),如果目标服务不可用,可以将请求暂存到 Kafka

  4. 微服务架构下的通信桥梁:在基于微服务的 VoIP 架构中,OpenSIPS 作为 SIP 边界网关,可通过 Kafka 与其他微服务(如认证服务、媒体控制服务)进行异步通信

  5. 消息广播与事件驱动架构:OpenSIPS 可将特定的 SIP 事件广播到 Kafka 的多个主题,供不同的下游服务消费

  6. 性能优化与流量削峰:在高并发场景下,Kafka 可以作为缓冲层,缓解 OpenSIPS 与后端系统之间的流量压力

  7. 自定义业务逻辑扩展:通过 Kafka 与外部业务逻辑模块解耦,可以在不影响 OpenSIPS 核心逻辑的前提下,灵活扩展新的业务功能

使用步骤

在这里插入图片描述

引入模块

loadmodule "event_kafka.so"
modparam("event_kafka", "broker_id", "[k1]127.0.0.1:9092/opensips?g.linger.ms=100&t.acks=all")

链接语法:'kafka:' brokers '/' topic ['?' properties]
properties语法:'g.'|'t.' property '=' value ['&' 'g.'|'t.' property '=' value] ...
可以设置的proroperty参考官方说明

组装&发送数据

        $json(sql_obj) := "{}";$json(sql_obj/table) = 'acc';$json(sql_obj/method) = $param(method);$json(sql_obj/fromTag) = $param(from_tag);$json(sql_obj/toTag) = $param(to_tag);$json(sql_obj/callid) = $param(callid);$json(sql_obj/sipCode) = $param(sip_code);$json(sql_obj/sipReason) = $param(sip_reason);$json(sql_obj/time) = $param(time);$json(sql_obj/duration) = $param(duration);$json(sql_obj/msDuration) = $param(ms_duration);$json(sql_obj/setuptime) = $param(setuptime);$json(sql_obj/created) = $param(created);kafka_publish("k1", $json(sql_obj), $ci, "kafka_report");

可异步监听回执

route[kafka_report] {xlog("[$avp(kafka_id)] status=$avp(kafka_status) key=$avp(kafka_key) msg=$avp(kafka_msg)\n");...
}

消费数据

$ bin/kafka-console-consumer.sh   --bootstrap-server 127.0.0.1:9092   --topic opensips --from-beginning{ "status": 1, "dlg_id": "17091151056627", "callid": "1-14809@127.0.0.1", "from_uri": "sip:20250610@127.0.0.1:5060", "from_tag": "1", "to_uri": "sip:tt061013776167200@127.0.0.1:5900", "caller_sock": "127.0.0.1:5900", "caller_contact": "sip:20250610@127.0.0.1:5060", "start_time": 0, "timeout": 0, "caller_in": "20250610", "callee_in": "tt061013776167200", "caller_gateway": "46", "callee_gateway": "9", "src_ip": "127.0.0.1", "dst_ip": "127.0.0.1" }
{ "status": 5, "dlg_id": "17091151056627", "to_uri": "sip:tt061013776167200@127.0.0.1:5900", "to_tag": "1", "callee_contact": "sip:127.0.0.1:5080;transport=UDP", "start_time": "1749459233", "timeout": "1749462833", "caller_in": "20250610", "callee_in": "tt061013776167200", "caller_gateway": "46", "callee_gateway": "9", "src_ip": "127.0.0.1", "dst_ip": "127.0.0.1" }
{ "table": "acc", "method": "INVITE", "fromTag": "1", "toTag": "1", "callid": "1-14809@127.0.0.1", "sipCode": "200", "sipReason": "OK", "time": 1749459233, "duration": 4, "msDuration": 3411, "setuptime": 8, "created": 1749459225, "srcIp": "127.0.0.1", "dstIp": "127.0.0.1", "caller": "20250610", "callee": "331213776167200", "callStartTime": "1749459233.433263", "callEndTime": "1749459236.844957", "callerIn": "20250610", "calleeIn": "tt061013776167200", "callerOut": "20241213ob16701", "calleeOut": "calleeout330613776167200", "callergateway": "46", "calleegateway": "9", "calllevel": "0", "routinglevel": "0", "calleraccount": "1", "calleeaccount": "2", "callerCallid": "1-14809@127.0.0.1", "calleeCallid": "", "area": "", "endSide": "1", "endCode": "9201", "endReason": "caller hang up", "realDuration": "3411", "through_jt": "1", "callerproductid": "", "calleeproductid": "", "routing_path": "9-200-0;", "node_addr": "127.0.0.1:5900", "multi_gw": "", "s_timeout": "", "event_time": "1749459236.847516" }
{ "status": 6, "dlg_id": "17091151056627", "callid": "1-14809@127.0.0.1" }

故障转移

引入event_virtual/event_flatstore,将事件消息通过队列传递,并且支持故障转移

异常信息 ->> EVENT ->> KAFKA(故障链路 ->> EVENT_VIRTUAL ->> EVENT_FLATSTORE)

loadmodule "event_flatstore.so"
loadmodule "event_kafka.so"
loadmodule "event_virtual.so"startup_route {subscribe_event("E_MY_EVENT", "virtual:FAILOVER kafka:127.0.0.1:9092/opensipsfailover flatstore:/var/log/myevents");
}

相关文章:

  • 营销型网站建设xywlcn如何联系百度人工客服电话
  • 合肥网站维护网络营销学什么内容
  • 网站制作 连云港百度指数分析数据
  • APP加网站建设预算多少钱阿里云域名查询和注册
  • 成都网站制作公司有哪些seo网站优化方案案例
  • 百度怎么制作网站教程最新国内新闻事件今天
  • UAVAI-YOLO:无人机航拍图像的小目标检测模型
  • 深度优化OSS上传性能:多线程分片上传 vs 断点续传实战对比
  • ntext 数据类型不能选为 DISTINCT,因为它不可比
  • 解析云计算虚拟化基石:KVM、QEMU与Libvirt的协同
  • ✨从零搭建 Ubuntu22.04 + Python3.11 + PyTorch2.5.1 GPU Docker 镜像并上传 Docker Hub
  • C# WinForm跨平台串口通讯实现
  • RFID馆员工作站DW312-A|全国已经规模化应用
  • linux实时同步工具sersync
  • 利用 Python 脚本批量查找并删除指定 IP 的 AWS Lightsail 实例
  • FunASR搭建语音识别服务和VAD检测
  • 第23篇:OpenEuler 24.03系统下的备份与还原技术详解
  • 从牛顿流体到弹性固体:旋转流变仪的高精度流变特性测定与工业应用
  • WebRTC(九):JitterBuffer
  • web布局16
  • Android 开发问题:bluetoothLeScanner.startScan(scanCallback); 扫描不到设备
  • 使用 PyAEDT 设计参数化对数周期偶极子天线 LPDA
  • OSS与NAS混合云存储架构:非结构化数据统一管理实战
  • 【Java高频面试问题】数据库篇
  • Drag-and-Drop LLMs: Zero-Shot Prompt-to-Weights
  • Windows10的任务栏时间显示秒 笔记250624