译者 | 核子可乐
审校 | 重楼
将多个大语言模型集成至应用程序当中往往是项艰巨的挑战,各类不同API及通信协议的协同处理,以及如何确保请求路由的复杂性难题往往令人望而生畏。

好在可以使用消息代理与路由机制更优雅地解决此类问题,在解决痛点的同时实现多个关键优势。
本文将向大家介绍具体操作步骤。这里以KubeMQ为例,配合代码示例来指导大家逐步建立一套可与OpenAI及Anthropic Claude交互的路由体系。
使用消息代理作为大模型路由工具的主要优势
1. 简化集成
通过使用消息代理作为路由机制,我们可以将不同大模型API交互所涉及的复杂性抽象出来,从而简化客户端代码并降低出错几率。
2. 多模型用例
消息代理能够实现多模型或专门用于不同任务的模型间的通信(如一个模型用于摘要,另一模型用于情绪分析)。其可以确保请求被有效路由至适当模型,使得应用程序能够利用各模型的优势且无需额外开销。
3. 批处理与大规模推理
对于需要批处理或大规模推理任务的应用程序,消息代理通过在大模型繁忙或不可用时建立请求队列,从而实现异步处理。这将确保不会丢失任何数据或请求,即使是在繁重的工作负载下也能提供可靠的处理响应。
4. 冗余与回退保证
对于特别关注正常运行时间的用例,消息代理可确保无缝回退至替代环境。例如,如果与提供OpenAI模型的某家云服务商发生连接失败,KubeMQ可自动切换至另一服务商。这样的冗余设计保证AI不间断操作,有助于增强服务可靠性与客户满意度。
5. 处理高流量应用程序
消息代理能够将传入的请求分发至多个大模型实例或副本,防止过载并确保平衡运行。这种负载均衡设计对于高流量应用程序至关重要,可使其在不影响性能的前提下有效扩展。
使用KubeMQ建立大模型路由机制:集成OpenAI与Claude
现在,我们将分步了解如何使用KubeMQ设置能够与OpenAI和Anthropic Claude交互的路由机制。
全部示例代码均保存在KubeMQ的GitHub repo当中(https://github.com/kubemq-io/kubemq-llm-router)。
准备工作
在开始之前,请确保你已安装以下内容:
- Python 3.7或更高版本。
 - 本地安装Docker。
 - 拥有有效的OpenAI和Anthropic API密钥。
 - KubeMQ令牌(可从KubeMQ官网处获取)。
 - kubemq-cq Python包:
 
Plain Text pip install kubemq-cq
- .env文件中包含你的AIP密钥:
 
Plain Text OPENAI_API_KEY=your_openai_api_key ANTHROPIC_API_KEY=your_anthropic_api_key
设置KubeMQ
首先,我们需要确保KubeMQ能够正常运行。这里使用Docker进行部署:
复制Shell docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your_token" \ kubemq/kubemq-community:latest
端口说明:
- 8080 – 公开KubeMQ REST API
 - 50000 – 打开 gRPC端口以进行实施意见-服务器通信
 - 9090 – 公开KubeMQ REST网关
 
注意: 将 your_token部分替换为你的真实KubeMQ令牌。
创建大模型路由服务器
大模型路由将充当客户端与大模型之间的中介,负责监听特定渠道的查询并将其路由至适当的大模型。
server.py
复制Python
import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threading
load_dotenv()
class LLMRouter:
 def __init__(self):
 self.openai_llm = ChatOpenAI(
 api_key=os.getenv("OPENAI_API_KEY"),
 model_name="gpt-3.5-turbo"
 )
 self.claude_llm = Anthropic(
 api_key=os.getenv("ANTHROPIC_API_KEY"),
 model="claude-3"
 )
 self.client = Client(address="localhost:50000")
 def handle_openai_query(self, request: QueryMessageReceived):
 try:
 message = request.body.decode('utf-8')
 result = self.openai_llm(message)
 response = QueryResponseMessage(
 query_received=request,
 is_executed=True,
 body=result.encode('utf-8')
 )
 self.client.send_response_message(response)
 except Exception as e:
 self.client.send_response_message(QueryResponseMessage(
 query_received=request,
 is_executed=False,
 error=str(e)
 ))
 def handle_claude_query(self, request: QueryMessageReceived):
 try:
 message = request.body.decode('utf-8')
 result = self.claude_llm(message)
 response = QueryResponseMessage(
 query_received=request,
 is_executed=True,
 body=result.encode('utf-8')
 )
 self.client.send_response_message(response)
 except Exception as e:
 self.client.send_response_message(QueryResponseMessage(
 query_received=request,
 is_executed=False,
 error=str(e)
 ))
 def run(self):
 def on_error(err: str):
 print(f"Error: {err}")
 def subscribe_openai():
 self.client.subscribe_to_queries(
 subscription=QueriesSubscription(
 channel="openai_requests",
 on_receive_query_callback=self.handle_openai_query,
 on_error_callback=on_error,
 ),
 cancel=CancellationToken()
 )
 def subscribe_claude():
 self.client.subscribe_to_queries(
 subscription=QueriesSubscription(
 channel="claude_requests",
 on_receive_query_callback=self.handle_claude_query,
 on_error_callback=on_error,
 ),
 cancel=CancellationToken()
 )
 threading.Thread(target=subscribe_openai).start()
 threading.Thread(target=subscribe_claude).start()
 print("LLM Router running on channels: openai_requests, claude_requests")
 try:
 while True:
 time.sleep(1)
 except KeyboardInterrupt:
 print("Shutting down...")
if __name__ == "__main__":
 router = LLMRouter()
 router.run()说明:
- 初始化。
 
A.为API密钥加载环境变量。
B.初始化OpenAI和Anthropic大模型的客户端。
C.设置KubeMQ客户端。
- 处理查询。
 
A.handle_openai_query和 handle_claude_query负责解码传入消息,将其传递给相应大模型,而后发回响应。
B.捕捉错误并将 is_executed 标记设置为 False。
- 订阅。
 
A.此路由将订阅两个小道:openai_requests 和 claude_requests。
B.使用线程并行处理订阅。
- 运行服务器。
 
A.run方法启动订阅并保持服务器运行,直至中断。
开发大模型客户端
客户端向大模型路由发送查询,指定要使用的模型。
client.py
复制Python
from kubemq.cq import Client, QueryMessage
import json
class LLMClient:
 def __init__(self, address="localhost:50000"):
 self.client = Client(address=address)
 def send_message(self, message: str, model: str) -> dict:
 channel = f"{model}_requests"
 response = self.client.send_query_request(QueryMessage(
 channel=channel,
 body=message.encode('utf-8'),
 timeout_in_seconds=30
 ))
 if response.is_error:
 return {"error": response.error}
 else:
 return {"response": response.body.decode('utf-8')}
if __name__ == "__main__":
 client = LLMClient()
 models = ["openai", "claude"]
 message = input("Enter your message: ")
 model = input(f"Choose model ({'/'.join(models)}): ")
 if model in models:
 response = client.send_message(message, model)
 if "error" in response:
 print(f"Error: {response['error']}")
 else:
 print(f"Response: {response['response']}")
 else:
 print("Invalid model selected")说明:
- 初始化。
 
A.设置KubeMQ客户端。
- 发送消息。
 
A.send_message 方法根据所选模型构建适当通道。
B.向路由发送查询消息并等待响应。
C.处理错误并解码响应主体。
- 用户交互。
 
A.提示用户输入消息并选择模型。
B.从大模型处输出响应。
通过REST发送和接收
对于倾向或需要RESTful通信的服务或客户端,KubeMQ亦可提供REST端点。
通过REST发送请求
端点:
复制Plain Text POST http://localhost:9090/send/request
标头:
复制Plain Text Content-Type: application/json
实体:
复制JSON
{
 "RequestTypeData": 2,
 "ClientID": "LLMRouter-sender",
 "Channel": "openai_requests",
 "BodyString": "What is the capital of France?",
 "Timeout": 30000
}负载细节:
- RequestTypeData – 指定请求类型(查询为2)。
 - ClientID – 发送请求的客户端标识符。
 - Channel – 与大模型(openai_requests或claude_requests)对应的通道。
 - BodyString – 要发送至大模型的消息。
 - Timeout – 等待响应的时间(单位为毫秒)。
 
接收响应
响应是一个包含大模型输出或错误消息的JSON对象。
总结
在消息代理(KubeMQ)的帮助下,我们建立起可扩展且高效的路由机制,能够与多个大模型进行交互。此设置允许客户端无缝向不同模型发送查询,并可扩展以引入更多模型或功能。
这种方法的好处包括:
- 简化集成。大家可以将与不同大模型API交互与涉及的复杂性抽象出来,简化客户端代码并降低出错几率。
 - 多模型支持。有效将请求路由至专门用于不同任务的适当模型。
 - 可靠性。确保在大模型繁忙或不可用时,数据不致丢失。
 - 冗余。提供后备机制以保持不间断操作。
 - 可扩展性。通过在多个大模型实例间分配请求以应对高流量需求。