|
|
import os
|
|
|
import time
|
|
|
import uuid
|
|
|
import json
|
|
|
import requests
|
|
|
import subprocess
|
|
|
import asyncio
|
|
|
import threading
|
|
|
import hashlib
|
|
|
import re
|
|
|
from datetime import datetime, timedelta
|
|
|
from typing import Optional, Dict, List, Tuple
|
|
|
from dataclasses import dataclass, asdict
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
import sqlite3
|
|
|
from contextlib import contextmanager
|
|
|
from dotenv import load_dotenv
|
|
|
from azure.storage.blob import BlobServiceClient
|
|
|
import tempfile
|
|
|
import shutil
|
|
|
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
def _require_env_var(varname):
|
|
|
value = os.environ.get(varname)
|
|
|
if not value or value.strip() == "" or "your" in value.lower():
|
|
|
raise ValueError(f"Environment variable {varname} is missing or invalid. Check your .env file.")
|
|
|
return value
|
|
|
|
|
|
|
|
|
AZURE_SPEECH_KEY = _require_env_var("AZURE_SPEECH_KEY")
|
|
|
AZURE_SPEECH_KEY_ENDPOINT = _require_env_var("AZURE_SPEECH_KEY_ENDPOINT").rstrip('/')
|
|
|
AZURE_REGION = _require_env_var("AZURE_REGION")
|
|
|
AZURE_BLOB_CONNECTION = _require_env_var("AZURE_BLOB_CONNECTION")
|
|
|
AZURE_CONTAINER = _require_env_var("AZURE_CONTAINER")
|
|
|
AZURE_BLOB_SAS_TOKEN = _require_env_var("AZURE_BLOB_SAS_TOKEN")
|
|
|
ALLOWED_LANGS = json.loads(os.environ.get("ALLOWED_LANGS", "{}"))
|
|
|
API_VERSION = os.environ.get("API_VERSION", "v3.2")
|
|
|
|
|
|
|
|
|
AZURE_OPENAI_ENDPOINT = os.environ.get("AZURE_OPENAI_ENDPOINT", "")
|
|
|
AZURE_OPENAI_KEY = os.environ.get("AZURE_OPENAI_KEY", "")
|
|
|
AZURE_OPENAI_DEPLOYMENT = os.environ.get("AZURE_OPENAI_DEPLOYMENT", "gpt-4o-mini")
|
|
|
|
|
|
|
|
|
TRANSCRIPTS_CONTAINER = AZURE_CONTAINER
|
|
|
AI_SUMMARIES_CONTAINER = os.environ.get("AI_SUMMARIES_CONTAINER", f"{AZURE_CONTAINER}-summaries")
|
|
|
CHAT_RESPONSES_CONTAINER = os.environ.get("CHAT_RESPONSES_CONTAINER", f"{AZURE_CONTAINER}-chats")
|
|
|
|
|
|
|
|
|
UPLOAD_DIR = "uploads"
|
|
|
DB_DIR = "database"
|
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
|
|
os.makedirs(DB_DIR, exist_ok=True)
|
|
|
|
|
|
AUDIO_FORMATS = [
|
|
|
"wav", "mp3", "ogg", "opus", "flac", "wma", "aac", "alaw", "mulaw", "amr", "webm", "speex"
|
|
|
]
|
|
|
|
|
|
@dataclass
|
|
|
class User:
|
|
|
user_id: str
|
|
|
email: str
|
|
|
username: str
|
|
|
password_hash: str
|
|
|
created_at: str
|
|
|
last_login: Optional[str] = None
|
|
|
is_active: bool = True
|
|
|
gdpr_consent: bool = False
|
|
|
data_retention_agreed: bool = False
|
|
|
marketing_consent: bool = False
|
|
|
|
|
|
@dataclass
|
|
|
class TranscriptionJob:
|
|
|
job_id: str
|
|
|
user_id: str
|
|
|
original_filename: str
|
|
|
audio_url: str
|
|
|
language: str
|
|
|
status: str
|
|
|
created_at: str
|
|
|
completed_at: Optional[str] = None
|
|
|
transcript_text: Optional[str] = None
|
|
|
transcript_url: Optional[str] = None
|
|
|
error_message: Optional[str] = None
|
|
|
azure_trans_id: Optional[str] = None
|
|
|
settings: Optional[Dict] = None
|
|
|
|
|
|
@dataclass
|
|
|
class SummaryJob:
|
|
|
job_id: str
|
|
|
user_id: str
|
|
|
original_files: List[str]
|
|
|
summary_type: str
|
|
|
user_prompt: str
|
|
|
status: str
|
|
|
created_at: str
|
|
|
completed_at: Optional[str] = None
|
|
|
summary_text: Optional[str] = None
|
|
|
processed_files: Optional[Dict] = None
|
|
|
extracted_images: Optional[List[str]] = None
|
|
|
transcript_text: Optional[str] = None
|
|
|
error_message: Optional[str] = None
|
|
|
settings: Optional[Dict] = None
|
|
|
chat_response_url: Optional[str] = None
|
|
|
|
|
|
class AuthManager:
|
|
|
"""Handle user authentication and PDPA compliance"""
|
|
|
|
|
|
@staticmethod
|
|
|
def hash_password(password: str) -> str:
|
|
|
"""Hash password using SHA-256 with salt"""
|
|
|
salt = "azure_ai_conference_service_salt_2024"
|
|
|
return hashlib.sha256((password + salt).encode()).hexdigest()
|
|
|
|
|
|
@staticmethod
|
|
|
def verify_password(password: str, password_hash: str) -> bool:
|
|
|
"""Verify password against hash"""
|
|
|
return AuthManager.hash_password(password) == password_hash
|
|
|
|
|
|
@staticmethod
|
|
|
def validate_email(email: str) -> bool:
|
|
|
"""Validate email format"""
|
|
|
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
|
|
return re.match(pattern, email) is not None
|
|
|
|
|
|
@staticmethod
|
|
|
def validate_username(username: str) -> bool:
|
|
|
"""Validate username format"""
|
|
|
|
|
|
pattern = r'^[a-zA-Z0-9_]{3,30}$'
|
|
|
return re.match(pattern, username) is not None
|
|
|
|
|
|
@staticmethod
|
|
|
def validate_password(password: str) -> Tuple[bool, str]:
|
|
|
"""Validate password strength"""
|
|
|
if len(password) < 8:
|
|
|
return False, "Password must be at least 8 characters long"
|
|
|
if not re.search(r'[A-Z]', password):
|
|
|
return False, "Password must contain at least one uppercase letter"
|
|
|
if not re.search(r'[a-z]', password):
|
|
|
return False, "Password must contain at least one lowercase letter"
|
|
|
if not re.search(r'\d', password):
|
|
|
return False, "Password must contain at least one number"
|
|
|
return True, "Password is valid"
|
|
|
|
|
|
class DatabaseManager:
|
|
|
def __init__(self, db_path: str = None):
|
|
|
self.db_path = db_path or os.path.join(DB_DIR, "ai_conference_service.db")
|
|
|
self.blob_service = BlobServiceClient.from_connection_string(AZURE_BLOB_CONNECTION)
|
|
|
self.db_blob_name = "shared/database/ai_conference_service.db"
|
|
|
self._lock = threading.Lock()
|
|
|
self._last_backup_time = 0
|
|
|
self._backup_interval = 30
|
|
|
|
|
|
|
|
|
self.init_database()
|
|
|
|
|
|
def _download_db_from_blob(self):
|
|
|
"""Download database from Azure Blob Storage if it exists"""
|
|
|
try:
|
|
|
blob_client = self.blob_service.get_blob_client(container=TRANSCRIPTS_CONTAINER, blob=self.db_blob_name)
|
|
|
|
|
|
|
|
|
if blob_client.exists():
|
|
|
print("📥 Downloading existing shared database from Azure Blob Storage...")
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
|
|
|
temp_path = temp_file.name
|
|
|
|
|
|
|
|
|
with open(temp_path, "wb") as download_file:
|
|
|
download_file.write(blob_client.download_blob().readall())
|
|
|
|
|
|
|
|
|
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
|
|
|
shutil.move(temp_path, self.db_path)
|
|
|
|
|
|
print("✅ Shared database downloaded successfully")
|
|
|
return True
|
|
|
else:
|
|
|
print("🔍 No existing shared database found in blob storage, will create new one")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"⚠️ Warning: Could not download shared database from blob storage: {e}")
|
|
|
print("🔍 Will create new local database")
|
|
|
return False
|
|
|
|
|
|
def _upload_db_to_blob(self):
|
|
|
"""Upload database to Azure Blob Storage with rate limiting"""
|
|
|
try:
|
|
|
current_time = time.time()
|
|
|
if current_time - self._last_backup_time < self._backup_interval:
|
|
|
return
|
|
|
|
|
|
if not os.path.exists(self.db_path):
|
|
|
return
|
|
|
|
|
|
blob_client = self.blob_service.get_blob_client(container=TRANSCRIPTS_CONTAINER, blob=self.db_blob_name)
|
|
|
|
|
|
with open(self.db_path, "rb") as data:
|
|
|
blob_client.upload_blob(data, overwrite=True)
|
|
|
|
|
|
self._last_backup_time = current_time
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"⚠️ Warning: Could not upload shared database to blob storage: {e}")
|
|
|
|
|
|
def _store_chat_response(self, job_id: str, response_content: str, user_id: str) -> str:
|
|
|
"""Store AI chat response in dedicated blob container"""
|
|
|
try:
|
|
|
|
|
|
chat_blob_name = f"users/{user_id}/chats/summary_{job_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
|
|
|
|
|
|
|
|
|
temp_path = os.path.join(tempfile.gettempdir(), f"chat_response_{job_id}.txt")
|
|
|
with open(temp_path, "w", encoding="utf-8") as f:
|
|
|
f.write(response_content)
|
|
|
|
|
|
|
|
|
chat_blob_client = self.blob_service.get_blob_client(
|
|
|
container=CHAT_RESPONSES_CONTAINER,
|
|
|
blob=chat_blob_name
|
|
|
)
|
|
|
|
|
|
with open(temp_path, "rb") as data:
|
|
|
chat_blob_client.upload_blob(data, overwrite=True)
|
|
|
|
|
|
|
|
|
os.remove(temp_path)
|
|
|
|
|
|
|
|
|
sas = AZURE_BLOB_SAS_TOKEN.lstrip("?")
|
|
|
chat_url = f"{chat_blob_client.url}?{sas}"
|
|
|
|
|
|
print(f"💬 Chat response stored for user {user_id[:8]}...")
|
|
|
return chat_url
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"⚠️ Error storing chat response: {e}")
|
|
|
return ""
|
|
|
|
|
|
@contextmanager
|
|
|
def get_connection(self):
|
|
|
with self._lock:
|
|
|
conn = sqlite3.connect(self.db_path, timeout=30.0)
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
try:
|
|
|
yield conn
|
|
|
finally:
|
|
|
conn.close()
|
|
|
|
|
|
threading.Thread(target=self._upload_db_to_blob, daemon=True).start()
|
|
|
|
|
|
def init_database(self):
|
|
|
|
|
|
self._download_db_from_blob()
|
|
|
|
|
|
|
|
|
with self.get_connection() as conn:
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
|
user_id TEXT PRIMARY KEY,
|
|
|
email TEXT UNIQUE NOT NULL,
|
|
|
username TEXT UNIQUE NOT NULL,
|
|
|
password_hash TEXT NOT NULL,
|
|
|
created_at TEXT NOT NULL,
|
|
|
last_login TEXT,
|
|
|
is_active BOOLEAN DEFAULT 1,
|
|
|
gdpr_consent BOOLEAN DEFAULT 0,
|
|
|
data_retention_agreed BOOLEAN DEFAULT 0,
|
|
|
marketing_consent BOOLEAN DEFAULT 0
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS transcriptions (
|
|
|
job_id TEXT PRIMARY KEY,
|
|
|
user_id TEXT NOT NULL,
|
|
|
original_filename TEXT NOT NULL,
|
|
|
audio_url TEXT,
|
|
|
language TEXT NOT NULL,
|
|
|
status TEXT NOT NULL,
|
|
|
created_at TEXT NOT NULL,
|
|
|
completed_at TEXT,
|
|
|
transcript_text TEXT,
|
|
|
transcript_url TEXT,
|
|
|
error_message TEXT,
|
|
|
azure_trans_id TEXT,
|
|
|
settings TEXT,
|
|
|
file_size INTEGER DEFAULT 0,
|
|
|
processing_duration REAL DEFAULT 0.0,
|
|
|
FOREIGN KEY (user_id) REFERENCES users (user_id)
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS ai_summaries (
|
|
|
job_id TEXT PRIMARY KEY,
|
|
|
user_id TEXT NOT NULL,
|
|
|
original_files TEXT NOT NULL,
|
|
|
summary_type TEXT NOT NULL,
|
|
|
user_prompt TEXT NOT NULL,
|
|
|
status TEXT NOT NULL,
|
|
|
created_at TEXT NOT NULL,
|
|
|
completed_at TEXT,
|
|
|
summary_text TEXT,
|
|
|
processed_files TEXT,
|
|
|
extracted_images TEXT,
|
|
|
transcript_text TEXT,
|
|
|
error_message TEXT,
|
|
|
settings TEXT,
|
|
|
chat_response_url TEXT,
|
|
|
input_token_count INTEGER DEFAULT 0,
|
|
|
output_token_count INTEGER DEFAULT 0,
|
|
|
processing_duration REAL DEFAULT 0.0,
|
|
|
FOREIGN KEY (user_id) REFERENCES users (user_id)
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")
|
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)")
|
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_transcriptions_user_id ON transcriptions(user_id)")
|
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_transcriptions_status ON transcriptions(status)")
|
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_transcriptions_created_at ON transcriptions(created_at DESC)")
|
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_transcriptions_user_created ON transcriptions(user_id, created_at DESC)")
|
|
|
|
|
|
|
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_ai_summaries_user_id ON ai_summaries(user_id)")
|
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_ai_summaries_status ON ai_summaries(status)")
|
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_ai_summaries_created_at ON ai_summaries(created_at DESC)")
|
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_ai_summaries_user_created ON ai_summaries(user_id, created_at DESC)")
|
|
|
|
|
|
conn.commit()
|
|
|
print("✅ Enhanced database schema initialized (transcriptions + AI summaries)")
|
|
|
|
|
|
|
|
|
def create_user(self, email: str, username: str, password: str, gdpr_consent: bool = True,
|
|
|
data_retention_agreed: bool = True, marketing_consent: bool = False) -> Tuple[bool, str, Optional[str]]:
|
|
|
"""Create new user account"""
|
|
|
try:
|
|
|
|
|
|
if not AuthManager.validate_email(email):
|
|
|
return False, "Invalid email format", None
|
|
|
|
|
|
if not AuthManager.validate_username(username):
|
|
|
return False, "Username must be 3-30 characters, alphanumeric and underscore only", None
|
|
|
|
|
|
is_valid, message = AuthManager.validate_password(password)
|
|
|
if not is_valid:
|
|
|
return False, message, None
|
|
|
|
|
|
if not gdpr_consent:
|
|
|
return False, "GDPR consent is required to create an account", None
|
|
|
|
|
|
if not data_retention_agreed:
|
|
|
return False, "Data retention agreement is required", None
|
|
|
|
|
|
user_id = str(uuid.uuid4())
|
|
|
password_hash = AuthManager.hash_password(password)
|
|
|
|
|
|
with self.get_connection() as conn:
|
|
|
|
|
|
existing = conn.execute(
|
|
|
"SELECT email, username FROM users WHERE email = ? OR username = ?",
|
|
|
(email, username)
|
|
|
).fetchone()
|
|
|
|
|
|
if existing:
|
|
|
if existing['email'] == email:
|
|
|
return False, "Email already registered", None
|
|
|
else:
|
|
|
return False, "Username already taken", None
|
|
|
|
|
|
|
|
|
user = User(
|
|
|
user_id=user_id,
|
|
|
email=email,
|
|
|
username=username,
|
|
|
password_hash=password_hash,
|
|
|
created_at=datetime.now().isoformat(),
|
|
|
gdpr_consent=gdpr_consent,
|
|
|
data_retention_agreed=data_retention_agreed,
|
|
|
marketing_consent=marketing_consent
|
|
|
)
|
|
|
|
|
|
conn.execute("""
|
|
|
INSERT INTO users
|
|
|
(user_id, email, username, password_hash, created_at, is_active,
|
|
|
gdpr_consent, data_retention_agreed, marketing_consent)
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
|
""", (
|
|
|
user.user_id, user.email, user.username, user.password_hash,
|
|
|
user.created_at, user.is_active, user.gdpr_consent,
|
|
|
user.data_retention_agreed, user.marketing_consent
|
|
|
))
|
|
|
conn.commit()
|
|
|
|
|
|
print(f"👤 New user registered: {username} ({email})")
|
|
|
return True, "Account created successfully", user_id
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ Error creating user: {str(e)}")
|
|
|
return False, f"Registration failed: {str(e)}", None
|
|
|
|
|
|
def authenticate_user(self, login: str, password: str) -> Tuple[bool, str, Optional[User]]:
|
|
|
"""Authenticate user by email or username"""
|
|
|
try:
|
|
|
with self.get_connection() as conn:
|
|
|
|
|
|
user_row = conn.execute("""
|
|
|
SELECT * FROM users
|
|
|
WHERE (email = ? OR username = ?) AND is_active = 1
|
|
|
""", (login, login)).fetchone()
|
|
|
|
|
|
if not user_row:
|
|
|
return False, "Invalid credentials", None
|
|
|
|
|
|
|
|
|
if not AuthManager.verify_password(password, user_row['password_hash']):
|
|
|
return False, "Invalid credentials", None
|
|
|
|
|
|
|
|
|
conn.execute(
|
|
|
"UPDATE users SET last_login = ? WHERE user_id = ?",
|
|
|
(datetime.now().isoformat(), user_row['user_id'])
|
|
|
)
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
user = User(
|
|
|
user_id=user_row['user_id'],
|
|
|
email=user_row['email'],
|
|
|
username=user_row['username'],
|
|
|
password_hash=user_row['password_hash'],
|
|
|
created_at=user_row['created_at'],
|
|
|
last_login=datetime.now().isoformat(),
|
|
|
is_active=bool(user_row['is_active']),
|
|
|
gdpr_consent=bool(user_row['gdpr_consent']),
|
|
|
data_retention_agreed=bool(user_row['data_retention_agreed']),
|
|
|
marketing_consent=bool(user_row['marketing_consent'])
|
|
|
)
|
|
|
|
|
|
print(f"🔐 User logged in: {user.username} ({user.email})")
|
|
|
return True, "Login successful", user
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ Authentication error: {str(e)}")
|
|
|
return False, f"Login failed: {str(e)}", None
|
|
|
|
|
|
def get_user_by_id(self, user_id: str) -> Optional[User]:
|
|
|
"""Get user by ID"""
|
|
|
try:
|
|
|
with self.get_connection() as conn:
|
|
|
user_row = conn.execute(
|
|
|
"SELECT * FROM users WHERE user_id = ? AND is_active = 1",
|
|
|
(user_id,)
|
|
|
).fetchone()
|
|
|
|
|
|
if user_row:
|
|
|
return User(
|
|
|
user_id=user_row['user_id'],
|
|
|
email=user_row['email'],
|
|
|
username=user_row['username'],
|
|
|
password_hash=user_row['password_hash'],
|
|
|
created_at=user_row['created_at'],
|
|
|
last_login=user_row['last_login'],
|
|
|
is_active=bool(user_row['is_active']),
|
|
|
gdpr_consent=bool(user_row['gdpr_consent']),
|
|
|
data_retention_agreed=bool(user_row['data_retention_agreed']),
|
|
|
marketing_consent=bool(user_row['marketing_consent'])
|
|
|
)
|
|
|
except Exception as e:
|
|
|
print(f"❌ Error getting user: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
def update_user_consent(self, user_id: str, marketing_consent: bool) -> bool:
|
|
|
"""Update user marketing consent"""
|
|
|
try:
|
|
|
with self.get_connection() as conn:
|
|
|
conn.execute(
|
|
|
"UPDATE users SET marketing_consent = ? WHERE user_id = ?",
|
|
|
(marketing_consent, user_id)
|
|
|
)
|
|
|
conn.commit()
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
print(f"❌ Error updating consent: {str(e)}")
|
|
|
return False
|
|
|
|
|
|
def delete_user_account(self, user_id: str) -> bool:
|
|
|
"""Delete user account and all associated data (GDPR compliance)"""
|
|
|
try:
|
|
|
with self.get_connection() as conn:
|
|
|
|
|
|
conn.execute("DELETE FROM transcriptions WHERE user_id = ?", (user_id,))
|
|
|
|
|
|
conn.execute("DELETE FROM ai_summaries WHERE user_id = ?", (user_id,))
|
|
|
|
|
|
conn.execute(
|
|
|
"UPDATE users SET is_active = 0, email = ?, username = ? WHERE user_id = ?",
|
|
|
(f"deleted_{user_id}@deleted.com", f"deleted_{user_id}", user_id)
|
|
|
)
|
|
|
conn.commit()
|
|
|
print(f"🗑️ Complete user account deleted: {user_id}")
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
print(f"❌ Error deleting user account: {str(e)}")
|
|
|
return False
|
|
|
|
|
|
def delete_user_summary_data(self, user_id: str) -> bool:
|
|
|
"""Delete user's AI summary data specifically"""
|
|
|
try:
|
|
|
with self.get_connection() as conn:
|
|
|
conn.execute("DELETE FROM ai_summaries WHERE user_id = ?", (user_id,))
|
|
|
conn.commit()
|
|
|
print(f"🗑️ User AI summary data deleted: {user_id}")
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
print(f"❌ Error deleting user AI summary data: {str(e)}")
|
|
|
return False
|
|
|
|
|
|
|
|
|
def save_job(self, job: TranscriptionJob):
|
|
|
with self.get_connection() as conn:
|
|
|
conn.execute("""
|
|
|
INSERT OR REPLACE INTO transcriptions
|
|
|
(job_id, user_id, original_filename, audio_url, language, status,
|
|
|
created_at, completed_at, transcript_text, transcript_url, error_message,
|
|
|
azure_trans_id, settings, file_size, processing_duration)
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
|
""", (
|
|
|
job.job_id, job.user_id, job.original_filename, job.audio_url,
|
|
|
job.language, job.status, job.created_at, job.completed_at,
|
|
|
job.transcript_text, job.transcript_url, job.error_message,
|
|
|
job.azure_trans_id, json.dumps(job.settings) if job.settings else None,
|
|
|
0, 0.0
|
|
|
))
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
def save_summary_job(self, job: SummaryJob):
|
|
|
"""Save or update AI summary job"""
|
|
|
with self.get_connection() as conn:
|
|
|
conn.execute("""
|
|
|
INSERT OR REPLACE INTO ai_summaries
|
|
|
(job_id, user_id, original_files, summary_type, user_prompt, status,
|
|
|
created_at, completed_at, summary_text, processed_files, extracted_images,
|
|
|
transcript_text, error_message, settings, chat_response_url,
|
|
|
input_token_count, output_token_count, processing_duration)
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
|
""", (
|
|
|
job.job_id, job.user_id, json.dumps(job.original_files), job.summary_type,
|
|
|
job.user_prompt, job.status, job.created_at, job.completed_at,
|
|
|
job.summary_text, json.dumps(job.processed_files) if job.processed_files else None,
|
|
|
json.dumps(job.extracted_images) if job.extracted_images else None,
|
|
|
job.transcript_text, job.error_message,
|
|
|
json.dumps(job.settings) if job.settings else None,
|
|
|
job.chat_response_url, 0, 0, 0.0
|
|
|
))
|
|
|
conn.commit()
|
|
|
|
|
|
def get_summary_job(self, job_id: str) -> Optional[SummaryJob]:
|
|
|
"""Get AI summary job by ID"""
|
|
|
with self.get_connection() as conn:
|
|
|
row = conn.execute(
|
|
|
"SELECT * FROM ai_summaries WHERE job_id = ?", (job_id,)
|
|
|
).fetchone()
|
|
|
if row:
|
|
|
return self._row_to_summary_job(row)
|
|
|
return None
|
|
|
|
|
|
def get_user_summary_jobs(self, user_id: str, limit: int = 50) -> List[SummaryJob]:
|
|
|
"""Get AI summary jobs for a specific user"""
|
|
|
with self.get_connection() as conn:
|
|
|
rows = conn.execute("""
|
|
|
SELECT * FROM ai_summaries
|
|
|
WHERE user_id = ?
|
|
|
ORDER BY created_at DESC
|
|
|
LIMIT ?
|
|
|
""", (user_id, limit)).fetchall()
|
|
|
return [self._row_to_summary_job(row) for row in rows]
|
|
|
|
|
|
def _row_to_summary_job(self, row) -> SummaryJob:
|
|
|
"""Convert database row to SummaryJob object"""
|
|
|
return SummaryJob(
|
|
|
job_id=row['job_id'],
|
|
|
user_id=row['user_id'],
|
|
|
original_files=json.loads(row['original_files']) if row['original_files'] else [],
|
|
|
summary_type=row['summary_type'],
|
|
|
user_prompt=row['user_prompt'],
|
|
|
status=row['status'],
|
|
|
created_at=row['created_at'],
|
|
|
completed_at=row['completed_at'],
|
|
|
summary_text=row['summary_text'],
|
|
|
processed_files=json.loads(row['processed_files']) if row['processed_files'] else None,
|
|
|
extracted_images=json.loads(row['extracted_images']) if row['extracted_images'] else None,
|
|
|
transcript_text=row['transcript_text'],
|
|
|
error_message=row['error_message'],
|
|
|
settings=json.loads(row['settings']) if row['settings'] else None,
|
|
|
chat_response_url=row['chat_response_url']
|
|
|
)
|
|
|
|
|
|
def get_job(self, job_id: str) -> Optional[TranscriptionJob]:
|
|
|
with self.get_connection() as conn:
|
|
|
row = conn.execute(
|
|
|
"SELECT * FROM transcriptions WHERE job_id = ?", (job_id,)
|
|
|
).fetchone()
|
|
|
if row:
|
|
|
return self._row_to_job(row)
|
|
|
return None
|
|
|
|
|
|
def get_user_jobs(self, user_id: str, limit: int = 50) -> List[TranscriptionJob]:
|
|
|
"""Get all transcription jobs for a specific user - PDPA compliant"""
|
|
|
with self.get_connection() as conn:
|
|
|
rows = conn.execute("""
|
|
|
SELECT * FROM transcriptions
|
|
|
WHERE user_id = ?
|
|
|
ORDER BY created_at DESC
|
|
|
LIMIT ?
|
|
|
""", (user_id, limit)).fetchall()
|
|
|
return [self._row_to_job(row) for row in rows]
|
|
|
|
|
|
def get_all_jobs(self, limit: int = 100) -> List[TranscriptionJob]:
|
|
|
"""Get all transcription jobs across all users (for admin/global view)"""
|
|
|
with self.get_connection() as conn:
|
|
|
rows = conn.execute("""
|
|
|
SELECT * FROM transcriptions
|
|
|
ORDER BY created_at DESC
|
|
|
LIMIT ?
|
|
|
""", (limit,)).fetchall()
|
|
|
return [self._row_to_job(row) for row in rows]
|
|
|
|
|
|
def get_pending_jobs(self) -> List[TranscriptionJob]:
|
|
|
"""Get pending transcription jobs across all users for background processing"""
|
|
|
with self.get_connection() as conn:
|
|
|
rows = conn.execute(
|
|
|
"SELECT * FROM transcriptions WHERE status IN ('pending', 'processing')"
|
|
|
).fetchall()
|
|
|
return [self._row_to_job(row) for row in rows]
|
|
|
|
|
|
def get_pending_summary_jobs(self) -> List[SummaryJob]:
|
|
|
"""Get pending AI summary jobs for background processing"""
|
|
|
with self.get_connection() as conn:
|
|
|
rows = conn.execute(
|
|
|
"SELECT * FROM ai_summaries WHERE status IN ('pending', 'processing')"
|
|
|
).fetchall()
|
|
|
return [self._row_to_summary_job(row) for row in rows]
|
|
|
|
|
|
def get_user_stats(self, user_id: str) -> Dict:
|
|
|
"""Get comprehensive statistics for a specific user (transcriptions)"""
|
|
|
with self.get_connection() as conn:
|
|
|
stats = {}
|
|
|
|
|
|
|
|
|
result = conn.execute("""
|
|
|
SELECT COUNT(*) FROM transcriptions WHERE user_id = ?
|
|
|
""", (user_id,)).fetchone()
|
|
|
stats['total_jobs'] = result[0] if result else 0
|
|
|
|
|
|
|
|
|
result = conn.execute("""
|
|
|
SELECT status, COUNT(*) FROM transcriptions
|
|
|
WHERE user_id = ?
|
|
|
GROUP BY status
|
|
|
""", (user_id,)).fetchall()
|
|
|
stats['by_status'] = {row[0]: row[1] for row in result}
|
|
|
|
|
|
|
|
|
week_ago = (datetime.now() - timedelta(days=7)).isoformat()
|
|
|
result = conn.execute("""
|
|
|
SELECT COUNT(*) FROM transcriptions
|
|
|
WHERE user_id = ? AND created_at >= ?
|
|
|
""", (user_id, week_ago)).fetchone()
|
|
|
stats['recent_jobs'] = result[0] if result else 0
|
|
|
|
|
|
return stats
|
|
|
|
|
|
def get_user_summary_stats(self, user_id: str) -> Dict:
|
|
|
"""Get comprehensive statistics for a specific user (AI summaries)"""
|
|
|
with self.get_connection() as conn:
|
|
|
stats = {}
|
|
|
|
|
|
|
|
|
result = conn.execute("""
|
|
|
SELECT COUNT(*) FROM ai_summaries WHERE user_id = ?
|
|
|
""", (user_id,)).fetchone()
|
|
|
stats['total_jobs'] = result[0] if result else 0
|
|
|
|
|
|
|
|
|
result = conn.execute("""
|
|
|
SELECT status, COUNT(*) FROM ai_summaries
|
|
|
WHERE user_id = ?
|
|
|
GROUP BY status
|
|
|
""", (user_id,)).fetchall()
|
|
|
stats['by_status'] = {row[0]: row[1] for row in result}
|
|
|
|
|
|
|
|
|
week_ago = (datetime.now() - timedelta(days=7)).isoformat()
|
|
|
result = conn.execute("""
|
|
|
SELECT COUNT(*) FROM ai_summaries
|
|
|
WHERE user_id = ? AND created_at >= ?
|
|
|
""", (user_id, week_ago)).fetchone()
|
|
|
stats['recent_jobs'] = result[0] if result else 0
|
|
|
|
|
|
return stats
|
|
|
|
|
|
def export_user_data(self, user_id: str) -> Dict:
|
|
|
"""Export comprehensive user data including AI summaries"""
|
|
|
try:
|
|
|
with self.get_connection() as conn:
|
|
|
|
|
|
user_row = conn.execute(
|
|
|
"SELECT * FROM users WHERE user_id = ?", (user_id,)
|
|
|
).fetchone()
|
|
|
|
|
|
|
|
|
transcription_rows = conn.execute(
|
|
|
"SELECT * FROM transcriptions WHERE user_id = ?", (user_id,)
|
|
|
).fetchall()
|
|
|
|
|
|
|
|
|
summary_rows = conn.execute(
|
|
|
"SELECT * FROM ai_summaries WHERE user_id = ?", (user_id,)
|
|
|
).fetchall()
|
|
|
|
|
|
export_data = {
|
|
|
"export_date": datetime.now().isoformat(),
|
|
|
"export_type": "comprehensive_ai_conference_service",
|
|
|
"user_info": dict(user_row) if user_row else {},
|
|
|
"transcriptions": [dict(row) for row in transcription_rows],
|
|
|
"ai_summaries": [dict(row) for row in summary_rows],
|
|
|
"transcription_statistics": self.get_user_stats(user_id),
|
|
|
"ai_summary_statistics": self.get_user_summary_stats(user_id),
|
|
|
"service_features": [
|
|
|
"speech_transcription",
|
|
|
"ai_summarization",
|
|
|
"computer_vision",
|
|
|
"multi_modal_analysis"
|
|
|
]
|
|
|
}
|
|
|
|
|
|
return export_data
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ Error exporting comprehensive user data: {str(e)}")
|
|
|
return {}
|
|
|
|
|
|
def _row_to_job(self, row) -> TranscriptionJob:
|
|
|
settings = json.loads(row['settings']) if row['settings'] else None
|
|
|
return TranscriptionJob(
|
|
|
job_id=row['job_id'],
|
|
|
user_id=row['user_id'],
|
|
|
original_filename=row['original_filename'],
|
|
|
audio_url=row['audio_url'],
|
|
|
language=row['language'],
|
|
|
status=row['status'],
|
|
|
created_at=row['created_at'],
|
|
|
completed_at=row['completed_at'],
|
|
|
transcript_text=row['transcript_text'],
|
|
|
transcript_url=row['transcript_url'],
|
|
|
error_message=row['error_message'],
|
|
|
azure_trans_id=row['azure_trans_id'],
|
|
|
settings=settings
|
|
|
)
|
|
|
|
|
|
class TranscriptionManager:
|
|
|
def __init__(self):
|
|
|
self.db = DatabaseManager()
|
|
|
self.executor = ThreadPoolExecutor(max_workers=5)
|
|
|
self.blob_service = BlobServiceClient.from_connection_string(AZURE_BLOB_CONNECTION)
|
|
|
self._job_status_cache = {}
|
|
|
|
|
|
|
|
|
self.running = True
|
|
|
self.worker_thread = threading.Thread(target=self._background_worker, daemon=True)
|
|
|
self.worker_thread.start()
|
|
|
|
|
|
print("✅ Enhanced Transcription Manager initialized with AI integration")
|
|
|
|
|
|
def _log_status_change(self, job_id: str, old_status: str, new_status: str, filename: str = "", user_id: str = ""):
|
|
|
"""Only log when status actually changes"""
|
|
|
cache_key = f"{job_id}_{old_status}_{new_status}"
|
|
|
if cache_key not in self._job_status_cache:
|
|
|
self._job_status_cache[cache_key] = True
|
|
|
user_display = f"[{user_id[:8]}...]" if user_id else ""
|
|
|
if filename:
|
|
|
print(f"🔄 {user_display} Job {job_id[:8]}... ({filename}): {old_status} → {new_status}")
|
|
|
else:
|
|
|
print(f"🔄 {user_display} Job {job_id[:8]}...: {old_status} → {new_status}")
|
|
|
|
|
|
def _background_worker(self):
|
|
|
"""Enhanced background worker to process both transcriptions and AI summaries"""
|
|
|
iteration_count = 0
|
|
|
while self.running:
|
|
|
try:
|
|
|
|
|
|
pending_transcription_jobs = self.db.get_pending_jobs()
|
|
|
pending_summary_jobs = self.db.get_pending_summary_jobs()
|
|
|
|
|
|
|
|
|
if (pending_transcription_jobs or pending_summary_jobs) and iteration_count % 6 == 0:
|
|
|
active_transcripts = len([j for j in pending_transcription_jobs if j.status == 'processing'])
|
|
|
queued_transcripts = len([j for j in pending_transcription_jobs if j.status == 'pending'])
|
|
|
active_summaries = len([j for j in pending_summary_jobs if j.status == 'processing'])
|
|
|
queued_summaries = len([j for j in pending_summary_jobs if j.status == 'pending'])
|
|
|
|
|
|
if any([active_transcripts, queued_transcripts, active_summaries, queued_summaries]):
|
|
|
print(f"📊 Background worker: 🎙️ {active_transcripts} transcribing, {queued_transcripts} queued | 🤖 {active_summaries} summarizing, {queued_summaries} queued")
|
|
|
|
|
|
|
|
|
for job in pending_transcription_jobs:
|
|
|
if job.status == 'pending':
|
|
|
self.executor.submit(self._process_transcription_job, job.job_id)
|
|
|
elif job.status == 'processing' and job.azure_trans_id:
|
|
|
self.executor.submit(self._check_transcription_status, job.job_id)
|
|
|
|
|
|
|
|
|
|
|
|
time.sleep(10)
|
|
|
iteration_count += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ Background worker error: {e}")
|
|
|
time.sleep(30)
|
|
|
|
|
|
def submit_transcription(
|
|
|
self,
|
|
|
file_bytes: bytes,
|
|
|
original_filename: str,
|
|
|
user_id: str,
|
|
|
language: str,
|
|
|
settings: Dict
|
|
|
) -> str:
|
|
|
"""Submit a new transcription job for authenticated user"""
|
|
|
job_id = str(uuid.uuid4())
|
|
|
|
|
|
print(f"🚀 [{user_id[:8]}...] New transcription: {original_filename} ({len(file_bytes):,} bytes)")
|
|
|
|
|
|
|
|
|
job = TranscriptionJob(
|
|
|
job_id=job_id,
|
|
|
user_id=user_id,
|
|
|
original_filename=original_filename,
|
|
|
audio_url="",
|
|
|
language=language,
|
|
|
status="pending",
|
|
|
created_at=datetime.now().isoformat(),
|
|
|
settings=settings
|
|
|
)
|
|
|
|
|
|
|
|
|
self.db.save_job(job)
|
|
|
|
|
|
|
|
|
self.executor.submit(self._prepare_audio_file, job_id, file_bytes, original_filename, settings)
|
|
|
|
|
|
return job_id
|
|
|
|
|
|
def _prepare_audio_file(self, job_id: str, file_bytes: bytes, original_filename: str, settings: Dict):
|
|
|
"""Prepare audio file and upload to blob storage with user-specific paths"""
|
|
|
try:
|
|
|
job = self.db.get_job(job_id)
|
|
|
if not job:
|
|
|
return
|
|
|
|
|
|
user_id = job.user_id
|
|
|
|
|
|
|
|
|
src_ext = original_filename.split('.')[-1].lower() if '.' in original_filename else "bin"
|
|
|
upload_path = os.path.join(UPLOAD_DIR, f"{job_id}_original.{src_ext}")
|
|
|
|
|
|
with open(upload_path, "wb") as f:
|
|
|
f.write(file_bytes)
|
|
|
|
|
|
|
|
|
audio_format = settings.get('audio_format', 'wav')
|
|
|
|
|
|
|
|
|
if src_ext == audio_format and audio_format == 'wav':
|
|
|
|
|
|
try:
|
|
|
probe_cmd = [
|
|
|
'ffprobe', '-v', 'quiet', '-print_format', 'json',
|
|
|
'-show_streams', upload_path
|
|
|
]
|
|
|
result = subprocess.run(probe_cmd, capture_output=True, text=True, timeout=30)
|
|
|
|
|
|
if result.returncode == 0:
|
|
|
import json
|
|
|
probe_data = json.loads(result.stdout)
|
|
|
audio_stream = probe_data.get('streams', [{}])[0]
|
|
|
|
|
|
sample_rate = int(audio_stream.get('sample_rate', 0))
|
|
|
channels = int(audio_stream.get('channels', 0))
|
|
|
|
|
|
|
|
|
if sample_rate == 16000 and channels == 1:
|
|
|
out_path = upload_path
|
|
|
else:
|
|
|
print(f"🔄 [{user_id[:8]}...] Converting {original_filename} to 16kHz mono")
|
|
|
out_path = os.path.join(UPLOAD_DIR, f"{job_id}_converted.{audio_format}")
|
|
|
self._convert_to_audio(upload_path, out_path, audio_format)
|
|
|
else:
|
|
|
out_path = os.path.join(UPLOAD_DIR, f"{job_id}_converted.{audio_format}")
|
|
|
self._convert_to_audio(upload_path, out_path, audio_format)
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"⚠️ [{user_id[:8]}...] Audio probing failed for {original_filename}: {e}")
|
|
|
out_path = os.path.join(UPLOAD_DIR, f"{job_id}_converted.{audio_format}")
|
|
|
self._convert_to_audio(upload_path, out_path, audio_format)
|
|
|
else:
|
|
|
|
|
|
print(f"🔄 [{user_id[:8]}...] Converting {original_filename}: {src_ext} → {audio_format}")
|
|
|
out_path = os.path.join(UPLOAD_DIR, f"{job_id}_converted.{audio_format}")
|
|
|
|
|
|
try:
|
|
|
self._convert_to_audio(upload_path, out_path, audio_format)
|
|
|
except Exception as e:
|
|
|
print(f"❌ [{user_id[:8]}...] Audio conversion failed for {original_filename}: {str(e)}")
|
|
|
job.status = "failed"
|
|
|
job.error_message = f"Audio conversion failed: {str(e)}"
|
|
|
job.completed_at = datetime.now().isoformat()
|
|
|
self.db.save_job(job)
|
|
|
|
|
|
|
|
|
try:
|
|
|
os.remove(upload_path)
|
|
|
except:
|
|
|
pass
|
|
|
return
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
final_audio_name = f"users/{user_id}/audio/{job_id}.{audio_format}"
|
|
|
audio_url = self._upload_blob(out_path, final_audio_name, TRANSCRIPTS_CONTAINER)
|
|
|
|
|
|
|
|
|
if out_path != upload_path:
|
|
|
orig_blob_name = f"users/{user_id}/originals/{job_id}_{original_filename}"
|
|
|
self._upload_blob(upload_path, orig_blob_name, TRANSCRIPTS_CONTAINER)
|
|
|
else:
|
|
|
|
|
|
orig_blob_name = f"users/{user_id}/originals/{job_id}_{original_filename}"
|
|
|
self._upload_blob(upload_path, orig_blob_name, TRANSCRIPTS_CONTAINER)
|
|
|
|
|
|
print(f"☁️ [{user_id[:8]}...] {original_filename} uploaded to user-specific blob storage")
|
|
|
|
|
|
|
|
|
job.audio_url = audio_url
|
|
|
job.status = "pending"
|
|
|
self.db.save_job(job)
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ [{user_id[:8]}...] Blob upload failed for {original_filename}: {str(e)}")
|
|
|
job.status = "failed"
|
|
|
job.error_message = f"Blob storage upload failed: {str(e)}"
|
|
|
job.completed_at = datetime.now().isoformat()
|
|
|
self.db.save_job(job)
|
|
|
|
|
|
|
|
|
try:
|
|
|
if os.path.exists(upload_path):
|
|
|
os.remove(upload_path)
|
|
|
if out_path != upload_path and os.path.exists(out_path):
|
|
|
os.remove(out_path)
|
|
|
except Exception as e:
|
|
|
print(f"⚠️ [{user_id[:8]}...] Warning: Could not clean up local files for {original_filename}: {e}")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ File preparation error for {original_filename}: {e}")
|
|
|
job = self.db.get_job(job_id)
|
|
|
if job:
|
|
|
job.status = "failed"
|
|
|
job.error_message = f"File preparation failed: {str(e)}"
|
|
|
job.completed_at = datetime.now().isoformat()
|
|
|
self.db.save_job(job)
|
|
|
|
|
|
def _process_transcription_job(self, job_id: str):
|
|
|
"""Process a transcription job"""
|
|
|
try:
|
|
|
job = self.db.get_job(job_id)
|
|
|
if not job or job.status != 'pending' or not job.audio_url:
|
|
|
return
|
|
|
|
|
|
user_id = job.user_id
|
|
|
|
|
|
old_status = job.status
|
|
|
|
|
|
job.status = "processing"
|
|
|
self.db.save_job(job)
|
|
|
|
|
|
self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
|
|
|
|
|
|
|
|
settings = job.settings or {}
|
|
|
azure_trans_id = self._create_transcription(
|
|
|
job.audio_url,
|
|
|
job.language,
|
|
|
settings.get('diarization_enabled', False),
|
|
|
settings.get('speakers', 2),
|
|
|
settings.get('profanity', 'masked'),
|
|
|
settings.get('punctuation', 'automatic'),
|
|
|
settings.get('timestamps', True),
|
|
|
settings.get('lexical', False),
|
|
|
settings.get('language_id_enabled', False),
|
|
|
settings.get('candidate_locales', None)
|
|
|
)
|
|
|
|
|
|
|
|
|
job.azure_trans_id = azure_trans_id
|
|
|
self.db.save_job(job)
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ Transcription submission failed for job {job_id[:8]}...: {str(e)}")
|
|
|
job = self.db.get_job(job_id)
|
|
|
if job:
|
|
|
old_status = job.status
|
|
|
job.status = "failed"
|
|
|
job.error_message = f"Transcription submission failed: {str(e)}"
|
|
|
job.completed_at = datetime.now().isoformat()
|
|
|
self.db.save_job(job)
|
|
|
self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
|
|
|
|
|
def _check_transcription_status(self, job_id: str):
|
|
|
"""Check status of Azure transcription"""
|
|
|
try:
|
|
|
job = self.db.get_job(job_id)
|
|
|
if not job or job.status != 'processing' or not job.azure_trans_id:
|
|
|
return
|
|
|
|
|
|
|
|
|
url = f"{AZURE_SPEECH_KEY_ENDPOINT}/speechtotext/{API_VERSION}/transcriptions/{job.azure_trans_id}"
|
|
|
headers = {"Ocp-Apim-Subscription-Key": AZURE_SPEECH_KEY}
|
|
|
|
|
|
r = requests.get(url, headers=headers)
|
|
|
data = r.json()
|
|
|
|
|
|
if data.get("status") == "Succeeded":
|
|
|
|
|
|
content_url = self._get_transcription_result_url(job.azure_trans_id)
|
|
|
if content_url:
|
|
|
transcript = self._fetch_transcript(content_url)
|
|
|
|
|
|
|
|
|
transcript_blob_name = f"users/{job.user_id}/transcripts/{job_id}.txt"
|
|
|
transcript_path = os.path.join(UPLOAD_DIR, f"{job_id}_transcript.txt")
|
|
|
|
|
|
with open(transcript_path, "w", encoding="utf-8") as f:
|
|
|
f.write(transcript)
|
|
|
|
|
|
transcript_url = self._upload_blob(transcript_path, transcript_blob_name, TRANSCRIPTS_CONTAINER)
|
|
|
|
|
|
|
|
|
old_status = job.status
|
|
|
job.status = "completed"
|
|
|
job.transcript_text = transcript
|
|
|
job.transcript_url = transcript_url
|
|
|
job.completed_at = datetime.now().isoformat()
|
|
|
self.db.save_job(job)
|
|
|
|
|
|
self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
|
|
print(f"✅ [{job.user_id[:8]}...] Transcription completed: {job.original_filename}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
os.remove(transcript_path)
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
elif data.get("status") in ("Failed", "FailedWithPartialResults"):
|
|
|
error_message = ""
|
|
|
if "properties" in data and "error" in data["properties"]:
|
|
|
error_message = data["properties"]["error"].get("message", "")
|
|
|
elif "error" in data:
|
|
|
error_message = data["error"].get("message", "")
|
|
|
|
|
|
old_status = job.status
|
|
|
job.status = "failed"
|
|
|
job.error_message = f"Azure transcription failed: {error_message}"
|
|
|
job.completed_at = datetime.now().isoformat()
|
|
|
self.db.save_job(job)
|
|
|
|
|
|
self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
|
|
print(f"❌ [{job.user_id[:8]}...] Transcription failed: {job.original_filename} - {error_message}")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ Status check failed for job {job_id[:8]}...: {str(e)}")
|
|
|
job = self.db.get_job(job_id)
|
|
|
if job:
|
|
|
old_status = job.status
|
|
|
job.status = "failed"
|
|
|
job.error_message = f"Status check failed: {str(e)}"
|
|
|
job.completed_at = datetime.now().isoformat()
|
|
|
self.db.save_job(job)
|
|
|
self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
|
|
|
|
|
def get_job_status(self, job_id: str) -> Optional[TranscriptionJob]:
|
|
|
"""Get current transcription job status"""
|
|
|
return self.db.get_job(job_id)
|
|
|
|
|
|
def get_user_history(self, user_id: str, limit: int = 50) -> List[TranscriptionJob]:
|
|
|
"""Get user's transcription history - PDPA compliant"""
|
|
|
return self.db.get_user_jobs(user_id, limit)
|
|
|
|
|
|
def get_all_history(self, limit: int = 100) -> List[TranscriptionJob]:
|
|
|
"""Get all transcription history across all users (admin view)"""
|
|
|
return self.db.get_all_jobs(limit)
|
|
|
|
|
|
def get_user_stats(self, user_id: str) -> Dict:
|
|
|
"""Get user transcription statistics"""
|
|
|
return self.db.get_user_stats(user_id)
|
|
|
|
|
|
def get_user_summary_stats(self, user_id: str) -> Dict:
|
|
|
"""Get user AI summary statistics"""
|
|
|
return self.db.get_user_summary_stats(user_id)
|
|
|
|
|
|
def download_transcript(self, job_id: str, user_id: str) -> Optional[str]:
|
|
|
"""Download transcript content - with user verification for PDPA compliance"""
|
|
|
job = self.db.get_job(job_id)
|
|
|
if job and job.user_id == user_id and job.transcript_text:
|
|
|
return job.transcript_text
|
|
|
return None
|
|
|
|
|
|
|
|
|
def save_summary_job(self, job: SummaryJob):
|
|
|
"""Save AI summary job to database"""
|
|
|
self.db.save_summary_job(job)
|
|
|
|
|
|
def get_summary_job(self, job_id: str) -> Optional[SummaryJob]:
|
|
|
"""Get AI summary job by ID"""
|
|
|
return self.db.get_summary_job(job_id)
|
|
|
|
|
|
def get_user_summary_history(self, user_id: str, limit: int = 50) -> List[SummaryJob]:
|
|
|
"""Get user's AI summary history"""
|
|
|
return self.db.get_user_summary_jobs(user_id, limit)
|
|
|
|
|
|
|
|
|
def register_user(self, email: str, username: str, password: str, gdpr_consent: bool = True,
|
|
|
data_retention_agreed: bool = True, marketing_consent: bool = False) -> Tuple[bool, str, Optional[str]]:
|
|
|
"""Register new user"""
|
|
|
return self.db.create_user(email, username, password, gdpr_consent, data_retention_agreed, marketing_consent)
|
|
|
|
|
|
def login_user(self, login: str, password: str) -> Tuple[bool, str, Optional[User]]:
|
|
|
"""Login user"""
|
|
|
return self.db.authenticate_user(login, password)
|
|
|
|
|
|
def get_user(self, user_id: str) -> Optional[User]:
|
|
|
"""Get user by ID"""
|
|
|
return self.db.get_user_by_id(user_id)
|
|
|
|
|
|
def update_user_consent(self, user_id: str, marketing_consent: bool) -> bool:
|
|
|
"""Update user marketing consent"""
|
|
|
return self.db.update_user_consent(user_id, marketing_consent)
|
|
|
|
|
|
def export_user_data(self, user_id: str) -> Dict:
|
|
|
"""Export comprehensive user data including AI summaries"""
|
|
|
return self.db.export_user_data(user_id)
|
|
|
|
|
|
def delete_user_account(self, user_id: str) -> bool:
|
|
|
"""Delete user account and all data"""
|
|
|
return self.db.delete_user_account(user_id)
|
|
|
|
|
|
def delete_user_summary_data(self, user_id: str) -> bool:
|
|
|
"""Delete user's AI summary data specifically"""
|
|
|
return self.db.delete_user_summary_data(user_id)
|
|
|
|
|
|
|
|
|
def _convert_to_audio(self, input_path, output_path, audio_format="wav"):
|
|
|
"""Convert audio/video file to specified audio format"""
|
|
|
|
|
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
|
|
|
|
|
if audio_format in {"wav", "alaw", "mulaw"}:
|
|
|
cmd = [
|
|
|
"ffmpeg", "-y", "-i", input_path,
|
|
|
"-ar", "16000", "-ac", "1",
|
|
|
output_path
|
|
|
]
|
|
|
else:
|
|
|
cmd = [
|
|
|
"ffmpeg", "-y", "-i", input_path,
|
|
|
output_path
|
|
|
]
|
|
|
|
|
|
try:
|
|
|
result = subprocess.run(
|
|
|
cmd,
|
|
|
stdout=subprocess.PIPE,
|
|
|
stderr=subprocess.PIPE,
|
|
|
timeout=300,
|
|
|
text=True
|
|
|
)
|
|
|
|
|
|
if result.returncode != 0:
|
|
|
error_output = result.stderr
|
|
|
raise Exception(f"FFmpeg conversion failed: {error_output}")
|
|
|
|
|
|
|
|
|
if not os.path.exists(output_path):
|
|
|
raise Exception(f"Output file was not created: {output_path}")
|
|
|
|
|
|
file_size = os.path.getsize(output_path)
|
|
|
if file_size == 0:
|
|
|
raise Exception(f"Output file is empty: {output_path}")
|
|
|
|
|
|
except subprocess.TimeoutExpired:
|
|
|
raise Exception(f"FFmpeg conversion timed out after 5 minutes")
|
|
|
except Exception as e:
|
|
|
if "FFmpeg conversion failed" in str(e):
|
|
|
raise
|
|
|
else:
|
|
|
raise Exception(f"FFmpeg error: {str(e)}")
|
|
|
|
|
|
def _upload_blob(self, local_file, blob_name, container_name=None):
|
|
|
"""Upload file to specified blob container"""
|
|
|
if container_name is None:
|
|
|
container_name = TRANSCRIPTS_CONTAINER
|
|
|
|
|
|
blob_client = self.blob_service.get_blob_client(container=container_name, blob=blob_name)
|
|
|
with open(local_file, "rb") as data:
|
|
|
blob_client.upload_blob(data, overwrite=True)
|
|
|
sas = AZURE_BLOB_SAS_TOKEN.lstrip("?")
|
|
|
return f"{blob_client.url}?{sas}"
|
|
|
|
|
|
def _create_transcription(self, audio_url, language, diarization_enabled, speakers,
|
|
|
profanity, punctuation, timestamps, lexical,
|
|
|
language_id_enabled=False, candidate_locales=None):
|
|
|
"""Create Azure Speech transcription job"""
|
|
|
url = f"{AZURE_SPEECH_KEY_ENDPOINT}/speechtotext/{API_VERSION}/transcriptions"
|
|
|
headers = {
|
|
|
"Ocp-Apim-Subscription-Key": AZURE_SPEECH_KEY,
|
|
|
"Content-Type": "application/json"
|
|
|
}
|
|
|
|
|
|
properties = {
|
|
|
"profanityFilterMode": profanity,
|
|
|
"punctuationMode": punctuation,
|
|
|
"wordLevelTimestampsEnabled": timestamps,
|
|
|
"displayFormWordLevelTimestampsEnabled": timestamps,
|
|
|
"lexical": lexical
|
|
|
}
|
|
|
if diarization_enabled:
|
|
|
properties["diarizationEnabled"] = True
|
|
|
properties["diarization"] = {
|
|
|
"speakers": {
|
|
|
"minCount": 1,
|
|
|
"maxCount": int(speakers)
|
|
|
}
|
|
|
}
|
|
|
if language_id_enabled and candidate_locales:
|
|
|
properties["languageIdentification"] = {
|
|
|
"mode": "continuous",
|
|
|
"candidateLocales": candidate_locales
|
|
|
}
|
|
|
|
|
|
properties = {k: v for k, v in properties.items() if v is not None}
|
|
|
body = {
|
|
|
"displayName": f"AI_Conference_Transcription_{uuid.uuid4()}",
|
|
|
"description": "Enhanced batch speech-to-text with AI integration support",
|
|
|
"locale": language,
|
|
|
"contentUrls": [audio_url],
|
|
|
"properties": properties,
|
|
|
"customProperties": {}
|
|
|
}
|
|
|
r = requests.post(url, headers=headers, json=body)
|
|
|
r.raise_for_status()
|
|
|
trans_id = r.headers["Location"].split("/")[-1].split("?")[0]
|
|
|
return trans_id
|
|
|
|
|
|
def _get_transcription_result_url(self, trans_id):
|
|
|
"""Get transcription result URL from Azure"""
|
|
|
url = f"{AZURE_SPEECH_KEY_ENDPOINT}/speechtotext/{API_VERSION}/transcriptions/{trans_id}"
|
|
|
headers = {"Ocp-Apim-Subscription-Key": AZURE_SPEECH_KEY}
|
|
|
|
|
|
r = requests.get(url, headers=headers)
|
|
|
data = r.json()
|
|
|
|
|
|
if data.get("status") == "Succeeded":
|
|
|
files_url = None
|
|
|
if "links" in data and "files" in data["links"]:
|
|
|
files_url = data["links"]["files"]
|
|
|
if files_url:
|
|
|
r2 = requests.get(files_url, headers=headers)
|
|
|
file_list = r2.json().get("values", [])
|
|
|
for f in file_list:
|
|
|
if f.get("kind", "").lower() == "transcription":
|
|
|
return f["links"]["contentUrl"]
|
|
|
return None
|
|
|
|
|
|
def _fetch_transcript(self, content_url):
|
|
|
"""Enhanced transcript fetching with improved timestamp handling"""
|
|
|
r = requests.get(content_url)
|
|
|
try:
|
|
|
j = r.json()
|
|
|
out = []
|
|
|
|
|
|
def get_text(phrase):
|
|
|
if 'nBest' in phrase and phrase['nBest']:
|
|
|
return phrase['nBest'][0].get('display', '') or phrase.get('display', '')
|
|
|
return phrase.get('display', '')
|
|
|
|
|
|
def safe_offset(val):
|
|
|
try:
|
|
|
return int(val)
|
|
|
except (ValueError, TypeError):
|
|
|
return None
|
|
|
|
|
|
def format_time(seconds):
|
|
|
"""Format seconds into HH:MM:SS format"""
|
|
|
try:
|
|
|
td = timedelta(seconds=int(seconds))
|
|
|
hours, remainder = divmod(td.total_seconds(), 3600)
|
|
|
minutes, seconds = divmod(remainder, 60)
|
|
|
return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"
|
|
|
except:
|
|
|
return "00:00:00"
|
|
|
|
|
|
|
|
|
if 'recognizedPhrases' in j:
|
|
|
for phrase in j['recognizedPhrases']:
|
|
|
speaker_id = phrase.get('speaker', 0)
|
|
|
text = get_text(phrase)
|
|
|
|
|
|
if not text.strip():
|
|
|
continue
|
|
|
|
|
|
|
|
|
timestamp_seconds = None
|
|
|
|
|
|
|
|
|
if 'offset' in phrase and phrase['offset'] is not None:
|
|
|
offset_100ns = safe_offset(phrase['offset'])
|
|
|
if offset_100ns is not None:
|
|
|
timestamp_seconds = offset_100ns / 10_000_000
|
|
|
|
|
|
|
|
|
if timestamp_seconds is None and 'words' in phrase and phrase['words']:
|
|
|
first_word = phrase['words'][0]
|
|
|
if 'offset' in first_word and first_word['offset'] is not None:
|
|
|
offset_100ns = safe_offset(first_word['offset'])
|
|
|
if offset_100ns is not None:
|
|
|
timestamp_seconds = offset_100ns / 10_000_000
|
|
|
|
|
|
|
|
|
if timestamp_seconds is None and 'offsetInTicks' in phrase:
|
|
|
offset_ticks = safe_offset(phrase['offsetInTicks'])
|
|
|
if offset_ticks is not None:
|
|
|
timestamp_seconds = offset_ticks / 10_000_000
|
|
|
|
|
|
|
|
|
if timestamp_seconds is not None:
|
|
|
time_str = format_time(timestamp_seconds)
|
|
|
if 'speaker' in phrase:
|
|
|
|
|
|
out.append(f"[{time_str}] Speaker {speaker_id}: {text}")
|
|
|
else:
|
|
|
|
|
|
out.append(f"[{time_str}] {text}")
|
|
|
else:
|
|
|
|
|
|
if 'speaker' in phrase:
|
|
|
out.append(f"Speaker {speaker_id}: {text}")
|
|
|
else:
|
|
|
out.append(text)
|
|
|
|
|
|
if out:
|
|
|
return '\n\n'.join(out)
|
|
|
|
|
|
|
|
|
if 'combinedRecognizedPhrases' in j:
|
|
|
combined_results = []
|
|
|
for combined_phrase in j['combinedRecognizedPhrases']:
|
|
|
text = combined_phrase.get('display', '')
|
|
|
if text.strip():
|
|
|
combined_results.append(text)
|
|
|
|
|
|
if combined_results:
|
|
|
return '\n\n'.join(combined_results)
|
|
|
|
|
|
|
|
|
return json.dumps(j, ensure_ascii=False, indent=2)
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Unable to parse transcription result: {str(e)}\n\nRaw response: {r.text[:1000]}..."
|
|
|
|
|
|
|
|
|
transcription_manager = TranscriptionManager()
|
|
|
|
|
|
|
|
|
def allowed_file(filename):
|
|
|
"""Check if file extension is supported"""
|
|
|
if not filename or filename in ["upload.unknown", ""]:
|
|
|
return True
|
|
|
|
|
|
if '.' not in filename:
|
|
|
return True
|
|
|
|
|
|
ext = filename.rsplit('.', 1)[1].lower()
|
|
|
supported_extensions = set(AUDIO_FORMATS) | {
|
|
|
'mp4', 'mov', 'avi', 'mkv', 'webm', 'm4a', '3gp', 'f4v',
|
|
|
'wmv', 'asf', 'rm', 'rmvb', 'flv', 'mpg', 'mpeg', 'mts', 'vob',
|
|
|
|
|
|
'pdf', 'docx', 'doc', 'pptx', 'ppt', 'xlsx', 'xls', 'csv', 'txt', 'json',
|
|
|
'jpg', 'jpeg', 'png', 'bmp', 'gif', 'tiff', 'webp'
|
|
|
}
|
|
|
|
|
|
return ext in supported_extensions
|
|
|
|
|
|
def generate_user_session():
|
|
|
"""Generate a unique user session ID - kept for compatibility"""
|
|
|
return str(uuid.uuid4()) |