|
|
import os |
|
|
import time |
|
|
import json |
|
|
import requests |
|
|
from datetime import datetime |
|
|
from supabase import create_client |
|
|
from flask import Flask, jsonify |
|
|
import threading |
|
|
|
|
|
|
|
|
import whisper |
|
|
import librosa |
|
|
import numpy as np |
|
|
import soundfile as sf |
|
|
import nltk |
|
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification |
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
|
|
|
|
|
|
from reportlab.lib import colors |
|
|
from reportlab.lib.pagesizes import A4 |
|
|
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, PageBreak |
|
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
|
|
from reportlab.lib.units import inch |
|
|
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_JUSTIFY |
|
|
from reportlab.pdfbase import pdfmetrics |
|
|
from reportlab.pdfbase.ttfonts import TTFont |
|
|
|
|
|
|
|
|
SUPABASE_URL = os.getenv('SUPABASE_URL', 'https://zccaimlxjhktttzqsleb.supabase.co') |
|
|
SUPABASE_KEY = os.getenv('SUPABASE_KEY', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InpjY2FpbWx4amhrdHR0enFzbGViIiwicm9sZSI6ImFub24iLCJpYXQiOjE3NTc0ODAxNzEsImV4cCI6MjA3MzA1NjE3MX0.BIgHVR-u2fzCINVsLG1FXfnRu79rezgDMF8JTiMpbfQ') |
|
|
BUCKET_NAME = os.getenv('BUCKET_NAME', 'interview-videos') |
|
|
REPORTS_BUCKET_NAME = os.getenv('REPORTS_BUCKET_NAME', 'analysis-reports') |
|
|
|
|
|
|
|
|
supabase = create_client(SUPABASE_URL, SUPABASE_KEY) |
|
|
app = Flask(__name__) |
|
|
PROCESSED_FILES = set() |
|
|
|
|
|
|
|
|
try: |
|
|
nltk.download('punkt', quiet=True) |
|
|
nltk.download('stopwords', quiet=True) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
print("π Loading AI detection model...") |
|
|
detector_tokenizer = AutoTokenizer.from_pretrained("andreas122001/roberta-academic-detector") |
|
|
detector_model = AutoModelForSequenceClassification.from_pretrained("andreas122001/roberta-academic-detector") |
|
|
print("β
AI detection model loaded") |
|
|
|
|
|
|
|
|
def create_bucket_if_not_exists(bucket_name=REPORTS_BUCKET_NAME): |
|
|
"""Create bucket if it doesn't exist""" |
|
|
try: |
|
|
print(f"π Checking if bucket '{bucket_name}' exists...") |
|
|
|
|
|
|
|
|
try: |
|
|
supabase.storage.from_(bucket_name).list() |
|
|
print(f"β
Bucket '{bucket_name}' already exists") |
|
|
return True |
|
|
except Exception as e: |
|
|
|
|
|
if "not found" in str(e).lower(): |
|
|
print(f"π¦ Creating new bucket: {bucket_name}") |
|
|
|
|
|
headers = { |
|
|
"Authorization": f"Bearer {SUPABASE_KEY}", |
|
|
"Content-Type": "application/json" |
|
|
} |
|
|
|
|
|
data = { |
|
|
"name": bucket_name, |
|
|
"id": bucket_name, |
|
|
"public": True, |
|
|
"file_size_limit": 52428800, |
|
|
"allowed_mime_types": ["application/pdf"] |
|
|
} |
|
|
|
|
|
response = requests.post( |
|
|
f"{SUPABASE_URL}/storage/v1/bucket", |
|
|
headers=headers, |
|
|
json=data |
|
|
) |
|
|
|
|
|
if response.status_code in [200, 201, 409]: |
|
|
print(f"β
Bucket '{bucket_name}' created successfully") |
|
|
return True |
|
|
else: |
|
|
print(f"β Failed to create bucket: {response.text}") |
|
|
return False |
|
|
else: |
|
|
print(f"β Error checking bucket: {e}") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Bucket creation error: {e}") |
|
|
return False |
|
|
|
|
|
def setup_storage(): |
|
|
"""Setup required storage buckets""" |
|
|
print("π Setting up storage buckets...") |
|
|
|
|
|
|
|
|
if create_bucket_if_not_exists(REPORTS_BUCKET_NAME): |
|
|
print("β
Storage setup completed") |
|
|
return True |
|
|
else: |
|
|
print("β Storage setup failed") |
|
|
return False |
|
|
|
|
|
|
|
|
def get_bucket_files(): |
|
|
"""Get files from Supabase bucket""" |
|
|
try: |
|
|
files = supabase.storage.from_(BUCKET_NAME).list() |
|
|
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.webm'] |
|
|
videos = [f for f in files if any(f['name'].lower().endswith(ext) for ext in video_extensions)] |
|
|
return videos |
|
|
except Exception as e: |
|
|
print(f"β Error getting files: {e}") |
|
|
return [] |
|
|
|
|
|
def download_video(filename): |
|
|
"""Download video from Supabase""" |
|
|
try: |
|
|
file_url = supabase.storage.from_(BUCKET_NAME).get_public_url(filename) |
|
|
response = requests.get(file_url, stream=True, timeout=120) |
|
|
|
|
|
if response.status_code == 200: |
|
|
os.makedirs('downloads', exist_ok=True) |
|
|
file_path = os.path.join('downloads', filename) |
|
|
|
|
|
with open(file_path, 'wb') as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
if chunk: |
|
|
f.write(chunk) |
|
|
|
|
|
print(f"β
Downloaded: {filename}") |
|
|
return file_path |
|
|
else: |
|
|
print(f"β Download failed: HTTP {response.status_code}") |
|
|
return None |
|
|
except Exception as e: |
|
|
print(f"β Download error: {e}") |
|
|
return None |
|
|
|
|
|
def delete_from_supabase(filename): |
|
|
"""Delete file from Supabase""" |
|
|
try: |
|
|
supabase.storage.from_(BUCKET_NAME).remove([filename]) |
|
|
print(f"ποΈ Deleted from Supabase: {filename}") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"β Delete error: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def extract_audio(video_path, audio_path): |
|
|
"""Extract audio from video using ffmpeg directly""" |
|
|
try: |
|
|
import subprocess |
|
|
|
|
|
print(" π΅ Extracting audio with ffmpeg...") |
|
|
|
|
|
cmd = [ |
|
|
'ffmpeg', '-i', video_path, |
|
|
'-vn', |
|
|
'-acodec', 'pcm_s16le', |
|
|
'-ar', '16000', |
|
|
'-ac', '1', |
|
|
'-y', |
|
|
audio_path |
|
|
] |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) |
|
|
|
|
|
if result.returncode != 0: |
|
|
print(f"β FFmpeg error: {result.stderr}") |
|
|
return 0 |
|
|
|
|
|
audio_info = sf.info(audio_path) |
|
|
print(f" β
Audio extracted: {audio_info.duration:.1f}s") |
|
|
return audio_info.duration |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Audio extraction error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return 0 |
|
|
|
|
|
def transcribe_audio(audio_path): |
|
|
"""Transcribe audio using Whisper""" |
|
|
try: |
|
|
print(" π€ Loading Whisper model...") |
|
|
model = whisper.load_model("base") |
|
|
print(" π€ Transcribing...") |
|
|
result = model.transcribe(audio_path) |
|
|
|
|
|
segments = [] |
|
|
if "segments" in result: |
|
|
for segment in result["segments"]: |
|
|
segments.append({ |
|
|
"start": float(segment.get("start", 0)), |
|
|
"end": float(segment.get("end", 0)), |
|
|
"text": segment.get("text", "") |
|
|
}) |
|
|
|
|
|
return { |
|
|
"full_text": result["text"], |
|
|
"segments": segments |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"β Transcription error: {e}") |
|
|
return {"full_text": "", "segments": []} |
|
|
|
|
|
def extract_acoustic_features(audio_path): |
|
|
"""Extract acoustic features""" |
|
|
try: |
|
|
y, sr = librosa.load(audio_path, duration=60) |
|
|
|
|
|
|
|
|
try: |
|
|
pitch, _, _ = librosa.pyin(y, fmin=50, fmax=300) |
|
|
pitch_clean = pitch[~np.isnan(pitch)] |
|
|
pitch_std = float(np.std(pitch_clean)) if len(pitch_clean) > 0 else 0.0 |
|
|
pitch_mean = float(np.mean(pitch_clean)) if len(pitch_clean) > 0 else 0.0 |
|
|
except: |
|
|
pitch_std = 0.0 |
|
|
pitch_mean = 0.0 |
|
|
|
|
|
|
|
|
rms = librosa.feature.rms(y=y)[0] |
|
|
energy_mean = float(np.mean(rms)) |
|
|
energy_std = float(np.std(rms)) |
|
|
|
|
|
|
|
|
spectral_centroid = float(np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))) |
|
|
|
|
|
return { |
|
|
"pitch_mean": pitch_mean, |
|
|
"pitch_std": pitch_std, |
|
|
"energy_mean": energy_mean, |
|
|
"energy_std": energy_std, |
|
|
"spectral_centroid": spectral_centroid |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"β Acoustic feature extraction error: {e}") |
|
|
return { |
|
|
"pitch_mean": 0, "pitch_std": 0, "energy_mean": 0, |
|
|
"energy_std": 0, "spectral_centroid": 0 |
|
|
} |
|
|
|
|
|
def extract_linguistic_features(transcription_data, duration_sec): |
|
|
"""Extract linguistic features""" |
|
|
try: |
|
|
text = transcription_data["full_text"] |
|
|
words = text.lower().split() |
|
|
word_count = len(words) |
|
|
|
|
|
if word_count == 0 or duration_sec == 0: |
|
|
return { |
|
|
"words_per_minute": 0, |
|
|
"lexical_diversity": 0, |
|
|
"filler_ratio": 0 |
|
|
} |
|
|
|
|
|
words_per_minute = (word_count / duration_sec) * 60 |
|
|
lexical_diversity = len(set(words)) / word_count |
|
|
|
|
|
filler_words = ['um', 'uh', 'like', 'you know', 'so', 'actually', 'basically'] |
|
|
filler_count = sum(1 for word in words if word in filler_words) |
|
|
filler_ratio = filler_count / word_count |
|
|
|
|
|
return { |
|
|
"words_per_minute": float(words_per_minute), |
|
|
"lexical_diversity": float(lexical_diversity), |
|
|
"filler_ratio": float(filler_ratio) |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"β Linguistic feature extraction error: {e}") |
|
|
return {"words_per_minute": 0, "lexical_diversity": 0, "filler_ratio": 0} |
|
|
|
|
|
def detect_ai_text(text): |
|
|
"""Detect if text is AI-generated""" |
|
|
try: |
|
|
if not text or len(text.strip()) < 10: |
|
|
return {"Human": 0.5, "AI": 0.5} |
|
|
|
|
|
inputs = detector_tokenizer(text, return_tensors="pt", truncation=True, |
|
|
padding=True, max_length=512) |
|
|
with torch.no_grad(): |
|
|
outputs = detector_model(**inputs) |
|
|
probs = F.softmax(outputs.logits, dim=1) |
|
|
|
|
|
return { |
|
|
"Human": round(float(probs[0][0]), 4), |
|
|
"AI": round(float(probs[0][1]), 4) |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"β AI detection error: {e}") |
|
|
return {"Human": 0.5, "AI": 0.5} |
|
|
|
|
|
def analyze_video(video_path, video_name): |
|
|
"""Complete video analysis""" |
|
|
try: |
|
|
print(f"\n{'='*60}") |
|
|
print(f"π¬ ANALYZING: {video_name}") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
temp_dir = "temp_analysis" |
|
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
audio_path = os.path.join(temp_dir, "audio.wav") |
|
|
|
|
|
|
|
|
print(" πΉ Step 1/5: Extracting audio...") |
|
|
duration = extract_audio(video_path, audio_path) |
|
|
if duration == 0: |
|
|
return None |
|
|
print(f" β
Audio extracted ({duration:.1f}s)") |
|
|
|
|
|
|
|
|
print(" πΉ Step 2/5: Transcribing...") |
|
|
transcription = transcribe_audio(audio_path) |
|
|
print(f" β
Transcription complete ({len(transcription['full_text'])} chars)") |
|
|
|
|
|
|
|
|
print(" πΉ Step 3/5: Extracting acoustic features...") |
|
|
acoustic = extract_acoustic_features(audio_path) |
|
|
print(" β
Acoustic features extracted") |
|
|
|
|
|
|
|
|
print(" πΉ Step 4/5: Analyzing language...") |
|
|
audio_info = sf.info(audio_path) |
|
|
linguistic = extract_linguistic_features(transcription, audio_info.duration) |
|
|
print(" β
Linguistic features extracted") |
|
|
|
|
|
|
|
|
print(" πΉ Step 5/5: Running AI detection...") |
|
|
ai_result = detect_ai_text(transcription["full_text"]) |
|
|
print(" β
AI detection complete") |
|
|
|
|
|
|
|
|
confidence_score = ( |
|
|
acoustic['energy_mean'] * 0.3 + |
|
|
(1 - linguistic['filler_ratio']) * 0.3 + |
|
|
linguistic['lexical_diversity'] * 0.2 + |
|
|
(1 - abs(linguistic['words_per_minute'] - 150) / 150) * 0.2 |
|
|
) |
|
|
confidence_score = max(0, min(1, confidence_score)) |
|
|
|
|
|
report = { |
|
|
"video_name": video_name, |
|
|
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
|
"duration_seconds": float(audio_info.duration), |
|
|
"confidence_score": float(confidence_score), |
|
|
"acoustic_features": acoustic, |
|
|
"linguistic_features": linguistic, |
|
|
"ai_detection": ai_result, |
|
|
"transcription_preview": transcription["full_text"][:300], |
|
|
"full_transcription": transcription["full_text"] |
|
|
} |
|
|
|
|
|
|
|
|
if os.path.exists(audio_path): |
|
|
os.remove(audio_path) |
|
|
|
|
|
print(f"\nβ
ANALYSIS COMPLETE!") |
|
|
print(f" π Confidence Score: {confidence_score:.2f}") |
|
|
print(f" π€ AI Detection: {max(ai_result, key=ai_result.get)}") |
|
|
print(f"{'='*60}\n") |
|
|
|
|
|
return report |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Analysis error for {video_name}: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def extract_name_mobile_email(filename): |
|
|
"""Extract name, mobile and email from video filename""" |
|
|
try: |
|
|
|
|
|
name_without_ext = os.path.splitext(filename)[0] |
|
|
|
|
|
|
|
|
parts = name_without_ext.split('_') |
|
|
|
|
|
if len(parts) >= 3: |
|
|
name = parts[0].title() |
|
|
mobile = parts[1] |
|
|
email = parts[2] |
|
|
return name, mobile, email |
|
|
elif len(parts) == 2: |
|
|
name = parts[0].title() |
|
|
mobile = parts[1] if parts[1].isdigit() and len(parts[1]) == 10 else "Not Provided" |
|
|
return name, mobile, "Not Provided" |
|
|
else: |
|
|
return filename, "Not Provided", "Not Provided" |
|
|
|
|
|
except: |
|
|
return filename, "Not Provided", "Not Provided" |
|
|
|
|
|
|
|
|
def create_pdf_report(report, filename): |
|
|
"""Create modern single-page A4 PDF report - All content in one page""" |
|
|
try: |
|
|
print("\nπ Creating single-page A4 PDF report...") |
|
|
|
|
|
|
|
|
doc = SimpleDocTemplate(filename, pagesize=A4, |
|
|
topMargin=0.15*inch, |
|
|
bottomMargin=0.15*inch, |
|
|
leftMargin=0.3*inch, |
|
|
rightMargin=0.3*inch) |
|
|
story = [] |
|
|
styles = getSampleStyleSheet() |
|
|
|
|
|
|
|
|
PRIMARY_COLOR = colors.HexColor('#1E40AF') |
|
|
SECONDARY_COLOR = colors.HexColor('#3B82F6') |
|
|
ACCENT_COLOR = colors.HexColor('#10B981') |
|
|
WARNING_COLOR = colors.HexColor('#F59E0B') |
|
|
DANGER_COLOR = colors.HexColor('#EF4444') |
|
|
LIGHT_BG = colors.HexColor('#F8FAFC') |
|
|
DARK_TEXT = colors.HexColor('#1F2937') |
|
|
LIGHT_TEXT = colors.HexColor('#6B7280') |
|
|
|
|
|
|
|
|
candidate_name, mobile_number, email_id = extract_name_mobile_email(report['video_name']) |
|
|
|
|
|
|
|
|
title_style = ParagraphStyle( |
|
|
'CompactTitle', |
|
|
parent=styles['Heading1'], |
|
|
fontSize=16, |
|
|
textColor=colors.white, |
|
|
alignment=TA_CENTER, |
|
|
fontName='Helvetica-Bold', |
|
|
spaceAfter=12 |
|
|
) |
|
|
|
|
|
section_style = ParagraphStyle( |
|
|
'CompactSection', |
|
|
parent=styles['Heading2'], |
|
|
fontSize=11, |
|
|
textColor=PRIMARY_COLOR, |
|
|
fontName='Helvetica-Bold', |
|
|
spaceAfter=8, |
|
|
spaceBefore=12 |
|
|
) |
|
|
|
|
|
metric_label_style = ParagraphStyle( |
|
|
'CompactMetricLabel', |
|
|
parent=styles['Normal'], |
|
|
fontSize=8, |
|
|
textColor=LIGHT_TEXT, |
|
|
alignment=TA_CENTER, |
|
|
fontName='Helvetica' |
|
|
) |
|
|
|
|
|
metric_value_style = ParagraphStyle( |
|
|
'CompactMetricValue', |
|
|
parent=styles['Normal'], |
|
|
fontSize=12, |
|
|
textColor=DARK_TEXT, |
|
|
alignment=TA_CENTER, |
|
|
fontName='Helvetica-Bold' |
|
|
) |
|
|
|
|
|
|
|
|
header_data = [[ |
|
|
Paragraph("INTERVIEW ANALYSIS REPORT", title_style) |
|
|
]] |
|
|
header_table = Table(header_data, colWidths=[7.2*inch]) |
|
|
header_table.setStyle(TableStyle([ |
|
|
('BACKGROUND', (0, 0), (-1, 0), PRIMARY_COLOR), |
|
|
('VALIGN', (0, 0), (-1, 0), 'MIDDLE'), |
|
|
('BOTTOMPADDING', (0, 0), (-1, 0), 12), |
|
|
('TOPPADDING', (0, 0), (-1, 0), 12), |
|
|
])) |
|
|
story.append(header_table) |
|
|
story.append(Spacer(1, 0.05*inch)) |
|
|
|
|
|
|
|
|
candidate_data = [ |
|
|
['π€ Candidate:', candidate_name, 'π± Mobile:', mobile_number], |
|
|
['π§ Email:', email_id, 'β±οΈ Duration:', f"{report['duration_seconds']:.1f}s"], |
|
|
['π₯ Video:', report['video_name'][:25] + '...' if len(report['video_name']) > 25 else report['video_name'], '', ''], |
|
|
] |
|
|
|
|
|
candidate_table = Table(candidate_data, colWidths=[1.2*inch, 2.2*inch, 1.2*inch, 2.2*inch]) |
|
|
candidate_table.setStyle(TableStyle([ |
|
|
('FONTNAME', (0, 0), (-1, -1), 'Helvetica'), |
|
|
('FONTSIZE', (0, 0), (-1, -1), 9), |
|
|
('BOTTOMPADDING', (0, 0), (-1, -1), 6), |
|
|
('TOPPADDING', (0, 0), (-1, -1), 6), |
|
|
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), |
|
|
('SPAN', (2, 2), (3, 2)), |
|
|
])) |
|
|
story.append(candidate_table) |
|
|
story.append(Spacer(1, 0.1*inch)) |
|
|
|
|
|
|
|
|
confidence = report['confidence_score'] |
|
|
if confidence >= 0.8: |
|
|
conf_color = ACCENT_COLOR |
|
|
conf_text = "EXCELLENT" |
|
|
elif confidence >= 0.6: |
|
|
conf_color = WARNING_COLOR |
|
|
conf_text = "GOOD" |
|
|
else: |
|
|
conf_color = DANGER_COLOR |
|
|
conf_text = "NEEDS WORK" |
|
|
|
|
|
confidence_data = [[ |
|
|
Paragraph(f"Overall Score: {confidence:.2f}/1.00", |
|
|
ParagraphStyle('ConfScore', fontSize=11, textColor=colors.white, |
|
|
alignment=TA_CENTER, fontName='Helvetica-Bold')), |
|
|
Paragraph(conf_text, |
|
|
ParagraphStyle('ConfText', fontSize=10, textColor=colors.white, |
|
|
alignment=TA_CENTER, fontName='Helvetica')) |
|
|
]] |
|
|
|
|
|
confidence_table = Table(confidence_data, colWidths=[4*inch, 2.8*inch]) |
|
|
confidence_table.setStyle(TableStyle([ |
|
|
('BACKGROUND', (0, 0), (-1, 0), conf_color), |
|
|
('BOTTOMPADDING', (0, 0), (-1, 0), 8), |
|
|
('TOPPADDING', (0, 0), (-1, 0), 8), |
|
|
])) |
|
|
story.append(confidence_table) |
|
|
story.append(Spacer(1, 0.15*inch)) |
|
|
|
|
|
|
|
|
story.append(Paragraph("π KEY METRICS", section_style)) |
|
|
|
|
|
|
|
|
ai_label = max(report['ai_detection'], key=report['ai_detection'].get) |
|
|
ai_conf = report['ai_detection'][ai_label] |
|
|
|
|
|
metrics_data = [ |
|
|
[ |
|
|
|
|
|
Table([ |
|
|
[Paragraph('SPEAKING PACE', metric_label_style)], |
|
|
[Paragraph(f"{report['linguistic_features']['words_per_minute']:.0f}", metric_value_style)], |
|
|
[Paragraph('WPM', metric_label_style)] |
|
|
], style=[('ALIGN', (0, 0), (-1, -1), 'CENTER')]), |
|
|
|
|
|
Table([ |
|
|
[Paragraph('FILLER WORDS', metric_label_style)], |
|
|
[Paragraph(f"{report['linguistic_features']['filler_ratio']:.1%}", metric_value_style)], |
|
|
[Paragraph('Ratio', metric_label_style)] |
|
|
], style=[('ALIGN', (0, 0), (-1, -1), 'CENTER')]), |
|
|
|
|
|
Table([ |
|
|
[Paragraph('VOCABULARY', metric_label_style)], |
|
|
[Paragraph(f"{report['linguistic_features']['lexical_diversity']:.2f}", metric_value_style)], |
|
|
[Paragraph('Diversity', metric_label_style)] |
|
|
], style=[('ALIGN', (0, 0), (-1, -1), 'CENTER')]) |
|
|
], |
|
|
[ |
|
|
|
|
|
Table([ |
|
|
[Paragraph('VOICE STABILITY', metric_label_style)], |
|
|
[Paragraph(f"{report['acoustic_features']['pitch_std']:.1f}", metric_value_style)], |
|
|
[Paragraph('Std Dev', metric_label_style)] |
|
|
], style=[('ALIGN', (0, 0), (-1, -1), 'CENTER')]), |
|
|
|
|
|
Table([ |
|
|
[Paragraph('VOICE ENERGY', metric_label_style)], |
|
|
[Paragraph(f"{report['acoustic_features']['energy_mean']:.3f}", metric_value_style)], |
|
|
[Paragraph('Level', metric_label_style)] |
|
|
], style=[('ALIGN', (0, 0), (-1, -1), 'CENTER')]), |
|
|
|
|
|
Table([ |
|
|
[Paragraph('AI DETECTION', metric_label_style)], |
|
|
[Paragraph(f"{ai_conf:.1%}", metric_value_style)], |
|
|
[Paragraph(ai_label, metric_label_style)] |
|
|
], style=[('ALIGN', (0, 0), (-1, -1), 'CENTER')]) |
|
|
] |
|
|
] |
|
|
|
|
|
metrics_table = Table(metrics_data, colWidths=[2.2*inch, 2.2*inch, 2.2*inch]) |
|
|
metrics_table.setStyle(TableStyle([ |
|
|
('BACKGROUND', (0, 0), (-1, -1), LIGHT_BG), |
|
|
('BOX', (0, 0), (-1, -1), 1, colors.HexColor('#E5E7EB')), |
|
|
('BOTTOMPADDING', (0, 0), (-1, -1), 10), |
|
|
('TOPPADDING', (0, 0), (-1, -1), 10), |
|
|
])) |
|
|
story.append(metrics_table) |
|
|
story.append(Spacer(1, 0.15*inch)) |
|
|
|
|
|
|
|
|
story.append(Paragraph("π PERFORMANCE STATUS", section_style)) |
|
|
|
|
|
|
|
|
wpm_status = 'π’' if 120 <= report['linguistic_features']['words_per_minute'] <= 180 else 'π‘' if 80 <= report['linguistic_features']['words_per_minute'] <= 220 else 'π΄' |
|
|
filler_status = 'π’' if report['linguistic_features']['filler_ratio'] <= 0.05 else 'π‘' if report['linguistic_features']['filler_ratio'] <= 0.1 else 'π΄' |
|
|
vocab_status = 'π’' if report['linguistic_features']['lexical_diversity'] >= 0.7 else 'π‘' if report['linguistic_features']['lexical_diversity'] >= 0.5 else 'π΄' |
|
|
ai_status = 'π’' if ai_label == 'Human' else 'π΄' |
|
|
|
|
|
status_data = [ |
|
|
['Speaking Pace', get_pace_status(report['linguistic_features']['words_per_minute']), wpm_status], |
|
|
['Speech Fluency', get_fluency_status(report['linguistic_features']['filler_ratio']), filler_status], |
|
|
['Vocabulary Range', get_vocab_status(report['linguistic_features']['lexical_diversity']), vocab_status], |
|
|
['AI Detection', ai_label.upper(), ai_status], |
|
|
] |
|
|
|
|
|
status_table = Table(status_data, colWidths=[2.5*inch, 3.5*inch, 0.6*inch]) |
|
|
status_table.setStyle(TableStyle([ |
|
|
('FONTNAME', (0, 0), (-1, -1), 'Helvetica'), |
|
|
('FONTSIZE', (0, 0), (-1, -1), 9), |
|
|
('BOTTOMPADDING', (0, 0), (-1, -1), 6), |
|
|
('TOPPADDING', (0, 0), (-1, -1), 6), |
|
|
('GRID', (0, 0), (-1, -1), 1, colors.HexColor('#E5E7EB')), |
|
|
])) |
|
|
story.append(status_table) |
|
|
story.append(Spacer(1, 0.15*inch)) |
|
|
|
|
|
|
|
|
story.append(Paragraph("π¬ TRANSCRIPTION", section_style)) |
|
|
|
|
|
trans_text = report['transcription_preview'] |
|
|
if len(trans_text) > 150: |
|
|
trans_text = trans_text[:150] + "..." |
|
|
|
|
|
trans_style = ParagraphStyle( |
|
|
'CompactTranscription', |
|
|
parent=styles['Normal'], |
|
|
fontSize=9, |
|
|
textColor=DARK_TEXT, |
|
|
alignment=TA_JUSTIFY, |
|
|
backColor=LIGHT_BG, |
|
|
borderPadding=8, |
|
|
leftIndent=5, |
|
|
rightIndent=5 |
|
|
) |
|
|
|
|
|
story.append(Paragraph(trans_text, trans_style)) |
|
|
story.append(Spacer(1, 0.1*inch)) |
|
|
|
|
|
|
|
|
story.append(Paragraph("π‘ KEY RECOMMENDATIONS", section_style)) |
|
|
|
|
|
recommendations = [] |
|
|
if report['linguistic_features']['filler_ratio'] > 0.1: |
|
|
recommendations.append("β’ Reduce filler words") |
|
|
if report['linguistic_features']['words_per_minute'] < 120: |
|
|
recommendations.append("β’ Increase speaking pace") |
|
|
elif report['linguistic_features']['words_per_minute'] > 200: |
|
|
recommendations.append("β’ Slow down for clarity") |
|
|
if report['linguistic_features']['lexical_diversity'] < 0.6: |
|
|
recommendations.append("β’ Expand vocabulary") |
|
|
if ai_label != 'Human': |
|
|
recommendations.append("β’ Use natural speech patterns") |
|
|
|
|
|
if not recommendations: |
|
|
recommendations.append("β’ Excellent! Maintain current performance") |
|
|
|
|
|
|
|
|
if len(recommendations) > 3: |
|
|
recommendations = recommendations[:3] |
|
|
|
|
|
recommendations_text = "<br/>".join(recommendations) |
|
|
rec_style = ParagraphStyle( |
|
|
'CompactRecommendations', |
|
|
parent=styles['Normal'], |
|
|
fontSize=9, |
|
|
textColor=DARK_TEXT, |
|
|
alignment=TA_LEFT, |
|
|
backColor=colors.HexColor('#ECFDF5'), |
|
|
borderPadding=8, |
|
|
leftIndent=5 |
|
|
) |
|
|
story.append(Paragraph(recommendations_text, rec_style)) |
|
|
|
|
|
|
|
|
story.append(Spacer(1, 0.1*inch)) |
|
|
|
|
|
footer_text = f"Interview Analysis System (Developed by Avinash Kumar) β’ {report['timestamp']}" |
|
|
footer_style = ParagraphStyle( |
|
|
'CompactFooter', |
|
|
parent=styles['Normal'], |
|
|
fontSize=7, |
|
|
textColor=LIGHT_TEXT, |
|
|
alignment=TA_CENTER |
|
|
) |
|
|
story.append(Paragraph(footer_text, footer_style)) |
|
|
|
|
|
|
|
|
doc.build(story) |
|
|
print(f"β
PDF created: {filename}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β PDF creation error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return False |
|
|
|
|
|
|
|
|
def get_pace_status(wpm): |
|
|
if 120 <= wpm <= 180: |
|
|
return "Optimal" |
|
|
elif 80 <= wpm < 120: |
|
|
return "Slow" |
|
|
elif 180 < wpm <= 220: |
|
|
return "Fast" |
|
|
else: |
|
|
return "Extreme" |
|
|
|
|
|
def get_fluency_status(filler_ratio): |
|
|
if filler_ratio <= 0.05: |
|
|
return "Excellent" |
|
|
elif filler_ratio <= 0.1: |
|
|
return "Good" |
|
|
else: |
|
|
return "Needs Work" |
|
|
|
|
|
def get_vocab_status(lexical_diversity): |
|
|
if lexical_diversity >= 0.7: |
|
|
return "Rich" |
|
|
elif lexical_diversity >= 0.5: |
|
|
return "Average" |
|
|
else: |
|
|
return "Limited" |
|
|
|
|
|
|
|
|
def upload_to_supabase(file_path, filename, bucket_name=REPORTS_BUCKET_NAME): |
|
|
"""Upload file to Supabase storage - SIRF PDF KE LIYE""" |
|
|
try: |
|
|
print(f"π€ Uploading {filename} to Supabase...") |
|
|
|
|
|
with open(file_path, 'rb') as file: |
|
|
|
|
|
result = supabase.storage.from_(bucket_name).upload( |
|
|
file=file, |
|
|
path=filename, |
|
|
file_options={"content-type": "application/pdf"} |
|
|
) |
|
|
|
|
|
print(f"β
Uploaded to Supabase: {filename}") |
|
|
|
|
|
|
|
|
public_url = supabase.storage.from_(bucket_name).get_public_url(filename) |
|
|
print(f"π Public URL: {public_url}") |
|
|
|
|
|
return public_url |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Upload error: {e}") |
|
|
return None |
|
|
|
|
|
def store_analysis_data(report): |
|
|
"""Store analysis data in Supabase database""" |
|
|
try: |
|
|
print("πΎ Storing analysis data in database...") |
|
|
|
|
|
|
|
|
data = { |
|
|
"video_name": report["video_name"], |
|
|
"timestamp": report["timestamp"], |
|
|
"duration_seconds": report["duration_seconds"], |
|
|
"confidence_score": report["confidence_score"], |
|
|
"acoustic_features": report["acoustic_features"], |
|
|
"linguistic_features": report["linguistic_features"], |
|
|
"ai_detection": report["ai_detection"], |
|
|
"transcription_preview": report["transcription_preview"], |
|
|
"full_transcription": report["full_transcription"], |
|
|
"created_at": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
result = supabase.table("video_analysis_results").insert(data).execute() |
|
|
print(f"β
Stored analysis data for: {report['video_name']}") |
|
|
|
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"β Database storage error: {e}") |
|
|
return False |
|
|
|
|
|
def create_and_store_single_report(report): |
|
|
"""Create and store PDF for single video - VIDEO KE NAME SE""" |
|
|
try: |
|
|
print("\nπ Creating and storing individual PDF report...") |
|
|
|
|
|
|
|
|
video_name_without_ext = os.path.splitext(report['video_name'])[0] |
|
|
pdf_filename = f"{video_name_without_ext}_analysis_report.pdf" |
|
|
|
|
|
|
|
|
if not create_pdf_report(report, pdf_filename): |
|
|
print("β Failed to create PDF") |
|
|
return False |
|
|
|
|
|
|
|
|
pdf_url = upload_to_supabase(pdf_filename, pdf_filename) |
|
|
if not pdf_url: |
|
|
print("β Failed to upload PDF") |
|
|
return False |
|
|
|
|
|
|
|
|
try: |
|
|
if not store_analysis_data(report): |
|
|
print("β οΈ Failed to store analysis data, but PDF uploaded successfully") |
|
|
except Exception as e: |
|
|
print(f"β οΈ Database storage failed, but PDF uploaded: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
report_metadata = { |
|
|
"pdf_url": pdf_url, |
|
|
"video_name": report['video_name'], |
|
|
"confidence_score": report["confidence_score"], |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"report_id": f"report_{video_name_without_ext}" |
|
|
} |
|
|
|
|
|
supabase.table("analysis_reports").insert(report_metadata).execute() |
|
|
print("β
Report metadata stored") |
|
|
except Exception as e: |
|
|
print(f"β οΈ Could not store report metadata: {e}") |
|
|
|
|
|
|
|
|
if os.path.exists(pdf_filename): |
|
|
os.remove(pdf_filename) |
|
|
|
|
|
print(f"β
Individual PDF report stored successfully in Supabase!") |
|
|
print(f"π PDF URL: {pdf_url}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Report storage error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return False |
|
|
|
|
|
|
|
|
def setup_database_tables(): |
|
|
"""Create required database tables if they don't exist""" |
|
|
try: |
|
|
print("π§ Checking database tables...") |
|
|
|
|
|
required_tables = ["video_analysis_results", "analysis_reports"] |
|
|
print(f"π Required tables: {required_tables}") |
|
|
print("π‘ Note: Create these tables in Supabase Dashboard -> Table Editor") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Database setup error: {e}") |
|
|
|
|
|
|
|
|
def process_videos(): |
|
|
"""Main video processing loop - EK TIME PE EK VIDEO""" |
|
|
while True: |
|
|
try: |
|
|
print(f"\n{'='*60}") |
|
|
print(f"π CHECKING FOR NEW VIDEOS... ({datetime.now().strftime('%H:%M:%S')})") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
videos = get_bucket_files() |
|
|
new_videos = [v for v in videos if v['name'] not in PROCESSED_FILES] |
|
|
|
|
|
if not new_videos: |
|
|
print("β
No new videos found. Waiting...") |
|
|
time.sleep(30) |
|
|
continue |
|
|
|
|
|
print(f"π― Found {len(new_videos)} new video(s) to process") |
|
|
print("π Processing ONE VIDEO AT A TIME...\n") |
|
|
|
|
|
|
|
|
video = new_videos[0] |
|
|
filename = video['name'] |
|
|
|
|
|
print(f"π¬ PROCESSING: {filename}") |
|
|
|
|
|
|
|
|
video_path = download_video(filename) |
|
|
if not video_path: |
|
|
PROCESSED_FILES.add(filename) |
|
|
continue |
|
|
|
|
|
|
|
|
report = analyze_video(video_path, filename) |
|
|
|
|
|
|
|
|
if os.path.exists(video_path): |
|
|
os.remove(video_path) |
|
|
print(f"ποΈ Deleted local: {filename}") |
|
|
|
|
|
|
|
|
delete_from_supabase(filename) |
|
|
|
|
|
|
|
|
PROCESSED_FILES.add(filename) |
|
|
|
|
|
|
|
|
if report: |
|
|
print(f"\n{'='*60}") |
|
|
print(f"π ANALYSIS COMPLETE - Storing individual PDF report") |
|
|
print(f"{'='*60}") |
|
|
create_and_store_single_report(report) |
|
|
|
|
|
print(f"\nβ
Video '{filename}' processing complete. Waiting 10 seconds for next video...\n") |
|
|
time.sleep(10) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Process error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
time.sleep(30) |
|
|
|
|
|
|
|
|
@app.route('/') |
|
|
def home(): |
|
|
return jsonify({ |
|
|
"status": "running", |
|
|
"service": "Video Analysis System", |
|
|
"processed_files": len(PROCESSED_FILES), |
|
|
"reports_bucket": REPORTS_BUCKET_NAME, |
|
|
"processing_mode": "ONE_VIDEO_AT_A_TIME", |
|
|
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
}) |
|
|
|
|
|
@app.route('/health') |
|
|
def health(): |
|
|
return jsonify({"status": "healthy"}), 200 |
|
|
|
|
|
@app.route('/storage-status') |
|
|
def storage_status(): |
|
|
"""Check storage bucket status""" |
|
|
try: |
|
|
|
|
|
files = supabase.storage.from_(REPORTS_BUCKET_NAME).list() |
|
|
pdf_files = [f for f in files if f['name'].endswith('.pdf')] |
|
|
return jsonify({ |
|
|
"status": "healthy", |
|
|
"reports_bucket": REPORTS_BUCKET_NAME, |
|
|
"total_files": len(files), |
|
|
"pdf_files": len(pdf_files), |
|
|
"bucket_accessible": True |
|
|
}) |
|
|
except Exception as e: |
|
|
return jsonify({ |
|
|
"status": "error", |
|
|
"reports_bucket": REPORTS_BUCKET_NAME, |
|
|
"bucket_accessible": False, |
|
|
"error": str(e) |
|
|
}), 500 |
|
|
|
|
|
@app.route('/stats') |
|
|
def stats(): |
|
|
return jsonify({ |
|
|
"total_processed": len(PROCESSED_FILES), |
|
|
"processed_files": list(PROCESSED_FILES), |
|
|
"bucket": BUCKET_NAME, |
|
|
"processing_mode": "sequential" |
|
|
}) |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
print("\n" + "="*60) |
|
|
print("π VIDEO ANALYSIS SYSTEM STARTING") |
|
|
print("="*60) |
|
|
print(f"π Videos Bucket: {BUCKET_NAME}") |
|
|
print(f"π Reports Bucket: {REPORTS_BUCKET_NAME}") |
|
|
print(f"π Storage Type: PDF ONLY") |
|
|
print(f"π― Processing: ONE VIDEO AT A TIME") |
|
|
print(f"β±οΈ Check interval: 30 seconds") |
|
|
print("="*60 + "\n") |
|
|
|
|
|
|
|
|
setup_storage() |
|
|
setup_database_tables() |
|
|
|
|
|
|
|
|
processor = threading.Thread(target=process_videos, daemon=True) |
|
|
processor.start() |
|
|
|
|
|
|
|
|
port = int(os.getenv("PORT", 7860)) |
|
|
app.run(host='0.0.0.0', port=port, debug=False) |