|
|
import os
|
|
|
import json
|
|
|
import uuid
|
|
|
import time
|
|
|
import base64
|
|
|
from datetime import datetime
|
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
from dataclasses import dataclass, asdict
|
|
|
import tempfile
|
|
|
import shutil
|
|
|
from dotenv import load_dotenv
|
|
|
import tiktoken
|
|
|
import requests
|
|
|
import threading
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
|
|
from azure.cognitiveservices.vision.computervision import ComputerVisionClient
|
|
|
from azure.cognitiveservices.vision.computervision.models import OperationStatusCodes
|
|
|
from msrest.authentication import CognitiveServicesCredentials
|
|
|
|
|
|
from file_processors import FileProcessor
|
|
|
from image_extraction import VideoFrameExtractor
|
|
|
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
@dataclass
|
|
|
class SummaryJob:
|
|
|
job_id: str
|
|
|
user_id: str
|
|
|
original_files: List[str]
|
|
|
summary_type: str
|
|
|
user_prompt: str
|
|
|
status: str
|
|
|
created_at: str
|
|
|
completed_at: Optional[str] = None
|
|
|
summary_text: Optional[str] = None
|
|
|
processed_files: Optional[Dict] = None
|
|
|
extracted_images: Optional[List[str]] = None
|
|
|
transcript_text: Optional[str] = None
|
|
|
error_message: Optional[str] = None
|
|
|
settings: Optional[Dict] = None
|
|
|
chat_response_url: Optional[str] = None
|
|
|
|
|
|
class TokenManager:
|
|
|
"""Manage token counting and content truncation for GPT models"""
|
|
|
|
|
|
def __init__(self, model_name: str = "gpt-4o-mini"):
|
|
|
try:
|
|
|
|
|
|
model_encoding_map = {
|
|
|
"gpt-4o": "o200k_base",
|
|
|
"gpt-4o-mini": "o200k_base",
|
|
|
"gpt-4": "cl100k_base",
|
|
|
"gpt-4-turbo": "cl100k_base",
|
|
|
"gpt-35-turbo": "cl100k_base"
|
|
|
}
|
|
|
|
|
|
encoding_name = model_encoding_map.get(model_name, "cl100k_base")
|
|
|
self.encoder = tiktoken.get_encoding(encoding_name)
|
|
|
except Exception as e:
|
|
|
print(f"Warning: Could not load tokenizer for {model_name}, using fallback: {e}")
|
|
|
self.encoder = tiktoken.get_encoding("cl100k_base")
|
|
|
|
|
|
|
|
|
if "gpt-4o" in model_name:
|
|
|
self.max_input_tokens = 120000
|
|
|
else:
|
|
|
self.max_input_tokens = 100000
|
|
|
|
|
|
self.max_transcript_tokens = 80000
|
|
|
self.max_document_tokens = 30000
|
|
|
self.max_image_analysis_tokens = 10000
|
|
|
|
|
|
def count_tokens(self, text: str) -> int:
|
|
|
"""Count tokens in text"""
|
|
|
try:
|
|
|
return len(self.encoder.encode(text))
|
|
|
except Exception:
|
|
|
|
|
|
return len(text) // 4
|
|
|
|
|
|
def truncate_text(self, text: str, max_tokens: int) -> str:
|
|
|
"""Truncate text to fit within token limit"""
|
|
|
if not text:
|
|
|
return text
|
|
|
|
|
|
current_tokens = self.count_tokens(text)
|
|
|
if current_tokens <= max_tokens:
|
|
|
return text
|
|
|
|
|
|
lines = text.split('\n')
|
|
|
if len(lines) == 1:
|
|
|
|
|
|
chars_per_token = len(text) / current_tokens
|
|
|
target_chars = int(max_tokens * chars_per_token * 0.9)
|
|
|
return text[:target_chars] + "\n[Content truncated due to length]"
|
|
|
|
|
|
|
|
|
truncated_lines = []
|
|
|
current_tokens = 0
|
|
|
|
|
|
for line in lines:
|
|
|
line_tokens = self.count_tokens(line + '\n')
|
|
|
if current_tokens + line_tokens > max_tokens:
|
|
|
truncated_lines.append("[Content truncated due to length]")
|
|
|
break
|
|
|
truncated_lines.append(line)
|
|
|
current_tokens += line_tokens
|
|
|
|
|
|
return '\n'.join(truncated_lines)
|
|
|
|
|
|
def optimize_content_for_tokens(self, transcripts: List[Dict], documents: List[Dict],
|
|
|
image_insights: List[Dict], user_prompt: str) -> Tuple[List[Dict], List[Dict], List[Dict]]:
|
|
|
"""Optimize content to fit within token limits"""
|
|
|
|
|
|
|
|
|
total_transcript_text = ""
|
|
|
for transcript in transcripts:
|
|
|
total_transcript_text += transcript.get('content', '') + "\n\n"
|
|
|
|
|
|
transcript_tokens = self.count_tokens(total_transcript_text)
|
|
|
if transcript_tokens > self.max_transcript_tokens:
|
|
|
print(f"Transcripts too long ({transcript_tokens} tokens), truncating to {self.max_transcript_tokens}")
|
|
|
tokens_per_transcript = self.max_transcript_tokens // len(transcripts) if transcripts else 0
|
|
|
for transcript in transcripts:
|
|
|
transcript['content'] = self.truncate_text(
|
|
|
transcript['content'], tokens_per_transcript
|
|
|
)
|
|
|
|
|
|
|
|
|
total_document_text = ""
|
|
|
for doc in documents:
|
|
|
total_document_text += doc.get('content', '') + "\n\n"
|
|
|
|
|
|
doc_tokens = self.count_tokens(total_document_text)
|
|
|
if doc_tokens > self.max_document_tokens:
|
|
|
print(f"Documents too long ({doc_tokens} tokens), truncating to {self.max_document_tokens}")
|
|
|
tokens_per_doc = self.max_document_tokens // len(documents) if documents else 0
|
|
|
for doc in documents:
|
|
|
doc['content'] = self.truncate_text(
|
|
|
doc['content'], tokens_per_doc
|
|
|
)
|
|
|
|
|
|
|
|
|
if len(image_insights) > 10:
|
|
|
print(f"Too many images ({len(image_insights)}), limiting to 10")
|
|
|
image_insights = image_insights[:10]
|
|
|
|
|
|
for img in image_insights:
|
|
|
if 'analysis' in img:
|
|
|
desc = img['analysis'].get('description', '')
|
|
|
text = img['analysis'].get('extracted_text', '')
|
|
|
if desc:
|
|
|
img['analysis']['description'] = self.truncate_text(desc, 200)
|
|
|
if text:
|
|
|
img['analysis']['extracted_text'] = self.truncate_text(text, 300)
|
|
|
|
|
|
return transcripts, documents, image_insights
|
|
|
|
|
|
class AISummaryManager:
|
|
|
"""Enhanced AI-powered conference summarization using Azure OpenAI with full backend integration"""
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
|
self.azure_openai_endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT")
|
|
|
self.azure_openai_key = os.environ.get("AZURE_OPENAI_KEY")
|
|
|
self.azure_openai_deployment = os.environ.get("AZURE_OPENAI_DEPLOYMENT", "gpt-4o-mini")
|
|
|
self.azure_openai_api_version = os.environ.get("AZURE_OPENAI_API_VERSION", "2024-08-01-preview")
|
|
|
|
|
|
|
|
|
self.cv_endpoint = os.environ.get("COMPUTER_VISION_ENDPOINT")
|
|
|
self.cv_key = os.environ.get("COMPUTER_VISION_KEY")
|
|
|
|
|
|
|
|
|
self.cv_client = None
|
|
|
self.file_processor = FileProcessor()
|
|
|
self.frame_extractor = VideoFrameExtractor()
|
|
|
self.token_manager = TokenManager(self.azure_openai_deployment)
|
|
|
|
|
|
|
|
|
self.executor = ThreadPoolExecutor(max_workers=3)
|
|
|
self.running = True
|
|
|
|
|
|
|
|
|
self.backend_manager = None
|
|
|
self._init_backend_integration()
|
|
|
|
|
|
self._init_services()
|
|
|
|
|
|
|
|
|
self.worker_thread = threading.Thread(target=self._background_summary_worker, daemon=True)
|
|
|
self.worker_thread.start()
|
|
|
|
|
|
print("🤖 Enhanced AI Summary Manager initialized with full backend integration")
|
|
|
|
|
|
def _init_backend_integration(self):
|
|
|
"""Initialize integration with the enhanced backend"""
|
|
|
try:
|
|
|
|
|
|
from backend import transcription_manager
|
|
|
self.backend_manager = transcription_manager
|
|
|
print("🔗 Backend integration initialized successfully")
|
|
|
except ImportError as e:
|
|
|
print(f"Warning: Could not initialize backend integration: {e}")
|
|
|
self.backend_manager = None
|
|
|
|
|
|
def _background_summary_worker(self):
|
|
|
"""Background worker specifically for AI summary processing"""
|
|
|
iteration_count = 0
|
|
|
while self.running:
|
|
|
try:
|
|
|
if self.backend_manager:
|
|
|
|
|
|
pending_summary_jobs = self.backend_manager.db.get_pending_summary_jobs()
|
|
|
|
|
|
if pending_summary_jobs and iteration_count % 6 == 0:
|
|
|
active_summaries = len([j for j in pending_summary_jobs if j.status == 'processing'])
|
|
|
queued_summaries = len([j for j in pending_summary_jobs if j.status == 'pending'])
|
|
|
if active_summaries > 0 or queued_summaries > 0:
|
|
|
print(f"🤖 AI Summary worker: {active_summaries} processing, {queued_summaries} queued")
|
|
|
|
|
|
|
|
|
for job in pending_summary_jobs:
|
|
|
if job.status == 'pending':
|
|
|
self.executor.submit(self._process_summary_job_background, job.job_id)
|
|
|
|
|
|
time.sleep(10)
|
|
|
iteration_count += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ AI Summary background worker error: {e}")
|
|
|
time.sleep(30)
|
|
|
|
|
|
def _init_services(self):
|
|
|
"""Initialize services with validation"""
|
|
|
|
|
|
if not all([self.azure_openai_endpoint, self.azure_openai_key, self.azure_openai_deployment]):
|
|
|
print("ERROR: Missing Azure OpenAI configuration")
|
|
|
print("Required environment variables:")
|
|
|
print("- AZURE_OPENAI_ENDPOINT")
|
|
|
print("- AZURE_OPENAI_KEY")
|
|
|
print("- AZURE_OPENAI_DEPLOYMENT")
|
|
|
raise ValueError("Azure OpenAI configuration incomplete")
|
|
|
|
|
|
|
|
|
if not self.azure_openai_endpoint.startswith("https://"):
|
|
|
raise ValueError("AZURE_OPENAI_ENDPOINT must be a valid HTTPS URL")
|
|
|
|
|
|
|
|
|
self.azure_openai_endpoint = self.azure_openai_endpoint.rstrip('/')
|
|
|
|
|
|
print(f"🤖 Azure OpenAI initialized: {self.azure_openai_deployment} at {self.azure_openai_endpoint}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
self._test_azure_openai_connection()
|
|
|
except Exception as e:
|
|
|
print(f"WARNING: Azure OpenAI connection test failed: {e}")
|
|
|
|
|
|
|
|
|
if self.cv_key and self.cv_endpoint:
|
|
|
try:
|
|
|
self.cv_client = ComputerVisionClient(
|
|
|
self.cv_endpoint,
|
|
|
CognitiveServicesCredentials(self.cv_key)
|
|
|
)
|
|
|
print("👁️ Computer Vision Client initialized")
|
|
|
except Exception as e:
|
|
|
print(f"WARNING: Computer Vision initialization failed: {e}")
|
|
|
else:
|
|
|
print("Computer Vision key/endpoint not found - image processing disabled")
|
|
|
|
|
|
def _test_azure_openai_connection(self):
|
|
|
"""Test Azure OpenAI connection"""
|
|
|
url = f"{self.azure_openai_endpoint}/openai/deployments/{self.azure_openai_deployment}/chat/completions?api-version={self.azure_openai_api_version}"
|
|
|
|
|
|
headers = {
|
|
|
"Content-Type": "application/json",
|
|
|
"api-key": self.azure_openai_key
|
|
|
}
|
|
|
|
|
|
test_data = {
|
|
|
"messages": [{"role": "user", "content": "Hello"}],
|
|
|
"max_tokens": 5,
|
|
|
"temperature": 0
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
response = requests.post(url, headers=headers, json=test_data, timeout=10)
|
|
|
if response.status_code == 200:
|
|
|
print("✅ Azure OpenAI connection test: SUCCESS")
|
|
|
else:
|
|
|
print(f"Azure OpenAI connection test failed: {response.status_code} - {response.text}")
|
|
|
raise Exception(f"Connection test failed: {response.status_code}")
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
print(f"Azure OpenAI connection test error: {e}")
|
|
|
raise
|
|
|
|
|
|
def submit_summary_job_enhanced(
|
|
|
self,
|
|
|
user_id: str,
|
|
|
content_mode: str,
|
|
|
summary_type: str,
|
|
|
user_prompt: str,
|
|
|
existing_transcript_ids=None,
|
|
|
audio_video_files=None,
|
|
|
document_files=None,
|
|
|
settings=None,
|
|
|
) -> str:
|
|
|
"""
|
|
|
Compatibility wrapper expected by app.py.
|
|
|
Normalizes inputs and forwards to submit_summary_job().
|
|
|
"""
|
|
|
existing_transcript_ids = existing_transcript_ids or []
|
|
|
files = (audio_video_files or []) + (document_files or [])
|
|
|
settings = dict(settings or {})
|
|
|
|
|
|
settings.setdefault("content_mode", content_mode)
|
|
|
if existing_transcript_ids:
|
|
|
settings.setdefault("transcript_job_ids", existing_transcript_ids)
|
|
|
|
|
|
return self.submit_summary_job(
|
|
|
user_id=user_id,
|
|
|
summary_type=summary_type,
|
|
|
user_prompt=user_prompt,
|
|
|
files=files,
|
|
|
transcript_job_ids=existing_transcript_ids,
|
|
|
settings=settings,
|
|
|
)
|
|
|
|
|
|
def submit_summary_job(
|
|
|
self,
|
|
|
user_id: str,
|
|
|
summary_type: str,
|
|
|
user_prompt: str,
|
|
|
files: List = None,
|
|
|
transcript_job_ids: List[str] = None,
|
|
|
settings: Dict = None
|
|
|
) -> str:
|
|
|
"""Submit a new AI summary job with enhanced backend integration"""
|
|
|
job_id = str(uuid.uuid4())
|
|
|
|
|
|
original_files = []
|
|
|
if files:
|
|
|
original_files.extend([f.name if hasattr(f, 'name') else str(f) for f in files])
|
|
|
if transcript_job_ids:
|
|
|
original_files.extend([f"transcript_{tid[:8]}..." for tid in transcript_job_ids])
|
|
|
|
|
|
print(f"🤖 [{user_id[:8]}...] New AI summary job: {summary_type}")
|
|
|
print(f"User prompt: {user_prompt[:100]}{'...' if len(user_prompt) > 100 else ''}")
|
|
|
|
|
|
job = SummaryJob(
|
|
|
job_id=job_id,
|
|
|
user_id=user_id,
|
|
|
original_files=original_files,
|
|
|
summary_type=summary_type,
|
|
|
user_prompt=user_prompt,
|
|
|
status="pending",
|
|
|
created_at=datetime.now().isoformat(),
|
|
|
settings=settings or {}
|
|
|
)
|
|
|
|
|
|
|
|
|
if self.backend_manager:
|
|
|
self.backend_manager.save_summary_job(job)
|
|
|
else:
|
|
|
self._save_summary_job_fallback(job)
|
|
|
|
|
|
|
|
|
if files and len(files) > 0:
|
|
|
|
|
|
self.executor.submit(self._process_summary_job, job_id, files, transcript_job_ids)
|
|
|
else:
|
|
|
|
|
|
pass
|
|
|
|
|
|
return job_id
|
|
|
|
|
|
def _process_summary_job_background(self, job_id: str):
|
|
|
"""Process AI summary job from background worker"""
|
|
|
self._process_summary_job(job_id, [], [])
|
|
|
|
|
|
def _save_summary_job_fallback(self, job: SummaryJob):
|
|
|
"""Fallback method to save summary job if backend is not available"""
|
|
|
try:
|
|
|
|
|
|
os.makedirs("temp/summary_jobs", exist_ok=True)
|
|
|
job_file = f"temp/summary_jobs/{job.job_id}.json"
|
|
|
|
|
|
with open(job_file, 'w', encoding='utf-8') as f:
|
|
|
json.dump(asdict(job), f, ensure_ascii=False, indent=2, default=str)
|
|
|
|
|
|
print(f"Job saved to fallback storage: {job_file}")
|
|
|
except Exception as e:
|
|
|
print(f"Error saving job to fallback storage: {e}")
|
|
|
|
|
|
def _process_summary_job(self, job_id: str, files: List = None, transcript_job_ids: List[str] = None):
|
|
|
"""Process AI summary job using Azure OpenAI with enhanced error handling and chat storage"""
|
|
|
job = None
|
|
|
try:
|
|
|
job = self.get_summary_status(job_id)
|
|
|
if not job:
|
|
|
print(f"Job {job_id[:8]}... not found")
|
|
|
return
|
|
|
|
|
|
print(f"🤖 [{job.user_id[:8]}...] Processing AI summary job: {job_id[:8]}...")
|
|
|
|
|
|
job.status = "processing"
|
|
|
self._update_job_status(job)
|
|
|
|
|
|
|
|
|
processed_content = {
|
|
|
'transcripts': [],
|
|
|
'documents': [],
|
|
|
'images': [],
|
|
|
'extracted_frames': []
|
|
|
}
|
|
|
|
|
|
|
|
|
if transcript_job_ids:
|
|
|
processed_content['transcripts'] = self._get_existing_transcripts(transcript_job_ids, job.user_id)
|
|
|
print(f"Processed {len(processed_content['transcripts'])} existing transcripts")
|
|
|
|
|
|
|
|
|
if not transcript_job_ids and job.settings and 'transcript_job_ids' in job.settings:
|
|
|
transcript_job_ids = job.settings['transcript_job_ids']
|
|
|
processed_content['transcripts'] = self._get_existing_transcripts(transcript_job_ids, job.user_id)
|
|
|
print(f"Processed {len(processed_content['transcripts'])} transcripts from job settings")
|
|
|
|
|
|
|
|
|
if files:
|
|
|
for i, file in enumerate(files):
|
|
|
print(f"Processing file {i+1}/{len(files)}: {getattr(file, 'name', 'unknown')}")
|
|
|
file_content = self._process_uploaded_file(file, job.user_id)
|
|
|
if file_content:
|
|
|
if file_content['type'] == 'video':
|
|
|
frames = self._extract_significant_frames(file_content['path'])
|
|
|
processed_content['extracted_frames'].extend(frames)
|
|
|
print(f"Extracted {len(frames)} frames from video")
|
|
|
elif file_content['type'] == 'document':
|
|
|
processed_content['documents'].append(file_content)
|
|
|
elif file_content['type'] == 'image':
|
|
|
processed_content['images'].append(file_content)
|
|
|
|
|
|
|
|
|
image_insights = []
|
|
|
all_images = processed_content['images'] + processed_content['extracted_frames']
|
|
|
|
|
|
print(f"Analyzing {len(all_images)} images...")
|
|
|
for image_info in all_images:
|
|
|
analysis = self._analyze_image_content(image_info['path'])
|
|
|
if analysis:
|
|
|
image_insights.append({
|
|
|
'source': image_info['filename'],
|
|
|
'analysis': analysis
|
|
|
})
|
|
|
|
|
|
print(f"Analysis complete: {len(processed_content['transcripts'])} transcripts, {len(processed_content['documents'])} documents, {len(image_insights)} images")
|
|
|
|
|
|
|
|
|
optimized_transcripts, optimized_documents, optimized_images = self.token_manager.optimize_content_for_tokens(
|
|
|
processed_content['transcripts'],
|
|
|
processed_content['documents'],
|
|
|
image_insights,
|
|
|
job.user_prompt
|
|
|
)
|
|
|
|
|
|
output_language = job.settings.get('output_language', 'English') if job.settings else 'English'
|
|
|
|
|
|
|
|
|
summary_result = self._generate_ai_summary_with_openai(
|
|
|
transcripts=optimized_transcripts,
|
|
|
documents=optimized_documents,
|
|
|
image_insights=optimized_images,
|
|
|
user_prompt=job.user_prompt,
|
|
|
output_language=output_language
|
|
|
)
|
|
|
|
|
|
|
|
|
chat_url = ""
|
|
|
if self.backend_manager and hasattr(self.backend_manager.db, '_store_chat_response'):
|
|
|
try:
|
|
|
chat_url = self.backend_manager.db._store_chat_response(job_id, summary_result, job.user_id)
|
|
|
print(f"💬 Chat response stored successfully: {chat_url}")
|
|
|
except Exception as e:
|
|
|
print(f"⚠️ Warning: Could not store chat response: {e}")
|
|
|
|
|
|
|
|
|
job.status = "completed"
|
|
|
job.summary_text = summary_result
|
|
|
job.completed_at = datetime.now().isoformat()
|
|
|
job.processed_files = {
|
|
|
'transcript_count': len(processed_content['transcripts']),
|
|
|
'document_count': len(processed_content['documents']),
|
|
|
'image_count': len(all_images),
|
|
|
'extracted_frames': len(processed_content['extracted_frames'])
|
|
|
}
|
|
|
job.extracted_images = [img['filename'] for img in processed_content['extracted_frames']]
|
|
|
job.chat_response_url = chat_url
|
|
|
|
|
|
self._update_job_status(job)
|
|
|
|
|
|
print(f"✅ [{job.user_id[:8]}...] AI summary completed: {job_id[:8]}...")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ AI summary processing failed: {e}")
|
|
|
if job:
|
|
|
job.status = "failed"
|
|
|
job.error_message = str(e)
|
|
|
job.completed_at = datetime.now().isoformat()
|
|
|
self._update_job_status(job)
|
|
|
|
|
|
def _update_job_status(self, job: SummaryJob):
|
|
|
"""Update job status in backend or fallback storage"""
|
|
|
if self.backend_manager:
|
|
|
self.backend_manager.save_summary_job(job)
|
|
|
else:
|
|
|
self._save_summary_job_fallback(job)
|
|
|
|
|
|
def _generate_ai_summary_with_openai(
|
|
|
self,
|
|
|
transcripts: List[Dict],
|
|
|
documents: List[Dict],
|
|
|
image_insights: List[Dict],
|
|
|
user_prompt: str,
|
|
|
output_language: str = "English"
|
|
|
) -> str:
|
|
|
"""Generate AI summary using Azure OpenAI with proper error handling"""
|
|
|
try:
|
|
|
print("🤖 Generating AI summary with Azure OpenAI...")
|
|
|
|
|
|
|
|
|
context = self._prepare_text_content(transcripts, documents, image_insights, user_prompt, output_language)
|
|
|
|
|
|
|
|
|
final_tokens = self.token_manager.count_tokens(context)
|
|
|
print(f"Final input tokens: {final_tokens} / {self.token_manager.max_input_tokens}")
|
|
|
|
|
|
if final_tokens > self.token_manager.max_input_tokens:
|
|
|
print("Content too long, applying emergency truncation")
|
|
|
context = self.token_manager.truncate_text(context, self.token_manager.max_input_tokens)
|
|
|
|
|
|
|
|
|
url = f"{self.azure_openai_endpoint}/openai/deployments/{self.azure_openai_deployment}/chat/completions?api-version={self.azure_openai_api_version}"
|
|
|
|
|
|
headers = {
|
|
|
"Content-Type": "application/json",
|
|
|
"api-key": self.azure_openai_key
|
|
|
}
|
|
|
|
|
|
|
|
|
max_completion_tokens = 4000
|
|
|
if "gpt-4o" in self.azure_openai_deployment:
|
|
|
max_completion_tokens = 8000
|
|
|
|
|
|
data = {
|
|
|
"messages": [
|
|
|
{
|
|
|
"role": "system",
|
|
|
"content": f"You are an expert conference analyst and AI assistant. Create comprehensive, professional summaries in {output_language}. Focus on key insights, strategic decisions, action items, and important discussions. Integrate information from all sources provided (transcripts, documents, visual content). Structure your response clearly with headers and bullet points where appropriate."
|
|
|
},
|
|
|
{
|
|
|
"role": "user",
|
|
|
"content": context
|
|
|
}
|
|
|
],
|
|
|
"max_tokens": max_completion_tokens,
|
|
|
"temperature": 0.2,
|
|
|
"top_p": 0.3,
|
|
|
"frequency_penalty": 0,
|
|
|
"presence_penalty": 0
|
|
|
}
|
|
|
|
|
|
print(f"Making API request to: {url}")
|
|
|
print(f"Using model: {self.azure_openai_deployment}")
|
|
|
|
|
|
|
|
|
max_retries = 3
|
|
|
for attempt in range(max_retries):
|
|
|
try:
|
|
|
response = requests.post(
|
|
|
url,
|
|
|
headers=headers,
|
|
|
json=data,
|
|
|
timeout=300
|
|
|
)
|
|
|
|
|
|
print(f"API Response Status: {response.status_code}")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
break
|
|
|
elif response.status_code == 429:
|
|
|
|
|
|
wait_time = 2 ** attempt
|
|
|
print(f"Rate limited, waiting {wait_time} seconds...")
|
|
|
time.sleep(wait_time)
|
|
|
continue
|
|
|
else:
|
|
|
error_msg = f"Azure OpenAI API error: {response.status_code} - {response.text}"
|
|
|
print(error_msg)
|
|
|
if attempt == max_retries - 1:
|
|
|
raise Exception(error_msg)
|
|
|
time.sleep(1)
|
|
|
continue
|
|
|
|
|
|
except requests.exceptions.Timeout:
|
|
|
if attempt == max_retries - 1:
|
|
|
raise Exception("Azure OpenAI request timed out after multiple retries")
|
|
|
print(f"Request timeout, retrying... (attempt {attempt + 1})")
|
|
|
time.sleep(2)
|
|
|
continue
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
if attempt == max_retries - 1:
|
|
|
raise Exception(f"Azure OpenAI request failed: {str(e)}")
|
|
|
print(f"Request error, retrying... (attempt {attempt + 1}): {e}")
|
|
|
time.sleep(2)
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
result = response.json()
|
|
|
except json.JSONDecodeError as e:
|
|
|
raise Exception(f"Invalid JSON response from Azure OpenAI: {str(e)}")
|
|
|
|
|
|
|
|
|
if 'choices' in result and len(result['choices']) > 0:
|
|
|
choice = result['choices'][0]
|
|
|
|
|
|
if 'message' in choice and 'content' in choice['message']:
|
|
|
ai_response = choice['message']['content']
|
|
|
|
|
|
|
|
|
finish_reason = choice.get('finish_reason', '')
|
|
|
if finish_reason == 'content_filter':
|
|
|
raise Exception("Content was filtered by Azure OpenAI safety systems")
|
|
|
elif finish_reason == 'length':
|
|
|
print("Response was truncated due to length limit")
|
|
|
ai_response += "\n\n[Response was truncated due to length limit]"
|
|
|
|
|
|
print(f"✅ AI summary generated successfully in {output_language}")
|
|
|
return ai_response
|
|
|
else:
|
|
|
raise Exception(f"Unexpected response format: {result}")
|
|
|
else:
|
|
|
raise Exception(f"No response generated from Azure OpenAI: {result}")
|
|
|
|
|
|
except Exception as e:
|
|
|
error_msg = f"Azure OpenAI generation failed: {str(e)}"
|
|
|
print(error_msg)
|
|
|
raise Exception(error_msg)
|
|
|
|
|
|
def _prepare_text_content(self, transcripts, documents, image_insights, user_prompt, output_language="English"):
|
|
|
"""Prepare comprehensive text content for AI analysis"""
|
|
|
context_parts = [
|
|
|
"# Enhanced AI Conference Analysis Request",
|
|
|
f"**User Instructions:** {user_prompt}",
|
|
|
f"**Output Language:** {output_language}",
|
|
|
"",
|
|
|
"## Comprehensive Content Analysis:",
|
|
|
""
|
|
|
]
|
|
|
|
|
|
|
|
|
if transcripts:
|
|
|
context_parts.append("### 🎙️ Transcription Content:")
|
|
|
for i, transcript in enumerate(transcripts, 1):
|
|
|
context_parts.append(f"**Source {i}: {transcript['source']}**")
|
|
|
context_parts.append(transcript['content'])
|
|
|
context_parts.append("")
|
|
|
|
|
|
|
|
|
if documents:
|
|
|
context_parts.append("### 📄 Document Content:")
|
|
|
for i, doc in enumerate(documents, 1):
|
|
|
context_parts.append(f"**Document {i}: {doc['filename']}**")
|
|
|
context_parts.append(doc['content'])
|
|
|
context_parts.append("")
|
|
|
|
|
|
|
|
|
if image_insights:
|
|
|
context_parts.append("### 👁️ Visual Content Analysis:")
|
|
|
for i, img in enumerate(image_insights, 1):
|
|
|
context_parts.append(f"**Image {i}: {img['source']}**")
|
|
|
analysis = img['analysis']
|
|
|
context_parts.append(f"Description: {analysis.get('description', 'N/A')}")
|
|
|
if analysis.get('extracted_text'):
|
|
|
context_parts.append(f"Text Content: {analysis['extracted_text']}")
|
|
|
context_parts.append("")
|
|
|
|
|
|
context_parts.extend([
|
|
|
"## AI Analysis Instructions:",
|
|
|
f"Create a comprehensive, professional conference analysis in {output_language}.",
|
|
|
"Follow the user's specific instructions for format and focus areas.",
|
|
|
"Integrate information from ALL sources (transcripts, documents, visual content).",
|
|
|
"Structure your response with clear headers and organize information logically.",
|
|
|
"Highlight key insights, strategic decisions, action items, and important discussions.",
|
|
|
"Provide actionable recommendations based on the comprehensive content analysis.",
|
|
|
"Include timestamps and speaker references where relevant.",
|
|
|
"Ensure the summary is professional, accurate, and valuable for business decision-making.",
|
|
|
])
|
|
|
|
|
|
return "\n".join(context_parts)
|
|
|
|
|
|
def _process_uploaded_file(self, file, user_id: str) -> Optional[Dict]:
|
|
|
"""Process uploaded file"""
|
|
|
try:
|
|
|
if hasattr(file, 'name'):
|
|
|
file_path = file.name
|
|
|
filename = os.path.basename(file_path)
|
|
|
elif isinstance(file, str):
|
|
|
file_path = file
|
|
|
filename = os.path.basename(file_path)
|
|
|
else:
|
|
|
return None
|
|
|
|
|
|
if not os.path.exists(file_path):
|
|
|
return None
|
|
|
|
|
|
file_size = os.path.getsize(file_path)
|
|
|
if file_size > 500 * 1024 * 1024:
|
|
|
return None
|
|
|
|
|
|
ext = filename.split('.')[-1].lower() if '.' in filename else ''
|
|
|
|
|
|
if ext in ['mp4', 'mov', 'avi', 'mkv', 'webm', 'flv', '3gp', 'wmv']:
|
|
|
return {
|
|
|
'type': 'video',
|
|
|
'filename': filename,
|
|
|
'path': file_path,
|
|
|
'extension': ext,
|
|
|
'size': file_size
|
|
|
}
|
|
|
elif ext in ['jpg', 'jpeg', 'png', 'bmp', 'gif', 'tiff', 'webp']:
|
|
|
return {
|
|
|
'type': 'image',
|
|
|
'filename': filename,
|
|
|
'path': file_path,
|
|
|
'extension': ext,
|
|
|
'size': file_size
|
|
|
}
|
|
|
elif ext in ['pdf', 'docx', 'doc', 'pptx', 'ppt', 'xlsx', 'xls', 'txt', 'json', 'csv']:
|
|
|
content = self.file_processor.process_file(file_path, ext)
|
|
|
if content:
|
|
|
return {
|
|
|
'type': 'document',
|
|
|
'filename': filename,
|
|
|
'path': file_path,
|
|
|
'extension': ext,
|
|
|
'content': content,
|
|
|
'size': file_size
|
|
|
}
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing file: {e}")
|
|
|
return None
|
|
|
|
|
|
def _extract_significant_frames(self, video_path: str) -> List[Dict]:
|
|
|
"""Extract significant frames from video"""
|
|
|
try:
|
|
|
frames = self.frame_extractor.extract_frames(video_path)
|
|
|
return frames if frames else []
|
|
|
except Exception as e:
|
|
|
print(f"Frame extraction failed: {e}")
|
|
|
return []
|
|
|
|
|
|
def _analyze_image_content(self, image_path: str) -> Optional[Dict]:
|
|
|
"""Analyze image content with Computer Vision"""
|
|
|
if not self.cv_client:
|
|
|
return None
|
|
|
|
|
|
try:
|
|
|
with open(image_path, 'rb') as image_stream:
|
|
|
|
|
|
ocr_result = self.cv_client.read_in_stream(image_stream, raw=True)
|
|
|
operation_id = ocr_result.headers["Operation-Location"].split("/")[-1]
|
|
|
|
|
|
|
|
|
timeout = 30
|
|
|
start_time = time.time()
|
|
|
while True:
|
|
|
if time.time() - start_time > timeout:
|
|
|
break
|
|
|
|
|
|
read_result = self.cv_client.get_read_result(operation_id)
|
|
|
if read_result.status not in ['notStarted', 'running']:
|
|
|
break
|
|
|
time.sleep(1)
|
|
|
|
|
|
|
|
|
extracted_text = ""
|
|
|
if read_result.status == OperationStatusCodes.succeeded:
|
|
|
for text_result in read_result.analyze_result.read_results:
|
|
|
for line in text_result.lines:
|
|
|
extracted_text += line.text + "\n"
|
|
|
|
|
|
|
|
|
image_stream.seek(0)
|
|
|
description_result = self.cv_client.describe_image_in_stream(
|
|
|
image_stream,
|
|
|
max_candidates=3,
|
|
|
language='en'
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
'extracted_text': extracted_text.strip(),
|
|
|
'description': description_result.captions[0].text if description_result.captions else "",
|
|
|
'confidence': description_result.captions[0].confidence if description_result.captions else 0
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Image analysis failed: {e}")
|
|
|
return None
|
|
|
|
|
|
def _get_existing_transcripts(self, transcript_job_ids: List[str], user_id: str) -> List[Dict]:
|
|
|
"""Get existing transcripts using backend integration"""
|
|
|
transcripts = []
|
|
|
|
|
|
if self.backend_manager:
|
|
|
for job_id in transcript_job_ids:
|
|
|
try:
|
|
|
job = self.backend_manager.get_job_status(job_id)
|
|
|
if job and job.user_id == user_id and job.transcript_text:
|
|
|
transcripts.append({
|
|
|
'source': f"Previous transcript: {job.original_filename}",
|
|
|
'content': job.transcript_text
|
|
|
})
|
|
|
except Exception as e:
|
|
|
print(f"Error getting transcript {job_id[:8]}...: {e}")
|
|
|
else:
|
|
|
print("Backend manager not available, cannot retrieve existing transcripts")
|
|
|
|
|
|
return transcripts
|
|
|
|
|
|
def get_summary_status(self, job_id: str) -> Optional[SummaryJob]:
|
|
|
"""Get current summary job status using backend integration"""
|
|
|
if self.backend_manager:
|
|
|
return self.backend_manager.get_summary_job(job_id)
|
|
|
else:
|
|
|
|
|
|
try:
|
|
|
job_file = f"temp/summary_jobs/{job_id}.json"
|
|
|
if os.path.exists(job_file):
|
|
|
with open(job_file, 'r', encoding='utf-8') as f:
|
|
|
job_data = json.load(f)
|
|
|
|
|
|
|
|
|
return SummaryJob(**job_data)
|
|
|
except Exception as e:
|
|
|
print(f"Error loading job from fallback storage: {e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
def get_user_summary_history(self, user_id: str, limit: int = 20) -> List[SummaryJob]:
|
|
|
"""Get user's summary history using backend integration"""
|
|
|
if self.backend_manager:
|
|
|
return self.backend_manager.get_user_summary_history(user_id, limit)
|
|
|
else:
|
|
|
|
|
|
try:
|
|
|
history = []
|
|
|
jobs_dir = "temp/summary_jobs"
|
|
|
if os.path.exists(jobs_dir):
|
|
|
for filename in os.listdir(jobs_dir):
|
|
|
if filename.endswith('.json'):
|
|
|
job_file = os.path.join(jobs_dir, filename)
|
|
|
try:
|
|
|
with open(job_file, 'r', encoding='utf-8') as f:
|
|
|
job_data = json.load(f)
|
|
|
|
|
|
if job_data.get('user_id') == user_id:
|
|
|
history.append(SummaryJob(**job_data))
|
|
|
except Exception as e:
|
|
|
print(f"Error loading job file {filename}: {e}")
|
|
|
|
|
|
|
|
|
history.sort(key=lambda x: x.created_at, reverse=True)
|
|
|
return history[:limit]
|
|
|
except Exception as e:
|
|
|
print(f"Error getting user history from fallback storage: {e}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
ai_summary_manager = AISummaryManager() |