当前位置: 首页 > news >正文

借助 KubeMQ 简化多 LLM 集成

    将多个大语言模型(LLM),如 OpenAI 和 Anthropic 的 Claude 集成到应用程序中是一项具有挑战性的任务。处理不同 API 和通信协议的复杂性,以及确保请求高效路由,都会带来诸多难题。

    然而,使用消息代理和路由器可以成为解决这些问题的优雅方案,能处理这些痛点并提供多项关键优势。在本文中,我们将探讨如何实现这一点,并提供代码示例,指导大家如何使用 KubeMQ 构建一个与 OpenAI 和 Anthropic Claude 交互的路由器,当然要集成 DeepSeek 也类似。

使用消息代理作为 LLM 路由器的关键优势

1. 简化集成

使用消息代理作为路由器可抽象掉直接与不同 LLM API 交互的复杂性,简化客户端代码并降低错误发生概率。

2. 多模型应用场景

消息代理可促进多个 LLM 或专用于不同任务(如一个模型用于摘要,另一个用于情感分析)的模型间通信,能高效地将请求路由到相应模型,使应用程序可在不增加额外开销的情况下充分发挥各模型的优势。

3. 批量处理与大规模推理

对于需要批量处理或大规模推理任务的应用,消息代理可通过在 LLM 忙碌或不可用时排队请求来实现异步处理。这确保了即使在高负载下也不会丢失数据或请求,提供可靠的处理能力。

4. 冗余与故障转移保障

在关键业务场景中,消息代理可确保无缝故障转移至其他环境。例如,如果连接提供 OpenAI 模型的云提供商失败,KubeMQ 可自动切换至其他提供商。这种冗余性可确保 AI 操作不间断,维护服务可靠性和客户满意度。

5. 高流量应用处理

消息代理可将传入请求分发至多个 LLM 实例或副本,防止过载并确保顺畅运行。这种负载均衡对于高流量应用至关重要,使其能够有效扩展而不影响性能。

使用 KubeMQ 构建 LLM 路由器:集成 OpenAI 和 Claude

现在,我将指导大家如何使用 KubeMQ 构建一个与 OpenAI 和 Anthropic Claude 交互的路由器。KubeMQ 是一款领先、开源的消息代理和消息队列平台,我们将利用其优势并提供代码示例,介绍如何设置消息代理、构建服务器端路由器以及创建用于发送查询的客户端。所有代码示例均可在 KubeMQ 的 GitHub(https://github.com/kubemq-io/kubemq-llm-router) 仓库中找到。

先决条件

在开始之前,请确保满足以下条件:

  • • 已安装 Python 3.7 或更高版本。

  • • 机器上已安装 Docker。

  • • 拥有 OpenAI 和 Anthropic 的有效 API 密钥。

  • • 拥有 KubeMQ 密钥(可从 KubeMQ 网站获取)。

  • • 已安装 kubemq-cq Python 包。

  • • 创建包含 API 密钥的 .env 文件:

OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key

设置 KubeMQ

首先,我们需要确保 KubeMQ 正常运行。我们将使用 Docker 进行部署:

docker run -d --rm \-p 8080:8080 \-p 50000:50000 \-p 9090:9090 \-e KUBEMQ_TOKEN="your_token" \kubemq/kubemq-community:latest

端口说明:

  • • 8080:暴露 KubeMQ REST API

  • • 50000:打开用于客户端 - 服务器通信的 gRPC 端口

  • • 9090:暴露 KubeMQ REST 网关
    注意:将 your_token 替换为实际的 KubeMQ 密钥。

创建 LLM 路由器服务器

LLM 路由器作为客户端和 LLM 之间的中间件。它监听特定通道上的查询并将它们路由至相应的 LLM。

server.py
import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threadingload_dotenv()classLLMRouter:def__init__(self):self.openai_llm = ChatOpenAI(api_key=os.getenv("OPENAI_API_KEY"),model_name="gpt-3.5-turbo")self.claude_llm = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"),model="claude-3")self.client = Client(address="localhost:50000")defhandle_openai_query(self, request: QueryMessageReceived):try:message = request.body.decode('utf-8')result = self.openai_llm(message)response = QueryResponseMessage(query_received=request,is_executed=True,body=result.encode('utf-8'))self.client.send_response_message(response)except Exception as e:self.client.send_response_message(QueryResponseMessage(query_received=request,is_executed=False,error=str(e)))defhandle_claude_query(self, request: QueryMessageReceived):try:message = request.body.decode('utf-8')result = self.claude_llm(message)response = QueryResponseMessage(query_received=request,is_executed=True,body=result.encode('utf-8'))self.client.send_response_message(response)except Exception as e:self.client.send_response_message(QueryResponseMessage(query_received=request,is_executed=False,error=str(e)))defrun(self):defon_error(err: str):print(f"Error: {err}")defsubscribe_openai():self.client.subscribe_to_queries(subscription=QueriesSubscription(channel="openai_requests",on_receive_query_callback=self.handle_openai_query,on_error_callback=on_error,),cancel=CancellationToken())defsubscribe_claude():self.client.subscribe_to_queries(subscription=QueriesSubscription(channel="claude_requests",on_receive_query_callback=self.handle_claude_query,on_error_callback=on_error,),cancel=CancellationToken())threading.Thread(target=subscribe_openai).start()threading.Thread(target=subscribe_claude).start()print("LLM Router running on channels: openai_requests, claude_requests")try:whileTrue:time.sleep(1)except KeyboardInterrupt:print("Shutting down...")if __name__ == "__main__":router = LLMRouter()router.run()

解释:

  • • 初始化

    • • 加载 API 密钥的环境变量。

    • • 初始化 OpenAI 和 Anthropic LLM 的客户端。

    • • 设置 KubeMQ 客户端。

  • • 处理查询

    • • handle_openai_query 和 handle_claude_query 解码传入消息,将其传递给相应的 LLM,并发送回响应。

    • • 捕获错误并发送回 is_executed 标志设置为 False 的响应。

  • • 订阅

    • • 路由器订阅两个通道:openai_requests 和 claude_requests

    • • 使用线程并行处理订阅。

  • • 运行服务器

    • • run 方法启动订阅并保持服务器运行,直到收到中断信号。

开发 LLM 客户端

客户端向 LLM 路由器发送查询,指定要使用的模型。

client.py
from kubemq.cq import Client, QueryMessage
import jsonclassLLMClient:def__init__(self, address="localhost:50000"):self.client = Client(address=address)defsend_message(self, message: str, model: str) -> dict:channel = f"{model}_requests"response = self.client.send_query_request(QueryMessage(channel=channel,body=message.encode('utf-8'),timeout_in_seconds=30))if response.is_error:return {"error": response.error}else:return {"response": response.body.decode('utf-8')}if __name__ == "__main__":client = LLMClient()models = ["openai", "claude"]message = input("Enter your message: ")model = input(f"Choose model ({'/'.join(models)}): ")if model in models:response = client.send_message(message, model)if"error"in response:print(f"Error: {response['error']}")else:print(f"Response: {response['response']}")else:print("Invalid model selected")

解释:

  • • 初始化

    • • 设置 KubeMQ 客户端。

  • • 发送消息

    • • send_message 方法根据选定的模型构建相应的通道。

    • • 向路由器发送查询消息并等待响应。

    • • 处理错误并解码响应体。

  • • 用户交互

    • • 提示用户输入消息并选择模型。

    • • 打印来自 LLM 的响应。

通过 REST 进行发送和接收

对于偏好或需要 RESTful 通信的服务或客户端,KubeMQ 提供了 REST 端点。

通过 REST 发送请求

端点:

POST http://localhost:9090/send/request

头部:

Content-Type: application/json

主体:

{"RequestTypeData": 2,"ClientID": "LLMRouter-sender","Channel": "openai_requests","BodyString": "What is the capital of France?","Timeout": 30000
}

载荷说明:

  • • RequestTypeData:指定请求类型(2 表示查询)。

  • • ClientID:发送请求的客户端的标识符。

  • • Channel:对应 LLM 模型的通道(openai_requests 或 claude_requests)。

  • • BodyString:要发送给 LLM 的消息。

  • • Timeout:等待响应的时间(以毫秒为单位)。

接收响应

响应将是一个包含 LLM 输出或错误消息的 JSON 对象。


综上,通过利用消息代理(KubeMQ),我们构建了一个可扩展且高效的路由器,能够与多个 LLM 交互。这种设置使客户端能够无缝地向不同模型发送查询,并且可以扩展以包含更多模型或功能。这种方法的优势包括:

  1. 1. 简化集成:抽象掉直接与不同 LLM API 交互的复杂性,简化客户端代码并降低错误发生概率。

  2. 2. 多模型支持:高效地将请求路由至专用于不同任务的相应模型。

  3. 3. 可靠性:确保即使在 LLM 忙碌或不可用时也不会丢失数据。

  4. 4. 冗余性:提供故障转移机制以维持不间断的运营。

  5. 5. 可扩展性:通过在多个 LLM 实例上分发请求来处理高流量。

http://www.dtcms.com/a/262836.html

相关文章:

  • 深度学习专栏总结
  • 生信分析之流式数据分析:Flowjo 软件核心功能全解析
  • Openssl升级
  • 使用 LoRA 微调大模型:关键参数与最佳实践全解析
  • 深度解析基于贝叶斯的垃圾邮件分类
  • 数字孪生技术为UI前端注入灵魂:实现产品全生命周期的可视化管理
  • 银河麒麟系统上利用WPS的SDK进行WORD的二次开发
  • linux docker 客户端操作数据卷
  • Excel转pdf实现动态数据绑定
  • [附源码+数据库+毕业论文]基于Spring+MyBatis+MySQL+Maven+jsp实现的校园服务平台管理系统,推荐!
  • 【甲方安全建设】敏感数据检测工具 Earlybird 安装使用详细教程
  • 6月30日作业
  • AR 学习:开启未来学习新视界​
  • 深入解析TCP:可靠传输的核心机制与实现逻辑
  • 7,FreeRTOS列表与列表项的插入删除
  • docker安装MySQL,创建MySQL容器
  • 认识 Spring AI
  • 根据OS自动加载不同的native库和本地jar包
  • Linux驱动学习day11(定时器)
  • 百度文库智能PPT月访问量超3400万,用户规模翻倍增长
  • demo01:基于 SpringMVC 的用户管理系统
  • AlpineLinux安装部署MongoDB
  • Clickhouse源码分析-TTL执行流程
  • 杂谈-架构时代演进
  • C语言常用转换函数实现原理
  • 50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | ThemeClock(主题时钟)
  • Windows环境下Docker容器化的安装与设置指南
  • 【第二章:机器学习与神经网络概述】04.回归算法理论与实践 -(1)线性回归模型
  • AWS WebRTC:通过shell分析并发启动master后产生的日志文件
  • 御控助力打造物联网实训室,赋能职业教育高质量发展