Spaces:
Sleeping
Sleeping
| """ | |
| Database models for voiceprint tracking. | |
| """ | |
| from sqlalchemy import create_engine, Column, String, Float, Integer, DateTime, Boolean, ForeignKey, LargeBinary | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy.orm import sessionmaker, relationship | |
| from datetime import datetime | |
| import os | |
| Base = declarative_base() | |
| class Voiceprint(Base): | |
| """Unique voice identity.""" | |
| __tablename__ = 'voiceprints' | |
| id = Column(String(20), primary_key=True) # vp_xxxxxxxx | |
| embedding = Column(LargeBinary, nullable=False) # 192-dim vector as bytes | |
| first_seen = Column(DateTime, default=datetime.utcnow) | |
| times_seen = Column(Integer, default=1) | |
| total_audio_seconds = Column(Float, default=0.0) | |
| is_flagged = Column(Boolean, default=False) | |
| flag_reason = Column(String(200), nullable=True) | |
| # User-editable fields | |
| label = Column(String(100), nullable=True) # Human-friendly name (e.g., "Juan Pérez") | |
| notes = Column(String(1000), nullable=True) # User comments/notes | |
| # Relationships | |
| appearances = relationship("VoiceprintAppearance", back_populates="voiceprint") | |
| class VoiceprintAppearance(Base): | |
| """Track where a voiceprint appears.""" | |
| __tablename__ = 'voiceprint_appearances' | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| voiceprint_id = Column(String(20), ForeignKey('voiceprints.id'), nullable=False) | |
| test_id = Column(String(50), nullable=False) | |
| test_filename = Column(String(200), nullable=False) | |
| role = Column(String(20), nullable=False) # 'main' or 'additional' | |
| duration_seconds = Column(Float, nullable=False) | |
| detected_at = Column(DateTime, default=datetime.utcnow) | |
| clip_path = Column(String(500), nullable=True) # Path to extracted audio clip | |
| # Relationships | |
| voiceprint = relationship("Voiceprint", back_populates="appearances") | |
| class TestAnalysis(Base): | |
| """Store analysis results per test.""" | |
| __tablename__ = 'test_analyses' | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| test_id = Column(String(50), unique=True, nullable=False) | |
| filename = Column(String(200), nullable=False) | |
| duration_seconds = Column(Float, nullable=False) | |
| analyzed_at = Column(DateTime, default=datetime.utcnow) | |
| # Main speaker | |
| main_voiceprint_id = Column(String(20), ForeignKey('voiceprints.id'), nullable=True) | |
| main_speech_seconds = Column(Float, default=0.0) | |
| main_quality = Column(String(20), nullable=True) | |
| # Detection counts | |
| additional_speakers_count = Column(Integer, default=0) | |
| background_anomalies_count = Column(Integer, default=0) | |
| wake_words_count = Column(Integer, default=0) | |
| # Synthetic detection | |
| synthetic_score = Column(Float, default=0.0) | |
| is_synthetic = Column(Boolean, default=False) | |
| # JSON results (full analysis) | |
| results_json = Column(String, nullable=True) | |
| class Database: | |
| """Database manager.""" | |
| def __init__(self, db_path: str = None): | |
| if db_path is None: | |
| data_dir = os.environ.get("DATA_DIR", "data") | |
| db_path = os.path.join(data_dir, "db", "voiceprints.db") | |
| self.db_path = db_path | |
| os.makedirs(os.path.dirname(db_path), exist_ok=True) | |
| self.engine = create_engine(f'sqlite:///{db_path}') | |
| Base.metadata.create_all(self.engine) | |
| self.Session = sessionmaker(bind=self.engine) | |
| def get_session(self): | |
| return self.Session() | |
| def add_voiceprint(self, vp_id: str, embedding: bytes, | |
| test_id: str, filename: str, role: str, | |
| duration: float, clip_path: str = None): | |
| """Add or update voiceprint and record appearance.""" | |
| session = self.get_session() | |
| try: | |
| # Check if voiceprint exists | |
| vp = session.query(Voiceprint).filter_by(id=vp_id).first() | |
| if vp: | |
| # Update existing | |
| vp.times_seen += 1 | |
| vp.total_audio_seconds += duration | |
| # Check for flag conditions | |
| if vp.times_seen >= 4: | |
| vp.is_flagged = True | |
| vp.flag_reason = f"Seen in {vp.times_seen} tests" | |
| else: | |
| # Create new | |
| vp = Voiceprint( | |
| id=vp_id, | |
| embedding=embedding, | |
| total_audio_seconds=duration | |
| ) | |
| session.add(vp) | |
| # Record appearance | |
| appearance = VoiceprintAppearance( | |
| voiceprint_id=vp_id, | |
| test_id=test_id, | |
| test_filename=filename, | |
| role=role, | |
| duration_seconds=duration, | |
| clip_path=clip_path | |
| ) | |
| session.add(appearance) | |
| session.commit() | |
| return vp | |
| except Exception as e: | |
| session.rollback() | |
| raise e | |
| finally: | |
| session.close() | |
| def get_voiceprint(self, vp_id: str): | |
| """Get voiceprint by ID.""" | |
| session = self.get_session() | |
| try: | |
| return session.query(Voiceprint).filter_by(id=vp_id).first() | |
| finally: | |
| session.close() | |
| def get_all_voiceprints(self): | |
| """Get all voiceprints.""" | |
| session = self.get_session() | |
| try: | |
| return session.query(Voiceprint).order_by(Voiceprint.times_seen.desc()).all() | |
| finally: | |
| session.close() | |
| def get_flagged_voiceprints(self): | |
| """Get flagged voiceprints.""" | |
| session = self.get_session() | |
| try: | |
| return session.query(Voiceprint).filter_by(is_flagged=True).all() | |
| finally: | |
| session.close() | |
| def get_multi_appearance_voiceprints(self, min_appearances: int = 2): | |
| """Get voiceprints seen in multiple tests.""" | |
| session = self.get_session() | |
| try: | |
| return session.query(Voiceprint).filter( | |
| Voiceprint.times_seen >= min_appearances | |
| ).order_by(Voiceprint.times_seen.desc()).all() | |
| finally: | |
| session.close() | |
| def get_voiceprint_appearances(self, vp_id: str): | |
| """Get all appearances of a voiceprint.""" | |
| session = self.get_session() | |
| try: | |
| return session.query(VoiceprintAppearance).filter_by( | |
| voiceprint_id=vp_id | |
| ).order_by(VoiceprintAppearance.detected_at.desc()).all() | |
| finally: | |
| session.close() | |
| def find_matching_voiceprint(self, embedding: bytes, threshold: float = 0.80): | |
| """Find existing voiceprint matching the embedding.""" | |
| import numpy as np | |
| session = self.get_session() | |
| try: | |
| new_emb = np.frombuffer(bytes(embedding), dtype=np.float32) | |
| for vp in session.query(Voiceprint).all(): | |
| stored_emb = np.frombuffer(bytes(vp.embedding), dtype=np.float32) | |
| # Cosine similarity | |
| similarity = np.dot(new_emb, stored_emb) / ( | |
| np.linalg.norm(new_emb) * np.linalg.norm(stored_emb) | |
| ) | |
| if similarity >= threshold: | |
| return vp, similarity | |
| return None, 0.0 | |
| finally: | |
| session.close() | |
| def save_test_analysis(self, test_id: str, filename: str, | |
| duration: float, results: dict): | |
| """Save full test analysis.""" | |
| import json | |
| session = self.get_session() | |
| try: | |
| analysis = TestAnalysis( | |
| test_id=test_id, | |
| filename=filename, | |
| duration_seconds=duration, | |
| main_voiceprint_id=results.get('main_voiceprint_id'), | |
| main_speech_seconds=results.get('main_speech_seconds', 0), | |
| main_quality=results.get('main_quality'), | |
| additional_speakers_count=len(results.get('additional_speakers', [])), | |
| background_anomalies_count=len(results.get('background_anomalies', [])), | |
| wake_words_count=len(results.get('wake_words', [])), | |
| synthetic_score=results.get('synthetic_score', 0), | |
| is_synthetic=results.get('is_synthetic', False), | |
| results_json=json.dumps(results) | |
| ) | |
| session.add(analysis) | |
| session.commit() | |
| return analysis | |
| except Exception as e: | |
| session.rollback() | |
| raise e | |
| finally: | |
| session.close() | |
| def get_stats(self): | |
| """Get database statistics.""" | |
| session = self.get_session() | |
| try: | |
| return { | |
| 'total_tests': session.query(TestAnalysis).count(), | |
| 'total_voiceprints': session.query(Voiceprint).count(), | |
| 'flagged_voiceprints': session.query(Voiceprint).filter_by(is_flagged=True).count(), | |
| 'multi_appearance': session.query(Voiceprint).filter(Voiceprint.times_seen >= 2).count() | |
| } | |
| finally: | |
| session.close() | |
| def get_analyzer_dashboard_stats(self): | |
| """Get extended stats for the Analyzer tab dashboard.""" | |
| import json as _json | |
| from datetime import datetime as _dt | |
| session = self.get_session() | |
| try: | |
| total_tests = session.query(TestAnalysis).count() | |
| total_vp = session.query(Voiceprint).count() | |
| high_risk = 0 | |
| for a in session.query(TestAnalysis).all(): | |
| if a.results_json: | |
| try: | |
| r = _json.loads(a.results_json) | |
| if r.get('risk_score', 0) > 60: | |
| high_risk += 1 | |
| except Exception: | |
| pass | |
| fraud_rate = (high_risk / total_tests * 100) if total_tests > 0 else 0.0 | |
| today_start = _dt.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) | |
| today_analyses = session.query(TestAnalysis).filter( | |
| TestAnalysis.analyzed_at >= today_start | |
| ).all() | |
| alerts_today = 0 | |
| for a in today_analyses: | |
| if a.results_json: | |
| try: | |
| r = _json.loads(a.results_json) | |
| if r.get('risk_score', 0) > 30: | |
| alerts_today += 1 | |
| except Exception: | |
| pass | |
| return { | |
| 'total_tests': total_tests, | |
| 'fraud_rate': round(fraud_rate, 1), | |
| 'unique_voices': total_vp, | |
| 'alerts_today': alerts_today, | |
| } | |
| finally: | |
| session.close() | |
| def get_all_tests(self): | |
| """Get all test analyses ordered by date descending.""" | |
| session = self.get_session() | |
| try: | |
| return session.query(TestAnalysis).order_by( | |
| TestAnalysis.analyzed_at.desc() | |
| ).all() | |
| finally: | |
| session.close() | |
| def get_test_results(self, test_id: str) -> dict: | |
| """Get full results JSON for a test.""" | |
| import json as _json | |
| session = self.get_session() | |
| try: | |
| t = session.query(TestAnalysis).filter_by(test_id=test_id).first() | |
| if t and t.results_json: | |
| return _json.loads(t.results_json) | |
| return None | |
| finally: | |
| session.close() | |
| def update_voiceprint_label(self, vp_id: str, label: str): | |
| """Update voiceprint label/name.""" | |
| session = self.get_session() | |
| try: | |
| vp = session.query(Voiceprint).filter_by(id=vp_id).first() | |
| if vp: | |
| vp.label = label | |
| session.commit() | |
| return True | |
| return False | |
| except Exception as e: | |
| session.rollback() | |
| raise e | |
| finally: | |
| session.close() | |
| def update_voiceprint_notes(self, vp_id: str, notes: str): | |
| """Update voiceprint notes/comments.""" | |
| session = self.get_session() | |
| try: | |
| vp = session.query(Voiceprint).filter_by(id=vp_id).first() | |
| if vp: | |
| vp.notes = notes | |
| session.commit() | |
| return True | |
| return False | |
| except Exception as e: | |
| session.rollback() | |
| raise e | |
| finally: | |
| session.close() | |
| def toggle_voiceprint_flag(self, vp_id: str, flagged: bool, reason: str = None): | |
| """Manually flag/unflag a voiceprint.""" | |
| session = self.get_session() | |
| try: | |
| vp = session.query(Voiceprint).filter_by(id=vp_id).first() | |
| if vp: | |
| vp.is_flagged = flagged | |
| vp.flag_reason = reason if flagged else None | |
| session.commit() | |
| return True | |
| return False | |
| except Exception as e: | |
| session.rollback() | |
| raise e | |
| finally: | |
| session.close() | |
| def get_similarity_threshold(self): | |
| """Get current similarity threshold (default 0.80).""" | |
| # Could be stored in a settings table, for now return default | |
| return 0.80 | |
| def get_appearance_timeline(self, vp_id: str = None): | |
| """Get appearances over time for timeline chart.""" | |
| session = self.get_session() | |
| try: | |
| query = session.query(VoiceprintAppearance) | |
| if vp_id: | |
| query = query.filter_by(voiceprint_id=vp_id) | |
| appearances = query.order_by(VoiceprintAppearance.detected_at).all() | |
| return [ | |
| { | |
| 'date': a.detected_at, | |
| 'voiceprint_id': a.voiceprint_id, | |
| 'test_id': a.test_id, | |
| 'role': a.role, | |
| 'duration': a.duration_seconds | |
| } | |
| for a in appearances | |
| ] | |
| finally: | |
| session.close() | |
| def get_trend_data(self): | |
| """Get aggregated trend data for charts.""" | |
| import json as _json | |
| from collections import defaultdict | |
| session = self.get_session() | |
| try: | |
| all_tests = session.query(TestAnalysis).order_by( | |
| TestAnalysis.analyzed_at | |
| ).all() | |
| daily_scores = defaultdict(lambda: {'scores': [], 'count': 0, 'high_risk': 0}) | |
| daily_flags = defaultdict(lambda: { | |
| 'synthetic': 0, 'playback': 0, 'reading': 0, | |
| 'whispers': 0, 'pauses': 0, 'wake_words': 0 | |
| }) | |
| total_risk = 0.0 | |
| high_risk_count = 0 | |
| for t in all_tests: | |
| day = t.analyzed_at.strftime('%Y-%m-%d') if t.analyzed_at else 'unknown' | |
| risk = 0 | |
| if t.results_json: | |
| try: | |
| r = _json.loads(t.results_json) | |
| risk = r.get('risk_score', 0) | |
| daily_scores[day]['scores'].append(risk) | |
| daily_scores[day]['count'] += 1 | |
| if risk > 60: | |
| daily_scores[day]['high_risk'] += 1 | |
| high_risk_count += 1 | |
| total_risk += risk | |
| if r.get('main_speaker', {}).get('is_synthetic', False): | |
| daily_flags[day]['synthetic'] += 1 | |
| if r.get('playback_detected', False): | |
| daily_flags[day]['playback'] += 1 | |
| if r.get('reading_pattern_detected', False): | |
| daily_flags[day]['reading'] += 1 | |
| if r.get('whisper_detected', False): | |
| daily_flags[day]['whispers'] += 1 | |
| if r.get('suspicious_pauses_detected', False): | |
| daily_flags[day]['pauses'] += 1 | |
| if len(r.get('wake_words', [])) > 0: | |
| daily_flags[day]['wake_words'] += 1 | |
| except Exception: | |
| pass | |
| total = len(all_tests) | |
| avg_risk = (total_risk / total) if total > 0 else 0 | |
| high_risk_pct = (high_risk_count / total * 100) if total > 0 else 0 | |
| scores_list = [] | |
| for day in sorted(daily_scores.keys()): | |
| d = daily_scores[day] | |
| scores_list.append({ | |
| 'date': day, | |
| 'avg_score': round(sum(d['scores']) / len(d['scores']), 1), | |
| 'count': d['count'], | |
| 'high_risk': d['high_risk'], | |
| }) | |
| flags_list = [] | |
| for day in sorted(daily_flags.keys()): | |
| entry = {'date': day} | |
| entry.update(daily_flags[day]) | |
| flags_list.append(entry) | |
| # Top voices | |
| all_vps = session.query(Voiceprint).order_by( | |
| Voiceprint.times_seen.desc() | |
| ).limit(10).all() | |
| top_voices = [ | |
| { | |
| 'id': vp.id, | |
| 'label': vp.label or vp.id, | |
| 'times_seen': vp.times_seen, | |
| 'flagged': vp.is_flagged, | |
| } | |
| for vp in all_vps | |
| ] | |
| recurring = session.query(Voiceprint).filter( | |
| Voiceprint.times_seen >= 2 | |
| ).count() | |
| return { | |
| 'daily_scores': scores_list, | |
| 'daily_flags': flags_list, | |
| 'top_voices': top_voices, | |
| 'summary': { | |
| 'total': total, | |
| 'avg_risk': round(avg_risk, 1), | |
| 'high_risk_pct': round(high_risk_pct, 1), | |
| 'recurring': recurring, | |
| }, | |
| } | |
| finally: | |
| session.close() | |