Alphin Jain
back to normal
1a40b86
import logging # Import logging
import os
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any, Tuple
from app.config.settings import DOCS_FOLDER
# Import classes from the renamed modules
from app.document_processing.extractors import DocumentProcessorAdapter
from app.retrieval.vector_store import Retriever
from app.summarization.output import SummaryOutputManager
from app.summarization.summarizer import DocumentSummarizer
# Configure logging for the main script
logger = logging.getLogger(__name__)
def process_uploaded_files(uploaded_files) -> List[Dict[str, Any]]:
"""
Processes a list of files uploaded via Streamlit.
Saves them temporarily into a folder and uses the DocumentProcessorAdapter
to process that folder.
Args:
uploaded_files: List of Streamlit UploadedFile objects.
Type hint is omitted here to avoid needing Streamlit import at top level.
Returns:
List of dictionaries with original extraction results, including chunk_size.
"""
# Import streamlit here, as it's used for st.warning
import streamlit as st
start_time = time.time()
logger.info(f"Starting processing for {len(uploaded_files)} uploaded files.")
# Create a temporary directory to save uploaded files
# This directory will be automatically cleaned up when the 'with' block exits
with tempfile.TemporaryDirectory() as tmpdir:
logger.info(f"Using temporary directory: {tmpdir}")
# Save all uploaded files into the temporary directory
for uploaded_file in uploaded_files:
# Create a safe path within the temporary directory
# Use uploaded_file.name directly, tempfile handles uniqueness if needed
file_path = os.path.join(tmpdir, uploaded_file.name)
# Write the file content to the temporary path
try:
with open(file_path, "wb") as f:
f.write(uploaded_file.getvalue())
logger.debug(f"Saved uploaded file '{uploaded_file.name}' to '{file_path}'")
except Exception as e:
logger.error(f"Error saving uploaded file '{uploaded_file.name}' to temporary directory: {e}", exc_info=True)
# Log a warning in Streamlit if a file couldn't be saved
st.warning(f"Could not save uploaded file '{uploaded_file.name}' temporarily. It will be skipped.")
# Use the DocumentProcessorAdapter to process the entire temporary folder
processor = DocumentProcessorAdapter() # Corrected typo here
# Call process_folder with the temporary directory path
extraction_results = processor.process_folder(tmpdir)
# The process_folder method returns the list of extraction results
end_time = time.time()
logger.info(f"Finished processing uploaded files in {end_time - start_time:.2f} seconds.")
# The extraction_results list now contains dictionaries with 'filename', 'text', 'error', etc.
return extraction_results
def setup_retrieval_system(extraction_results: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], Retriever]:
"""
Sets up the retrieval system (vector store) from extraction results.
Args:
extraction_results: List of dictionaries from document extraction.
Should contain 'filename' and 'text'.
Returns:
A tuple containing:
- The updated extraction_results list (with 'chunk_size' populated by Retriever).
- An initialized Retriever instance.
"""
start_time = time.time()
logger.info("Setting up retrieval system.")
try:
retriever = Retriever()
# create_from_documents takes extraction results, chunks text, embeds, and builds the DB.
# It also updates the extraction_results list with the 'chunk_size' for each document.
updated_extraction_results = retriever.create_from_documents(extraction_results)
end_time = time.time()
logger.info(f"Retriever setup complete in {end_time - start_time:.2f} seconds.")
return updated_extraction_results, retriever
except Exception as e:
end_time = time.time()
logger.error(f"Error during retrieval system setup: {e}", exc_info=True)
# If retrieval setup fails, the summarization cannot proceed.
# Re-raise the exception so the Streamlit app can catch and display it.
raise
def summarize_extracted_documents(extraction_results: List[Dict[str, Any]], retriever: Retriever) -> List[Dict[str, Any]]:
"""
Summarizes documents based on extraction results and a configured retriever.
Args:
extraction_results: List of dictionaries from document extraction (should include chunk_size
populated by setup_retrieval_system).
retriever: An initialized Retriever instance.
Returns:
A list of dictionaries, each containing the summary result for a file.
Each dictionary includes:
- 'filename': The name of the file.
- 'success': Boolean indicating if summarization was successful.
- 'summary': The generated summary string (if successful), or None.
- 'error': An error message string (if not successful), or None.
- 'processing_time': Time taken for summarization of this file.
"""
start_time = time.time()
logger.info(f"Starting summarization for {len(extraction_results)} documents.")
# Initialize the summarizer with the retriever
summarizer = DocumentSummarizer(retriever)
results = [] # List to store results for each document
# Filter out results that failed extraction or have no text/chunks
# Summarization requires extracted text and successful chunking (chunk_size > 0)
summarizable_results = [
res for res in extraction_results
if res.get('text') and res.get('chunk_size', 0) > 0 and res.get('error') is None
]
skipped_results = [
res for res in extraction_results
if res not in summarizable_results
]
if skipped_results:
logger.warning(f"Skipping summarization for {len(skipped_results)} files due to extraction errors or no text/chunks.")
for res in skipped_results:
# Add entries for skipped files to the results list
results.append({
'filename': res.get('filename', 'unknown'),
'success': False,
'summary': None,
'error': res.get('error', 'Extraction failed or no text/chunks'),
'processing_time': 0, # No summarization time for skipped files
})
def process_single_summary(result: Dict[str, Any]) -> Dict[str, Any]:
"""Helper function to summarize a single document result."""
file_start_time = time.time()
filename = result.get('filename', 'unknown')
# Use detected language, default to English if detection failed
language = result.get('language', 'en')
chunk_size = result.get('chunk_size', 0) # Should be > 0 for summarizable_results
logger.info(f"Summarizing document: {filename}")
try:
# Call the summarizer for a single document
# The summerize_document method handles parallel processing of components internally
summary = summarizer.summerize_document(filename, language, chunk_size)
file_end_time = time.time()
logger.info(f"Finished summarizing {filename} in {file_end_time - file_start_time:.2f} seconds.")
return {
'filename': filename,
'success': True,
'summary': summary, # Return the summary string
'error': None,
'processing_time': file_end_time - file_start_time,
}
except Exception as e:
file_end_time = time.time()
error_msg = str(e)
logger.error(f"Error summarizing document {filename}: {e}", exc_info=True)
return {
'filename': filename,
'success': False,
'summary': None,
'error': error_msg,
'processing_time': file_end_time - file_start_time,
}
with ThreadPoolExecutor(max_workers=None) as executor: # Adjust max_workers as needed
# Submit summarizable document results to the executor
futures = {executor.submit(process_single_summary, res): res['filename'] for res in summarizable_results}
# Process results as they complete
for future in as_completed(futures):
filename = futures[future]
try:
summary_result = future.result()
results.append(summary_result)
logger.debug(f"Summary result received for {filename}")
except Exception as exc:
# This catches exceptions *within* the future's result retrieval
logger.error(f"Exception retrieving summary result for {filename}: {exc}", exc_info=True)
results.append({
'filename': filename,
'success': False,
'summary': None,
'error': f"Failed to retrieve result: {exc}",
'processing_time': 0, # Can't determine processing time if result retrieval failed
})
end_time = time.time()
logger.info(f"Finished batch summarization in {end_time - start_time:.2f} seconds.")
return results
# if __name__ == "__main__":
# start_time = time.time()
# logger.info("Starting document summarization process (command line).")
#
# try:
# # Step 1: Process documents from the predefined folder
# logger.info(f"Processing documents from: {DOCS_FOLDER}")
# # DocumentProcessorAdapter().process_folder returns a list of extraction result dicts
# extraction_results = DocumentProcessorAdapter().process_folder(DOCS_FOLDER)
# logger.info(f"Document Processing Time taken: {time.time()-start_time:.2f} seconds")
#
# # Step 2: Setup retrieval system
# setup_start_time = time.time()
# # setup_retrieval_system takes extraction results and returns updated results (with chunk_size) and the retriever
# extraction_results_with_chunks, retriever = setup_retrieval_system(extraction_results)
# logger.info(f"Retriever Setup Time taken: {time.time() - setup_start_time:.2f} seconds")
#
# # Step 3: Summarize the documents
# summarization_start_time = time.time()
# # For command line, we might still want to save files locally
# output_manager = SummaryOutputManager() # Uses default output_dir from settings
# # summarize_extracted_documents performs the summarization and returns results
# summary_results = summarize_extracted_documents(extraction_results_with_chunks, retriever)
#
# # Step 4: Save summaries to files (for command-line only)
# logger.info("Saving summaries to files.")
# saved_count = 0
# for res in summary_results:
# if res['success'] and res['summary']:
# # Use the output_manager to save the summary string
# output_manager.save_summary(res['filename'], res['summary'], formats=['markdown'])
# saved_count += 1
# logger.info(f"Saved {saved_count} summaries.")
#
#
# logger.info(f"Summarization Time taken: {time.time() - summarization_start_time:.2f} seconds")
#
#
# # Output results summary to console
# logger.info("\n" + "=" * 50)
# logger.info("Summarization Process Complete.")
# logger.info("=" * 50)
# successful_count = sum(res.get('success', False) for res in summary_results)
# total_processed = len(summary_results) # Includes skipped files if they were added to results list earlier
# total_time = time.time() - start_time
#
# logger.info(f"Total files attempted: {len(extraction_results)}") # Total files found/attempted extraction
# logger.info(f"Files successfully extracted and summarizable: {len(extraction_results_with_chunks)}") # Files with text and chunks
# logger.info(f"Files summarized: {successful_count}/{total_processed}")
# logger.info(f"Total process time: {total_time:.2f} seconds")
# logger.info("=" * 50)
#
# # Print individual results status
# logger.info("\nIndividual File Results:")
# for result in summary_results:
# name = result.get('filename', 'unknown')
# status = "SUCCESS" if result['success'] else "FAILED"
# time_taken = result.get('processing_time', 0)
# error_msg = result.get('error', '')
# logger.info(f"- {name}: {status} ({time_taken:.2f}s) {f'Error: {error_msg}' if error_msg else ''}")
#
#
# except FileNotFoundError as fnf_error:
# logger.error(f"Configuration Error: {fnf_error}")
# print(f"Error: {fnf_error}")
# except Exception as main_error:
# logger.error(f"An unexpected error occurred during the main process: {main_error}", exc_info=True)
# print(f"An unexpected error occurred: {main_error}")