anujjoshi3105's picture
first commit
361bd3e
import json
import hashlib
import time
import re
from typing import Optional, List, Tuple
from langsmith import uuid7
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from core.llm import get_model
from core.settings import settings
from scripts.portfolio.prompt import PORTFOLIO_INGESTION_SYSTEM_PROMPT
class DocumentChunker:
"""Service for splitting documents into chunks."""
def __init__(self, chunk_size: int = 1500, chunk_overlap: int = 200):
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
is_separator_regex=True,
)
print(f"DEBUG: Initialized DocumentChunker with chunk_size={chunk_size}, overlap={chunk_overlap}")
def chunk_document(self, doc: Document, base_id: str, content_hash: str) -> List[Tuple[Document, str]]:
"""
Splits a document into chunks and prepares them for storage.
Args:
doc: The document to chunk
base_id: The base document ID
content_hash: The content hash for change detection
Returns:
List of tuples (chunk_document, chunk_id)
"""
chunks = self.text_splitter.split_documents([doc])
chunked_docs = []
for idx, chunk in enumerate(chunks):
chunk_id = f"{base_id}_chunk_{idx}"
chunk.metadata["content_hash"] = content_hash
chunk.metadata["base_id"] = base_id
chunked_docs.append((chunk, chunk_id))
print(f"DEBUG: Split document {base_id} into {len(chunked_docs)} chunks")
return chunked_docs
class DocumentEnricher:
"""Service for enriching documents using LLM with generalized retry logic."""
def __init__(self):
self.llm = get_model(settings.DEFAULT_MODEL)
self.enrich_prompt = ChatPromptTemplate.from_messages([
("system", PORTFOLIO_INGESTION_SYSTEM_PROMPT),
("human", "Category: {category}\n\nMetadata:\n{metadata}\n\nContent:\n{content}")
])
print(f"INFO: Initialized DocumentEnricher with {settings.DEFAULT_MODEL}")
def enrich(self, doc: Document, category: str, max_retries: int = 5) -> Tuple[Optional[Document], str, str]:
pid = str(doc.metadata.get("id", uuid7()))
title = doc.metadata.get("Title", "Untitled")
for attempt in range(max_retries):
try:
if attempt > 0:
wait_time = min(2 ** attempt, 60)
print(f"INFO: Retrying {title} (attempt {attempt + 1}/{max_retries}) in {wait_time}s...")
time.sleep(wait_time)
else:
print(f"INFO: Enriching document: {title} (PID: {pid})")
res = self.llm.invoke(
self.enrich_prompt.format_messages(
category=category,
metadata=json.dumps(doc.metadata, default=str),
content=doc.page_content or "No content provided."
),
config=RunnableConfig(run_id=uuid7())
)
enriched_content = res.content.strip()
content_hash = hashlib.sha256(enriched_content.encode('utf-8')).hexdigest()
enriched_doc = Document(
page_content=enriched_content,
metadata={
**doc.metadata,
"category": category,
"content_hash": content_hash,
"base_id": pid
}
)
return enriched_doc, pid, content_hash
except Exception as e:
error_msg = str(e).lower()
error_type = type(e).__name__.lower()
# --- Rate Limit Detection ---
is_rate_limit = any(keyword in error_msg or keyword in error_type
for keyword in ["429", "rate_limit", "rate limit", "too many requests", "throttled"])
# --- Overloaded/Server Error Detection ---
is_server_error = any(keyword in error_msg
for keyword in ["500", "502", "503", "overloaded", "unavailable", "deadline_exceeded"])
if is_rate_limit or is_server_error:
wait_time = 5 # Default
match = re.search(r'(?:try again in|retry after|wait)\s*([\d.]+)\s*s', error_msg)
if match:
wait_time = float(match.group(1)) + 1
if attempt < max_retries - 1:
print(f"WARN: API issue (Rate Limit/Overload) for {title}. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
# Non-retriable or final attempt failure
print(f"ERROR: Enrichment failed for {title}: {e}")
if attempt >= 1:
return None, pid, ""
return None, pid, ""