借助 KubeMQ 简化多 LLM 集成
将多个大语言模型(LLM),如 OpenAI 和 Anthropic 的 Claude 集成到应用程序中是一项具有挑战性的任务。处理不同 API 和通信协议的复杂性,以及确保请求高效路由,都会带来诸多难题。
然而,使用消息代理和路由器可以成为解决这些问题的优雅方案,能处理这些痛点并提供多项关键优势。在本文中,我们将探讨如何实现这一点,并提供代码示例,指导大家如何使用 KubeMQ 构建一个与 OpenAI 和 Anthropic Claude 交互的路由器,当然要集成 DeepSeek 也类似。
使用消息代理作为 LLM 路由器的关键优势
1. 简化集成
使用消息代理作为路由器可抽象掉直接与不同 LLM API 交互的复杂性,简化客户端代码并降低错误发生概率。
2. 多模型应用场景
消息代理可促进多个 LLM 或专用于不同任务(如一个模型用于摘要,另一个用于情感分析)的模型间通信,能高效地将请求路由到相应模型,使应用程序可在不增加额外开销的情况下充分发挥各模型的优势。
3. 批量处理与大规模推理
对于需要批量处理或大规模推理任务的应用,消息代理可通过在 LLM 忙碌或不可用时排队请求来实现异步处理。这确保了即使在高负载下也不会丢失数据或请求,提供可靠的处理能力。
4. 冗余与故障转移保障
在关键业务场景中,消息代理可确保无缝故障转移至其他环境。例如,如果连接提供 OpenAI 模型的云提供商失败,KubeMQ 可自动切换至其他提供商。这种冗余性可确保 AI 操作不间断,维护服务可靠性和客户满意度。
5. 高流量应用处理
消息代理可将传入请求分发至多个 LLM 实例或副本,防止过载并确保顺畅运行。这种负载均衡对于高流量应用至关重要,使其能够有效扩展而不影响性能。
使用 KubeMQ 构建 LLM 路由器:集成 OpenAI 和 Claude
现在,我将指导大家如何使用 KubeMQ 构建一个与 OpenAI 和 Anthropic Claude 交互的路由器。KubeMQ 是一款领先、开源的消息代理和消息队列平台,我们将利用其优势并提供代码示例,介绍如何设置消息代理、构建服务器端路由器以及创建用于发送查询的客户端。所有代码示例均可在 KubeMQ 的 GitHub(https://github.com/kubemq-io/kubemq-llm-router) 仓库中找到。
先决条件
在开始之前,请确保满足以下条件:
-
• 已安装 Python 3.7 或更高版本。
-
• 机器上已安装 Docker。
-
• 拥有 OpenAI 和 Anthropic 的有效 API 密钥。
-
• 拥有 KubeMQ 密钥(可从 KubeMQ 网站获取)。
-
• 已安装 kubemq-cq Python 包。
-
• 创建包含 API 密钥的 .env 文件:
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
设置 KubeMQ
首先,我们需要确保 KubeMQ 正常运行。我们将使用 Docker 进行部署:
docker run -d --rm \-p 8080:8080 \-p 50000:50000 \-p 9090:9090 \-e KUBEMQ_TOKEN="your_token" \kubemq/kubemq-community:latest
端口说明:
-
• 8080:暴露 KubeMQ REST API
-
• 50000:打开用于客户端 - 服务器通信的 gRPC 端口
-
• 9090:暴露 KubeMQ REST 网关
注意:将your_token
替换为实际的 KubeMQ 密钥。
创建 LLM 路由器服务器
LLM 路由器作为客户端和 LLM 之间的中间件。它监听特定通道上的查询并将它们路由至相应的 LLM。
server.py
import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threadingload_dotenv()classLLMRouter:def__init__(self):self.openai_llm = ChatOpenAI(api_key=os.getenv("OPENAI_API_KEY"),model_name="gpt-3.5-turbo")self.claude_llm = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"),model="claude-3")self.client = Client(address="localhost:50000")defhandle_openai_query(self, request: QueryMessageReceived):try:message = request.body.decode('utf-8')result = self.openai_llm(message)response = QueryResponseMessage(query_received=request,is_executed=True,body=result.encode('utf-8'))self.client.send_response_message(response)except Exception as e:self.client.send_response_message(QueryResponseMessage(query_received=request,is_executed=False,error=str(e)))defhandle_claude_query(self, request: QueryMessageReceived):try:message = request.body.decode('utf-8')result = self.claude_llm(message)response = QueryResponseMessage(query_received=request,is_executed=True,body=result.encode('utf-8'))self.client.send_response_message(response)except Exception as e:self.client.send_response_message(QueryResponseMessage(query_received=request,is_executed=False,error=str(e)))defrun(self):defon_error(err: str):print(f"Error: {err}")defsubscribe_openai():self.client.subscribe_to_queries(subscription=QueriesSubscription(channel="openai_requests",on_receive_query_callback=self.handle_openai_query,on_error_callback=on_error,),cancel=CancellationToken())defsubscribe_claude():self.client.subscribe_to_queries(subscription=QueriesSubscription(channel="claude_requests",on_receive_query_callback=self.handle_claude_query,on_error_callback=on_error,),cancel=CancellationToken())threading.Thread(target=subscribe_openai).start()threading.Thread(target=subscribe_claude).start()print("LLM Router running on channels: openai_requests, claude_requests")try:whileTrue:time.sleep(1)except KeyboardInterrupt:print("Shutting down...")if __name__ == "__main__":router = LLMRouter()router.run()
解释:
-
• 初始化。
-
• 加载 API 密钥的环境变量。
-
• 初始化 OpenAI 和 Anthropic LLM 的客户端。
-
• 设置 KubeMQ 客户端。
-
-
• 处理查询。
-
•
handle_openai_query
和handle_claude_query
解码传入消息,将其传递给相应的 LLM,并发送回响应。 -
• 捕获错误并发送回
is_executed
标志设置为False
的响应。
-
-
• 订阅。
-
• 路由器订阅两个通道:
openai_requests
和claude_requests
。 -
• 使用线程并行处理订阅。
-
-
• 运行服务器。
-
•
run
方法启动订阅并保持服务器运行,直到收到中断信号。
-
开发 LLM 客户端
客户端向 LLM 路由器发送查询,指定要使用的模型。
client.py
from kubemq.cq import Client, QueryMessage
import jsonclassLLMClient:def__init__(self, address="localhost:50000"):self.client = Client(address=address)defsend_message(self, message: str, model: str) -> dict:channel = f"{model}_requests"response = self.client.send_query_request(QueryMessage(channel=channel,body=message.encode('utf-8'),timeout_in_seconds=30))if response.is_error:return {"error": response.error}else:return {"response": response.body.decode('utf-8')}if __name__ == "__main__":client = LLMClient()models = ["openai", "claude"]message = input("Enter your message: ")model = input(f"Choose model ({'/'.join(models)}): ")if model in models:response = client.send_message(message, model)if"error"in response:print(f"Error: {response['error']}")else:print(f"Response: {response['response']}")else:print("Invalid model selected")
解释:
-
• 初始化。
-
• 设置 KubeMQ 客户端。
-
-
• 发送消息。
-
•
send_message
方法根据选定的模型构建相应的通道。 -
• 向路由器发送查询消息并等待响应。
-
• 处理错误并解码响应体。
-
-
• 用户交互。
-
• 提示用户输入消息并选择模型。
-
• 打印来自 LLM 的响应。
-
通过 REST 进行发送和接收
对于偏好或需要 RESTful 通信的服务或客户端,KubeMQ 提供了 REST 端点。
通过 REST 发送请求
端点:
POST http://localhost:9090/send/request
头部:
Content-Type: application/json
主体:
{"RequestTypeData": 2,"ClientID": "LLMRouter-sender","Channel": "openai_requests","BodyString": "What is the capital of France?","Timeout": 30000
}
载荷说明:
-
•
RequestTypeData
:指定请求类型(2 表示查询)。 -
•
ClientID
:发送请求的客户端的标识符。 -
•
Channel
:对应 LLM 模型的通道(openai_requests 或 claude_requests)。 -
•
BodyString
:要发送给 LLM 的消息。 -
•
Timeout
:等待响应的时间(以毫秒为单位)。
接收响应
响应将是一个包含 LLM 输出或错误消息的 JSON 对象。
综上,通过利用消息代理(KubeMQ),我们构建了一个可扩展且高效的路由器,能够与多个 LLM 交互。这种设置使客户端能够无缝地向不同模型发送查询,并且可以扩展以包含更多模型或功能。这种方法的优势包括:
-
1. 简化集成:抽象掉直接与不同 LLM API 交互的复杂性,简化客户端代码并降低错误发生概率。
-
2. 多模型支持:高效地将请求路由至专用于不同任务的相应模型。
-
3. 可靠性:确保即使在 LLM 忙碌或不可用时也不会丢失数据。
-
4. 冗余性:提供故障转移机制以维持不间断的运营。
-
5. 可扩展性:通过在多个 LLM 实例上分发请求来处理高流量。