Integre LLMs Usando KubeMQ para Otimizar Fluxos de Trabalho
TL;DR
O uso de múltiplos modelos de linguagem, como OpenAI e Claude, em aplicações de inteligência artificial (IA) apresenta desafios de comunicação e gerenciamento. Integrar essas tecnologias utilizando um broker de mensagens como o KubeMQ torna essa tarefa mais eficiente.
O uso de múltiplos modelos de linguagem, como OpenAI e Claude, em aplicações de inteligência artificial (IA) apresenta desafios de comunicação e gerenciamento. Integrar essas tecnologias utilizando um broker de mensagens como o KubeMQ torna essa tarefa mais eficiente.
Este artigo analisa como o KubeMQ pode servir como um roteador para diferentes LLMs (Modelos de Linguagem de Grande Escala), oferecendo vantagens como simplicidade na integração e suporte a múltiplos modelos simultaneamente.
Benefícios do Uso de um Broker de Mensagens
1. Integração Simplificada
Um broker de mensagens facilita a comunicação entre diversas APIs de LLM, reduzindo a complexidade do código e minimizando erros potenciais.
2. Versatilidade de Casos de Uso
Com um broker, é possível alternar entre diferentes modelos de acordo com a necessidade, utilizando um para resumos e outro para análise de sentimentos, por exemplo.
3. Processamento em Lote
O broker possibilita o tratamento assíncrono de solicitações, garantindo que pedidos sejam processados mesmo em períodos de alta demanda, aumentando a eficiência operacional.
4. Redundância e Continuidade
Em cenários críticos, um broker assegura a continuidade ao redirecionar solicitações para ambientes de backup em caso de falhas.
5. Tratamento de Alto Tráfego
Um broker de mensagens distribui as solicitações entre várias instâncias de LLM, prevenindo sobrecargas e garantindo que aplicações intensivas operem suavemente.
Configurando um Roteador de LLM com KubeMQ
A configuração de um roteador que opere com OpenAI e Claude requer seguir várias etapas relacionadas ao KubeMQ.
Pré-Requisitos
- Python 3.7 ou superior
- Docker instalado
- Chaves de API para OpenAI e Claude
- Token do KubeMQ disponível no site oficial do KubeMQ
Configurando o KubeMQ
Para iniciar o KubeMQ via Docker, execute o seguinte comando:
docker run -d --rm \
-p 8080:8080 \
-p 50000:50000 \
-p 9090:9090 \
-e KUBEMQ_TOKEN=\"seu_token\" \
kubemq\/kubemq-community:latest
As portas necessárias do KubeMQ incluem:
- 8080: Acesso à API REST.
- 50000: Conexão gRPC entre cliente e servidor.
- 9090: Gateway REST.
Não se esqueça de substituir "seu_token" pelo valor real do token obtido.
Criando o Servidor do Roteador
O roteador atua como intermediário, escutando canais específicos e roteando consultas para o modelo correto.
Exemplo de Código: server.py
import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threading
load_dotenv()
class LLMRouter:
def __init__(self):
self.openai_llm = ChatOpenAI(
api_key=os.getenv(\"OPENAI_API_KEY\"),
model_name=\"gpt-3.5-turbo\"
)
self.claude_llm = Anthropic(
api_key=os.getenv(\"ANTHROPIC_API_KEY\"),
model=\"claude-3\"
)
self.client = Client(address=\"localhost:50000\")
def handle_openai_query(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
result = self.openai_llm(message)
resp
query_received=request,
is_executed=True,
body=result.encode('utf-8')
)
self.client.send_response_message(response)
except Exception as e:
self.client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def handle_claude_query(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
result = self.claude_llm(message)
resp
query_received=request,
is_executed=True,
body=result.encode('utf-8')
)
self.client.send_response_message(response)
except Exception as e:
self.client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def run(self):
def on_error(err: str):
print(f\"Error: {err}\")
def subscribe_openai():
self.client.subscribe_to_queries(
subscription=QueriesSubscription(
channel=\"openai_requests\",
),
cancel=CancellationToken()
)
def subscribe_claude():
self.client.subscribe_to_queries(
subscription=QueriesSubscription(
channel=\"claude_requests\",
),
cancel=CancellationToken()
)
threading.Thread(target=subscribe_openai).start()
threading.Thread(target=subscribe_claude).start()
print(\"LLM Router rodando em: openai_requests, claude_requests\")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print(\"Encerrando...\")
if __name__ == \"__main__\":
router = LLMRouter()
router.run()
Explicação do Código
- Inicialização:
- Carrega variáveis ambientais para as chaves das APIs.
- Inicializa clientes para os LLMs com OpenAI e Claude.
- Configura um cliente do KubeMQ.
- Processamento de Consultas:
- Os métodos
handle_openai_queryehandle_claude_querygerenciam as mensagens recebidas e retornam as respostas adequadas. - Erros são tratados com retorno informando ao cliente.
- Os métodos
- Assinatura:
- O roteador se inscreve em dois canais:
openai_requestseclaude_requests. - Threads gerenciam assinaturas de forma concorrente.
- O roteador se inscreve em dois canais:
- Execução do Servidor:
- O método
runinicia a escuta das assinaturas e mantém o servidor ativo até interrupção.
- O método
Desenvolvendo o Cliente LLM
O cliente para envio de consultas ao roteador possibilita ao usuário selecionar o modelo desejado.
Exemplo de Código: client.py
from kubemq.cq import Client, QueryMessage
import json
class LLMClient:
def __init__(self, address=\"localhost:50000\"):
self.client = Client(address=address)
def send_message(self, message: str, model: str) -> dict:
channel = f\"{model}_requests\"
resp
channel=channel,
body=message.encode('utf-8'),
timeout_in_sec
))
if response.is_error:
return {\"error\": response.error}
else:
return {\"response\": response.body.decode('utf-8')}
if __name__ == \"__main__\":
client = LLMClient()
models = [\"openai\", \"claude\"]
message = input(\"Digite sua mensagem: \")
model = input(f\"Escolha o modelo ({', '.join(models)}): \")
if model in models:
resp model)
if \"error\" in response:
print(f\"Erro: {response['error']}\")
else:
print(f\"Resposta: {response['response']}\")
else:
print(\"Modelo inválido selecionado\")
Explicação do Código
- Inicialização:
- Configura um cliente do KubeMQ.
- Envio de Mensagens:
- O método
send_messageconstrói um canal baseado no modelo escolhido e envia uma consulta. - Os erros são tratados e a resposta é retornada ao usuário.
- O método
- Interação com o Usuário:
- Pede ao usuário para inserir uma mensagem e escolher um modelo.
- Apresenta a resposta retornada do LLM.
Enviando e Recebendo via REST
Para serviços que utilizam a abordagem RESTful, o KubeMQ oferece endpoints REST.
Enviando uma Solicitação via REST
Utilize o seguinte endpoint:
POST http:\/\/localhost:9090\/send\/request
Headers:
Content-Type: application\/json
Corpo:
{
\"RequestTypeData\": 2,
\"ClientID\": \"LLMRouter-sender\",
\"Channel\": \"openai_requests\",
\"BodyString\": \"Qual é a capital da França?\",
\"Timeout\": 30000
}
Detalhes do payload:
RequestTypeData: Define o tipo de solicitação (2 para consulta).ClientID: Identificador do cliente que envia a solicitação.Channel: Canal para o modelo LLM (openai_requests ou claude_requests).BodyString: Mensagem enviada ao LLM.Timeout: Tempo máximo para resposta (em milissegundos).
Recebendo a Resposta
A resposta será um objeto JSON que contém a saída do LLM ou uma mensagem de erro.
Conclusão
O emprego de um broker de mensagens como o KubeMQ permite o desenvolvimento de um roteador eficaz e escalável, capaz de interagir com diversos LLMs. Esta estrutura assegura que clientes possam enviar consultas continuamente e propicia futuras ampliações com novos modelos e funcionalidades.
Entre as vantagens, destacam-se:
- Integração Simplificada: Facilita a interação com diversas APIs de LLM.
- Suporte a Múltiplos Modelos: Roteia eficazmente para o modelo adequado.
- Confiabilidade: Assegura que informações não sejam perdidas durante uso intensivo.
- Redundância: Possibilita operações contínuas mesmo em caso de falhas.
- Escalabilidade: Gerencia altos volumes de tráfego de forma eficiente.
Esse avanço representa uma significativa evolução no uso de brokers de mensagens, otimizando o desenvolvimento de soluções em inteligência artificial.
Conteudo selecionado e editado com assistencia de IA. Fontes originais referenciadas acima.


