File size: 10,547 Bytes
55fd686
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d77d99a
55fd686
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import shutil
import tempfile
import time
import concurrent.futures
from typing import List, Dict, Any, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import os

from app.config.settings import DOCS_FOLDER
from app.document_processing.extractors import DocumentProcessorAdapter
from app.retrieval.vector_store import Retriever
from app.summarization.summarizer import DocumentSummarizer
from app.summarization.output import SummaryOutputManager
from app.utils.performance import timeit
from app.utils.logger import setup_logger

logger = logging.getLogger(__name__)

TEMP_UPLOAD_DIR = os.path.join("/tmp", "temp_uploads")
os.makedirs(TEMP_UPLOAD_DIR, exist_ok=True)

def clear_upload_directory():
    """Clears all files from the persistent temporary upload directory."""
    if os.path.exists(TEMP_UPLOAD_DIR):
        logger.info(f"Clearing temporary upload directory: {TEMP_UPLOAD_DIR}")
        # Iterate through all items in the directory
        for item in os.listdir(TEMP_UPLOAD_DIR):
            item_path = os.path.join(TEMP_UPLOAD_DIR, item)
            try:
                # Check if it's a file or a symbolic link and remove it
                if os.path.isfile(item_path) or os.path.islink(item_path):
                    os.unlink(item_path)
                    logger.debug(f"Deleted file/link: {item_path}")
                # Check if it's a directory and remove it and its contents
                elif os.path.isdir(item_path):
                    shutil.rmtree(item_path)
                    logger.debug(f"Deleted directory: {item_path}")
            except Exception as e:
                logger.error(f"Error deleting {item_path}: {e}", exc_info=True)
    else:
         logger.info(f"Temporary upload directory does not exist, no need to clear: {TEMP_UPLOAD_DIR}")



def process_uploaded_files(uploaded_files) -> List[Dict[str, Any]]:
    start_time = time.time()
    logger.info(f"Starting processing for {len(uploaded_files)} uploaded files.")
    with tempfile.TemporaryDirectory() as tmpdir:
        for uploaded_file in uploaded_files:
            file_path = os.path.join(tmpdir, uploaded_file.name)
            with open(file_path, "wb") as f:
                f.write(uploaded_file.getvalue())

        processor = DocumentProcessorAdapter() # Corrected typo here
        extraction_results = processor.process_folder(tmpdir)

    end_time = time.time()
    logger.info(f"Finished processing uploaded files in {end_time - start_time:.2f} seconds.")
    return extraction_results


@timeit
def process_documents(directory=None) -> List[Dict[str, Any]]:
    """Extract and preprocess documents from the configured folder."""
    processor = DocumentProcessorAdapter()
    if directory:
        return processor.process_folder(directory)
    return processor.process_folder(DOCS_FOLDER)


def setup_retrieval_system(doc_data: Dict[str, Any]) -> Tuple[Dict[str, Any], Retriever]:
    """Initialize the retriever with document data."""
    retriever = Retriever()
    updated_doc_data = retriever.create_from_documents(doc_data)
    return updated_doc_data, retriever


def process_single_document(doc_data: Dict[str, Any], max_workers: int = 4) -> List[Dict[str, Any]]:
    """Process a single document and generate its summary components."""
    try:
        doc_data, retriever = setup_retrieval_system(doc_data)
        # Pass max_workers to DocumentSummarizer
        summarizer = DocumentSummarizer(retriever, max_workers=max_workers)

        components = summarizer.generate_summarizer_components(
            filename=doc_data.get("filename"),
            language=doc_data.get("language", "en"),
            chunk_size=doc_data.get("chunk_size", 1000),
            document_text=doc_data.get("text", '')[:1000]
        )
        return components
    except Exception as e:
        logger.error(f"Failed to summarize {doc_data.get('filename')}: {str(e)}")
        return []


@timeit
def batch_summarize_documents(extraction_results: List[Dict[str, Any]],

                              max_workers: int = None) -> List[Dict[str, Any]]:
    """

    Generate summary components with stream generators for all documents in parallel.



    Args:

        extraction_results: List of document data dictionaries

        max_workers: Maximum number of worker threads (defaults to CPU count)



    Returns:

        List of summary component dictionaries with stream generators

    """
    # Determine optimal number of workers if not specified
    if max_workers is None:
        max_workers = min(os.cpu_count() or 4, 8)  # Limit to 8 max to avoid API rate limits

    # Calculate workers per document based on total documents
    doc_count = len(extraction_results)
    doc_workers = max(1, min(4, max_workers // max(1, doc_count)))

    logger.info(f"Processing {doc_count} documents with {max_workers} total workers "
                f"({doc_workers} workers per document)")

    summary_component_streams = []

    # Process documents in parallel
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit document processing tasks
        futures = {
            executor.submit(process_single_document, doc_data, doc_workers): doc_data.get('filename', 'unknown')
            for doc_data in extraction_results
        }

        # Collect results as they complete
        for future in as_completed(futures):
            doc_name = futures[future]
            try:
                components = future.result()
                if components:
                    summary_component_streams.extend(components)
                    logger.info(f"Successfully processed components for '{doc_name}'")
                else:
                    logger.warning(f"No components generated for '{doc_name}'")
            except Exception as e:
                logger.error(f"Error processing '{doc_name}': {str(e)}")

    logger.info(f"Generated {len(summary_component_streams)} total summary components")
    return summary_component_streams


def consume_stream(stream_data: Dict[str, Any]) -> Tuple[str, str]:
    """Consume a single streaming generator and return the result."""
    file_id = f"{stream_data['filename']}-{stream_data['comp_name']}"
    component_type = stream_data['comp_name']

    try:
        stream_generator = stream_data[component_type]
        content_buffer = []

        logger.info(f"Processing stream for {file_id}")
        print(f"\n{'=' * 50}\nProcessing: {file_id}\n")

        # Handle resource_link component stream differently
        if component_type == 'resource_link':
            for event in stream_generator:
                content_buffer.append(str(event))
        else:
            # Handle regular component streams
            for event in stream_generator:
                if event.type == "content-delta":
                    delta_text = event.delta.message.content.text
                    content_buffer.append(delta_text)
                    print(delta_text, end="", flush=True)

        print(f"\n{'=' * 50}")
        return file_id, "success"
    except Exception as e:
        logger.error(f"Error processing stream {file_id}: {str(e)}")
        return file_id, f"Error: {str(e)}"


@timeit
def process_stream_components(stream_components: List[Dict[str, Any]], max_workers: int = 4) -> Dict[str, str]:
    """Process all streaming components in parallel with controlled concurrency."""
    results = {}

    logger.info(f"Processing {len(stream_components)} summary components with {max_workers} workers")

    # Use semaphore pattern for controlled concurrency
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(consume_stream, component): component
            for component in stream_components
        }

        # Process results as they complete
        for future in as_completed(futures):
            component_id, status = future.result()
            results[component_id] = status

    return results


def display_summary_results(results: Dict[str, str]) -> None:
    """Display results summary in a clean format."""
    successful = [k for k, v in results.items() if v == 'success']
    failed = [(k, v) for k, v in results.items() if v != 'success']

    print("\n" + "=" * 60)
    print(f"SUMMARY: {len(successful)}/{len(results)} components successfully processed")

    if successful:
        print("\nSuccessful components:")
        for comp in successful:
            print(f"  ✓ {comp}")

    if failed:
        print("\nFailed components:")
        for comp, error in failed:
            print(f"  ✗ {comp}: {error}")

    print("=" * 60)


def main():
    """Main execution flow for document processing and summarization."""
    try:
        # Configure logging
        setup_logger()
        logger.info(f"Starting document processing from: {DOCS_FOLDER}")

        # Process documents
        extraction_results = process_documents()
        logger.info(f"Processed {len(extraction_results)} documents")

        # Determine optimal thread counts based on system resources and document count
        cpu_count = os.cpu_count() or 4
        doc_count = len(extraction_results)

        # Calculate optimal workers for summarization
        # More workers for many documents, fewer for few documents
        summary_workers = min(max(2, cpu_count), 8)  # Cap at 8 to avoid API limits

        # Generate summaries with streaming in parallel
        logger.info(f"Starting parallel streaming summarization with {summary_workers} workers")
        stream_components = batch_summarize_documents(
            extraction_results,
            max_workers=summary_workers
        )

        # Process all streams with adaptive concurrency
        # Use fewer workers for consuming streams to avoid overwhelming output
        stream_workers = min(max(2, cpu_count // 2), 4)
        logger.info(f"Processing streams with {stream_workers} workers")
        results = process_stream_components(stream_components, max_workers=stream_workers)

        # Display results
        display_summary_results(results)

    except Exception as e:
        logger.critical(f"Critical error in main execution: {str(e)}")
        raise


if __name__ == "__main__":
    main()