import os import json import uuid import time import base64 from datetime import datetime from typing import Dict, List, Optional, Tuple from dataclasses import dataclass, asdict import tempfile import shutil from dotenv import load_dotenv import tiktoken import requests import threading from concurrent.futures import ThreadPoolExecutor from azure.cognitiveservices.vision.computervision import ComputerVisionClient from azure.cognitiveservices.vision.computervision.models import OperationStatusCodes from msrest.authentication import CognitiveServicesCredentials from file_processors import FileProcessor from image_extraction import VideoFrameExtractor # Load Environment load_dotenv() @dataclass class SummaryJob: job_id: str user_id: str original_files: List[str] summary_type: str user_prompt: str status: str created_at: str completed_at: Optional[str] = None summary_text: Optional[str] = None processed_files: Optional[Dict] = None extracted_images: Optional[List[str]] = None transcript_text: Optional[str] = None error_message: Optional[str] = None settings: Optional[Dict] = None chat_response_url: Optional[str] = None class TokenManager: """Manage token counting and content truncation for GPT models""" def __init__(self, model_name: str = "gpt-4o-mini"): try: # Map common model names to encodings model_encoding_map = { "gpt-4o": "o200k_base", "gpt-4o-mini": "o200k_base", "gpt-4": "cl100k_base", "gpt-4-turbo": "cl100k_base", "gpt-35-turbo": "cl100k_base" } encoding_name = model_encoding_map.get(model_name, "cl100k_base") self.encoder = tiktoken.get_encoding(encoding_name) except Exception as e: print(f"Warning: Could not load tokenizer for {model_name}, using fallback: {e}") self.encoder = tiktoken.get_encoding("cl100k_base") # Token limits based on model if "gpt-4o" in model_name: self.max_input_tokens = 120000 # 128k context window else: self.max_input_tokens = 100000 # Conservative limit self.max_transcript_tokens = 80000 self.max_document_tokens = 30000 self.max_image_analysis_tokens = 10000 def count_tokens(self, text: str) -> int: """Count tokens in text""" try: return len(self.encoder.encode(text)) except Exception: # Fallback estimation: ~4 characters per token return len(text) // 4 def truncate_text(self, text: str, max_tokens: int) -> str: """Truncate text to fit within token limit""" if not text: return text current_tokens = self.count_tokens(text) if current_tokens <= max_tokens: return text lines = text.split('\n') if len(lines) == 1: # Single line - truncate by character estimation chars_per_token = len(text) / current_tokens target_chars = int(max_tokens * chars_per_token * 0.9) return text[:target_chars] + "\n[Content truncated due to length]" # Multi-line - truncate by lines truncated_lines = [] current_tokens = 0 for line in lines: line_tokens = self.count_tokens(line + '\n') if current_tokens + line_tokens > max_tokens: truncated_lines.append("[Content truncated due to length]") break truncated_lines.append(line) current_tokens += line_tokens return '\n'.join(truncated_lines) def optimize_content_for_tokens(self, transcripts: List[Dict], documents: List[Dict], image_insights: List[Dict], user_prompt: str) -> Tuple[List[Dict], List[Dict], List[Dict]]: """Optimize content to fit within token limits""" # Truncate transcripts total_transcript_text = "" for transcript in transcripts: total_transcript_text += transcript.get('content', '') + "\n\n" transcript_tokens = self.count_tokens(total_transcript_text) if transcript_tokens > self.max_transcript_tokens: print(f"Transcripts too long ({transcript_tokens} tokens), truncating to {self.max_transcript_tokens}") tokens_per_transcript = self.max_transcript_tokens // len(transcripts) if transcripts else 0 for transcript in transcripts: transcript['content'] = self.truncate_text( transcript['content'], tokens_per_transcript ) # Truncate documents total_document_text = "" for doc in documents: total_document_text += doc.get('content', '') + "\n\n" doc_tokens = self.count_tokens(total_document_text) if doc_tokens > self.max_document_tokens: print(f"Documents too long ({doc_tokens} tokens), truncating to {self.max_document_tokens}") tokens_per_doc = self.max_document_tokens // len(documents) if documents else 0 for doc in documents: doc['content'] = self.truncate_text( doc['content'], tokens_per_doc ) # Truncate image analysis if len(image_insights) > 10: print(f"Too many images ({len(image_insights)}), limiting to 10") image_insights = image_insights[:10] for img in image_insights: if 'analysis' in img: desc = img['analysis'].get('description', '') text = img['analysis'].get('extracted_text', '') if desc: img['analysis']['description'] = self.truncate_text(desc, 200) if text: img['analysis']['extracted_text'] = self.truncate_text(text, 300) return transcripts, documents, image_insights class AISummaryManager: """Enhanced AI-powered conference summarization using Azure OpenAI with full backend integration""" def __init__(self): # Azure OpenAI Configuration self.azure_openai_endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT") self.azure_openai_key = os.environ.get("AZURE_OPENAI_KEY") self.azure_openai_deployment = os.environ.get("AZURE_OPENAI_DEPLOYMENT", "gpt-4o-mini") self.azure_openai_api_version = os.environ.get("AZURE_OPENAI_API_VERSION", "2024-08-01-preview") # Computer Vision Configuration self.cv_endpoint = os.environ.get("COMPUTER_VISION_ENDPOINT") self.cv_key = os.environ.get("COMPUTER_VISION_KEY") # Initialize services self.cv_client = None self.file_processor = FileProcessor() self.frame_extractor = VideoFrameExtractor() self.token_manager = TokenManager(self.azure_openai_deployment) # Background processing self.executor = ThreadPoolExecutor(max_workers=3) self.running = True # Initialize backend integration self.backend_manager = None self._init_backend_integration() self._init_services() # Start background worker for AI summary processing self.worker_thread = threading.Thread(target=self._background_summary_worker, daemon=True) self.worker_thread.start() print("🤖 Enhanced AI Summary Manager initialized with full backend integration") def _init_backend_integration(self): """Initialize integration with the enhanced backend""" try: # Import the transcription manager from backend from backend import transcription_manager self.backend_manager = transcription_manager print("🔗 Backend integration initialized successfully") except ImportError as e: print(f"Warning: Could not initialize backend integration: {e}") self.backend_manager = None def _background_summary_worker(self): """Background worker specifically for AI summary processing""" iteration_count = 0 while self.running: try: if self.backend_manager: # Get pending AI summary jobs from the backend pending_summary_jobs = self.backend_manager.db.get_pending_summary_jobs() if pending_summary_jobs and iteration_count % 6 == 0: # Log every minute active_summaries = len([j for j in pending_summary_jobs if j.status == 'processing']) queued_summaries = len([j for j in pending_summary_jobs if j.status == 'pending']) if active_summaries > 0 or queued_summaries > 0: print(f"🤖 AI Summary worker: {active_summaries} processing, {queued_summaries} queued") # Process pending AI summary jobs for job in pending_summary_jobs: if job.status == 'pending': self.executor.submit(self._process_summary_job_background, job.job_id) time.sleep(10) # Check every 10 seconds iteration_count += 1 except Exception as e: print(f"❌ AI Summary background worker error: {e}") time.sleep(30) def _init_services(self): """Initialize services with validation""" # Validate Azure OpenAI configuration if not all([self.azure_openai_endpoint, self.azure_openai_key, self.azure_openai_deployment]): print("ERROR: Missing Azure OpenAI configuration") print("Required environment variables:") print("- AZURE_OPENAI_ENDPOINT") print("- AZURE_OPENAI_KEY") print("- AZURE_OPENAI_DEPLOYMENT") raise ValueError("Azure OpenAI configuration incomplete") # Validate endpoint format if not self.azure_openai_endpoint.startswith("https://"): raise ValueError("AZURE_OPENAI_ENDPOINT must be a valid HTTPS URL") # Remove trailing slash from endpoint self.azure_openai_endpoint = self.azure_openai_endpoint.rstrip('/') print(f"🤖 Azure OpenAI initialized: {self.azure_openai_deployment} at {self.azure_openai_endpoint}") # Test Azure OpenAI connection try: self._test_azure_openai_connection() except Exception as e: print(f"WARNING: Azure OpenAI connection test failed: {e}") # Initialize Computer Vision Client if self.cv_key and self.cv_endpoint: try: self.cv_client = ComputerVisionClient( self.cv_endpoint, CognitiveServicesCredentials(self.cv_key) ) print("👁️ Computer Vision Client initialized") except Exception as e: print(f"WARNING: Computer Vision initialization failed: {e}") else: print("Computer Vision key/endpoint not found - image processing disabled") def _test_azure_openai_connection(self): """Test Azure OpenAI connection""" url = f"{self.azure_openai_endpoint}/openai/deployments/{self.azure_openai_deployment}/chat/completions?api-version={self.azure_openai_api_version}" headers = { "Content-Type": "application/json", "api-key": self.azure_openai_key } test_data = { "messages": [{"role": "user", "content": "Hello"}], "max_tokens": 5, "temperature": 0 } try: response = requests.post(url, headers=headers, json=test_data, timeout=10) if response.status_code == 200: print("✅ Azure OpenAI connection test: SUCCESS") else: print(f"Azure OpenAI connection test failed: {response.status_code} - {response.text}") raise Exception(f"Connection test failed: {response.status_code}") except requests.exceptions.RequestException as e: print(f"Azure OpenAI connection test error: {e}") raise def submit_summary_job_enhanced( self, user_id: str, content_mode: str, summary_type: str, user_prompt: str, existing_transcript_ids=None, audio_video_files=None, document_files=None, settings=None, ) -> str: """ Compatibility wrapper expected by app.py. Normalizes inputs and forwards to submit_summary_job(). """ existing_transcript_ids = existing_transcript_ids or [] files = (audio_video_files or []) + (document_files or []) settings = dict(settings or {}) # Preserve content mode and transcript ids for background worker settings.setdefault("content_mode", content_mode) if existing_transcript_ids: settings.setdefault("transcript_job_ids", existing_transcript_ids) return self.submit_summary_job( user_id=user_id, summary_type=summary_type, user_prompt=user_prompt, files=files, transcript_job_ids=existing_transcript_ids, settings=settings, ) def submit_summary_job( self, user_id: str, summary_type: str, user_prompt: str, files: List = None, transcript_job_ids: List[str] = None, settings: Dict = None ) -> str: """Submit a new AI summary job with enhanced backend integration""" job_id = str(uuid.uuid4()) original_files = [] if files: original_files.extend([f.name if hasattr(f, 'name') else str(f) for f in files]) if transcript_job_ids: original_files.extend([f"transcript_{tid[:8]}..." for tid in transcript_job_ids]) print(f"🤖 [{user_id[:8]}...] New AI summary job: {summary_type}") print(f"User prompt: {user_prompt[:100]}{'...' if len(user_prompt) > 100 else ''}") job = SummaryJob( job_id=job_id, user_id=user_id, original_files=original_files, summary_type=summary_type, user_prompt=user_prompt, status="pending", created_at=datetime.now().isoformat(), settings=settings or {} ) # Save job to backend database if self.backend_manager: self.backend_manager.save_summary_job(job) else: self._save_summary_job_fallback(job) # Start foreground processing for immediate jobs or let background worker handle it if files and len(files) > 0: # For new files, start processing immediately self.executor.submit(self._process_summary_job, job_id, files, transcript_job_ids) else: # For existing transcripts only, let background worker handle it pass return job_id def _process_summary_job_background(self, job_id: str): """Process AI summary job from background worker""" self._process_summary_job(job_id, [], []) def _save_summary_job_fallback(self, job: SummaryJob): """Fallback method to save summary job if backend is not available""" try: # Create a simple local storage fallback os.makedirs("temp/summary_jobs", exist_ok=True) job_file = f"temp/summary_jobs/{job.job_id}.json" with open(job_file, 'w', encoding='utf-8') as f: json.dump(asdict(job), f, ensure_ascii=False, indent=2, default=str) print(f"Job saved to fallback storage: {job_file}") except Exception as e: print(f"Error saving job to fallback storage: {e}") def _process_summary_job(self, job_id: str, files: List = None, transcript_job_ids: List[str] = None): """Process AI summary job using Azure OpenAI with enhanced error handling and chat storage""" job = None try: job = self.get_summary_status(job_id) if not job: print(f"Job {job_id[:8]}... not found") return print(f"🤖 [{job.user_id[:8]}...] Processing AI summary job: {job_id[:8]}...") job.status = "processing" self._update_job_status(job) # Process all input sources processed_content = { 'transcripts': [], 'documents': [], 'images': [], 'extracted_frames': [] } # Process existing transcripts if transcript_job_ids: processed_content['transcripts'] = self._get_existing_transcripts(transcript_job_ids, job.user_id) print(f"Processed {len(processed_content['transcripts'])} existing transcripts") # Also check job's settings for transcript IDs (from background processing) if not transcript_job_ids and job.settings and 'transcript_job_ids' in job.settings: transcript_job_ids = job.settings['transcript_job_ids'] processed_content['transcripts'] = self._get_existing_transcripts(transcript_job_ids, job.user_id) print(f"Processed {len(processed_content['transcripts'])} transcripts from job settings") # Process uploaded files if files: for i, file in enumerate(files): print(f"Processing file {i+1}/{len(files)}: {getattr(file, 'name', 'unknown')}") file_content = self._process_uploaded_file(file, job.user_id) if file_content: if file_content['type'] == 'video': frames = self._extract_significant_frames(file_content['path']) processed_content['extracted_frames'].extend(frames) print(f"Extracted {len(frames)} frames from video") elif file_content['type'] == 'document': processed_content['documents'].append(file_content) elif file_content['type'] == 'image': processed_content['images'].append(file_content) # Analyze images with Computer Vision image_insights = [] all_images = processed_content['images'] + processed_content['extracted_frames'] print(f"Analyzing {len(all_images)} images...") for image_info in all_images: analysis = self._analyze_image_content(image_info['path']) if analysis: image_insights.append({ 'source': image_info['filename'], 'analysis': analysis }) print(f"Analysis complete: {len(processed_content['transcripts'])} transcripts, {len(processed_content['documents'])} documents, {len(image_insights)} images") # Optimize content for token limits optimized_transcripts, optimized_documents, optimized_images = self.token_manager.optimize_content_for_tokens( processed_content['transcripts'], processed_content['documents'], image_insights, job.user_prompt ) output_language = job.settings.get('output_language', 'English') if job.settings else 'English' # Generate AI summary using Azure OpenAI summary_result = self._generate_ai_summary_with_openai( transcripts=optimized_transcripts, documents=optimized_documents, image_insights=optimized_images, user_prompt=job.user_prompt, output_language=output_language ) # Store response to chat container using backend integration chat_url = "" if self.backend_manager and hasattr(self.backend_manager.db, '_store_chat_response'): try: chat_url = self.backend_manager.db._store_chat_response(job_id, summary_result, job.user_id) print(f"💬 Chat response stored successfully: {chat_url}") except Exception as e: print(f"⚠️ Warning: Could not store chat response: {e}") # Update job with results job.status = "completed" job.summary_text = summary_result job.completed_at = datetime.now().isoformat() job.processed_files = { 'transcript_count': len(processed_content['transcripts']), 'document_count': len(processed_content['documents']), 'image_count': len(all_images), 'extracted_frames': len(processed_content['extracted_frames']) } job.extracted_images = [img['filename'] for img in processed_content['extracted_frames']] job.chat_response_url = chat_url self._update_job_status(job) print(f"✅ [{job.user_id[:8]}...] AI summary completed: {job_id[:8]}...") except Exception as e: print(f"❌ AI summary processing failed: {e}") if job: job.status = "failed" job.error_message = str(e) job.completed_at = datetime.now().isoformat() self._update_job_status(job) def _update_job_status(self, job: SummaryJob): """Update job status in backend or fallback storage""" if self.backend_manager: self.backend_manager.save_summary_job(job) else: self._save_summary_job_fallback(job) def _generate_ai_summary_with_openai( self, transcripts: List[Dict], documents: List[Dict], image_insights: List[Dict], user_prompt: str, output_language: str = "English" ) -> str: """Generate AI summary using Azure OpenAI with proper error handling""" try: print("🤖 Generating AI summary with Azure OpenAI...") # Prepare the context context = self._prepare_text_content(transcripts, documents, image_insights, user_prompt, output_language) # Check final token count final_tokens = self.token_manager.count_tokens(context) print(f"Final input tokens: {final_tokens} / {self.token_manager.max_input_tokens}") if final_tokens > self.token_manager.max_input_tokens: print("Content too long, applying emergency truncation") context = self.token_manager.truncate_text(context, self.token_manager.max_input_tokens) # Prepare the Azure OpenAI API request with correct URL format url = f"{self.azure_openai_endpoint}/openai/deployments/{self.azure_openai_deployment}/chat/completions?api-version={self.azure_openai_api_version}" headers = { "Content-Type": "application/json", "api-key": self.azure_openai_key } # Adjust max tokens based on model max_completion_tokens = 4000 if "gpt-4o" in self.azure_openai_deployment: max_completion_tokens = 8000 data = { "messages": [ { "role": "system", "content": f"You are an expert conference analyst and AI assistant. Create comprehensive, professional summaries in {output_language}. Focus on key insights, strategic decisions, action items, and important discussions. Integrate information from all sources provided (transcripts, documents, visual content). Structure your response clearly with headers and bullet points where appropriate." }, { "role": "user", "content": context } ], "max_tokens": max_completion_tokens, "temperature": 0.2, "top_p": 0.3, "frequency_penalty": 0, "presence_penalty": 0 } print(f"Making API request to: {url}") print(f"Using model: {self.azure_openai_deployment}") # Make the API request with retries max_retries = 3 for attempt in range(max_retries): try: response = requests.post( url, headers=headers, json=data, timeout=300 # 5 minute timeout ) print(f"API Response Status: {response.status_code}") if response.status_code == 200: break elif response.status_code == 429: # Rate limit - wait and retry wait_time = 2 ** attempt print(f"Rate limited, waiting {wait_time} seconds...") time.sleep(wait_time) continue else: error_msg = f"Azure OpenAI API error: {response.status_code} - {response.text}" print(error_msg) if attempt == max_retries - 1: raise Exception(error_msg) time.sleep(1) continue except requests.exceptions.Timeout: if attempt == max_retries - 1: raise Exception("Azure OpenAI request timed out after multiple retries") print(f"Request timeout, retrying... (attempt {attempt + 1})") time.sleep(2) continue except requests.exceptions.RequestException as e: if attempt == max_retries - 1: raise Exception(f"Azure OpenAI request failed: {str(e)}") print(f"Request error, retrying... (attempt {attempt + 1}): {e}") time.sleep(2) continue try: result = response.json() except json.JSONDecodeError as e: raise Exception(f"Invalid JSON response from Azure OpenAI: {str(e)}") # Extract the AI response if 'choices' in result and len(result['choices']) > 0: choice = result['choices'][0] if 'message' in choice and 'content' in choice['message']: ai_response = choice['message']['content'] # Check for completion reason finish_reason = choice.get('finish_reason', '') if finish_reason == 'content_filter': raise Exception("Content was filtered by Azure OpenAI safety systems") elif finish_reason == 'length': print("Response was truncated due to length limit") ai_response += "\n\n[Response was truncated due to length limit]" print(f"✅ AI summary generated successfully in {output_language}") return ai_response else: raise Exception(f"Unexpected response format: {result}") else: raise Exception(f"No response generated from Azure OpenAI: {result}") except Exception as e: error_msg = f"Azure OpenAI generation failed: {str(e)}" print(error_msg) raise Exception(error_msg) def _prepare_text_content(self, transcripts, documents, image_insights, user_prompt, output_language="English"): """Prepare comprehensive text content for AI analysis""" context_parts = [ "# Enhanced AI Conference Analysis Request", f"**User Instructions:** {user_prompt}", f"**Output Language:** {output_language}", "", "## Comprehensive Content Analysis:", "" ] # Add transcript content if transcripts: context_parts.append("### 🎙️ Transcription Content:") for i, transcript in enumerate(transcripts, 1): context_parts.append(f"**Source {i}: {transcript['source']}**") context_parts.append(transcript['content']) context_parts.append("") # Add document content if documents: context_parts.append("### 📄 Document Content:") for i, doc in enumerate(documents, 1): context_parts.append(f"**Document {i}: {doc['filename']}**") context_parts.append(doc['content']) context_parts.append("") # Add image analysis if image_insights: context_parts.append("### 👁️ Visual Content Analysis:") for i, img in enumerate(image_insights, 1): context_parts.append(f"**Image {i}: {img['source']}**") analysis = img['analysis'] context_parts.append(f"Description: {analysis.get('description', 'N/A')}") if analysis.get('extracted_text'): context_parts.append(f"Text Content: {analysis['extracted_text']}") context_parts.append("") context_parts.extend([ "## AI Analysis Instructions:", f"Create a comprehensive, professional conference analysis in {output_language}.", "Follow the user's specific instructions for format and focus areas.", "Integrate information from ALL sources (transcripts, documents, visual content).", "Structure your response with clear headers and organize information logically.", "Highlight key insights, strategic decisions, action items, and important discussions.", "Provide actionable recommendations based on the comprehensive content analysis.", "Include timestamps and speaker references where relevant.", "Ensure the summary is professional, accurate, and valuable for business decision-making.", ]) return "\n".join(context_parts) def _process_uploaded_file(self, file, user_id: str) -> Optional[Dict]: """Process uploaded file""" try: if hasattr(file, 'name'): file_path = file.name filename = os.path.basename(file_path) elif isinstance(file, str): file_path = file filename = os.path.basename(file_path) else: return None if not os.path.exists(file_path): return None file_size = os.path.getsize(file_path) if file_size > 500 * 1024 * 1024: # 500MB limit return None ext = filename.split('.')[-1].lower() if '.' in filename else '' if ext in ['mp4', 'mov', 'avi', 'mkv', 'webm', 'flv', '3gp', 'wmv']: return { 'type': 'video', 'filename': filename, 'path': file_path, 'extension': ext, 'size': file_size } elif ext in ['jpg', 'jpeg', 'png', 'bmp', 'gif', 'tiff', 'webp']: return { 'type': 'image', 'filename': filename, 'path': file_path, 'extension': ext, 'size': file_size } elif ext in ['pdf', 'docx', 'doc', 'pptx', 'ppt', 'xlsx', 'xls', 'txt', 'json', 'csv']: content = self.file_processor.process_file(file_path, ext) if content: return { 'type': 'document', 'filename': filename, 'path': file_path, 'extension': ext, 'content': content, 'size': file_size } return None except Exception as e: print(f"Error processing file: {e}") return None def _extract_significant_frames(self, video_path: str) -> List[Dict]: """Extract significant frames from video""" try: frames = self.frame_extractor.extract_frames(video_path) return frames if frames else [] except Exception as e: print(f"Frame extraction failed: {e}") return [] def _analyze_image_content(self, image_path: str) -> Optional[Dict]: """Analyze image content with Computer Vision""" if not self.cv_client: return None try: with open(image_path, 'rb') as image_stream: # OCR ocr_result = self.cv_client.read_in_stream(image_stream, raw=True) operation_id = ocr_result.headers["Operation-Location"].split("/")[-1] # Wait for completion timeout = 30 start_time = time.time() while True: if time.time() - start_time > timeout: break read_result = self.cv_client.get_read_result(operation_id) if read_result.status not in ['notStarted', 'running']: break time.sleep(1) # Extract text extracted_text = "" if read_result.status == OperationStatusCodes.succeeded: for text_result in read_result.analyze_result.read_results: for line in text_result.lines: extracted_text += line.text + "\n" # Get description image_stream.seek(0) description_result = self.cv_client.describe_image_in_stream( image_stream, max_candidates=3, language='en' ) return { 'extracted_text': extracted_text.strip(), 'description': description_result.captions[0].text if description_result.captions else "", 'confidence': description_result.captions[0].confidence if description_result.captions else 0 } except Exception as e: print(f"Image analysis failed: {e}") return None def _get_existing_transcripts(self, transcript_job_ids: List[str], user_id: str) -> List[Dict]: """Get existing transcripts using backend integration""" transcripts = [] if self.backend_manager: for job_id in transcript_job_ids: try: job = self.backend_manager.get_job_status(job_id) if job and job.user_id == user_id and job.transcript_text: transcripts.append({ 'source': f"Previous transcript: {job.original_filename}", 'content': job.transcript_text }) except Exception as e: print(f"Error getting transcript {job_id[:8]}...: {e}") else: print("Backend manager not available, cannot retrieve existing transcripts") return transcripts def get_summary_status(self, job_id: str) -> Optional[SummaryJob]: """Get current summary job status using backend integration""" if self.backend_manager: return self.backend_manager.get_summary_job(job_id) else: # Fallback to local storage try: job_file = f"temp/summary_jobs/{job_id}.json" if os.path.exists(job_file): with open(job_file, 'r', encoding='utf-8') as f: job_data = json.load(f) # Convert back to SummaryJob object return SummaryJob(**job_data) except Exception as e: print(f"Error loading job from fallback storage: {e}") return None def get_user_summary_history(self, user_id: str, limit: int = 20) -> List[SummaryJob]: """Get user's summary history using backend integration""" if self.backend_manager: return self.backend_manager.get_user_summary_history(user_id, limit) else: # Fallback to local storage try: history = [] jobs_dir = "temp/summary_jobs" if os.path.exists(jobs_dir): for filename in os.listdir(jobs_dir): if filename.endswith('.json'): job_file = os.path.join(jobs_dir, filename) try: with open(job_file, 'r', encoding='utf-8') as f: job_data = json.load(f) if job_data.get('user_id') == user_id: history.append(SummaryJob(**job_data)) except Exception as e: print(f"Error loading job file {filename}: {e}") # Sort by creation date and limit history.sort(key=lambda x: x.created_at, reverse=True) return history[:limit] except Exception as e: print(f"Error getting user history from fallback storage: {e}") return [] # Global AI summary manager instance with enhanced backend integration ai_summary_manager = AISummaryManager()