Integrate Multiple LLMs with KubeMQ to Improve Your Workflow
TL;DR
This article explores the use of KubeMQ as a router to connect and manage various LLMs, promoting advantages such as simplified integration and support for multiple language models.
Integrating multiple language models, such as OpenAI and Claude from Anthropic, in applications faces challenges related to API communication and the efficient management of requests. Implementing a message broker is a solution that simplifies this interactivity.
This article explores the use of KubeMQ as a router to connect and manage various LLMs, promoting advantages such as simplified integration and support for multiple language models.
Advantages of Using a Message Broker
1. Simplified Integration
A message broker facilitates communication with various LLM APIs, reducing code complexity and minimizing errors.
2. Multiple Use Cases
It allows the use of different models as needed, such as one for summaries and another for sentiment analysis.
3. Batch Processing and Large-Scale Inference
The broker also enables asynchronous processing, ensuring that requests are handled even during peak demand.
4. Redundancy and Continuity Assurance
In critical situations, a message broker provides service continuity by switching to backup environments in case of connection failures.
5. Handling High Traffic Applications
It distributes requests among various LLM instances, avoiding overloads and ensuring smooth operation for high-traffic applications.
Building an LLM Router with KubeMQ
To set up a router that operates with OpenAI and Claude, we need to follow several fundamental steps involving the setup of KubeMQ.
Prerequisites
Before starting, ensure you have:
Python 3.7 or higher.
Docker installed.
Valid API keys for OpenAI and Anthropic.
KubeMQ Token, available at KubeMQ.
Setting Up KubeMQ
First, it is essential to ensure KubeMQ is running through Docker:
docker run -d --rm \
-p 8080:8080 \
-p 50000:50000 \
-p 9090:9090 \
-e KUBEMQ_TOKEN="your_token" \
kubemq/kubemq-community:latest
The KubeMQ ports are:
8080: Access to KubeMQ's REST API.
50000: gRPC connection between client and server.
9090: KubeMQ REST Gateway.
Replace "your_token" with the actual KubeMQ token.
Creating the LLM Router Server
The router acts as an intermediary between clients and LLMs, listening to specific channels and routing queries to the appropriate model.
Code Example: server.py
import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threading
load_dotenv()
class LLMRouter:
def __init__(self):
self.openai_llm = ChatOpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="gpt-3.5-turbo"
)
self.claude_llm = Anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3"
)
self.client = Client(address="localhost:50000")
def handle_openai_query(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
result = self.openai_llm(message)
resp
query_received=request,
is_executed=True,
body=result.encode('utf-8')
)
self.client.send_response_message(response)
except Exception as e:
self.client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def handle_claude_query(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
result = self.claude_llm(message)
resp
query_received=request,
is_executed=True,
body=result.encode('utf-8')
)
self.client.send_response_message(response)
except Exception as e:
self.client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def run(self):
def on_error(err: str):
print(f"Error: {err}")
def subscribe_openai():
self.client.subscribe_to_queries(
subscription=QueriesSubscription(
channel="openai_requests",
),
cancel=CancellationToken()
)
def subscribe_claude():
self.client.subscribe_to_queries(
subscription=QueriesSubscription(
channel="claude_requests",
),
cancel=CancellationToken()
)
threading.Thread(target=subscribe_openai).start()
threading.Thread(target=subscribe_claude).start()
print("LLM Router running on channels: openai_requests, claude_requests")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down...")
if __name__ == "__main__":
router = LLMRouter()
router.run()
Code Explanation
Initialization:
- Loads environmental variables for API keys.
- Initializes clients for OpenAI and Anthropic LLMs.
- Sets up a KubeMQ client.
Query Handling:
- The
handle_openai_queryandhandle_claude_querymethods process incoming messages and return appropriate responses. - Errors are handled appropriately, notifying the client.
- The
Subscription:
- The router subscribes to two channels:
openai_requestsandclaude_requests. - Uses threads to handle subscriptions concurrently.
- The router subscribes to two channels:
Server Execution:
- The
runmethod starts listening for subscriptions and keeps the server active until interrupted.
- The
Developing the LLM Client
The client is responsible for sending queries to the router, choosing which model to use.
Code Example: client.py
from kubemq.cq import Client, QueryMessage
import json
class LLMClient:
def __init__(self, address="localhost:50000"):
self.client = Client(address=address)
def send_message(self, message: str, model: str) -> dict:
channel = f"{model}_requests"
resp
channel=channel,
body=message.encode('utf-8'),
timeout_in_sec
))
if response.is_error:
return {"error": response.error}
else:
return {"response": response.body.decode('utf-8')}
if __name__ == "__main__":
client = LLMClient()
models = ["openai", "claude"]
message = input("Enter your message: ")
model = input(f"Choose model ({', '.join(models)}): ")
if model in models:
resp model)
if "error" in response:
print(f"Error: {response['error']}")
else:
print(f"Response: {response['response']}")
else:
print("Invalid model selected")
Code Explanation
Initialization:
- Sets up a KubeMQ client.
Message Sending:
- The
send_messagemethod constructs an appropriate channel based on the selected model. - Sends a query to the router and waits for the response, handling errors as necessary.
- The
User Interaction:
- Prompts the user to enter a message and select a model.
- Displays the response received from the LLM.
Sending and Receiving via REST
For services that prefer RESTful communication, KubeMQ provides REST endpoints.
Sending a Request via REST
Endpoint:
POST http://localhost:9090/send/request
Headers:
Content-Type: application/json
Body:
{
"RequestTypeData": 2,
"ClientID": "LLMRouter-sender",
"Channel": "openai_requests",
"BodyString": "What is the capital of France?",
"Timeout": 30000
}
Payload details:
RequestTypeData: Specifies the type of request (2 for query).ClientID: Identifier of the client sending the request.Channel: Channel corresponding to the LLM model (openai_requests or claude_requests).BodyString: Message to be sent to the LLM.Timeout: Maximum time to wait for a response (in milliseconds).
Receiving the Response
The response will be a JSON object containing the output from the LLM or an error message.
Conclusion
By utilizing a message broker like KubeMQ, it is possible to develop a scalable and efficient router that interacts with various LLMs. This structure allows clients to send queries to different models continuously and can be expanded to include additional models or functionalities in the future.
Benefits of this approach include:
Simplified Integration: Facilitates interaction with different LLM APIs.
Multimodal Support: Efficiently routes requests to the appropriate model.
Reliability: Ensures that information is not lost during heavy usage.
Redundancy: Provides mechanisms for maintaining continuous operations.
Scalability: Manages high volumes of traffic efficiently.
This approach will represent a significant advance in the application of message brokers to optimize the development of artificial intelligence solutions.
Content selected and edited with AI assistance. Original sources referenced above.


