Claude no es simplemente otro modelo de lenguaje. Su arquitectura está diseñada para integraciones empresariales serias: uso de herramientas nativo que permite al modelo invocar funciones de su sistema, salidas estructuradas que garantizan respuestas en el formato exacto que necesita, y streaming que mantiene la experiencia del usuario fluida incluso con respuestas largas.
En este artículo compartimos los patrones que utilizamos en Cloudstudio para integrar Claude en aplicaciones de producción. Esta no es una guía de "hola mundo"; es lo que funciona cuando tienes miles de solicitudes por día y el costo importa.
Uso de herramientas: El superpoder de Claude
El uso de herramientas le permite definir herramientas como esquemas JSON que Claude puede invocar durante una conversación. El modelo decide cuándo usar cada herramienta, con qué parámetros y cómo interpretar los resultados. Esto convierte a Claude de un generador de texto en un componente activo de su sistema.
La clave es diseñar herramientas granulares y bien documentadas. Cada herramienta debe hacer una cosa bien, con un esquema claro y una descripción que el modelo pueda entender. Las herramientas que son demasiado amplias confunden al modelo; las herramientas que son demasiado granulares generan llamadas excesivas.
Aquí hay un ejemplo real de cómo definimos las herramientas para el sistema de gestión de pedidos de un cliente:
import anthropic
client = anthropic.Anthropic()
tools = [
{
"name": "lookup_order",
"description": "Busca un pedido por ID de pedido. Devuelve el estado del pedido, artículos, información de envío y detalles de pago. Usa esto cuando el usuario pregunte por un pedido específico.",
"input_schema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "El ID del pedido, ej. ORD-2024-1234"
}
},
"required": ["order_id"]
}
},
{
"name": "search_orders",
"description": "Busca pedidos por correo electrónico del cliente, rango de fechas o estado. Devuelve una lista de pedidos coincidentes. Usa esto cuando el usuario quiera encontrar pedidos que coincidan con ciertos criterios.",
"input_schema": {
"type": "object",
"properties": {
"email": {
"type": "string",
"description": "Dirección de correo electrónico del cliente"
},
"status": {
"type": "string",
"enum": ["pending", "shipped", "delivered", "cancelled"],
"description": "Filtro de estado del pedido"
},
"date_from": {
"type": "string",
"description": "Fecha de inicio en formato AAAA-MM-DD"
},
"date_to": {
"type": "string",
"description": "Fecha de fin en formato AAAA-MM-DD"
}
},
"required": []
}
}
]
La descripción de la herramienta es crítica: es la señal principal que Claude utiliza para decidir qué herramienta llamar. Hemos descubierto que las descripciones escritas como instrucciones ("Usa esto cuando...") superan a las descripciones que simplemente indican lo que hace la herramienta.
Integración de herramientas moderna con MCP y Agent SDK
Si está integrando Claude en una aplicación que se conecta a muchos sistemas externos, el Protocolo de Contexto de Modelo (MCP) puede simplificar drásticamente su capa de herramientas. MCP es ahora el estándar de la industria para conectar modelos de IA a herramientas y fuentes de datos, respaldado por la Linux Foundation con el apoyo de Anthropic, OpenAI, Google, Microsoft y AWS, y superando los 97 millones de descargas mensuales del SDK. En lugar de definir cada herramienta manualmente como se mostró anteriormente, MCP permite que su aplicación descubra e invoque herramientas de cualquier servidor compatible con MCP a través de un protocolo universal.
Para los equipos que desean una abstracción de mayor nivel para construir agentes impulsados por Claude, el Anthropic Agent SDK proporciona gestión de herramientas lista para producción, guardrails y orquestación multi-agente de forma predeterminada. Y para proyectos de TypeScript que necesitan soportar múltiples proveedores de IA (Claude, GPT-5.4, Gemini 3.1), Vercel AI SDK 6 ofrece un framework agnóstico al proveedor con soporte nativo para MCP, aprobación de herramientas con humano en el bucle y un AI Gateway.
Manejo de resultados de herramientas en el bucle de conversación
Cuando Claude decide usar una herramienta, la API devuelve una respuesta con tool_use. Su aplicación ejecuta la herramienta y envía el resultado de vuelta. Este es el bucle completo:
def run_conversation(user_message: str, tools: list, system_prompt: str) -> str:
messages = [{"role": "user", "content": user_message}]
while True:
response = client.messages.create(
model="claude-sonnet-4-6-6",
max_tokens=4096,
system=system_prompt,
tools=tools,
messages=messages,
)
# Si Claude responde con texto y se detiene, hemos terminado
if response.stop_reason == "end_turn":
return next(
block.text for block in response.content
if block.type == "text"
)
# Si Claude quiere usar una herramienta, ejecútala
if response.stop_reason == "tool_use":
# Añade la respuesta de Claude (con bloques tool_use) a los mensajes
messages.append({"role": "assistant", "content": response.content})
# Procesa cada llamada a herramienta
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = execute_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result),
})
# Envía los resultados de la herramienta de vuelta a Claude
messages.append({"role": "user", "content": tool_results})
def execute_tool(name: str, params: dict) -> dict:
"""Enruta las llamadas a herramientas a las implementaciones reales."""
handlers = {
"lookup_order": order_service.lookup,
"search_orders": order_service.search,
}
handler = handlers.get(name)
if not handler:
return {"error": f"Herramienta desconocida: {name}"}
try:
return handler(**params)
except Exception as e:
return {"error": str(e)}
Un detalle crítico: siempre maneje el caso en el que falla una llamada a la herramienta. Devuelva el error como un resultado de la herramienta; Claude normalmente reconocerá el error y volverá a intentarlo con parámetros diferentes o explicará la situación al usuario. Nunca ignore los errores de las herramientas en silencio.
Salidas estructuradas: Respuestas estructuradas para sistemas reales
Cuando Claude es parte de un flujo de trabajo automatizado, necesita respuestas en un formato predecible. Las salidas estructuradas obligan a la respuesta a seguir un esquema JSON específico. Esto elimina el frágil procesamiento de texto libre y hace que la integración sea robusta.
Utilizamos salidas estructuradas para la clasificación de documentos, la extracción de datos, el análisis de sentimiento y cualquier caso en el que la respuesta alimente a otro componente del sistema. La confiabilidad pasa de ~90% con prompts de texto libre a ~99% con esquemas estrictos.
Así es como extraemos datos estructurados de correos electrónicos de soporte para el sistema de tickets de un cliente:
import anthropic
import json
client = anthropic.Anthropic()
def classify_support_email(email_body: str) -> dict:
response = client.messages.create(
model="claude-sonnet-4-6-6",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"Analiza este correo electrónico de soporte y extrae datos estructurados:\n\n{email_body}"
}],
tool_choice={"type": "tool", "name": "classify_email"},
tools=[{
"name": "classify_email",
"description": "Clasifica y extrae datos de un correo electrónico de soporte.",
"input_schema": {
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": ["billing", "technical", "account", "feature_request", "complaint"]
},
"priority": {
"type": "string",
"enum": ["low", "medium", "high", "urgent"]
},
"sentiment": {
"type": "string",
"enum": ["positive", "neutral", "negative", "angry"]
},
"summary": {
"type": "string",
"description": "Resumen de una oración del problema"
},
"entities": {
"type": "object",
"properties": {
"order_ids": {
"type": "array",
"items": {"type": "string"}
},
"product_names": {
"type": "array",
"items": {"type": "string"}
}
}
},
"suggested_action": {
"type": "string",
"description": "Paso siguiente recomendado para el equipo de soporte"
}
},
"required": ["category", "priority", "sentiment", "summary", "suggested_action"]
}
}]
)
# Extrae el resultado estructurado de la llamada a la herramienta
tool_block = next(b for b in response.content if b.type == "tool_use")
return tool_block.input
El truco aquí es usar tool_choice con un nombre de herramienta específico; esto obliga a Claude a llamar a esa herramienta en particular, garantizando una respuesta estructurada. El modelo no puede responder con texto libre. Este patrón es más confiable que pedir JSON en el prompt porque el esquema se aplica a nivel de API.
Streaming: Respuestas en tiempo real para interfaces interactivas
Para aplicaciones de cara al usuario, el streaming no es negociable. Sin él, los usuarios se quedan mirando un indicador de carga durante 5 a 15 segundos. Con el streaming, el primer token aparece en menos de un segundo y la respuesta se construye progresivamente.
Implementamos el streaming utilizando Eventos Enviados por el Servidor (SSE), que funciona de forma nativa con navegadores y frameworks modernos:
import anthropic
from flask import Response, stream_with_context
client = anthropic.Anthropic()
def stream_response(user_message: str, conversation_history: list):
"""Transmite la respuesta de Claude como Eventos Enviados por el Servidor (SSE)."""
def generate():
with client.messages.stream(
model="claude-sonnet-4-6-6",
max_tokens=4096,
messages=conversation_history + [
{"role": "user", "content": user_message}
],
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
yield f"data: {json.dumps({'text': event.delta.text})}\n\n"
elif event.type == "message_stop":
# Envía estadísticas de uso al final
usage = stream.get_final_message().usage
yield f"data: {json.dumps({'done': True, 'input_tokens': usage.input_tokens, 'output_tokens': usage.output_tokens})}\n\n"
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
}
)
En el frontend, consumir el flujo es sencillo:
async function streamChat(message) {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n\n');
buffer = lines.pop(); // Mantiene el fragmento incompleto en el búfer
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.text) {
appendToResponse(data.text);
}
if (data.done) {
showTokenUsage(data.input_tokens, data.output_tokens);
}
}
}
}
}
Un detalle importante: el streaming con uso de herramientas requiere manejar eventos content_block_start para detectar cuándo Claude comienza una llamada a una herramienta. La entrada de la herramienta llega como deltas que debe acumular y procesar como JSON una vez que el bloque esté completo. Envolvemos esto en una máquina de estados que rastrea si el bloque actual es texto o uso de herramienta.
Gestión de costos en producción
El costo de los tokens es predecible si se diseña para ello. Utilizamos el caché de prompts para reducir el costo de los prompts de sistema repetidos, max_tokens para limitar las respuestas y seleccionamos el modelo por tarea: Haiku para clasificación rápida, Sonnet para razonamiento general, Opus para tareas que requieren la máxima calidad.
Monitoreamos el costo por solicitud, por usuario y por función. Establecemos alertas cuando el costo se desvía de la línea base. Y diseñamos respaldos: si el modelo principal no responde a tiempo, degradamos a un modelo más rápido en lugar de fallar.
Selección de modelo por tarea
No todas las solicitudes necesitan el mismo modelo. Enrutamos según la complejidad de la tarea:
from enum import Enum
from dataclasses import dataclass
class ModelTier(Enum):
FAST = "claude-haiku-4-5-5"
BALANCED = "claude-sonnet-4-6-6"
MAX = "claude-opus-4-6-6"
@dataclass
class TaskConfig:
model: ModelTier
max_tokens: int
cache_system_prompt: bool = True
# Enruta las tareas al modelo correcto
TASK_ROUTING = {
"classify_intent": TaskConfig(ModelTier.FAST, max_tokens=128),
"extract_entities": TaskConfig(ModelTier.FAST, max_tokens=512),
"summarize_document": TaskConfig(ModelTier.BALANCED, max_tokens=2048),
"generate_report": TaskConfig(ModelTier.BALANCED, max_tokens=4096),
"complex_analysis": TaskConfig(ModelTier.MAX, max_tokens=4096),
"code_review": TaskConfig(ModelTier.MAX, max_tokens=4096),
}
def get_model_for_task(task_type: str) -> TaskConfig:
return TASK_ROUTING.get(
task_type,
TaskConfig(ModelTier.BALANCED, max_tokens=2048) # por defecto
)
Caché de prompts para la reducción de costos
Los prompts de sistema que se repiten en las solicitudes son costosos. Con el caché de prompts, Anthropic almacena el prefijo del prompt y cobra un 90% menos por los tokens almacenados en caché. Para un prompt de sistema de 2,000 tokens repetido en 1,000 solicitudes diarias, eso representa un ahorro significativo:
def create_cached_request(user_message: str, task_type: str):
config = get_model_for_task(task_type)
system_blocks = [
{
"type": "text",
"text": SYSTEM_PROMPTS[task_type], # Prompt de sistema largo y detallado
"cache_control": {"type": "ephemeral"}
}
]
response = client.messages.create(
model=config.model.value,
max_tokens=config.max_tokens,
system=system_blocks,
messages=[{"role": "user", "content": user_message}],
)
# Registra el rendimiento del caché
logger.info(
"Estadísticas de caché",
extra={
"cache_read_tokens": response.usage.cache_read_input_tokens,
"cache_creation_tokens": response.usage.cache_creation_input_tokens,
"input_tokens": response.usage.input_tokens,
}
)
return response
En nuestros sistemas de producción, el caché de prompts suele alcanzar una tasa de aciertos del 70-85% después del calentamiento, lo que se traduce en una reducción de costos de aproximadamente el 60% en los tokens de entrada.
Manejo de errores y reintentos
En producción, las llamadas a la API fallan. Se alcanzan los límites de tasa. Las redes agotan el tiempo de espera. Su integración debe manejar todo esto con elegancia sin perder el contexto del usuario ni generar acciones duplicadas.
Usamos retroceso exponencial con jitter para los reintentos, y distinguimos entre errores reintentables y no reintentables:
import time
import random
import anthropic
def call_claude_with_retries(
messages: list,
model: str = "claude-sonnet-4-6-6",
max_retries: int = 3,
**kwargs
) -> anthropic.types.Message:
"""Llama a Claude con retroceso exponencial y lógica de reintento inteligente."""
client = anthropic.Anthropic()
for attempt in range(max_retries + 1):
try:
return client.messages.create(
model=model,
messages=messages,
**kwargs,
)
except anthropic.RateLimitError as e:
if attempt == max_retries:
raise
# Usa el encabezado retry-after si está disponible, de lo contrario, retroceso exponencial
retry_after = float(e.response.headers.get("retry-after", 0))
wait = max(retry_after, (2 ** attempt) + random.uniform(0, 1))
logger.warning(f"Límite de tasa alcanzado, reintentando en {wait:.1f}s (intento {attempt + 1})")
time.sleep(wait)
except anthropic.APIStatusError as e:
if e.status_code >= 500:
# Los errores del servidor son reintentables
if attempt == max_retries:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
logger