File size: 5,477 Bytes
361bd3e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import json
import hashlib
import time
import re
from typing import Optional, List, Tuple

from langsmith import uuid7
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter

from core.llm import get_model
from core.settings import settings
from scripts.portfolio.prompt import PORTFOLIO_INGESTION_SYSTEM_PROMPT

class DocumentChunker:
    """Service for splitting documents into chunks."""
    
    def __init__(self, chunk_size: int = 1500, chunk_overlap: int = 200):
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            is_separator_regex=True,
        )
        print(f"DEBUG: Initialized DocumentChunker with chunk_size={chunk_size}, overlap={chunk_overlap}")
    
    def chunk_document(self, doc: Document, base_id: str, content_hash: str) -> List[Tuple[Document, str]]:
        """
        Splits a document into chunks and prepares them for storage.
        
        Args:
            doc: The document to chunk
            base_id: The base document ID
            content_hash: The content hash for change detection
            
        Returns:
            List of tuples (chunk_document, chunk_id)
        """
        chunks = self.text_splitter.split_documents([doc])
        chunked_docs = []
        
        for idx, chunk in enumerate(chunks):
            chunk_id = f"{base_id}_chunk_{idx}"
            chunk.metadata["content_hash"] = content_hash
            chunk.metadata["base_id"] = base_id
            chunked_docs.append((chunk, chunk_id))
        
        print(f"DEBUG: Split document {base_id} into {len(chunked_docs)} chunks")
        return chunked_docs


class DocumentEnricher:
    """Service for enriching documents using LLM with generalized retry logic."""
    
    def __init__(self):
        self.llm = get_model(settings.DEFAULT_MODEL)
        self.enrich_prompt = ChatPromptTemplate.from_messages([
            ("system", PORTFOLIO_INGESTION_SYSTEM_PROMPT),
            ("human", "Category: {category}\n\nMetadata:\n{metadata}\n\nContent:\n{content}")
        ])
        print(f"INFO: Initialized DocumentEnricher with {settings.DEFAULT_MODEL}")
    
    def enrich(self, doc: Document, category: str, max_retries: int = 5) -> Tuple[Optional[Document], str, str]:
        pid = str(doc.metadata.get("id", uuid7()))
        title = doc.metadata.get("Title", "Untitled")
        
        for attempt in range(max_retries):
            try:
                if attempt > 0:
                    wait_time = min(2 ** attempt, 60)
                    print(f"INFO: Retrying {title} (attempt {attempt + 1}/{max_retries}) in {wait_time}s...")
                    time.sleep(wait_time)
                else:
                    print(f"INFO: Enriching document: {title} (PID: {pid})")
                
                res = self.llm.invoke(
                    self.enrich_prompt.format_messages(
                        category=category,
                        metadata=json.dumps(doc.metadata, default=str),
                        content=doc.page_content or "No content provided."
                    ),
                    config=RunnableConfig(run_id=uuid7())
                )
                
                enriched_content = res.content.strip()
                content_hash = hashlib.sha256(enriched_content.encode('utf-8')).hexdigest()
                
                enriched_doc = Document(
                    page_content=enriched_content,
                    metadata={
                        **doc.metadata,
                        "category": category,
                        "content_hash": content_hash,
                        "base_id": pid
                    }
                )
                return enriched_doc, pid, content_hash
                
            except Exception as e:
                error_msg = str(e).lower()
                error_type = type(e).__name__.lower()
                
                # --- Rate Limit Detection ---
                is_rate_limit = any(keyword in error_msg or keyword in error_type 
                                   for keyword in ["429", "rate_limit", "rate limit", "too many requests", "throttled"])
                
                # --- Overloaded/Server Error Detection ---
                is_server_error = any(keyword in error_msg 
                                     for keyword in ["500", "502", "503", "overloaded", "unavailable", "deadline_exceeded"])

                if is_rate_limit or is_server_error:
                    wait_time = 5 # Default
                    match = re.search(r'(?:try again in|retry after|wait)\s*([\d.]+)\s*s', error_msg)
                    if match:
                        wait_time = float(match.group(1)) + 1
                    
                    if attempt < max_retries - 1:
                        print(f"WARN: API issue (Rate Limit/Overload) for {title}. Waiting {wait_time}s...")
                        time.sleep(wait_time)
                        continue
                
                # Non-retriable or final attempt failure
                print(f"ERROR: Enrichment failed for {title}: {e}")
                if attempt >= 1:
                    return None, pid, ""

        return None, pid, ""