daasime's picture
Add Trends tab with analytics charts and Coming Soon sections
5b529ce
"""
Database models for voiceprint tracking.
"""
from sqlalchemy import create_engine, Column, String, Float, Integer, DateTime, Boolean, ForeignKey, LargeBinary
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
import os
Base = declarative_base()
class Voiceprint(Base):
"""Unique voice identity."""
__tablename__ = 'voiceprints'
id = Column(String(20), primary_key=True) # vp_xxxxxxxx
embedding = Column(LargeBinary, nullable=False) # 192-dim vector as bytes
first_seen = Column(DateTime, default=datetime.utcnow)
times_seen = Column(Integer, default=1)
total_audio_seconds = Column(Float, default=0.0)
is_flagged = Column(Boolean, default=False)
flag_reason = Column(String(200), nullable=True)
# User-editable fields
label = Column(String(100), nullable=True) # Human-friendly name (e.g., "Juan Pérez")
notes = Column(String(1000), nullable=True) # User comments/notes
# Relationships
appearances = relationship("VoiceprintAppearance", back_populates="voiceprint")
class VoiceprintAppearance(Base):
"""Track where a voiceprint appears."""
__tablename__ = 'voiceprint_appearances'
id = Column(Integer, primary_key=True, autoincrement=True)
voiceprint_id = Column(String(20), ForeignKey('voiceprints.id'), nullable=False)
test_id = Column(String(50), nullable=False)
test_filename = Column(String(200), nullable=False)
role = Column(String(20), nullable=False) # 'main' or 'additional'
duration_seconds = Column(Float, nullable=False)
detected_at = Column(DateTime, default=datetime.utcnow)
clip_path = Column(String(500), nullable=True) # Path to extracted audio clip
# Relationships
voiceprint = relationship("Voiceprint", back_populates="appearances")
class TestAnalysis(Base):
"""Store analysis results per test."""
__tablename__ = 'test_analyses'
id = Column(Integer, primary_key=True, autoincrement=True)
test_id = Column(String(50), unique=True, nullable=False)
filename = Column(String(200), nullable=False)
duration_seconds = Column(Float, nullable=False)
analyzed_at = Column(DateTime, default=datetime.utcnow)
# Main speaker
main_voiceprint_id = Column(String(20), ForeignKey('voiceprints.id'), nullable=True)
main_speech_seconds = Column(Float, default=0.0)
main_quality = Column(String(20), nullable=True)
# Detection counts
additional_speakers_count = Column(Integer, default=0)
background_anomalies_count = Column(Integer, default=0)
wake_words_count = Column(Integer, default=0)
# Synthetic detection
synthetic_score = Column(Float, default=0.0)
is_synthetic = Column(Boolean, default=False)
# JSON results (full analysis)
results_json = Column(String, nullable=True)
class Database:
"""Database manager."""
def __init__(self, db_path: str = None):
if db_path is None:
data_dir = os.environ.get("DATA_DIR", "data")
db_path = os.path.join(data_dir, "db", "voiceprints.db")
self.db_path = db_path
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.engine = create_engine(f'sqlite:///{db_path}')
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
def get_session(self):
return self.Session()
def add_voiceprint(self, vp_id: str, embedding: bytes,
test_id: str, filename: str, role: str,
duration: float, clip_path: str = None):
"""Add or update voiceprint and record appearance."""
session = self.get_session()
try:
# Check if voiceprint exists
vp = session.query(Voiceprint).filter_by(id=vp_id).first()
if vp:
# Update existing
vp.times_seen += 1
vp.total_audio_seconds += duration
# Check for flag conditions
if vp.times_seen >= 4:
vp.is_flagged = True
vp.flag_reason = f"Seen in {vp.times_seen} tests"
else:
# Create new
vp = Voiceprint(
id=vp_id,
embedding=embedding,
total_audio_seconds=duration
)
session.add(vp)
# Record appearance
appearance = VoiceprintAppearance(
voiceprint_id=vp_id,
test_id=test_id,
test_filename=filename,
role=role,
duration_seconds=duration,
clip_path=clip_path
)
session.add(appearance)
session.commit()
return vp
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def get_voiceprint(self, vp_id: str):
"""Get voiceprint by ID."""
session = self.get_session()
try:
return session.query(Voiceprint).filter_by(id=vp_id).first()
finally:
session.close()
def get_all_voiceprints(self):
"""Get all voiceprints."""
session = self.get_session()
try:
return session.query(Voiceprint).order_by(Voiceprint.times_seen.desc()).all()
finally:
session.close()
def get_flagged_voiceprints(self):
"""Get flagged voiceprints."""
session = self.get_session()
try:
return session.query(Voiceprint).filter_by(is_flagged=True).all()
finally:
session.close()
def get_multi_appearance_voiceprints(self, min_appearances: int = 2):
"""Get voiceprints seen in multiple tests."""
session = self.get_session()
try:
return session.query(Voiceprint).filter(
Voiceprint.times_seen >= min_appearances
).order_by(Voiceprint.times_seen.desc()).all()
finally:
session.close()
def get_voiceprint_appearances(self, vp_id: str):
"""Get all appearances of a voiceprint."""
session = self.get_session()
try:
return session.query(VoiceprintAppearance).filter_by(
voiceprint_id=vp_id
).order_by(VoiceprintAppearance.detected_at.desc()).all()
finally:
session.close()
def find_matching_voiceprint(self, embedding: bytes, threshold: float = 0.80):
"""Find existing voiceprint matching the embedding."""
import numpy as np
session = self.get_session()
try:
new_emb = np.frombuffer(bytes(embedding), dtype=np.float32)
for vp in session.query(Voiceprint).all():
stored_emb = np.frombuffer(bytes(vp.embedding), dtype=np.float32)
# Cosine similarity
similarity = np.dot(new_emb, stored_emb) / (
np.linalg.norm(new_emb) * np.linalg.norm(stored_emb)
)
if similarity >= threshold:
return vp, similarity
return None, 0.0
finally:
session.close()
def save_test_analysis(self, test_id: str, filename: str,
duration: float, results: dict):
"""Save full test analysis."""
import json
session = self.get_session()
try:
analysis = TestAnalysis(
test_id=test_id,
filename=filename,
duration_seconds=duration,
main_voiceprint_id=results.get('main_voiceprint_id'),
main_speech_seconds=results.get('main_speech_seconds', 0),
main_quality=results.get('main_quality'),
additional_speakers_count=len(results.get('additional_speakers', [])),
background_anomalies_count=len(results.get('background_anomalies', [])),
wake_words_count=len(results.get('wake_words', [])),
synthetic_score=results.get('synthetic_score', 0),
is_synthetic=results.get('is_synthetic', False),
results_json=json.dumps(results)
)
session.add(analysis)
session.commit()
return analysis
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def get_stats(self):
"""Get database statistics."""
session = self.get_session()
try:
return {
'total_tests': session.query(TestAnalysis).count(),
'total_voiceprints': session.query(Voiceprint).count(),
'flagged_voiceprints': session.query(Voiceprint).filter_by(is_flagged=True).count(),
'multi_appearance': session.query(Voiceprint).filter(Voiceprint.times_seen >= 2).count()
}
finally:
session.close()
def get_analyzer_dashboard_stats(self):
"""Get extended stats for the Analyzer tab dashboard."""
import json as _json
from datetime import datetime as _dt
session = self.get_session()
try:
total_tests = session.query(TestAnalysis).count()
total_vp = session.query(Voiceprint).count()
high_risk = 0
for a in session.query(TestAnalysis).all():
if a.results_json:
try:
r = _json.loads(a.results_json)
if r.get('risk_score', 0) > 60:
high_risk += 1
except Exception:
pass
fraud_rate = (high_risk / total_tests * 100) if total_tests > 0 else 0.0
today_start = _dt.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
today_analyses = session.query(TestAnalysis).filter(
TestAnalysis.analyzed_at >= today_start
).all()
alerts_today = 0
for a in today_analyses:
if a.results_json:
try:
r = _json.loads(a.results_json)
if r.get('risk_score', 0) > 30:
alerts_today += 1
except Exception:
pass
return {
'total_tests': total_tests,
'fraud_rate': round(fraud_rate, 1),
'unique_voices': total_vp,
'alerts_today': alerts_today,
}
finally:
session.close()
def get_all_tests(self):
"""Get all test analyses ordered by date descending."""
session = self.get_session()
try:
return session.query(TestAnalysis).order_by(
TestAnalysis.analyzed_at.desc()
).all()
finally:
session.close()
def get_test_results(self, test_id: str) -> dict:
"""Get full results JSON for a test."""
import json as _json
session = self.get_session()
try:
t = session.query(TestAnalysis).filter_by(test_id=test_id).first()
if t and t.results_json:
return _json.loads(t.results_json)
return None
finally:
session.close()
def update_voiceprint_label(self, vp_id: str, label: str):
"""Update voiceprint label/name."""
session = self.get_session()
try:
vp = session.query(Voiceprint).filter_by(id=vp_id).first()
if vp:
vp.label = label
session.commit()
return True
return False
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def update_voiceprint_notes(self, vp_id: str, notes: str):
"""Update voiceprint notes/comments."""
session = self.get_session()
try:
vp = session.query(Voiceprint).filter_by(id=vp_id).first()
if vp:
vp.notes = notes
session.commit()
return True
return False
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def toggle_voiceprint_flag(self, vp_id: str, flagged: bool, reason: str = None):
"""Manually flag/unflag a voiceprint."""
session = self.get_session()
try:
vp = session.query(Voiceprint).filter_by(id=vp_id).first()
if vp:
vp.is_flagged = flagged
vp.flag_reason = reason if flagged else None
session.commit()
return True
return False
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def get_similarity_threshold(self):
"""Get current similarity threshold (default 0.80)."""
# Could be stored in a settings table, for now return default
return 0.80
def get_appearance_timeline(self, vp_id: str = None):
"""Get appearances over time for timeline chart."""
session = self.get_session()
try:
query = session.query(VoiceprintAppearance)
if vp_id:
query = query.filter_by(voiceprint_id=vp_id)
appearances = query.order_by(VoiceprintAppearance.detected_at).all()
return [
{
'date': a.detected_at,
'voiceprint_id': a.voiceprint_id,
'test_id': a.test_id,
'role': a.role,
'duration': a.duration_seconds
}
for a in appearances
]
finally:
session.close()
def get_trend_data(self):
"""Get aggregated trend data for charts."""
import json as _json
from collections import defaultdict
session = self.get_session()
try:
all_tests = session.query(TestAnalysis).order_by(
TestAnalysis.analyzed_at
).all()
daily_scores = defaultdict(lambda: {'scores': [], 'count': 0, 'high_risk': 0})
daily_flags = defaultdict(lambda: {
'synthetic': 0, 'playback': 0, 'reading': 0,
'whispers': 0, 'pauses': 0, 'wake_words': 0
})
total_risk = 0.0
high_risk_count = 0
for t in all_tests:
day = t.analyzed_at.strftime('%Y-%m-%d') if t.analyzed_at else 'unknown'
risk = 0
if t.results_json:
try:
r = _json.loads(t.results_json)
risk = r.get('risk_score', 0)
daily_scores[day]['scores'].append(risk)
daily_scores[day]['count'] += 1
if risk > 60:
daily_scores[day]['high_risk'] += 1
high_risk_count += 1
total_risk += risk
if r.get('main_speaker', {}).get('is_synthetic', False):
daily_flags[day]['synthetic'] += 1
if r.get('playback_detected', False):
daily_flags[day]['playback'] += 1
if r.get('reading_pattern_detected', False):
daily_flags[day]['reading'] += 1
if r.get('whisper_detected', False):
daily_flags[day]['whispers'] += 1
if r.get('suspicious_pauses_detected', False):
daily_flags[day]['pauses'] += 1
if len(r.get('wake_words', [])) > 0:
daily_flags[day]['wake_words'] += 1
except Exception:
pass
total = len(all_tests)
avg_risk = (total_risk / total) if total > 0 else 0
high_risk_pct = (high_risk_count / total * 100) if total > 0 else 0
scores_list = []
for day in sorted(daily_scores.keys()):
d = daily_scores[day]
scores_list.append({
'date': day,
'avg_score': round(sum(d['scores']) / len(d['scores']), 1),
'count': d['count'],
'high_risk': d['high_risk'],
})
flags_list = []
for day in sorted(daily_flags.keys()):
entry = {'date': day}
entry.update(daily_flags[day])
flags_list.append(entry)
# Top voices
all_vps = session.query(Voiceprint).order_by(
Voiceprint.times_seen.desc()
).limit(10).all()
top_voices = [
{
'id': vp.id,
'label': vp.label or vp.id,
'times_seen': vp.times_seen,
'flagged': vp.is_flagged,
}
for vp in all_vps
]
recurring = session.query(Voiceprint).filter(
Voiceprint.times_seen >= 2
).count()
return {
'daily_scores': scores_list,
'daily_flags': flags_list,
'top_voices': top_voices,
'summary': {
'total': total,
'avg_risk': round(avg_risk, 1),
'high_risk_pct': round(high_risk_pct, 1),
'recurring': recurring,
},
}
finally:
session.close()