Azure_Powered_AI_Summary / ai_summary.py
Chirapath's picture
Upload 9 files
8418b54 verified
import os
import json
import uuid
import time
import base64
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
import tempfile
import shutil
from dotenv import load_dotenv
import tiktoken
import requests
import threading
from concurrent.futures import ThreadPoolExecutor
from azure.cognitiveservices.vision.computervision import ComputerVisionClient
from azure.cognitiveservices.vision.computervision.models import OperationStatusCodes
from msrest.authentication import CognitiveServicesCredentials
from file_processors import FileProcessor
from image_extraction import VideoFrameExtractor
# Load Environment
load_dotenv()
@dataclass
class SummaryJob:
job_id: str
user_id: str
original_files: List[str]
summary_type: str
user_prompt: str
status: str
created_at: str
completed_at: Optional[str] = None
summary_text: Optional[str] = None
processed_files: Optional[Dict] = None
extracted_images: Optional[List[str]] = None
transcript_text: Optional[str] = None
error_message: Optional[str] = None
settings: Optional[Dict] = None
chat_response_url: Optional[str] = None
class TokenManager:
"""Manage token counting and content truncation for GPT models"""
def __init__(self, model_name: str = "gpt-4o-mini"):
try:
# Map common model names to encodings
model_encoding_map = {
"gpt-4o": "o200k_base",
"gpt-4o-mini": "o200k_base",
"gpt-4": "cl100k_base",
"gpt-4-turbo": "cl100k_base",
"gpt-35-turbo": "cl100k_base"
}
encoding_name = model_encoding_map.get(model_name, "cl100k_base")
self.encoder = tiktoken.get_encoding(encoding_name)
except Exception as e:
print(f"Warning: Could not load tokenizer for {model_name}, using fallback: {e}")
self.encoder = tiktoken.get_encoding("cl100k_base")
# Token limits based on model
if "gpt-4o" in model_name:
self.max_input_tokens = 120000 # 128k context window
else:
self.max_input_tokens = 100000 # Conservative limit
self.max_transcript_tokens = 80000
self.max_document_tokens = 30000
self.max_image_analysis_tokens = 10000
def count_tokens(self, text: str) -> int:
"""Count tokens in text"""
try:
return len(self.encoder.encode(text))
except Exception:
# Fallback estimation: ~4 characters per token
return len(text) // 4
def truncate_text(self, text: str, max_tokens: int) -> str:
"""Truncate text to fit within token limit"""
if not text:
return text
current_tokens = self.count_tokens(text)
if current_tokens <= max_tokens:
return text
lines = text.split('\n')
if len(lines) == 1:
# Single line - truncate by character estimation
chars_per_token = len(text) / current_tokens
target_chars = int(max_tokens * chars_per_token * 0.9)
return text[:target_chars] + "\n[Content truncated due to length]"
# Multi-line - truncate by lines
truncated_lines = []
current_tokens = 0
for line in lines:
line_tokens = self.count_tokens(line + '\n')
if current_tokens + line_tokens > max_tokens:
truncated_lines.append("[Content truncated due to length]")
break
truncated_lines.append(line)
current_tokens += line_tokens
return '\n'.join(truncated_lines)
def optimize_content_for_tokens(self, transcripts: List[Dict], documents: List[Dict],
image_insights: List[Dict], user_prompt: str) -> Tuple[List[Dict], List[Dict], List[Dict]]:
"""Optimize content to fit within token limits"""
# Truncate transcripts
total_transcript_text = ""
for transcript in transcripts:
total_transcript_text += transcript.get('content', '') + "\n\n"
transcript_tokens = self.count_tokens(total_transcript_text)
if transcript_tokens > self.max_transcript_tokens:
print(f"Transcripts too long ({transcript_tokens} tokens), truncating to {self.max_transcript_tokens}")
tokens_per_transcript = self.max_transcript_tokens // len(transcripts) if transcripts else 0
for transcript in transcripts:
transcript['content'] = self.truncate_text(
transcript['content'], tokens_per_transcript
)
# Truncate documents
total_document_text = ""
for doc in documents:
total_document_text += doc.get('content', '') + "\n\n"
doc_tokens = self.count_tokens(total_document_text)
if doc_tokens > self.max_document_tokens:
print(f"Documents too long ({doc_tokens} tokens), truncating to {self.max_document_tokens}")
tokens_per_doc = self.max_document_tokens // len(documents) if documents else 0
for doc in documents:
doc['content'] = self.truncate_text(
doc['content'], tokens_per_doc
)
# Truncate image analysis
if len(image_insights) > 10:
print(f"Too many images ({len(image_insights)}), limiting to 10")
image_insights = image_insights[:10]
for img in image_insights:
if 'analysis' in img:
desc = img['analysis'].get('description', '')
text = img['analysis'].get('extracted_text', '')
if desc:
img['analysis']['description'] = self.truncate_text(desc, 200)
if text:
img['analysis']['extracted_text'] = self.truncate_text(text, 300)
return transcripts, documents, image_insights
class AISummaryManager:
"""Enhanced AI-powered conference summarization using Azure OpenAI with full backend integration"""
def __init__(self):
# Azure OpenAI Configuration
self.azure_openai_endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT")
self.azure_openai_key = os.environ.get("AZURE_OPENAI_KEY")
self.azure_openai_deployment = os.environ.get("AZURE_OPENAI_DEPLOYMENT", "gpt-4o-mini")
self.azure_openai_api_version = os.environ.get("AZURE_OPENAI_API_VERSION", "2024-08-01-preview")
# Computer Vision Configuration
self.cv_endpoint = os.environ.get("COMPUTER_VISION_ENDPOINT")
self.cv_key = os.environ.get("COMPUTER_VISION_KEY")
# Initialize services
self.cv_client = None
self.file_processor = FileProcessor()
self.frame_extractor = VideoFrameExtractor()
self.token_manager = TokenManager(self.azure_openai_deployment)
# Background processing
self.executor = ThreadPoolExecutor(max_workers=3)
self.running = True
# Initialize backend integration
self.backend_manager = None
self._init_backend_integration()
self._init_services()
# Start background worker for AI summary processing
self.worker_thread = threading.Thread(target=self._background_summary_worker, daemon=True)
self.worker_thread.start()
print("🤖 Enhanced AI Summary Manager initialized with full backend integration")
def _init_backend_integration(self):
"""Initialize integration with the enhanced backend"""
try:
# Import the transcription manager from backend
from backend import transcription_manager
self.backend_manager = transcription_manager
print("🔗 Backend integration initialized successfully")
except ImportError as e:
print(f"Warning: Could not initialize backend integration: {e}")
self.backend_manager = None
def _background_summary_worker(self):
"""Background worker specifically for AI summary processing"""
iteration_count = 0
while self.running:
try:
if self.backend_manager:
# Get pending AI summary jobs from the backend
pending_summary_jobs = self.backend_manager.db.get_pending_summary_jobs()
if pending_summary_jobs and iteration_count % 6 == 0: # Log every minute
active_summaries = len([j for j in pending_summary_jobs if j.status == 'processing'])
queued_summaries = len([j for j in pending_summary_jobs if j.status == 'pending'])
if active_summaries > 0 or queued_summaries > 0:
print(f"🤖 AI Summary worker: {active_summaries} processing, {queued_summaries} queued")
# Process pending AI summary jobs
for job in pending_summary_jobs:
if job.status == 'pending':
self.executor.submit(self._process_summary_job_background, job.job_id)
time.sleep(10) # Check every 10 seconds
iteration_count += 1
except Exception as e:
print(f"❌ AI Summary background worker error: {e}")
time.sleep(30)
def _init_services(self):
"""Initialize services with validation"""
# Validate Azure OpenAI configuration
if not all([self.azure_openai_endpoint, self.azure_openai_key, self.azure_openai_deployment]):
print("ERROR: Missing Azure OpenAI configuration")
print("Required environment variables:")
print("- AZURE_OPENAI_ENDPOINT")
print("- AZURE_OPENAI_KEY")
print("- AZURE_OPENAI_DEPLOYMENT")
raise ValueError("Azure OpenAI configuration incomplete")
# Validate endpoint format
if not self.azure_openai_endpoint.startswith("https://"):
raise ValueError("AZURE_OPENAI_ENDPOINT must be a valid HTTPS URL")
# Remove trailing slash from endpoint
self.azure_openai_endpoint = self.azure_openai_endpoint.rstrip('/')
print(f"🤖 Azure OpenAI initialized: {self.azure_openai_deployment} at {self.azure_openai_endpoint}")
# Test Azure OpenAI connection
try:
self._test_azure_openai_connection()
except Exception as e:
print(f"WARNING: Azure OpenAI connection test failed: {e}")
# Initialize Computer Vision Client
if self.cv_key and self.cv_endpoint:
try:
self.cv_client = ComputerVisionClient(
self.cv_endpoint,
CognitiveServicesCredentials(self.cv_key)
)
print("👁️ Computer Vision Client initialized")
except Exception as e:
print(f"WARNING: Computer Vision initialization failed: {e}")
else:
print("Computer Vision key/endpoint not found - image processing disabled")
def _test_azure_openai_connection(self):
"""Test Azure OpenAI connection"""
url = f"{self.azure_openai_endpoint}/openai/deployments/{self.azure_openai_deployment}/chat/completions?api-version={self.azure_openai_api_version}"
headers = {
"Content-Type": "application/json",
"api-key": self.azure_openai_key
}
test_data = {
"messages": [{"role": "user", "content": "Hello"}],
"max_tokens": 5,
"temperature": 0
}
try:
response = requests.post(url, headers=headers, json=test_data, timeout=10)
if response.status_code == 200:
print("✅ Azure OpenAI connection test: SUCCESS")
else:
print(f"Azure OpenAI connection test failed: {response.status_code} - {response.text}")
raise Exception(f"Connection test failed: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Azure OpenAI connection test error: {e}")
raise
def submit_summary_job_enhanced(
self,
user_id: str,
content_mode: str,
summary_type: str,
user_prompt: str,
existing_transcript_ids=None,
audio_video_files=None,
document_files=None,
settings=None,
) -> str:
"""
Compatibility wrapper expected by app.py.
Normalizes inputs and forwards to submit_summary_job().
"""
existing_transcript_ids = existing_transcript_ids or []
files = (audio_video_files or []) + (document_files or [])
settings = dict(settings or {})
# Preserve content mode and transcript ids for background worker
settings.setdefault("content_mode", content_mode)
if existing_transcript_ids:
settings.setdefault("transcript_job_ids", existing_transcript_ids)
return self.submit_summary_job(
user_id=user_id,
summary_type=summary_type,
user_prompt=user_prompt,
files=files,
transcript_job_ids=existing_transcript_ids,
settings=settings,
)
def submit_summary_job(
self,
user_id: str,
summary_type: str,
user_prompt: str,
files: List = None,
transcript_job_ids: List[str] = None,
settings: Dict = None
) -> str:
"""Submit a new AI summary job with enhanced backend integration"""
job_id = str(uuid.uuid4())
original_files = []
if files:
original_files.extend([f.name if hasattr(f, 'name') else str(f) for f in files])
if transcript_job_ids:
original_files.extend([f"transcript_{tid[:8]}..." for tid in transcript_job_ids])
print(f"🤖 [{user_id[:8]}...] New AI summary job: {summary_type}")
print(f"User prompt: {user_prompt[:100]}{'...' if len(user_prompt) > 100 else ''}")
job = SummaryJob(
job_id=job_id,
user_id=user_id,
original_files=original_files,
summary_type=summary_type,
user_prompt=user_prompt,
status="pending",
created_at=datetime.now().isoformat(),
settings=settings or {}
)
# Save job to backend database
if self.backend_manager:
self.backend_manager.save_summary_job(job)
else:
self._save_summary_job_fallback(job)
# Start foreground processing for immediate jobs or let background worker handle it
if files and len(files) > 0:
# For new files, start processing immediately
self.executor.submit(self._process_summary_job, job_id, files, transcript_job_ids)
else:
# For existing transcripts only, let background worker handle it
pass
return job_id
def _process_summary_job_background(self, job_id: str):
"""Process AI summary job from background worker"""
self._process_summary_job(job_id, [], [])
def _save_summary_job_fallback(self, job: SummaryJob):
"""Fallback method to save summary job if backend is not available"""
try:
# Create a simple local storage fallback
os.makedirs("temp/summary_jobs", exist_ok=True)
job_file = f"temp/summary_jobs/{job.job_id}.json"
with open(job_file, 'w', encoding='utf-8') as f:
json.dump(asdict(job), f, ensure_ascii=False, indent=2, default=str)
print(f"Job saved to fallback storage: {job_file}")
except Exception as e:
print(f"Error saving job to fallback storage: {e}")
def _process_summary_job(self, job_id: str, files: List = None, transcript_job_ids: List[str] = None):
"""Process AI summary job using Azure OpenAI with enhanced error handling and chat storage"""
job = None
try:
job = self.get_summary_status(job_id)
if not job:
print(f"Job {job_id[:8]}... not found")
return
print(f"🤖 [{job.user_id[:8]}...] Processing AI summary job: {job_id[:8]}...")
job.status = "processing"
self._update_job_status(job)
# Process all input sources
processed_content = {
'transcripts': [],
'documents': [],
'images': [],
'extracted_frames': []
}
# Process existing transcripts
if transcript_job_ids:
processed_content['transcripts'] = self._get_existing_transcripts(transcript_job_ids, job.user_id)
print(f"Processed {len(processed_content['transcripts'])} existing transcripts")
# Also check job's settings for transcript IDs (from background processing)
if not transcript_job_ids and job.settings and 'transcript_job_ids' in job.settings:
transcript_job_ids = job.settings['transcript_job_ids']
processed_content['transcripts'] = self._get_existing_transcripts(transcript_job_ids, job.user_id)
print(f"Processed {len(processed_content['transcripts'])} transcripts from job settings")
# Process uploaded files
if files:
for i, file in enumerate(files):
print(f"Processing file {i+1}/{len(files)}: {getattr(file, 'name', 'unknown')}")
file_content = self._process_uploaded_file(file, job.user_id)
if file_content:
if file_content['type'] == 'video':
frames = self._extract_significant_frames(file_content['path'])
processed_content['extracted_frames'].extend(frames)
print(f"Extracted {len(frames)} frames from video")
elif file_content['type'] == 'document':
processed_content['documents'].append(file_content)
elif file_content['type'] == 'image':
processed_content['images'].append(file_content)
# Analyze images with Computer Vision
image_insights = []
all_images = processed_content['images'] + processed_content['extracted_frames']
print(f"Analyzing {len(all_images)} images...")
for image_info in all_images:
analysis = self._analyze_image_content(image_info['path'])
if analysis:
image_insights.append({
'source': image_info['filename'],
'analysis': analysis
})
print(f"Analysis complete: {len(processed_content['transcripts'])} transcripts, {len(processed_content['documents'])} documents, {len(image_insights)} images")
# Optimize content for token limits
optimized_transcripts, optimized_documents, optimized_images = self.token_manager.optimize_content_for_tokens(
processed_content['transcripts'],
processed_content['documents'],
image_insights,
job.user_prompt
)
output_language = job.settings.get('output_language', 'English') if job.settings else 'English'
# Generate AI summary using Azure OpenAI
summary_result = self._generate_ai_summary_with_openai(
transcripts=optimized_transcripts,
documents=optimized_documents,
image_insights=optimized_images,
user_prompt=job.user_prompt,
output_language=output_language
)
# Store response to chat container using backend integration
chat_url = ""
if self.backend_manager and hasattr(self.backend_manager.db, '_store_chat_response'):
try:
chat_url = self.backend_manager.db._store_chat_response(job_id, summary_result, job.user_id)
print(f"💬 Chat response stored successfully: {chat_url}")
except Exception as e:
print(f"⚠️ Warning: Could not store chat response: {e}")
# Update job with results
job.status = "completed"
job.summary_text = summary_result
job.completed_at = datetime.now().isoformat()
job.processed_files = {
'transcript_count': len(processed_content['transcripts']),
'document_count': len(processed_content['documents']),
'image_count': len(all_images),
'extracted_frames': len(processed_content['extracted_frames'])
}
job.extracted_images = [img['filename'] for img in processed_content['extracted_frames']]
job.chat_response_url = chat_url
self._update_job_status(job)
print(f"✅ [{job.user_id[:8]}...] AI summary completed: {job_id[:8]}...")
except Exception as e:
print(f"❌ AI summary processing failed: {e}")
if job:
job.status = "failed"
job.error_message = str(e)
job.completed_at = datetime.now().isoformat()
self._update_job_status(job)
def _update_job_status(self, job: SummaryJob):
"""Update job status in backend or fallback storage"""
if self.backend_manager:
self.backend_manager.save_summary_job(job)
else:
self._save_summary_job_fallback(job)
def _generate_ai_summary_with_openai(
self,
transcripts: List[Dict],
documents: List[Dict],
image_insights: List[Dict],
user_prompt: str,
output_language: str = "English"
) -> str:
"""Generate AI summary using Azure OpenAI with proper error handling"""
try:
print("🤖 Generating AI summary with Azure OpenAI...")
# Prepare the context
context = self._prepare_text_content(transcripts, documents, image_insights, user_prompt, output_language)
# Check final token count
final_tokens = self.token_manager.count_tokens(context)
print(f"Final input tokens: {final_tokens} / {self.token_manager.max_input_tokens}")
if final_tokens > self.token_manager.max_input_tokens:
print("Content too long, applying emergency truncation")
context = self.token_manager.truncate_text(context, self.token_manager.max_input_tokens)
# Prepare the Azure OpenAI API request with correct URL format
url = f"{self.azure_openai_endpoint}/openai/deployments/{self.azure_openai_deployment}/chat/completions?api-version={self.azure_openai_api_version}"
headers = {
"Content-Type": "application/json",
"api-key": self.azure_openai_key
}
# Adjust max tokens based on model
max_completion_tokens = 4000
if "gpt-4o" in self.azure_openai_deployment:
max_completion_tokens = 8000
data = {
"messages": [
{
"role": "system",
"content": f"You are an expert conference analyst and AI assistant. Create comprehensive, professional summaries in {output_language}. Focus on key insights, strategic decisions, action items, and important discussions. Integrate information from all sources provided (transcripts, documents, visual content). Structure your response clearly with headers and bullet points where appropriate."
},
{
"role": "user",
"content": context
}
],
"max_tokens": max_completion_tokens,
"temperature": 0.2,
"top_p": 0.3,
"frequency_penalty": 0,
"presence_penalty": 0
}
print(f"Making API request to: {url}")
print(f"Using model: {self.azure_openai_deployment}")
# Make the API request with retries
max_retries = 3
for attempt in range(max_retries):
try:
response = requests.post(
url,
headers=headers,
json=data,
timeout=300 # 5 minute timeout
)
print(f"API Response Status: {response.status_code}")
if response.status_code == 200:
break
elif response.status_code == 429:
# Rate limit - wait and retry
wait_time = 2 ** attempt
print(f"Rate limited, waiting {wait_time} seconds...")
time.sleep(wait_time)
continue
else:
error_msg = f"Azure OpenAI API error: {response.status_code} - {response.text}"
print(error_msg)
if attempt == max_retries - 1:
raise Exception(error_msg)
time.sleep(1)
continue
except requests.exceptions.Timeout:
if attempt == max_retries - 1:
raise Exception("Azure OpenAI request timed out after multiple retries")
print(f"Request timeout, retrying... (attempt {attempt + 1})")
time.sleep(2)
continue
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise Exception(f"Azure OpenAI request failed: {str(e)}")
print(f"Request error, retrying... (attempt {attempt + 1}): {e}")
time.sleep(2)
continue
try:
result = response.json()
except json.JSONDecodeError as e:
raise Exception(f"Invalid JSON response from Azure OpenAI: {str(e)}")
# Extract the AI response
if 'choices' in result and len(result['choices']) > 0:
choice = result['choices'][0]
if 'message' in choice and 'content' in choice['message']:
ai_response = choice['message']['content']
# Check for completion reason
finish_reason = choice.get('finish_reason', '')
if finish_reason == 'content_filter':
raise Exception("Content was filtered by Azure OpenAI safety systems")
elif finish_reason == 'length':
print("Response was truncated due to length limit")
ai_response += "\n\n[Response was truncated due to length limit]"
print(f"✅ AI summary generated successfully in {output_language}")
return ai_response
else:
raise Exception(f"Unexpected response format: {result}")
else:
raise Exception(f"No response generated from Azure OpenAI: {result}")
except Exception as e:
error_msg = f"Azure OpenAI generation failed: {str(e)}"
print(error_msg)
raise Exception(error_msg)
def _prepare_text_content(self, transcripts, documents, image_insights, user_prompt, output_language="English"):
"""Prepare comprehensive text content for AI analysis"""
context_parts = [
"# Enhanced AI Conference Analysis Request",
f"**User Instructions:** {user_prompt}",
f"**Output Language:** {output_language}",
"",
"## Comprehensive Content Analysis:",
""
]
# Add transcript content
if transcripts:
context_parts.append("### 🎙️ Transcription Content:")
for i, transcript in enumerate(transcripts, 1):
context_parts.append(f"**Source {i}: {transcript['source']}**")
context_parts.append(transcript['content'])
context_parts.append("")
# Add document content
if documents:
context_parts.append("### 📄 Document Content:")
for i, doc in enumerate(documents, 1):
context_parts.append(f"**Document {i}: {doc['filename']}**")
context_parts.append(doc['content'])
context_parts.append("")
# Add image analysis
if image_insights:
context_parts.append("### 👁️ Visual Content Analysis:")
for i, img in enumerate(image_insights, 1):
context_parts.append(f"**Image {i}: {img['source']}**")
analysis = img['analysis']
context_parts.append(f"Description: {analysis.get('description', 'N/A')}")
if analysis.get('extracted_text'):
context_parts.append(f"Text Content: {analysis['extracted_text']}")
context_parts.append("")
context_parts.extend([
"## AI Analysis Instructions:",
f"Create a comprehensive, professional conference analysis in {output_language}.",
"Follow the user's specific instructions for format and focus areas.",
"Integrate information from ALL sources (transcripts, documents, visual content).",
"Structure your response with clear headers and organize information logically.",
"Highlight key insights, strategic decisions, action items, and important discussions.",
"Provide actionable recommendations based on the comprehensive content analysis.",
"Include timestamps and speaker references where relevant.",
"Ensure the summary is professional, accurate, and valuable for business decision-making.",
])
return "\n".join(context_parts)
def _process_uploaded_file(self, file, user_id: str) -> Optional[Dict]:
"""Process uploaded file"""
try:
if hasattr(file, 'name'):
file_path = file.name
filename = os.path.basename(file_path)
elif isinstance(file, str):
file_path = file
filename = os.path.basename(file_path)
else:
return None
if not os.path.exists(file_path):
return None
file_size = os.path.getsize(file_path)
if file_size > 500 * 1024 * 1024: # 500MB limit
return None
ext = filename.split('.')[-1].lower() if '.' in filename else ''
if ext in ['mp4', 'mov', 'avi', 'mkv', 'webm', 'flv', '3gp', 'wmv']:
return {
'type': 'video',
'filename': filename,
'path': file_path,
'extension': ext,
'size': file_size
}
elif ext in ['jpg', 'jpeg', 'png', 'bmp', 'gif', 'tiff', 'webp']:
return {
'type': 'image',
'filename': filename,
'path': file_path,
'extension': ext,
'size': file_size
}
elif ext in ['pdf', 'docx', 'doc', 'pptx', 'ppt', 'xlsx', 'xls', 'txt', 'json', 'csv']:
content = self.file_processor.process_file(file_path, ext)
if content:
return {
'type': 'document',
'filename': filename,
'path': file_path,
'extension': ext,
'content': content,
'size': file_size
}
return None
except Exception as e:
print(f"Error processing file: {e}")
return None
def _extract_significant_frames(self, video_path: str) -> List[Dict]:
"""Extract significant frames from video"""
try:
frames = self.frame_extractor.extract_frames(video_path)
return frames if frames else []
except Exception as e:
print(f"Frame extraction failed: {e}")
return []
def _analyze_image_content(self, image_path: str) -> Optional[Dict]:
"""Analyze image content with Computer Vision"""
if not self.cv_client:
return None
try:
with open(image_path, 'rb') as image_stream:
# OCR
ocr_result = self.cv_client.read_in_stream(image_stream, raw=True)
operation_id = ocr_result.headers["Operation-Location"].split("/")[-1]
# Wait for completion
timeout = 30
start_time = time.time()
while True:
if time.time() - start_time > timeout:
break
read_result = self.cv_client.get_read_result(operation_id)
if read_result.status not in ['notStarted', 'running']:
break
time.sleep(1)
# Extract text
extracted_text = ""
if read_result.status == OperationStatusCodes.succeeded:
for text_result in read_result.analyze_result.read_results:
for line in text_result.lines:
extracted_text += line.text + "\n"
# Get description
image_stream.seek(0)
description_result = self.cv_client.describe_image_in_stream(
image_stream,
max_candidates=3,
language='en'
)
return {
'extracted_text': extracted_text.strip(),
'description': description_result.captions[0].text if description_result.captions else "",
'confidence': description_result.captions[0].confidence if description_result.captions else 0
}
except Exception as e:
print(f"Image analysis failed: {e}")
return None
def _get_existing_transcripts(self, transcript_job_ids: List[str], user_id: str) -> List[Dict]:
"""Get existing transcripts using backend integration"""
transcripts = []
if self.backend_manager:
for job_id in transcript_job_ids:
try:
job = self.backend_manager.get_job_status(job_id)
if job and job.user_id == user_id and job.transcript_text:
transcripts.append({
'source': f"Previous transcript: {job.original_filename}",
'content': job.transcript_text
})
except Exception as e:
print(f"Error getting transcript {job_id[:8]}...: {e}")
else:
print("Backend manager not available, cannot retrieve existing transcripts")
return transcripts
def get_summary_status(self, job_id: str) -> Optional[SummaryJob]:
"""Get current summary job status using backend integration"""
if self.backend_manager:
return self.backend_manager.get_summary_job(job_id)
else:
# Fallback to local storage
try:
job_file = f"temp/summary_jobs/{job_id}.json"
if os.path.exists(job_file):
with open(job_file, 'r', encoding='utf-8') as f:
job_data = json.load(f)
# Convert back to SummaryJob object
return SummaryJob(**job_data)
except Exception as e:
print(f"Error loading job from fallback storage: {e}")
return None
def get_user_summary_history(self, user_id: str, limit: int = 20) -> List[SummaryJob]:
"""Get user's summary history using backend integration"""
if self.backend_manager:
return self.backend_manager.get_user_summary_history(user_id, limit)
else:
# Fallback to local storage
try:
history = []
jobs_dir = "temp/summary_jobs"
if os.path.exists(jobs_dir):
for filename in os.listdir(jobs_dir):
if filename.endswith('.json'):
job_file = os.path.join(jobs_dir, filename)
try:
with open(job_file, 'r', encoding='utf-8') as f:
job_data = json.load(f)
if job_data.get('user_id') == user_id:
history.append(SummaryJob(**job_data))
except Exception as e:
print(f"Error loading job file {filename}: {e}")
# Sort by creation date and limit
history.sort(key=lambda x: x.created_at, reverse=True)
return history[:limit]
except Exception as e:
print(f"Error getting user history from fallback storage: {e}")
return []
# Global AI summary manager instance with enhanced backend integration
ai_summary_manager = AISummaryManager()