Cada sistema RAG que he construido para clientes este año comenzó de la misma manera: "Tenemos un prototipo, pero no funciona bien en producción". El pipeline es siempre el mismo: consulta del usuario, embedding, búsqueda vectorial, top-K, LLM. Funciona en demostraciones con 50 documentos. Luego cargas 10,000 y todo se rompe.
El problema no es que RAG no funcione. El problema es que la arquitectura simplista oculta tres problemas fundamentales que solo aparecen a escala.
El pipeline RAG simplista y por qué se rompe
El flujo estándar de RAG parece engañosamente simple: tomar la consulta del usuario, generar un embedding, buscar los K vectores más similares, concatenar esos fragmentos y pasarlos al LLM. En un notebook con un conjunto de datos curado, esto obtiene resultados impresionantes. Pero la producción es una historia diferente.
El primer problema es que la similitud de coseno entre embeddings no equivale a la relevancia real. El segundo problema es que el top-K se vuelve cada vez más ruidoso a medida que crece tu corpus. El tercer problema es que los fragmentos de tamaño fijo rompen el contexto en los límites de oraciones y secciones.
Recorreré la arquitectura de tres capas que utilizamos en Cloudstudio para resolver cada uno de estos problemas, con el código real que ejecutamos en producción.
Capa 1: Fragmentación inteligente (Smart chunking)
En lugar de cortar cada N tokens, utilizamos una fragmentación recursiva que respeta la estructura del documento: los encabezados, párrafos, bloques de código y tablas se tratan como unidades atómicas. Cada fragmento lleva metadatos del padre: fuente, fecha, jerarquía de secciones y su posición en el documento.
Añadimos una ventana de superposición del 10-15% entre fragmentos consecutivos para que nunca se pierda el contexto en los límites.
Aquí está el fragmentador recursivo que utilizamos. Intenta dividir primero en el límite más significativo (encabezados), luego recurre a párrafos, oraciones y, finalmente, a límites estrictos de tokens:
import re
from dataclasses import dataclass, field
@dataclass
class Chunk:
text: str
metadata: dict = field(default_factory=dict)
token_count: int = 0
chunk_id: str = ""
parent_id: str | None = None
class RecursiveChunker:
"""Split documents respecting structural boundaries."""
def __init__(
self,
max_tokens: int = 512,
overlap_tokens: int = 64,
tokenizer=None,
):
self.max_tokens = max_tokens
self.overlap_tokens = overlap_tokens
self.tokenizer = tokenizer or self._simple_tokenizer
# Separators ordered by priority — try the most meaningful split first
self.separators = [
r'\n#{1,3}\s', # Markdown headings
r'\n\n', # Double newline (paragraph break)
r'\n', # Single newline
r'(?<=[.!?])\s+', # Sentence boundary
r'\s+', # Word boundary (last resort)
]
def chunk_document(self, text: str, source_metadata: dict) -> list[Chunk]:
"""Chunk a document with overlap and metadata propagation."""
raw_sections = self._recursive_split(text, separator_idx=0)
chunks = []
doc_id = source_metadata.get("document_id", "unknown")
for i, section_text in enumerate(raw_sections):
chunk = Chunk(
text=section_text.strip(),
token_count=len(self.tokenizer(section_text)),
chunk_id=f"{doc_id}_chunk_{i:04d}",
metadata={
**source_metadata,
"chunk_index": i,
"total_chunks": len(raw_sections),
"heading_hierarchy": self._extract_headings(section_text),
},
)
chunks.append(chunk)
# Add overlap between consecutive chunks
chunks = self._add_overlap(chunks)
return chunks
def _recursive_split(self, text: str, separator_idx: int) -> list[str]:
"""Try to split with the current separator; if chunks are too big, recurse with the next one."""
if separator_idx >= len(self.separators):
# Last resort: hard cut at token limit
return self._hard_split(text)
pattern = self.separators[separator_idx]
parts = re.split(pattern, text)
result = []
current = ""
for part in parts:
if len(self.tokenizer(current + part)) <= self.max_tokens:
current += part
else:
if current:
result.append(current)
# If this single part is too large, split it with the next separator
if len(self.tokenizer(part)) > self.max_tokens:
result.extend(self._recursive_split(part, separator_idx + 1))
else:
current = part
if current:
result.append(current)
return result
def _add_overlap(self, chunks: list[Chunk]) -> list[Chunk]:
"""Add overlap text from the previous chunk to maintain context at boundaries."""
for i in range(1, len(chunks)):
prev_tokens = self.tokenizer(chunks[i - 1].text)
overlap_text = self._detokenize(prev_tokens[-self.overlap_tokens:])
chunks[i].text = overlap_text + "\n" + chunks[i].text
chunks[i].token_count = len(self.tokenizer(chunks[i].text))
chunks[i].metadata["has_overlap"] = True
return chunks
def _extract_headings(self, text: str) -> list[str]:
return re.findall(r'^#{1,3}\s+(.+)$', text, re.MULTILINE)
def _hard_split(self, text: str) -> list[str]:
tokens = self.tokenizer(text)
return [
self._detokenize(tokens[i:i + self.max_tokens])
for i in range(0, len(tokens), self.max_tokens - self.overlap_tokens)
]
@staticmethod
def _simple_tokenizer(text: str) -> list[str]:
return text.split()
@staticmethod
def _detokenize(tokens: list[str]) -> str:
return " ".join(tokens)
La clave es la prioridad del separador. Un fragmento de 512 tokens que termina en un límite de encabezado es dramáticamente más útil que uno que corta a mitad de una oración. Hemos medido esto: fragmentar en límites estructurales mejora la precisión de la recuperación entre un 15% y un 25% en comparación con la división de tamaño fijo, con un costo adicional de cero.
Generación de embeddings
Una vez que tienes fragmentos limpios, necesitas generar sus embeddings. Utilizamos un pipeline por lotes (batch) que maneja límites de velocidad, reintentos y propagación de metadatos:
import numpy as np
from openai import OpenAI
import time
class EmbeddingPipeline:
"""Generate embeddings with batching and rate-limit handling."""
def __init__(self, model: str = "text-embedding-3-small", batch_size: int = 100):
self.client = OpenAI()
self.model = model
self.batch_size = batch_size
self.dimensions = 1536 # For text-embedding-3-small
def embed_chunks(self, chunks: list[Chunk]) -> list[dict]:
"""Embed all chunks, returning vectors with metadata."""
results = []
for i in range(0, len(chunks), self.batch_size):
batch = chunks[i:i + self.batch_size]
texts = [chunk.text for chunk in batch]
embeddings = self._embed_with_retry(texts)
for chunk, embedding in zip(batch, embeddings):
results.append({
"id": chunk.chunk_id,
"values": embedding,
"metadata": {
**chunk.metadata,
"text": chunk.text,
"token_count": chunk.token_count,
}
})
return results
def _embed_with_retry(self, texts: list[str], max_retries: int = 3) -> list[list[float]]:
for attempt in range(max_retries):
try:
response = self.client.embeddings.create(
model=self.model,
input=texts,
dimensions=self.dimensions,
)
return [item.embedding for item in response.data]
except Exception as e:
if "rate_limit" in str(e).lower() and attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
raise
def embed_query(self, query: str) -> list[float]:
"""Embed a single query for search."""
response = self.client.embeddings.create(
model=self.model,
input=query,
dimensions=self.dimensions,
)
return response.data[0].embedding
Dos elecciones importantes aquí: usamos text-embedding-3-small en lugar de la variante grande porque la diferencia de calidad es marginal para la mayoría de las tareas de recuperación, pero el costo es 5 veces menor. Y establecemos dimensiones explícitas; esto nos permite reducir el tamaño del vector más adelante si el almacenamiento se convierte en una restricción sin tener que volver a generar los embeddings de todo el corpus.
Alternativas de modelos de embedding: Aunque text-embedding-3-small es nuestra opción predeterminada para despliegues sensibles al costo, vale la pena evaluar varios modelos más nuevos con tus datos. Cohere embed-v4 ofrece un sólido soporte multilingüe en más de 100 idiomas y destaca en embeddings orientados a la búsqueda. Voyage-3 es particularmente impresionante para contenido técnico y de código con una ventana de contexto de 32K tokens. Gemini Embedding 2 de Google proporciona una calidad competitiva con límites de velocidad generosos. Para una calidad de recuperación máxima, text-embedding-3-large sigue siendo una de las mejores opciones. El modelo adecuado depende de tu tipo de contenido, requisitos de idioma y restricciones de latencia; siempre realiza pruebas comparativas con tu corpus real antes de comprometerte.
Capa 2: Recuperación híbrida
La búsqueda vectorial pura captura la similitud semántica, pero pierde términos exactos que importan: nombres de productos, códigos de error, IDs internos. La búsqueda por palabras clave BM25 captura esas coincidencias exactas que los vectores pasan por alto. Usar ambas en paralelo te ofrece lo mejor de ambos mundos.
Pero la verdadera magia está en el re-ranking con cross-encoders. Tomas los candidatos combinados de ambas búsquedas y calificas cada uno con un modelo que evalúa pares consulta-documento para determinar la relevancia real, no solo la proximidad vectorial.
Aquí está nuestra implementación de recuperación híbrida. Usamos Qdrant para los vectores y una implementación simple de BM25, luego fusionamos los resultados:
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, Distance, VectorParams
import math
from collections import Counter
class BM25Index:
"""Lightweight BM25 implementation for keyword search."""
def __init__(self, k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
self.doc_freqs: dict[str, int] = {}
self.doc_lengths: dict[str, int] = {}
self.avg_doc_length: float = 0
self.corpus_size: int = 0
self.index: dict[str, dict[str, int]] = {} # term -> {doc_id: freq}
self.documents: dict[str, str] = {}
def add_documents(self, docs: list[dict]):
for doc in docs:
doc_id = doc["id"]
text = doc["text"].lower()
tokens = text.split()
self.documents[doc_id] = doc["text"]
self.doc_lengths[doc_id] = len(tokens)
term_freqs = Counter(tokens)
for term, freq in term_freqs.items():
if term not in self.index:
self.index[term] = {}
self.index[term][doc_id] = freq
self.doc_freqs[term] = len(self.index.get(term, {}))
self.corpus_size = len(self.documents)
self.avg_doc_length = sum(self.doc_lengths.values()) / max(self.corpus_size, 1)
def search(self, query: str, top_k: int = 20) -> list[tuple[str, float]]:
query_terms = query.lower().split()
scores: dict[str, float] = {}
for term in query_terms:
if term not in self.index:
continue
df = self.doc_freqs[term]
idf = math.log((self.corpus_size - df + 0.5) / (df + 0.5) + 1)
for doc_id, tf in self.index[term].items():
dl = self.doc_lengths[doc_id]
numerator = tf * (self.k1 + 1)
denominator = tf + self.k1 * (1 - self.b + self.b * dl / self.avg_doc_length)
scores[doc_id] = scores.get(doc_id, 0) + idf * numerator / denominator
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return ranked[:top_k]
class HybridRetriever:
"""Combine vector search and BM25 with reciprocal rank fusion."""
def __init__(self, qdrant_url: str, collection_name: str):
self.vector_client = QdrantClient(url=qdrant_url)
self.collection = collection_name
self.bm25 = BM25Index()
self.embedding_pipeline = EmbeddingPipeline()
def search(self, query: str, top_k: int = 10, vector_weight: float = 0.6) -> list[dict]:
"""Run hybrid search with reciprocal rank fusion."""
# Run both searches in parallel (simplified here as sequential)
vector_results = self._vector_search(query, top_k=top_k * 2)
bm25_results = self.bm25.search(query, top_k=top_k * 2)
# Reciprocal rank fusion
fused_scores: dict[str, float] = {}
k = 60 # RRF constant
for rank, (doc_id, _score) in enumerate(vector_results):
fused_scores[doc_id] = fused_scores.get(doc_id, 0) + vector_weight / (k + rank + 1)
for rank, (doc_id, _score) in enumerate(bm25_results):
fused_scores[doc_id] = fused_scores.get(doc_id, 0) + (1 - vector_weight) / (k + rank + 1)
# Sort by fused score and return top_k
ranked = sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
return [{"id": doc_id, "score": score} for doc_id, score in ranked]
def _vector_search(self, query: str, top_k: int) -> list[tuple[str, float]]:
query_embedding = self.embedding_pipeline.embed_query(query)
results = self.vector_client.search(
collection_name=self.collection,
query_vector=query_embedding,
limit=top_k,
)
return [(hit.id, hit.score) for hit in results]
Utilizamos la fusión de rangos recíprocos (RRF) en lugar de la normalización de puntuaciones porque RRF es más robusta: las puntuaciones de similitud vectorial y las puntuaciones BM25 viven en escalas completamente diferentes, y normalizarlas introduce artefactos. A RRF solo le importa la posición del rango, lo que hace que la fusión sea estable a través de diferentes consultas y tamaños de corpus.
El parámetro vector_weight tiene un valor predeterminado de 0.6, lo que significa que nos inclinamos ligeramente hacia la búsqueda semántica. Para documentación técnica con muchos términos específicos (códigos de error, endpoints de API), aumentamos el peso de BM25 a 0.5 o incluso 0.6. Para contenido conversacional (artículos de soporte, preguntas frecuentes), mantenemos el dominio de la búsqueda semántica.
Re-ranking para mayor precisión
Los resultados combinados de la búsqueda híbrida son buenos, pero el re-ranking con un modelo cross-encoder eleva la precisión significativamente. Un cross-encoder califica cada par consulta-documento directamente en lugar de comparar embeddings precalculados:
from sentence_transformers import CrossEncoder
class Reranker:
"""Re-rank retrieved chunks using a cross-encoder for maximum relevance."""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
self.model = CrossEncoder(model_name)
def rerank(self, query: str, documents: list[dict], top_k: int = 5) -> list[dict]:
"""Score each document against the query and return the top_k most relevant."""
if not documents:
return []
# Build query-document pairs for the cross-encoder
pairs = [(query, doc["text"]) for doc in documents]
scores = self.model.predict(pairs)
# Attach scores and sort
for doc, score in zip(documents, scores):
doc["rerank_score"] = float(score)
ranked = sorted(documents, key=lambda d: d["rerank_score"], reverse=True)
return ranked[:top_k]
El re-ranking suele tardar entre 50 y 200 ms para 20 candidatos, lo cual es aceptable para la mayoría de las aplicaciones. La mejora en la precisión es sustancial: medimos consistentemente una mejora del 20-30% en la relevancia del top-5 después del re-ranking. El modelo cross-encoder es pequeño (66 millones de parámetros) y puede ejecutarse en CPU, por lo que añade un costo de infraestructura mínimo.
Para aplicaciones sensibles a la latencia, puedes omitir el re-ranking y confiar en las puntuaciones de la fusión híbrida. Pero para cualquier caso donde la calidad de la respuesta importe más que el tiempo de respuesta (bases de conocimiento internas, preguntas y respuestas sobre documentos, búsqueda de cumplimiento), el re-ranking vale esos 100 ms adicionales.
Capa 3: Ensamblaje de contexto
Tener buenos fragmentos no es suficiente. La forma en que los ensamblas en el prompt final determina si el LLM da una respuesta coherente o un resumen inconexo.
La expansión padre-hijo es clave: cuando recuperas un fragmento, extrae su sección principal para obtener el contexto completo. La deduplicación de fuentes evita enviar contenido superpuesto. La gestión del presupuesto de tokens garantiza que quepa el máximo contexto relevante dentro de la ventana del modelo. Y el seguimiento de citas vincula cada afirmación con su fragmento de origen.
Aquí está el ensamblador de contexto que utilizamos:
import tiktoken
class ContextAssembler:
"""Assemble retrieved chunks into a prompt with citations and token budgets."""
def __init__(self, max_context_tokens: int = 12000):
self.max_tokens = max_context_tokens
self.encoder = tiktoken.encoding_for_model("gpt-4") # Token counting
def assemble(self, query: str, ranked_chunks: list[dict], chunk_store: dict) -> dict:
"""Build the context block with citation tracking."""
context_parts = []
citations = []
used_tokens = 0
seen_texts = set() # For deduplication
for rank, chunk in enumerate(ranked_chunks):
# Parent expansion: if the chunk has a parent, include it for context
expanded_text = self._expand_with_parent(chunk, chunk_store)
# Deduplication: skip if we have seen substantially similar content
text_fingerprint = self._fingerprint(expanded_text)
if text_fingerprint in seen_texts:
continue
seen_texts.add(text_fingerprint)
# Token budget check
chunk_tokens = len(self.encoder.encode(expanded_text))
if used_tokens + chunk_tokens > self.max_tokens:
# Try to fit a truncated version
remaining = self.max_tokens - used_tokens
if remaining > 100: # Only include if meaningful
expanded_text = self._truncate_to_tokens(expanded_text, remaining)
chunk_tokens = remaining
else:
break
# Add with citation marker
citation_id = f"[{len(citations) + 1}]"
source = chunk.get("metadata", {}).get("source", "Unknown")
section = chunk.get("metadata", {}).get("heading_hierarchy", [])
section_str = " > ".join(section) if section else "N/A"
context_parts.append(
f"--- Source {citation_id}: {source} | Section: {section_str} ---\n"
f"{expanded_text}\n"
)
citations.append({
"id": citation_id,
"source": source,
"section": section_str,
"chunk_id": chunk.get("id"),
"relevance_score": chunk.get("rerank_score", chunk.get("score", 0)),
})
used_tokens += chunk_tokens
context_block = "\n".join(context_parts)
return {
"context": context_block,
"citations": citations,
"tokens_used": used_tokens,
"chunks_included": len(citations),
"prompt": self._build_prompt(query, context_block, citations),
}
def _build_prompt(self, query: str, context: str, citations: list) -> str:
citation_legend = "\n".join(
f" {c['id']} = {c['source']} ({c['section']})" for c in citations
)
return (
f"Answer the following question using ONLY the provided sources. "
f"Cite your sources using the bracket notation (e.g., [1], [2]).\n"
f"If the sources do not contain enough information, say so explicitly.\n\n"
f"Sources:\n{context}\n\n"
f"Citation legend:\n{citation_legend}\n\n"
f"Question: {query}\n\n"
f"Answer:"
)
def _expand_with_parent(self, chunk: dict, chunk_store: dict) -> str:
"""If the chunk has a parent, prepend the parent's heading for context."""
parent_id = chunk.get("metadata", {}).get("parent_id")
if parent_id and parent_id in chunk_store:
parent = chunk_store[parent_id]
headings = parent.get("metadata", {}).get("heading_hierarchy", [])
if headings:
return f"## {headings[-1]}\n\n{chunk['text']}"
return chunk["text"]
def _fingerprint(self, text: str) -> str:
"""Create a rough fingerprint for deduplication."""
words = text.lower().split()[:50]
return " ".join(sorted(set(words)))
def _truncate_to_tokens(self, text: str, max_tokens: int) -> str:
tokens = self.encoder.encode(text)[:max_tokens]
return self.encoder.decode(tokens)
El seguimiento de citas es esencial para un sistema RAG en producción. Los usuarios necesitan verificar las respuestas y tu sistema necesita una pista de auditoría. Cada afirmación en la respuesta del LLM puede rastrearse hasta un fragmento específico, que a su vez puede rastrearse hasta un documento y sección específicos. Sin citas, RAG es una caja negra en la que nadie confía.
Evaluación: midiendo la calidad de la recuperación
No puedes mejorar lo que no mides. Mantenemos un conjunto de evaluación de pares consulta-documento_esperado y ejecutamos métricas automatizadas después de cada cambio en el pipeline:
from dataclasses import dataclass
@dataclass
class RetrievalEvalResult:
query: str
expected_doc_ids: list[str]
retrieved_doc_ids: list[str]
precision_at_5: float
recall_at_5: float
mrr: float # Mean reciprocal rank
hit: bool # Was any expected doc in top 5?
class RAGEvaluator:
"""Evaluate retrieval pipeline quality against a labeled dataset."""
def __init__(self, retriever: HybridRetriever, reranker: Reranker):
self.retriever = retriever
self.reranker = reranker
def evaluate(self, eval_set: list[dict], top_k: int = 5) -> dict:
"""Run evaluation on a set of {query, expected_doc_ids} pairs."""
results = []
for item in eval_set:
query = item["query"]
expected = set(item["expected_doc_ids"])
# Run the full retrieval pipeline
candidates = self.retriever.search(query, top_k=20)
reranked = self.reranker.rerank(query, candidates, top_k=top_k)
retrieved = [doc["id"] for doc in reranked]
# Calculate metrics
hits_at_k = [1 if doc_id in expected else 0 for doc_id in retrieved[:top_k]]
precision = sum(hits_at_k) / top_k if top_k > 0 else 0
recall = sum(hits_at_k) / len(expected) if expected else 0
# Mean reciprocal rank
mrr = 0.0
for rank, doc_id in enumerate(retrieved, 1):
if doc_id in expected:
mrr = 1.0 / rank
break
results.append(RetrievalEvalResult(
query=query,
expected_doc_ids=list(expected),
retrieved_doc_ids=retrieved,
precision_at_5=precision,
recall_at_5=recall,
mrr=mrr,
hit=any(hits_at_k),
))
# Aggregate metrics
n = len(results)
return {
"num_queries": n,
"avg_precision_at_5": sum(r.precision_at_5 for r in results) / n,
"avg_recall_at_5": sum(r.recall_at_5 for r in results) / n,
"avg_mrr": sum(r.mrr for r in results) / n,
"hit_rate": sum(1 for r in results if r.hit) / n,
"results": results, # For detailed analysis
}
Ejecutamos esta evaluación como un paso de CI (Integración Continua). Cada cambio en la estrategia de fragmentación, el modelo de embedding, los pesos de recuperación o el modelo de re-ranking activa una ejecución de evaluación completa. La métrica clave que optimizamos es la tasa de aciertos (hit rate) —el porcentaje de consultas donde al menos un documento relevante está en el top 5—, ya que es la que más correlaciona con la satisfacción del usuario final.
Un sistema RAG saludable en producción debería alcanzar: una tasa de aciertos superior al 90%, un MRR superior a 0.6 y una precisión@5 superior a 0.4. Si tus números están por debajo de estos umbrales, lo primero que debes corregir suele ser la fragmentación: los fragmentos deficientes envenenan todo lo que viene después.
Alternativas de frameworks
La arquitectura de tres capas descrita anteriormente es independiente del framework: puedes implementarla con Python puro como se muestra, o usar frameworks establecidos que proporcionan componentes preconstruidos. LlamaIndex Workflows ofrece pipelines RAG listos para producción con cargadores de documentos integrados, estrategias de fragmentación y módulos de recuperación que se ajustan estrechamente a las capas descritas aquí. LangGraph (del equipo de LangChain, pero una librería separada y más orientada a la producción) proporciona orquestación de flujos de trabajo basada en grafos que