Nirmal
file upload
cf71c95
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
from typing import Dict, List, Any, Iterator
import cohere
from cohere import StreamedChatResponseV2
from app.config.settings import LLM_MODEL
from app.utils.exceptions import DocumentProcessingError, NoRelevantContentError
from app.utils.performance import timeit
logger = logging.getLogger(__name__)
class DocumentSummarizer:
"""
Processes documents and generates streaming summaries using vector search
and LLM-based summarization with Cohere's streaming API.
"""
# Define components and their descriptions
COMPONENT_TYPES = {
'basic_info': "Basic Paper Information",
'abstract': "Abstract Summary",
'methods': "Methodology Summary",
'results': "Key Results",
'limitations': "Limitations & Future Work",
'related_work': "Related Work",
'applications': "Practical Applications",
'technical': "Technical Details",
'equations': "Key Equations",
'resource_link': "Original Research Link",
}
# Define the order of sections in the final document
SECTIONS_ORDER = [
'basic_info', 'abstract', 'methods', 'results',
'equations', 'technical', 'related_work',
'applications', 'limitations', 'resource_link'
]
def __init__(self, retriever, max_workers: int = 16, batch_size: int = 4):
"""Initialize summarizer with vector retriever and configuration."""
self.retriever = retriever
self.batch_size = batch_size
self.max_workers = max_workers
self.cohere_client = cohere.ClientV2()
self._prompts = self._load_prompts()
# Validate configuration
self._validate_config()
@lru_cache(maxsize=1)
def _load_prompts(self) -> Dict[str, str]:
"""Load and cache prompts for each component type."""
try:
from ..summarization.prompts import (
basic_info_prompt, abstract_prompt,
methods_prompt, results_prompt, limitations_prompt,
related_work_prompt, applications_prompt,
technical_prompt, equations_prompt, resource_link_prompt
)
return {
'basic_info': basic_info_prompt,
'abstract': abstract_prompt,
'methods': methods_prompt,
'results': results_prompt,
'limitations': limitations_prompt,
'related_work': related_work_prompt,
'applications': applications_prompt,
'technical': technical_prompt,
'equations': equations_prompt,
'resource_link': resource_link_prompt,
}
except ImportError as e:
logger.error(f"Failed to load summarization prompts: {e}")
return {}
def _validate_config(self) -> None:
"""Validate that all components have corresponding prompts."""
if not self._prompts:
raise ValueError("No prompts loaded for document summarization")
missing_prompts = [comp for comp in self.COMPONENT_TYPES if comp not in self._prompts]
if missing_prompts:
logger.warning(f"Missing prompts for components: {missing_prompts}")
def get_streaming_summary(
self,
documents: List[str],
prompt: str,
language: str = "en"
) -> Iterator[StreamedChatResponseV2]:
"""
Generate a streaming summary using Cohere's chat API.
Returns a generator that yields events as content is generated.
"""
if not documents:
raise NoRelevantContentError("No document content provided for summarization")
try:
return self.cohere_client.chat_stream(
model=LLM_MODEL,
documents=documents,
messages=[
{"role": "system", "content": f"You are an expert summarization AI. Please respond in {language}."},
{"role": "user", "content": prompt}
],
)
except Exception as e:
logger.error(f"Cohere API error: {e}")
raise DocumentProcessingError(f"Failed to generate summary: {str(e)}")
def get_relevant_document_chunks(self, component: str, filename: str, chunk_size: int) -> List[str]:
"""Retrieve relevant document chunks for a specific component using vector search."""
component_description = self.COMPONENT_TYPES.get(component, component)
query = f"Analyze the {component_description} section from the document titled '{filename}'."
try:
return self.retriever.get_relevant_docs(
chromdb_query=query,
rerank_query=query,
filter={'filename': filename},
chunk_size=chunk_size
)
except Exception as e:
logger.error(f"Document retrieval error for {component}: {e}")
return []
def _process_resource_link(self, comp_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Process resource link component for streaming generation.
"""
filename = comp_data['filename']
document_text = comp_data.get('document_text', '')
try:
# Generate streaming resource link
stream_generator = self.get_resource_link_stream(document_text)
# Create component with stream
component = {
'filename': filename,
'comp_name': 'resource_link',
'resource_link': stream_generator,
'success': True
}
logger.info(f"Created resource link stream generator for '{filename}'")
return component
except Exception as e:
logger.error(f"Failed to process resource link for '{filename}': {e}")
return {
'filename': filename,
'comp_name': 'resource_link',
'success': False,
'error': str(e)
}
def get_resource_link_stream(self, document_text: str) -> Iterator:
"""
Generate a streaming response for finding the original research paper link.
Returns a generator that yields events as content is generated.
"""
if not document_text:
logger.error("Empty document content provided for resource link lookup")
raise NoRelevantContentError("No document content provided for resource link lookup")
try:
cohere_client = cohere.Client()
yield cohere_client.chat(
model=LLM_MODEL,
message=f"Find the research paper link for this document: {document_text[:1000]} Respond only with the link.",
connectors=[{"id": "web-search"}],
).text
except Exception as e:
logger.error(f"Cohere API error in resource link lookup: {e}")
raise DocumentProcessingError(f"Failed to generate resource link: {str(e)}")
def _process_component(self, comp_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Process a single component for streaming summary generation.
This is used by the ThreadPoolExecutor to parallelize component processing.
"""
comp_name = comp_data['comp_name']
filename = comp_data['filename']
language = comp_data.get('language', 'en')
chunk_size = comp_data.get('chunk_size', 1000)
try:
# Get relevant document chunks
document_chunks = self.get_relevant_document_chunks(comp_name, filename, chunk_size)
if not document_chunks:
logger.warning(f"No relevant content found for {comp_name} in '{filename}'")
return {
'filename': filename,
'comp_name': comp_name,
'success': False,
'error': 'No relevant content found'
}
# Get prompt for this component
prompt = self._prompts.get(comp_name)
if not prompt:
logger.warning(f"No prompt defined for component: {comp_name}")
return {
'filename': filename,
'comp_name': comp_name,
'success': False,
'error': 'No prompt defined'
}
# Generate streaming summary
stream_generator = self.get_streaming_summary(document_chunks, prompt, language)
# Create component with stream
component = {
'filename': filename,
'comp_name': comp_name,
comp_name: stream_generator,
'success': True
}
logger.info(f"Created stream generator for '{filename}' component '{comp_name}'")
return component
except Exception as e:
logger.error(f"Failed to process component '{comp_name}' for '{filename}': {e}")
return {
'filename': filename,
'comp_name': comp_name,
'success': False,
'error': str(e)
}
@timeit
def generate_summarizer_components(
self,
filename: str,
language: str = "en",
chunk_size: int = 1000,
document_text: str = ""
) -> List[Dict[str, Any]]:
"""
Generate streaming summary components for a document using parallel processing.
Returns a list of component dictionaries, each containing a
streaming generator for incremental content consumption.
"""
logger.info(f"Generating summaries for '{filename}' using ThreadPoolExecutor with {self.max_workers} workers")
# Prepare component data for parallel processing
component_tasks = [
{
'comp_name': comp_name,
'filename': filename,
'language': language,
'chunk_size': chunk_size,
'document_text': document_text
}
for comp_name in self.COMPONENT_TYPES
]
components = []
# Process components in parallel using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {}
# Submit normal components
for task in component_tasks:
if task['comp_name'] != 'resource_link':
futures[executor.submit(self._process_component, task)] = task['comp_name']
else:
futures[executor.submit(self._process_resource_link, task)] = task['comp_name']
for future in as_completed(futures):
comp_name = futures[future]
try:
result = future.result()
if result['success']:
components.append(result)
except Exception as e:
logger.error(f"Thread execution error for '{comp_name}': {e}")
successful_count = len([c for c in components if c.get('success', False)])
logger.info(f"Generated {successful_count}/{len(self.COMPONENT_TYPES)} components for '{filename}'")
return components
def compile_summary(self, filename: str, results: Dict[str, str]) -> str:
"""Compile a full document summary from component results."""
generation_time = time.strftime('%Y-%m-%d %H:%M:%S')
lines = [
f"# Summary of {filename}",
f"Generated on: {generation_time}\n"
]
# Add sections in the predefined order
for section in self.SECTIONS_ORDER:
if section in results and results[section]:
title = self.COMPONENT_TYPES.get(section, section).title()
lines.append(f"## {title}\n")
lines.append(f"{results[section]}\n")
# Add any additional sections not in predefined order
for section, content in results.items():
if section not in self.SECTIONS_ORDER and content:
title = self.COMPONENT_TYPES.get(section, section).title()
lines.append(f"## {title}\n")
lines.append(f"{content}\n")
return "\n".join(lines)