Integra Varios LLMs con KubeMQ para Mejorar tu Flujo de Trabajo
TL;DR
Integrar múltiples modelos de lenguaje, como OpenAI y Claude de Anthropic, en aplicaciones enfrenta desafíos relacionados con la comunicación entre APIs y la gestión eficiente de solicitudes. La implementación de un broker de mensajes es una solución que simplifica esta interactividad.
Integrar múltiples modelos de lenguaje, como OpenAI y Claude de Anthropic, en aplicaciones enfrenta desafíos relacionados con la comunicación entre APIs y la gestión eficiente de solicitudes. La implementación de un broker de mensajes es una solución que simplifica esta interactividad.
Este artículo explora el uso de KubeMQ como un enrutador para conectar y gestionar diversos LLMs, promoviendo ventajas como integración simplificada y soporte a múltiples modelos de lenguaje.
Ventajas del Uso de un Broker de Mensajes
1. Integración Simplificada
Un broker de mensajes facilita la comunicación con varias APIs de LLM, reduciendo la complejidad del código y minimizando errores.
2. Casos de Uso Múltiples
Permite el uso de diferentes modelos según la necesidad, como uno para resúmenes y otro para análisis de sentimientos.
3. Procesamiento por Lotes e Inferencia a Gran Escala
El broker igualmente viabiliza el procesamiento asincrónico, asegurando que las solicitudes sean tratadas incluso en picos de demanda.
4. Redundancia y Garantía de Continuidad
En situaciones críticas, un broker de mensajes proporciona continuidad al servicio, alternando a entornos de respaldo en caso de fallas de conexión.
5. Tratamiento de Aplicaciones con Alto Tráfico
Distribuye solicitudes entre varias instancias de LLM, evitando sobrecargas y asegurando un funcionamiento suave para aplicaciones de alto tráfico.
Construyendo un Enrutador de LLM con KubeMQ
Para configurar un enrutador que opere con OpenAI y Claude, necesitamos seguir algunos pasos fundamentales involucrando la configuración de KubeMQ.
Requisitos Previos
Antes de iniciar, asegúrate de tener:
Python 3.7 o superior.
Docker instalado.
Claves de API válidas para OpenAI y Anthropic.
Token de KubeMQ, disponible en el sitio de KubeMQ.
Configurando KubeMQ
Primero, es esencial garantizar que KubeMQ esté funcionando a través de Docker:
docker run -d --rm \
-p 8080:8080 \
-p 50000:50000 \
-p 9090:9090 \
-e KUBEMQ_TOKEN="tu_token" \
kubemq/kubemq-community:latest
Las puertas de KubeMQ son:
8080: Acceso a la API REST de KubeMQ.
50000: Conexión gRPC entre cliente y servidor.
9090: Gateway REST de KubeMQ.
Sustituye "tu_token" por el token real de KubeMQ.
Cree el Servidor del Enrutador de LLM
El enrutador actúa como un intermediario entre clientes y LLMs, escuchando canales específicos y enrutando consultas al modelo apropiado.
Ejemplo de Código: server.py
import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threading
load_dotenv()
class LLMRouter:
def __init__(self):
self.openai_llm = ChatOpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="gpt-3.5-turbo"
)
self.claude_llm = Anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3"
)
self.client = Client(address="localhost:50000")
def handle_openai_query(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
result = self.openai_llm(message)
resp
query_received=request,
is_executed=True,
body=result.encode('utf-8')
)
self.client.send_response_message(response)
except Exception as e:
self.client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def handle_claude_query(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
result = self.claude_llm(message)
resp
query_received=request,
is_executed=True,
body=result.encode('utf-8')
)
self.client.send_response_message(response)
except Exception as e:
self.client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def run(self):
def on_error(err: str):
print(f"Error: {err}")
def subscribe_openai():
self.client.subscribe_to_queries(
subscription=QueriesSubscription(
channel="openai_requests",
),
cancel=CancellationToken()
)
def subscribe_claude():
self.client.subscribe_to_queries(
subscription=QueriesSubscription(
channel="claude_requests",
),
cancel=CancellationToken()
)
threading.Thread(target=subscribe_openai).start()
threading.Thread(target=subscribe_claude).start()
print("LLM Router running on channels: openai_requests, claude_requests")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down...")
if __name__ == "__main__":
router = LLMRouter()
router.run()
Explicación del Código
Inicialización:
- Carga variables ambientales para claves de la API.
- Inicializa clientes para los LLMs de OpenAI y Anthropic.
- Configura un cliente de KubeMQ.
Manejo de Consultas:
- Los métodos
handle_openai_queryyhandle_claude_queryprocesan los mensajes recibidos y devuelven las respuestas adecuadas. - Los errores son tratados adecuadamente, informando al cliente.
- Los métodos
Suscripción:
- El enrutador se suscribe a dos canales:
openai_requestsyclaude_requests. - Usa hilos para manejar las suscripciones de forma concurrente.
- El enrutador se suscribe a dos canales:
Ejecución del Servidor:
- El método
runinicia la escucha de las suscripciones y mantiene el servidor activo hasta que sea interrumpido.
- El método
Desarrollando el Cliente LLM
El cliente es responsable de enviar consultas al enrutador, eligiendo qué modelo utilizar.
Ejemplo de Código: client.py
from kubemq.cq import Client, QueryMessage
import json
class LLMClient:
def __init__(self, address="localhost:50000"):
self.client = Client(address=address)
def send_message(self, message: str, model: str) -> dict:
channel = f"{model}_requests"
resp
channel=channel,
body=message.encode('utf-8'),
timeout_in_sec
))
if response.is_error:
return {"error": response.error}
else:
return {"response": response.body.decode('utf-8')}
if __name__ == "__main__":
client = LLMClient()
models = ["openai", "claude"]
message = input("Ingresa tu mensaje: ")
model = input(f"Elige modelo ({', '.join(models)}): ")
if model in models:
resp model)
if "error" in response:
print(f"Error: {response['error']}")
else:
print(f"Respuesta: {response['response']}")
else:
print("Modelo seleccionado no válido")
Explicación del Código
Inicialización:
- Configura un cliente de KubeMQ.
Envío de Mensajes:
- El método
send_messageconstruye un canal adecuado basado en el modelo seleccionado. - Envía una consulta al enrutador y espera la respuesta, tratando errores según sea necesario.
- El método
Interacción con el Usuario:
- Pide al usuario que ingrese un mensaje y seleccione un modelo.
- Muestra la respuesta recibida del LLM.
Enviando y Recibiendo a través de REST
Para servicios que prefieren comunicación RESTful, KubeMQ proporciona endpoints REST.
Enviando una Solicitud a través de REST
Endpoint:
POST http://localhost:9090/send/request
Headers:
Content-Type: application/json
Cuerpo:
{
"RequestTypeData": 2,
"ClientID": "LLMRouter-sender",
"Channel": "openai_requests",
"BodyString": "¿Cuál es la capital de Francia?",
"Timeout": 30000
}
Detalles de la carga útil:
RequestTypeData: Especifica el tipo de solicitud (2 para consulta).ClientID: Identificador del cliente que envía la solicitud.Channel: Canal correspondiente al modelo LLM (openai_requests o claude_requests).BodyString: Mensaje a ser enviado al LLM.Timeout: Tiempo máximo para esperar una respuesta (en milisegundos).
Recibiendo la Respuesta
La respuesta será un objeto JSON que contiene la salida del LLM o un mensaje de error.
Conclusión
Al utilizar un broker de mensajes como KubeMQ, es posible desarrollar un enrutador escalable y eficiente que interactúa con diversos LLMs. Esta estructura permite que los clientes envíen consultas a diferentes modelos de forma continua y puede ser expandida para incluir modelos o funcionalidades adicionales en el futuro.
Beneficios de este enfoque incluyen:
Integración Simplificada: Facilita la interacción con diferentes APIs de LLM.
Soporte Multimodal: Rotea eficientemente las solicitudes al modelo adecuado.
Confiabilidad: Asegura que la información no se pierda durante el uso intenso.
Redundancia: Proporciona mecanismos para mantener operaciones continuas.
Escalabilidad: Gestiona altos volúmenes de tráfico con eficiencia.
Este enfoque representará un avance significativo en la aplicación de brokers de mensajes para optimizar el desarrollo de soluciones en inteligencia artificial.
Contenido seleccionado y editado con asistencia de IA. Fuentes originales referenciadas arriba.


