|
|
import shutil
|
|
|
import tempfile
|
|
|
import time
|
|
|
import concurrent.futures
|
|
|
from typing import List, Dict, Any, Tuple
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
import logging
|
|
|
import os
|
|
|
|
|
|
from app.config.settings import DOCS_FOLDER
|
|
|
from app.document_processing.extractors import DocumentProcessorAdapter
|
|
|
from app.retrieval.vector_store import Retriever
|
|
|
from app.summarization.summarizer import DocumentSummarizer
|
|
|
from app.summarization.output import SummaryOutputManager
|
|
|
from app.utils.performance import timeit
|
|
|
from app.utils.logger import setup_logger
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
TEMP_UPLOAD_DIR = os.path.join("/tmp", "temp_uploads")
|
|
|
os.makedirs(TEMP_UPLOAD_DIR, exist_ok=True)
|
|
|
|
|
|
def clear_upload_directory():
|
|
|
"""Clears all files from the persistent temporary upload directory."""
|
|
|
if os.path.exists(TEMP_UPLOAD_DIR):
|
|
|
logger.info(f"Clearing temporary upload directory: {TEMP_UPLOAD_DIR}")
|
|
|
|
|
|
for item in os.listdir(TEMP_UPLOAD_DIR):
|
|
|
item_path = os.path.join(TEMP_UPLOAD_DIR, item)
|
|
|
try:
|
|
|
|
|
|
if os.path.isfile(item_path) or os.path.islink(item_path):
|
|
|
os.unlink(item_path)
|
|
|
logger.debug(f"Deleted file/link: {item_path}")
|
|
|
|
|
|
elif os.path.isdir(item_path):
|
|
|
shutil.rmtree(item_path)
|
|
|
logger.debug(f"Deleted directory: {item_path}")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error deleting {item_path}: {e}", exc_info=True)
|
|
|
else:
|
|
|
logger.info(f"Temporary upload directory does not exist, no need to clear: {TEMP_UPLOAD_DIR}")
|
|
|
|
|
|
|
|
|
|
|
|
def process_uploaded_files(uploaded_files) -> List[Dict[str, Any]]:
|
|
|
start_time = time.time()
|
|
|
logger.info(f"Starting processing for {len(uploaded_files)} uploaded files.")
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
for uploaded_file in uploaded_files:
|
|
|
file_path = os.path.join(tmpdir, uploaded_file.name)
|
|
|
with open(file_path, "wb") as f:
|
|
|
f.write(uploaded_file.getvalue())
|
|
|
|
|
|
processor = DocumentProcessorAdapter()
|
|
|
extraction_results = processor.process_folder(tmpdir)
|
|
|
|
|
|
end_time = time.time()
|
|
|
logger.info(f"Finished processing uploaded files in {end_time - start_time:.2f} seconds.")
|
|
|
return extraction_results
|
|
|
|
|
|
|
|
|
@timeit
|
|
|
def process_documents(directory=None) -> List[Dict[str, Any]]:
|
|
|
"""Extract and preprocess documents from the configured folder."""
|
|
|
processor = DocumentProcessorAdapter()
|
|
|
if directory:
|
|
|
return processor.process_folder(directory)
|
|
|
return processor.process_folder(DOCS_FOLDER)
|
|
|
|
|
|
|
|
|
def setup_retrieval_system(doc_data: Dict[str, Any]) -> Tuple[Dict[str, Any], Retriever]:
|
|
|
"""Initialize the retriever with document data."""
|
|
|
retriever = Retriever()
|
|
|
updated_doc_data = retriever.create_from_documents(doc_data)
|
|
|
return updated_doc_data, retriever
|
|
|
|
|
|
|
|
|
def process_single_document(doc_data: Dict[str, Any], max_workers: int = 4) -> List[Dict[str, Any]]:
|
|
|
"""Process a single document and generate its summary components."""
|
|
|
try:
|
|
|
doc_data, retriever = setup_retrieval_system(doc_data)
|
|
|
|
|
|
summarizer = DocumentSummarizer(retriever, max_workers=max_workers)
|
|
|
|
|
|
components = summarizer.generate_summarizer_components(
|
|
|
filename=doc_data.get("filename"),
|
|
|
language=doc_data.get("language", "en"),
|
|
|
chunk_size=doc_data.get("chunk_size", 1000),
|
|
|
document_text=doc_data.get("text", '')[:1000]
|
|
|
)
|
|
|
return components
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to summarize {doc_data.get('filename')}: {str(e)}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
@timeit
|
|
|
def batch_summarize_documents(extraction_results: List[Dict[str, Any]],
|
|
|
max_workers: int = None) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Generate summary components with stream generators for all documents in parallel.
|
|
|
|
|
|
Args:
|
|
|
extraction_results: List of document data dictionaries
|
|
|
max_workers: Maximum number of worker threads (defaults to CPU count)
|
|
|
|
|
|
Returns:
|
|
|
List of summary component dictionaries with stream generators
|
|
|
"""
|
|
|
|
|
|
if max_workers is None:
|
|
|
max_workers = min(os.cpu_count() or 4, 8)
|
|
|
|
|
|
|
|
|
doc_count = len(extraction_results)
|
|
|
doc_workers = max(1, min(4, max_workers // max(1, doc_count)))
|
|
|
|
|
|
logger.info(f"Processing {doc_count} documents with {max_workers} total workers "
|
|
|
f"({doc_workers} workers per document)")
|
|
|
|
|
|
summary_component_streams = []
|
|
|
|
|
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
|
|
|
|
futures = {
|
|
|
executor.submit(process_single_document, doc_data, doc_workers): doc_data.get('filename', 'unknown')
|
|
|
for doc_data in extraction_results
|
|
|
}
|
|
|
|
|
|
|
|
|
for future in as_completed(futures):
|
|
|
doc_name = futures[future]
|
|
|
try:
|
|
|
components = future.result()
|
|
|
if components:
|
|
|
summary_component_streams.extend(components)
|
|
|
logger.info(f"Successfully processed components for '{doc_name}'")
|
|
|
else:
|
|
|
logger.warning(f"No components generated for '{doc_name}'")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing '{doc_name}': {str(e)}")
|
|
|
|
|
|
logger.info(f"Generated {len(summary_component_streams)} total summary components")
|
|
|
return summary_component_streams
|
|
|
|
|
|
|
|
|
def consume_stream(stream_data: Dict[str, Any]) -> Tuple[str, str]:
|
|
|
"""Consume a single streaming generator and return the result."""
|
|
|
file_id = f"{stream_data['filename']}-{stream_data['comp_name']}"
|
|
|
component_type = stream_data['comp_name']
|
|
|
|
|
|
try:
|
|
|
stream_generator = stream_data[component_type]
|
|
|
content_buffer = []
|
|
|
|
|
|
logger.info(f"Processing stream for {file_id}")
|
|
|
print(f"\n{'=' * 50}\nProcessing: {file_id}\n")
|
|
|
|
|
|
|
|
|
if component_type == 'resource_link':
|
|
|
for event in stream_generator:
|
|
|
content_buffer.append(str(event))
|
|
|
else:
|
|
|
|
|
|
for event in stream_generator:
|
|
|
if event.type == "content-delta":
|
|
|
delta_text = event.delta.message.content.text
|
|
|
content_buffer.append(delta_text)
|
|
|
print(delta_text, end="", flush=True)
|
|
|
|
|
|
print(f"\n{'=' * 50}")
|
|
|
return file_id, "success"
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing stream {file_id}: {str(e)}")
|
|
|
return file_id, f"Error: {str(e)}"
|
|
|
|
|
|
|
|
|
@timeit
|
|
|
def process_stream_components(stream_components: List[Dict[str, Any]], max_workers: int = 4) -> Dict[str, str]:
|
|
|
"""Process all streaming components in parallel with controlled concurrency."""
|
|
|
results = {}
|
|
|
|
|
|
logger.info(f"Processing {len(stream_components)} summary components with {max_workers} workers")
|
|
|
|
|
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
|
futures = {
|
|
|
executor.submit(consume_stream, component): component
|
|
|
for component in stream_components
|
|
|
}
|
|
|
|
|
|
|
|
|
for future in as_completed(futures):
|
|
|
component_id, status = future.result()
|
|
|
results[component_id] = status
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
def display_summary_results(results: Dict[str, str]) -> None:
|
|
|
"""Display results summary in a clean format."""
|
|
|
successful = [k for k, v in results.items() if v == 'success']
|
|
|
failed = [(k, v) for k, v in results.items() if v != 'success']
|
|
|
|
|
|
print("\n" + "=" * 60)
|
|
|
print(f"SUMMARY: {len(successful)}/{len(results)} components successfully processed")
|
|
|
|
|
|
if successful:
|
|
|
print("\nSuccessful components:")
|
|
|
for comp in successful:
|
|
|
print(f" ✓ {comp}")
|
|
|
|
|
|
if failed:
|
|
|
print("\nFailed components:")
|
|
|
for comp, error in failed:
|
|
|
print(f" ✗ {comp}: {error}")
|
|
|
|
|
|
print("=" * 60)
|
|
|
|
|
|
|
|
|
def main():
|
|
|
"""Main execution flow for document processing and summarization."""
|
|
|
try:
|
|
|
|
|
|
setup_logger()
|
|
|
logger.info(f"Starting document processing from: {DOCS_FOLDER}")
|
|
|
|
|
|
|
|
|
extraction_results = process_documents()
|
|
|
logger.info(f"Processed {len(extraction_results)} documents")
|
|
|
|
|
|
|
|
|
cpu_count = os.cpu_count() or 4
|
|
|
doc_count = len(extraction_results)
|
|
|
|
|
|
|
|
|
|
|
|
summary_workers = min(max(2, cpu_count), 8)
|
|
|
|
|
|
|
|
|
logger.info(f"Starting parallel streaming summarization with {summary_workers} workers")
|
|
|
stream_components = batch_summarize_documents(
|
|
|
extraction_results,
|
|
|
max_workers=summary_workers
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
stream_workers = min(max(2, cpu_count // 2), 4)
|
|
|
logger.info(f"Processing streams with {stream_workers} workers")
|
|
|
results = process_stream_components(stream_components, max_workers=stream_workers)
|
|
|
|
|
|
|
|
|
display_summary_results(results)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.critical(f"Critical error in main execution: {str(e)}")
|
|
|
raise
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main() |