""" Database models for voiceprint tracking. """ from sqlalchemy import create_engine, Column, String, Float, Integer, DateTime, Boolean, ForeignKey, LargeBinary from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from datetime import datetime import os Base = declarative_base() class Voiceprint(Base): """Unique voice identity.""" __tablename__ = 'voiceprints' id = Column(String(20), primary_key=True) # vp_xxxxxxxx embedding = Column(LargeBinary, nullable=False) # 192-dim vector as bytes first_seen = Column(DateTime, default=datetime.utcnow) times_seen = Column(Integer, default=1) total_audio_seconds = Column(Float, default=0.0) is_flagged = Column(Boolean, default=False) flag_reason = Column(String(200), nullable=True) # User-editable fields label = Column(String(100), nullable=True) # Human-friendly name (e.g., "Juan Pérez") notes = Column(String(1000), nullable=True) # User comments/notes # Relationships appearances = relationship("VoiceprintAppearance", back_populates="voiceprint") class VoiceprintAppearance(Base): """Track where a voiceprint appears.""" __tablename__ = 'voiceprint_appearances' id = Column(Integer, primary_key=True, autoincrement=True) voiceprint_id = Column(String(20), ForeignKey('voiceprints.id'), nullable=False) test_id = Column(String(50), nullable=False) test_filename = Column(String(200), nullable=False) role = Column(String(20), nullable=False) # 'main' or 'additional' duration_seconds = Column(Float, nullable=False) detected_at = Column(DateTime, default=datetime.utcnow) clip_path = Column(String(500), nullable=True) # Path to extracted audio clip # Relationships voiceprint = relationship("Voiceprint", back_populates="appearances") class TestAnalysis(Base): """Store analysis results per test.""" __tablename__ = 'test_analyses' id = Column(Integer, primary_key=True, autoincrement=True) test_id = Column(String(50), unique=True, nullable=False) filename = Column(String(200), nullable=False) duration_seconds = Column(Float, nullable=False) analyzed_at = Column(DateTime, default=datetime.utcnow) # Main speaker main_voiceprint_id = Column(String(20), ForeignKey('voiceprints.id'), nullable=True) main_speech_seconds = Column(Float, default=0.0) main_quality = Column(String(20), nullable=True) # Detection counts additional_speakers_count = Column(Integer, default=0) background_anomalies_count = Column(Integer, default=0) wake_words_count = Column(Integer, default=0) # Synthetic detection synthetic_score = Column(Float, default=0.0) is_synthetic = Column(Boolean, default=False) # JSON results (full analysis) results_json = Column(String, nullable=True) class Database: """Database manager.""" def __init__(self, db_path: str = None): if db_path is None: data_dir = os.environ.get("DATA_DIR", "data") db_path = os.path.join(data_dir, "db", "voiceprints.db") self.db_path = db_path os.makedirs(os.path.dirname(db_path), exist_ok=True) self.engine = create_engine(f'sqlite:///{db_path}') Base.metadata.create_all(self.engine) self.Session = sessionmaker(bind=self.engine) def get_session(self): return self.Session() def add_voiceprint(self, vp_id: str, embedding: bytes, test_id: str, filename: str, role: str, duration: float, clip_path: str = None): """Add or update voiceprint and record appearance.""" session = self.get_session() try: # Check if voiceprint exists vp = session.query(Voiceprint).filter_by(id=vp_id).first() if vp: # Update existing vp.times_seen += 1 vp.total_audio_seconds += duration # Check for flag conditions if vp.times_seen >= 4: vp.is_flagged = True vp.flag_reason = f"Seen in {vp.times_seen} tests" else: # Create new vp = Voiceprint( id=vp_id, embedding=embedding, total_audio_seconds=duration ) session.add(vp) # Record appearance appearance = VoiceprintAppearance( voiceprint_id=vp_id, test_id=test_id, test_filename=filename, role=role, duration_seconds=duration, clip_path=clip_path ) session.add(appearance) session.commit() return vp except Exception as e: session.rollback() raise e finally: session.close() def get_voiceprint(self, vp_id: str): """Get voiceprint by ID.""" session = self.get_session() try: return session.query(Voiceprint).filter_by(id=vp_id).first() finally: session.close() def get_all_voiceprints(self): """Get all voiceprints.""" session = self.get_session() try: return session.query(Voiceprint).order_by(Voiceprint.times_seen.desc()).all() finally: session.close() def get_flagged_voiceprints(self): """Get flagged voiceprints.""" session = self.get_session() try: return session.query(Voiceprint).filter_by(is_flagged=True).all() finally: session.close() def get_multi_appearance_voiceprints(self, min_appearances: int = 2): """Get voiceprints seen in multiple tests.""" session = self.get_session() try: return session.query(Voiceprint).filter( Voiceprint.times_seen >= min_appearances ).order_by(Voiceprint.times_seen.desc()).all() finally: session.close() def get_voiceprint_appearances(self, vp_id: str): """Get all appearances of a voiceprint.""" session = self.get_session() try: return session.query(VoiceprintAppearance).filter_by( voiceprint_id=vp_id ).order_by(VoiceprintAppearance.detected_at.desc()).all() finally: session.close() def find_matching_voiceprint(self, embedding: bytes, threshold: float = 0.80): """Find existing voiceprint matching the embedding.""" import numpy as np session = self.get_session() try: new_emb = np.frombuffer(bytes(embedding), dtype=np.float32) for vp in session.query(Voiceprint).all(): stored_emb = np.frombuffer(bytes(vp.embedding), dtype=np.float32) # Cosine similarity similarity = np.dot(new_emb, stored_emb) / ( np.linalg.norm(new_emb) * np.linalg.norm(stored_emb) ) if similarity >= threshold: return vp, similarity return None, 0.0 finally: session.close() def save_test_analysis(self, test_id: str, filename: str, duration: float, results: dict): """Save full test analysis.""" import json session = self.get_session() try: analysis = TestAnalysis( test_id=test_id, filename=filename, duration_seconds=duration, main_voiceprint_id=results.get('main_voiceprint_id'), main_speech_seconds=results.get('main_speech_seconds', 0), main_quality=results.get('main_quality'), additional_speakers_count=len(results.get('additional_speakers', [])), background_anomalies_count=len(results.get('background_anomalies', [])), wake_words_count=len(results.get('wake_words', [])), synthetic_score=results.get('synthetic_score', 0), is_synthetic=results.get('is_synthetic', False), results_json=json.dumps(results) ) session.add(analysis) session.commit() return analysis except Exception as e: session.rollback() raise e finally: session.close() def get_stats(self): """Get database statistics.""" session = self.get_session() try: return { 'total_tests': session.query(TestAnalysis).count(), 'total_voiceprints': session.query(Voiceprint).count(), 'flagged_voiceprints': session.query(Voiceprint).filter_by(is_flagged=True).count(), 'multi_appearance': session.query(Voiceprint).filter(Voiceprint.times_seen >= 2).count() } finally: session.close() def get_analyzer_dashboard_stats(self): """Get extended stats for the Analyzer tab dashboard.""" import json as _json from datetime import datetime as _dt session = self.get_session() try: total_tests = session.query(TestAnalysis).count() total_vp = session.query(Voiceprint).count() high_risk = 0 for a in session.query(TestAnalysis).all(): if a.results_json: try: r = _json.loads(a.results_json) if r.get('risk_score', 0) > 60: high_risk += 1 except Exception: pass fraud_rate = (high_risk / total_tests * 100) if total_tests > 0 else 0.0 today_start = _dt.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) today_analyses = session.query(TestAnalysis).filter( TestAnalysis.analyzed_at >= today_start ).all() alerts_today = 0 for a in today_analyses: if a.results_json: try: r = _json.loads(a.results_json) if r.get('risk_score', 0) > 30: alerts_today += 1 except Exception: pass return { 'total_tests': total_tests, 'fraud_rate': round(fraud_rate, 1), 'unique_voices': total_vp, 'alerts_today': alerts_today, } finally: session.close() def get_all_tests(self): """Get all test analyses ordered by date descending.""" session = self.get_session() try: return session.query(TestAnalysis).order_by( TestAnalysis.analyzed_at.desc() ).all() finally: session.close() def get_test_results(self, test_id: str) -> dict: """Get full results JSON for a test.""" import json as _json session = self.get_session() try: t = session.query(TestAnalysis).filter_by(test_id=test_id).first() if t and t.results_json: return _json.loads(t.results_json) return None finally: session.close() def update_voiceprint_label(self, vp_id: str, label: str): """Update voiceprint label/name.""" session = self.get_session() try: vp = session.query(Voiceprint).filter_by(id=vp_id).first() if vp: vp.label = label session.commit() return True return False except Exception as e: session.rollback() raise e finally: session.close() def update_voiceprint_notes(self, vp_id: str, notes: str): """Update voiceprint notes/comments.""" session = self.get_session() try: vp = session.query(Voiceprint).filter_by(id=vp_id).first() if vp: vp.notes = notes session.commit() return True return False except Exception as e: session.rollback() raise e finally: session.close() def toggle_voiceprint_flag(self, vp_id: str, flagged: bool, reason: str = None): """Manually flag/unflag a voiceprint.""" session = self.get_session() try: vp = session.query(Voiceprint).filter_by(id=vp_id).first() if vp: vp.is_flagged = flagged vp.flag_reason = reason if flagged else None session.commit() return True return False except Exception as e: session.rollback() raise e finally: session.close() def get_similarity_threshold(self): """Get current similarity threshold (default 0.80).""" # Could be stored in a settings table, for now return default return 0.80 def get_appearance_timeline(self, vp_id: str = None): """Get appearances over time for timeline chart.""" session = self.get_session() try: query = session.query(VoiceprintAppearance) if vp_id: query = query.filter_by(voiceprint_id=vp_id) appearances = query.order_by(VoiceprintAppearance.detected_at).all() return [ { 'date': a.detected_at, 'voiceprint_id': a.voiceprint_id, 'test_id': a.test_id, 'role': a.role, 'duration': a.duration_seconds } for a in appearances ] finally: session.close() def get_trend_data(self): """Get aggregated trend data for charts.""" import json as _json from collections import defaultdict session = self.get_session() try: all_tests = session.query(TestAnalysis).order_by( TestAnalysis.analyzed_at ).all() daily_scores = defaultdict(lambda: {'scores': [], 'count': 0, 'high_risk': 0}) daily_flags = defaultdict(lambda: { 'synthetic': 0, 'playback': 0, 'reading': 0, 'whispers': 0, 'pauses': 0, 'wake_words': 0 }) total_risk = 0.0 high_risk_count = 0 for t in all_tests: day = t.analyzed_at.strftime('%Y-%m-%d') if t.analyzed_at else 'unknown' risk = 0 if t.results_json: try: r = _json.loads(t.results_json) risk = r.get('risk_score', 0) daily_scores[day]['scores'].append(risk) daily_scores[day]['count'] += 1 if risk > 60: daily_scores[day]['high_risk'] += 1 high_risk_count += 1 total_risk += risk if r.get('main_speaker', {}).get('is_synthetic', False): daily_flags[day]['synthetic'] += 1 if r.get('playback_detected', False): daily_flags[day]['playback'] += 1 if r.get('reading_pattern_detected', False): daily_flags[day]['reading'] += 1 if r.get('whisper_detected', False): daily_flags[day]['whispers'] += 1 if r.get('suspicious_pauses_detected', False): daily_flags[day]['pauses'] += 1 if len(r.get('wake_words', [])) > 0: daily_flags[day]['wake_words'] += 1 except Exception: pass total = len(all_tests) avg_risk = (total_risk / total) if total > 0 else 0 high_risk_pct = (high_risk_count / total * 100) if total > 0 else 0 scores_list = [] for day in sorted(daily_scores.keys()): d = daily_scores[day] scores_list.append({ 'date': day, 'avg_score': round(sum(d['scores']) / len(d['scores']), 1), 'count': d['count'], 'high_risk': d['high_risk'], }) flags_list = [] for day in sorted(daily_flags.keys()): entry = {'date': day} entry.update(daily_flags[day]) flags_list.append(entry) # Top voices all_vps = session.query(Voiceprint).order_by( Voiceprint.times_seen.desc() ).limit(10).all() top_voices = [ { 'id': vp.id, 'label': vp.label or vp.id, 'times_seen': vp.times_seen, 'flagged': vp.is_flagged, } for vp in all_vps ] recurring = session.query(Voiceprint).filter( Voiceprint.times_seen >= 2 ).count() return { 'daily_scores': scores_list, 'daily_flags': flags_list, 'top_voices': top_voices, 'summary': { 'total': total, 'avg_risk': round(avg_risk, 1), 'high_risk_pct': round(high_risk_pct, 1), 'recurring': recurring, }, } finally: session.close()