Spaces:
Sleeping
Sleeping
| import logging # Import logging | |
| import os | |
| import tempfile | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import List, Dict, Any, Tuple | |
| from app.config.settings import DOCS_FOLDER | |
| # Import classes from the renamed modules | |
| from app.document_processing.extractors import DocumentProcessorAdapter | |
| from app.retrieval.vector_store import Retriever | |
| from app.summarization.output import SummaryOutputManager | |
| from app.summarization.summarizer import DocumentSummarizer | |
| # Configure logging for the main script | |
| logger = logging.getLogger(__name__) | |
| def process_uploaded_files(uploaded_files) -> List[Dict[str, Any]]: | |
| """ | |
| Processes a list of files uploaded via Streamlit. | |
| Saves them temporarily into a folder and uses the DocumentProcessorAdapter | |
| to process that folder. | |
| Args: | |
| uploaded_files: List of Streamlit UploadedFile objects. | |
| Type hint is omitted here to avoid needing Streamlit import at top level. | |
| Returns: | |
| List of dictionaries with original extraction results, including chunk_size. | |
| """ | |
| # Import streamlit here, as it's used for st.warning | |
| import streamlit as st | |
| start_time = time.time() | |
| logger.info(f"Starting processing for {len(uploaded_files)} uploaded files.") | |
| # Create a temporary directory to save uploaded files | |
| # This directory will be automatically cleaned up when the 'with' block exits | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| logger.info(f"Using temporary directory: {tmpdir}") | |
| # Save all uploaded files into the temporary directory | |
| for uploaded_file in uploaded_files: | |
| # Create a safe path within the temporary directory | |
| # Use uploaded_file.name directly, tempfile handles uniqueness if needed | |
| file_path = os.path.join(tmpdir, uploaded_file.name) | |
| # Write the file content to the temporary path | |
| try: | |
| with open(file_path, "wb") as f: | |
| f.write(uploaded_file.getvalue()) | |
| logger.debug(f"Saved uploaded file '{uploaded_file.name}' to '{file_path}'") | |
| except Exception as e: | |
| logger.error(f"Error saving uploaded file '{uploaded_file.name}' to temporary directory: {e}", exc_info=True) | |
| # Log a warning in Streamlit if a file couldn't be saved | |
| st.warning(f"Could not save uploaded file '{uploaded_file.name}' temporarily. It will be skipped.") | |
| # Use the DocumentProcessorAdapter to process the entire temporary folder | |
| processor = DocumentProcessorAdapter() # Corrected typo here | |
| # Call process_folder with the temporary directory path | |
| extraction_results = processor.process_folder(tmpdir) | |
| # The process_folder method returns the list of extraction results | |
| end_time = time.time() | |
| logger.info(f"Finished processing uploaded files in {end_time - start_time:.2f} seconds.") | |
| # The extraction_results list now contains dictionaries with 'filename', 'text', 'error', etc. | |
| return extraction_results | |
| def setup_retrieval_system(extraction_results: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], Retriever]: | |
| """ | |
| Sets up the retrieval system (vector store) from extraction results. | |
| Args: | |
| extraction_results: List of dictionaries from document extraction. | |
| Should contain 'filename' and 'text'. | |
| Returns: | |
| A tuple containing: | |
| - The updated extraction_results list (with 'chunk_size' populated by Retriever). | |
| - An initialized Retriever instance. | |
| """ | |
| start_time = time.time() | |
| logger.info("Setting up retrieval system.") | |
| try: | |
| retriever = Retriever() | |
| # create_from_documents takes extraction results, chunks text, embeds, and builds the DB. | |
| # It also updates the extraction_results list with the 'chunk_size' for each document. | |
| updated_extraction_results = retriever.create_from_documents(extraction_results) | |
| end_time = time.time() | |
| logger.info(f"Retriever setup complete in {end_time - start_time:.2f} seconds.") | |
| return updated_extraction_results, retriever | |
| except Exception as e: | |
| end_time = time.time() | |
| logger.error(f"Error during retrieval system setup: {e}", exc_info=True) | |
| # If retrieval setup fails, the summarization cannot proceed. | |
| # Re-raise the exception so the Streamlit app can catch and display it. | |
| raise | |
| def summarize_extracted_documents(extraction_results: List[Dict[str, Any]], retriever: Retriever) -> List[Dict[str, Any]]: | |
| """ | |
| Summarizes documents based on extraction results and a configured retriever. | |
| Args: | |
| extraction_results: List of dictionaries from document extraction (should include chunk_size | |
| populated by setup_retrieval_system). | |
| retriever: An initialized Retriever instance. | |
| Returns: | |
| A list of dictionaries, each containing the summary result for a file. | |
| Each dictionary includes: | |
| - 'filename': The name of the file. | |
| - 'success': Boolean indicating if summarization was successful. | |
| - 'summary': The generated summary string (if successful), or None. | |
| - 'error': An error message string (if not successful), or None. | |
| - 'processing_time': Time taken for summarization of this file. | |
| """ | |
| start_time = time.time() | |
| logger.info(f"Starting summarization for {len(extraction_results)} documents.") | |
| # Initialize the summarizer with the retriever | |
| summarizer = DocumentSummarizer(retriever) | |
| results = [] # List to store results for each document | |
| # Filter out results that failed extraction or have no text/chunks | |
| # Summarization requires extracted text and successful chunking (chunk_size > 0) | |
| summarizable_results = [ | |
| res for res in extraction_results | |
| if res.get('text') and res.get('chunk_size', 0) > 0 and res.get('error') is None | |
| ] | |
| skipped_results = [ | |
| res for res in extraction_results | |
| if res not in summarizable_results | |
| ] | |
| if skipped_results: | |
| logger.warning(f"Skipping summarization for {len(skipped_results)} files due to extraction errors or no text/chunks.") | |
| for res in skipped_results: | |
| # Add entries for skipped files to the results list | |
| results.append({ | |
| 'filename': res.get('filename', 'unknown'), | |
| 'success': False, | |
| 'summary': None, | |
| 'error': res.get('error', 'Extraction failed or no text/chunks'), | |
| 'processing_time': 0, # No summarization time for skipped files | |
| }) | |
| def process_single_summary(result: Dict[str, Any]) -> Dict[str, Any]: | |
| """Helper function to summarize a single document result.""" | |
| file_start_time = time.time() | |
| filename = result.get('filename', 'unknown') | |
| # Use detected language, default to English if detection failed | |
| language = result.get('language', 'en') | |
| chunk_size = result.get('chunk_size', 0) # Should be > 0 for summarizable_results | |
| logger.info(f"Summarizing document: {filename}") | |
| try: | |
| # Call the summarizer for a single document | |
| # The summerize_document method handles parallel processing of components internally | |
| summary = summarizer.summerize_document(filename, language, chunk_size) | |
| file_end_time = time.time() | |
| logger.info(f"Finished summarizing {filename} in {file_end_time - file_start_time:.2f} seconds.") | |
| return { | |
| 'filename': filename, | |
| 'success': True, | |
| 'summary': summary, # Return the summary string | |
| 'error': None, | |
| 'processing_time': file_end_time - file_start_time, | |
| } | |
| except Exception as e: | |
| file_end_time = time.time() | |
| error_msg = str(e) | |
| logger.error(f"Error summarizing document {filename}: {e}", exc_info=True) | |
| return { | |
| 'filename': filename, | |
| 'success': False, | |
| 'summary': None, | |
| 'error': error_msg, | |
| 'processing_time': file_end_time - file_start_time, | |
| } | |
| with ThreadPoolExecutor(max_workers=None) as executor: # Adjust max_workers as needed | |
| # Submit summarizable document results to the executor | |
| futures = {executor.submit(process_single_summary, res): res['filename'] for res in summarizable_results} | |
| # Process results as they complete | |
| for future in as_completed(futures): | |
| filename = futures[future] | |
| try: | |
| summary_result = future.result() | |
| results.append(summary_result) | |
| logger.debug(f"Summary result received for {filename}") | |
| except Exception as exc: | |
| # This catches exceptions *within* the future's result retrieval | |
| logger.error(f"Exception retrieving summary result for {filename}: {exc}", exc_info=True) | |
| results.append({ | |
| 'filename': filename, | |
| 'success': False, | |
| 'summary': None, | |
| 'error': f"Failed to retrieve result: {exc}", | |
| 'processing_time': 0, # Can't determine processing time if result retrieval failed | |
| }) | |
| end_time = time.time() | |
| logger.info(f"Finished batch summarization in {end_time - start_time:.2f} seconds.") | |
| return results | |
| # if __name__ == "__main__": | |
| # start_time = time.time() | |
| # logger.info("Starting document summarization process (command line).") | |
| # | |
| # try: | |
| # # Step 1: Process documents from the predefined folder | |
| # logger.info(f"Processing documents from: {DOCS_FOLDER}") | |
| # # DocumentProcessorAdapter().process_folder returns a list of extraction result dicts | |
| # extraction_results = DocumentProcessorAdapter().process_folder(DOCS_FOLDER) | |
| # logger.info(f"Document Processing Time taken: {time.time()-start_time:.2f} seconds") | |
| # | |
| # # Step 2: Setup retrieval system | |
| # setup_start_time = time.time() | |
| # # setup_retrieval_system takes extraction results and returns updated results (with chunk_size) and the retriever | |
| # extraction_results_with_chunks, retriever = setup_retrieval_system(extraction_results) | |
| # logger.info(f"Retriever Setup Time taken: {time.time() - setup_start_time:.2f} seconds") | |
| # | |
| # # Step 3: Summarize the documents | |
| # summarization_start_time = time.time() | |
| # # For command line, we might still want to save files locally | |
| # output_manager = SummaryOutputManager() # Uses default output_dir from settings | |
| # # summarize_extracted_documents performs the summarization and returns results | |
| # summary_results = summarize_extracted_documents(extraction_results_with_chunks, retriever) | |
| # | |
| # # Step 4: Save summaries to files (for command-line only) | |
| # logger.info("Saving summaries to files.") | |
| # saved_count = 0 | |
| # for res in summary_results: | |
| # if res['success'] and res['summary']: | |
| # # Use the output_manager to save the summary string | |
| # output_manager.save_summary(res['filename'], res['summary'], formats=['markdown']) | |
| # saved_count += 1 | |
| # logger.info(f"Saved {saved_count} summaries.") | |
| # | |
| # | |
| # logger.info(f"Summarization Time taken: {time.time() - summarization_start_time:.2f} seconds") | |
| # | |
| # | |
| # # Output results summary to console | |
| # logger.info("\n" + "=" * 50) | |
| # logger.info("Summarization Process Complete.") | |
| # logger.info("=" * 50) | |
| # successful_count = sum(res.get('success', False) for res in summary_results) | |
| # total_processed = len(summary_results) # Includes skipped files if they were added to results list earlier | |
| # total_time = time.time() - start_time | |
| # | |
| # logger.info(f"Total files attempted: {len(extraction_results)}") # Total files found/attempted extraction | |
| # logger.info(f"Files successfully extracted and summarizable: {len(extraction_results_with_chunks)}") # Files with text and chunks | |
| # logger.info(f"Files summarized: {successful_count}/{total_processed}") | |
| # logger.info(f"Total process time: {total_time:.2f} seconds") | |
| # logger.info("=" * 50) | |
| # | |
| # # Print individual results status | |
| # logger.info("\nIndividual File Results:") | |
| # for result in summary_results: | |
| # name = result.get('filename', 'unknown') | |
| # status = "SUCCESS" if result['success'] else "FAILED" | |
| # time_taken = result.get('processing_time', 0) | |
| # error_msg = result.get('error', '') | |
| # logger.info(f"- {name}: {status} ({time_taken:.2f}s) {f'Error: {error_msg}' if error_msg else ''}") | |
| # | |
| # | |
| # except FileNotFoundError as fnf_error: | |
| # logger.error(f"Configuration Error: {fnf_error}") | |
| # print(f"Error: {fnf_error}") | |
| # except Exception as main_error: | |
| # logger.error(f"An unexpected error occurred during the main process: {main_error}", exc_info=True) | |
| # print(f"An unexpected error occurred: {main_error}") |