avinashprajapati's picture
Update app.py
ff98e57 verified
import os
import time
import json
import requests
from datetime import datetime
from supabase import create_client
from flask import Flask, jsonify
import threading
# For video analysi
import whisper
import librosa
import numpy as np
import soundfile as sf
import nltk
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import torch.nn.functional as F
# For PDF generation
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_JUSTIFY
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
# ==================== CONFIGURATION =====================
SUPABASE_URL = os.getenv('SUPABASE_URL', 'https://zccaimlxjhktttzqsleb.supabase.co')
SUPABASE_KEY = os.getenv('SUPABASE_KEY', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InpjY2FpbWx4amhrdHR0enFzbGViIiwicm9sZSI6ImFub24iLCJpYXQiOjE3NTc0ODAxNzEsImV4cCI6MjA3MzA1NjE3MX0.BIgHVR-u2fzCINVsLG1FXfnRu79rezgDMF8JTiMpbfQ')
BUCKET_NAME = os.getenv('BUCKET_NAME', 'interview-videos')
REPORTS_BUCKET_NAME = os.getenv('REPORTS_BUCKET_NAME', 'analysis-reports')
# Initialize
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
app = Flask(__name__)
PROCESSED_FILES = set()
# Download NLTK data
try:
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
except:
pass
# Load AI detection model
print("πŸ”„ Loading AI detection model...")
detector_tokenizer = AutoTokenizer.from_pretrained("andreas122001/roberta-academic-detector")
detector_model = AutoModelForSequenceClassification.from_pretrained("andreas122001/roberta-academic-detector")
print("βœ… AI detection model loaded")
# ==================== BUCKET MANAGEMENT ====================
def create_bucket_if_not_exists(bucket_name=REPORTS_BUCKET_NAME):
"""Create bucket if it doesn't exist"""
try:
print(f"πŸ” Checking if bucket '{bucket_name}' exists...")
# Check if bucket exists by trying to list files
try:
supabase.storage.from_(bucket_name).list()
print(f"βœ… Bucket '{bucket_name}' already exists")
return True
except Exception as e:
# If bucket doesn't exist, create it
if "not found" in str(e).lower():
print(f"πŸ“¦ Creating new bucket: {bucket_name}")
headers = {
"Authorization": f"Bearer {SUPABASE_KEY}",
"Content-Type": "application/json"
}
data = {
"name": bucket_name,
"id": bucket_name,
"public": True,
"file_size_limit": 52428800, # 50MB
"allowed_mime_types": ["application/pdf"]
}
response = requests.post(
f"{SUPABASE_URL}/storage/v1/bucket",
headers=headers,
json=data
)
if response.status_code in [200, 201, 409]:
print(f"βœ… Bucket '{bucket_name}' created successfully")
return True
else:
print(f"❌ Failed to create bucket: {response.text}")
return False
else:
print(f"❌ Error checking bucket: {e}")
return False
except Exception as e:
print(f"❌ Bucket creation error: {e}")
return False
def setup_storage():
"""Setup required storage buckets"""
print("πŸ”„ Setting up storage buckets...")
# Create reports bucket
if create_bucket_if_not_exists(REPORTS_BUCKET_NAME):
print("βœ… Storage setup completed")
return True
else:
print("❌ Storage setup failed")
return False
# ==================== SUPABASE FILE FUNCTIONS ====================
def get_bucket_files():
"""Get files from Supabase bucket"""
try:
files = supabase.storage.from_(BUCKET_NAME).list()
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.webm']
videos = [f for f in files if any(f['name'].lower().endswith(ext) for ext in video_extensions)]
return videos
except Exception as e:
print(f"❌ Error getting files: {e}")
return []
def download_video(filename):
"""Download video from Supabase"""
try:
file_url = supabase.storage.from_(BUCKET_NAME).get_public_url(filename)
response = requests.get(file_url, stream=True, timeout=120)
if response.status_code == 200:
os.makedirs('downloads', exist_ok=True)
file_path = os.path.join('downloads', filename)
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"βœ… Downloaded: {filename}")
return file_path
else:
print(f"❌ Download failed: HTTP {response.status_code}")
return None
except Exception as e:
print(f"❌ Download error: {e}")
return None
def delete_from_supabase(filename):
"""Delete file from Supabase"""
try:
supabase.storage.from_(BUCKET_NAME).remove([filename])
print(f"πŸ—‘οΈ Deleted from Supabase: {filename}")
return True
except Exception as e:
print(f"❌ Delete error: {e}")
return False
# ==================== VIDEO ANALYSIS FUNCTIONS ====================
def extract_audio(video_path, audio_path):
"""Extract audio from video using ffmpeg directly"""
try:
import subprocess
print(" 🎡 Extracting audio with ffmpeg...")
cmd = [
'ffmpeg', '-i', video_path,
'-vn',
'-acodec', 'pcm_s16le',
'-ar', '16000',
'-ac', '1',
'-y',
audio_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
print(f"❌ FFmpeg error: {result.stderr}")
return 0
audio_info = sf.info(audio_path)
print(f" βœ… Audio extracted: {audio_info.duration:.1f}s")
return audio_info.duration
except Exception as e:
print(f"❌ Audio extraction error: {e}")
import traceback
traceback.print_exc()
return 0
def transcribe_audio(audio_path):
"""Transcribe audio using Whisper"""
try:
print(" 🎀 Loading Whisper model...")
model = whisper.load_model("base")
print(" 🎀 Transcribing...")
result = model.transcribe(audio_path)
segments = []
if "segments" in result:
for segment in result["segments"]:
segments.append({
"start": float(segment.get("start", 0)),
"end": float(segment.get("end", 0)),
"text": segment.get("text", "")
})
return {
"full_text": result["text"],
"segments": segments
}
except Exception as e:
print(f"❌ Transcription error: {e}")
return {"full_text": "", "segments": []}
def extract_acoustic_features(audio_path):
"""Extract acoustic features"""
try:
y, sr = librosa.load(audio_path, duration=60)
# Pitch
try:
pitch, _, _ = librosa.pyin(y, fmin=50, fmax=300)
pitch_clean = pitch[~np.isnan(pitch)]
pitch_std = float(np.std(pitch_clean)) if len(pitch_clean) > 0 else 0.0
pitch_mean = float(np.mean(pitch_clean)) if len(pitch_clean) > 0 else 0.0
except:
pitch_std = 0.0
pitch_mean = 0.0
# Energy
rms = librosa.feature.rms(y=y)[0]
energy_mean = float(np.mean(rms))
energy_std = float(np.std(rms))
# Spectral features
spectral_centroid = float(np.mean(librosa.feature.spectral_centroid(y=y, sr=sr)))
return {
"pitch_mean": pitch_mean,
"pitch_std": pitch_std,
"energy_mean": energy_mean,
"energy_std": energy_std,
"spectral_centroid": spectral_centroid
}
except Exception as e:
print(f"❌ Acoustic feature extraction error: {e}")
return {
"pitch_mean": 0, "pitch_std": 0, "energy_mean": 0,
"energy_std": 0, "spectral_centroid": 0
}
def extract_linguistic_features(transcription_data, duration_sec):
"""Extract linguistic features"""
try:
text = transcription_data["full_text"]
words = text.lower().split()
word_count = len(words)
if word_count == 0 or duration_sec == 0:
return {
"words_per_minute": 0,
"lexical_diversity": 0,
"filler_ratio": 0
}
words_per_minute = (word_count / duration_sec) * 60
lexical_diversity = len(set(words)) / word_count
filler_words = ['um', 'uh', 'like', 'you know', 'so', 'actually', 'basically']
filler_count = sum(1 for word in words if word in filler_words)
filler_ratio = filler_count / word_count
return {
"words_per_minute": float(words_per_minute),
"lexical_diversity": float(lexical_diversity),
"filler_ratio": float(filler_ratio)
}
except Exception as e:
print(f"❌ Linguistic feature extraction error: {e}")
return {"words_per_minute": 0, "lexical_diversity": 0, "filler_ratio": 0}
def detect_ai_text(text):
"""Detect if text is AI-generated"""
try:
if not text or len(text.strip()) < 10:
return {"Human": 0.5, "AI": 0.5}
inputs = detector_tokenizer(text, return_tensors="pt", truncation=True,
padding=True, max_length=512)
with torch.no_grad():
outputs = detector_model(**inputs)
probs = F.softmax(outputs.logits, dim=1)
return {
"Human": round(float(probs[0][0]), 4),
"AI": round(float(probs[0][1]), 4)
}
except Exception as e:
print(f"❌ AI detection error: {e}")
return {"Human": 0.5, "AI": 0.5}
def analyze_video(video_path, video_name):
"""Complete video analysis"""
try:
print(f"\n{'='*60}")
print(f"🎬 ANALYZING: {video_name}")
print(f"{'='*60}")
temp_dir = "temp_analysis"
os.makedirs(temp_dir, exist_ok=True)
audio_path = os.path.join(temp_dir, "audio.wav")
# 1. Extract Audio
print(" πŸ“Ή Step 1/5: Extracting audio...")
duration = extract_audio(video_path, audio_path)
if duration == 0:
return None
print(f" βœ… Audio extracted ({duration:.1f}s)")
# 2. Transcribe
print(" πŸ“Ή Step 2/5: Transcribing...")
transcription = transcribe_audio(audio_path)
print(f" βœ… Transcription complete ({len(transcription['full_text'])} chars)")
# 3. Acoustic Features
print(" πŸ“Ή Step 3/5: Extracting acoustic features...")
acoustic = extract_acoustic_features(audio_path)
print(" βœ… Acoustic features extracted")
# 4. Linguistic Features
print(" πŸ“Ή Step 4/5: Analyzing language...")
audio_info = sf.info(audio_path)
linguistic = extract_linguistic_features(transcription, audio_info.duration)
print(" βœ… Linguistic features extracted")
# 5. AI Detection
print(" πŸ“Ή Step 5/5: Running AI detection...")
ai_result = detect_ai_text(transcription["full_text"])
print(" βœ… AI detection complete")
# Calculate confidence score
confidence_score = (
acoustic['energy_mean'] * 0.3 +
(1 - linguistic['filler_ratio']) * 0.3 +
linguistic['lexical_diversity'] * 0.2 +
(1 - abs(linguistic['words_per_minute'] - 150) / 150) * 0.2
)
confidence_score = max(0, min(1, confidence_score))
report = {
"video_name": video_name,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"duration_seconds": float(audio_info.duration),
"confidence_score": float(confidence_score),
"acoustic_features": acoustic,
"linguistic_features": linguistic,
"ai_detection": ai_result,
"transcription_preview": transcription["full_text"][:300],
"full_transcription": transcription["full_text"]
}
# Cleanup
if os.path.exists(audio_path):
os.remove(audio_path)
print(f"\nβœ… ANALYSIS COMPLETE!")
print(f" πŸ“Š Confidence Score: {confidence_score:.2f}")
print(f" πŸ€– AI Detection: {max(ai_result, key=ai_result.get)}")
print(f"{'='*60}\n")
return report
except Exception as e:
print(f"❌ Analysis error for {video_name}: {e}")
import traceback
traceback.print_exc()
return None
# ==================== COMPACT PDF GENERATION (SINGLE PAGE) ====================
def extract_name_mobile_email(filename):
"""Extract name, mobile and email from video filename"""
try:
# Remove extension
name_without_ext = os.path.splitext(filename)[0]
# Split by underscore
parts = name_without_ext.split('_')
if len(parts) >= 3:
name = parts[0].title() # Avinash
mobile = parts[1] # 8235263572
email = parts[2] # avinashprajapati9199@gmail.com
return name, mobile, email
elif len(parts) == 2:
name = parts[0].title()
mobile = parts[1] if parts[1].isdigit() and len(parts[1]) == 10 else "Not Provided"
return name, mobile, "Not Provided"
else:
return filename, "Not Provided", "Not Provided"
except:
return filename, "Not Provided", "Not Provided"
# ==================== SINGLE PAGE A4 PDF GENERATION ====================
def create_pdf_report(report, filename):
"""Create modern single-page A4 PDF report - All content in one page"""
try:
print("\nπŸ“„ Creating single-page A4 PDF report...")
# A4 size with minimal margins
doc = SimpleDocTemplate(filename, pagesize=A4,
topMargin=0.15*inch,
bottomMargin=0.15*inch,
leftMargin=0.3*inch,
rightMargin=0.3*inch)
story = []
styles = getSampleStyleSheet()
# Compact Color Scheme
PRIMARY_COLOR = colors.HexColor('#1E40AF')
SECONDARY_COLOR = colors.HexColor('#3B82F6')
ACCENT_COLOR = colors.HexColor('#10B981')
WARNING_COLOR = colors.HexColor('#F59E0B')
DANGER_COLOR = colors.HexColor('#EF4444')
LIGHT_BG = colors.HexColor('#F8FAFC')
DARK_TEXT = colors.HexColor('#1F2937')
LIGHT_TEXT = colors.HexColor('#6B7280')
# Extract name, mobile and email
candidate_name, mobile_number, email_id = extract_name_mobile_email(report['video_name'])
# Ultra Compact Styles
title_style = ParagraphStyle(
'CompactTitle',
parent=styles['Heading1'],
fontSize=16,
textColor=colors.white,
alignment=TA_CENTER,
fontName='Helvetica-Bold',
spaceAfter=12
)
section_style = ParagraphStyle(
'CompactSection',
parent=styles['Heading2'],
fontSize=11,
textColor=PRIMARY_COLOR,
fontName='Helvetica-Bold',
spaceAfter=8,
spaceBefore=12
)
metric_label_style = ParagraphStyle(
'CompactMetricLabel',
parent=styles['Normal'],
fontSize=8,
textColor=LIGHT_TEXT,
alignment=TA_CENTER,
fontName='Helvetica'
)
metric_value_style = ParagraphStyle(
'CompactMetricValue',
parent=styles['Normal'],
fontSize=12,
textColor=DARK_TEXT,
alignment=TA_CENTER,
fontName='Helvetica-Bold'
)
# ===== COMPACT HEADER =====
header_data = [[
Paragraph("INTERVIEW ANALYSIS REPORT", title_style)
]]
header_table = Table(header_data, colWidths=[7.2*inch])
header_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), PRIMARY_COLOR),
('VALIGN', (0, 0), (-1, 0), 'MIDDLE'),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('TOPPADDING', (0, 0), (-1, 0), 12),
]))
story.append(header_table)
story.append(Spacer(1, 0.05*inch))
# ===== COMPACT CANDIDATE INFO =====
candidate_data = [
['πŸ‘€ Candidate:', candidate_name, 'πŸ“± Mobile:', mobile_number],
['πŸ“§ Email:', email_id, '⏱️ Duration:', f"{report['duration_seconds']:.1f}s"],
['πŸŽ₯ Video:', report['video_name'][:25] + '...' if len(report['video_name']) > 25 else report['video_name'], '', ''],
]
candidate_table = Table(candidate_data, colWidths=[1.2*inch, 2.2*inch, 1.2*inch, 2.2*inch])
candidate_table.setStyle(TableStyle([
('FONTNAME', (0, 0), (-1, -1), 'Helvetica'),
('FONTSIZE', (0, 0), (-1, -1), 9),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
('TOPPADDING', (0, 0), (-1, -1), 6),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('SPAN', (2, 2), (3, 2)), # Span the last row for video name
]))
story.append(candidate_table)
story.append(Spacer(1, 0.1*inch))
# ===== COMPACT CONFIDENCE SCORE =====
confidence = report['confidence_score']
if confidence >= 0.8:
conf_color = ACCENT_COLOR
conf_text = "EXCELLENT"
elif confidence >= 0.6:
conf_color = WARNING_COLOR
conf_text = "GOOD"
else:
conf_color = DANGER_COLOR
conf_text = "NEEDS WORK"
confidence_data = [[
Paragraph(f"Overall Score: {confidence:.2f}/1.00",
ParagraphStyle('ConfScore', fontSize=11, textColor=colors.white,
alignment=TA_CENTER, fontName='Helvetica-Bold')),
Paragraph(conf_text,
ParagraphStyle('ConfText', fontSize=10, textColor=colors.white,
alignment=TA_CENTER, fontName='Helvetica'))
]]
confidence_table = Table(confidence_data, colWidths=[4*inch, 2.8*inch])
confidence_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), conf_color),
('BOTTOMPADDING', (0, 0), (-1, 0), 8),
('TOPPADDING', (0, 0), (-1, 0), 8),
]))
story.append(confidence_table)
story.append(Spacer(1, 0.15*inch))
# ===== COMPACT METRICS GRID - 2x3 =====
story.append(Paragraph("πŸ“Š KEY METRICS", section_style))
# Get AI detection
ai_label = max(report['ai_detection'], key=report['ai_detection'].get)
ai_conf = report['ai_detection'][ai_label]
metrics_data = [
[
# Row 1: Speaking Metrics
Table([
[Paragraph('SPEAKING PACE', metric_label_style)],
[Paragraph(f"{report['linguistic_features']['words_per_minute']:.0f}", metric_value_style)],
[Paragraph('WPM', metric_label_style)]
], style=[('ALIGN', (0, 0), (-1, -1), 'CENTER')]),
Table([
[Paragraph('FILLER WORDS', metric_label_style)],
[Paragraph(f"{report['linguistic_features']['filler_ratio']:.1%}", metric_value_style)],
[Paragraph('Ratio', metric_label_style)]
], style=[('ALIGN', (0, 0), (-1, -1), 'CENTER')]),
Table([
[Paragraph('VOCABULARY', metric_label_style)],
[Paragraph(f"{report['linguistic_features']['lexical_diversity']:.2f}", metric_value_style)],
[Paragraph('Diversity', metric_label_style)]
], style=[('ALIGN', (0, 0), (-1, -1), 'CENTER')])
],
[
# Row 2: Technical Metrics
Table([
[Paragraph('VOICE STABILITY', metric_label_style)],
[Paragraph(f"{report['acoustic_features']['pitch_std']:.1f}", metric_value_style)],
[Paragraph('Std Dev', metric_label_style)]
], style=[('ALIGN', (0, 0), (-1, -1), 'CENTER')]),
Table([
[Paragraph('VOICE ENERGY', metric_label_style)],
[Paragraph(f"{report['acoustic_features']['energy_mean']:.3f}", metric_value_style)],
[Paragraph('Level', metric_label_style)]
], style=[('ALIGN', (0, 0), (-1, -1), 'CENTER')]),
Table([
[Paragraph('AI DETECTION', metric_label_style)],
[Paragraph(f"{ai_conf:.1%}", metric_value_style)],
[Paragraph(ai_label, metric_label_style)]
], style=[('ALIGN', (0, 0), (-1, -1), 'CENTER')])
]
]
metrics_table = Table(metrics_data, colWidths=[2.2*inch, 2.2*inch, 2.2*inch])
metrics_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, -1), LIGHT_BG),
('BOX', (0, 0), (-1, -1), 1, colors.HexColor('#E5E7EB')),
('BOTTOMPADDING', (0, 0), (-1, -1), 10),
('TOPPADDING', (0, 0), (-1, -1), 10),
]))
story.append(metrics_table)
story.append(Spacer(1, 0.15*inch))
# ===== COMPACT STATUS INDICATORS =====
story.append(Paragraph("πŸ“ˆ PERFORMANCE STATUS", section_style))
# Calculate status indicators
wpm_status = '🟒' if 120 <= report['linguistic_features']['words_per_minute'] <= 180 else '🟑' if 80 <= report['linguistic_features']['words_per_minute'] <= 220 else 'πŸ”΄'
filler_status = '🟒' if report['linguistic_features']['filler_ratio'] <= 0.05 else '🟑' if report['linguistic_features']['filler_ratio'] <= 0.1 else 'πŸ”΄'
vocab_status = '🟒' if report['linguistic_features']['lexical_diversity'] >= 0.7 else '🟑' if report['linguistic_features']['lexical_diversity'] >= 0.5 else 'πŸ”΄'
ai_status = '🟒' if ai_label == 'Human' else 'πŸ”΄'
status_data = [
['Speaking Pace', get_pace_status(report['linguistic_features']['words_per_minute']), wpm_status],
['Speech Fluency', get_fluency_status(report['linguistic_features']['filler_ratio']), filler_status],
['Vocabulary Range', get_vocab_status(report['linguistic_features']['lexical_diversity']), vocab_status],
['AI Detection', ai_label.upper(), ai_status],
]
status_table = Table(status_data, colWidths=[2.5*inch, 3.5*inch, 0.6*inch])
status_table.setStyle(TableStyle([
('FONTNAME', (0, 0), (-1, -1), 'Helvetica'),
('FONTSIZE', (0, 0), (-1, -1), 9),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
('TOPPADDING', (0, 0), (-1, -1), 6),
('GRID', (0, 0), (-1, -1), 1, colors.HexColor('#E5E7EB')),
]))
story.append(status_table)
story.append(Spacer(1, 0.15*inch))
# ===== COMPACT TRANSCRIPTION =====
story.append(Paragraph("πŸ’¬ TRANSCRIPTION", section_style))
trans_text = report['transcription_preview']
if len(trans_text) > 150: # Even more compact
trans_text = trans_text[:150] + "..."
trans_style = ParagraphStyle(
'CompactTranscription',
parent=styles['Normal'],
fontSize=9,
textColor=DARK_TEXT,
alignment=TA_JUSTIFY,
backColor=LIGHT_BG,
borderPadding=8,
leftIndent=5,
rightIndent=5
)
story.append(Paragraph(trans_text, trans_style))
story.append(Spacer(1, 0.1*inch))
# ===== COMPACT RECOMMENDATIONS =====
story.append(Paragraph("πŸ’‘ KEY RECOMMENDATIONS", section_style))
recommendations = []
if report['linguistic_features']['filler_ratio'] > 0.1:
recommendations.append("β€’ Reduce filler words")
if report['linguistic_features']['words_per_minute'] < 120:
recommendations.append("β€’ Increase speaking pace")
elif report['linguistic_features']['words_per_minute'] > 200:
recommendations.append("β€’ Slow down for clarity")
if report['linguistic_features']['lexical_diversity'] < 0.6:
recommendations.append("β€’ Expand vocabulary")
if ai_label != 'Human':
recommendations.append("β€’ Use natural speech patterns")
if not recommendations:
recommendations.append("β€’ Excellent! Maintain current performance")
# Limit to 3 recommendations max
if len(recommendations) > 3:
recommendations = recommendations[:3]
recommendations_text = "<br/>".join(recommendations)
rec_style = ParagraphStyle(
'CompactRecommendations',
parent=styles['Normal'],
fontSize=9,
textColor=DARK_TEXT,
alignment=TA_LEFT,
backColor=colors.HexColor('#ECFDF5'),
borderPadding=8,
leftIndent=5
)
story.append(Paragraph(recommendations_text, rec_style))
# ===== COMPACT FOOTER =====
story.append(Spacer(1, 0.1*inch))
footer_text = f"Interview Analysis System (Developed by Avinash Kumar) β€’ {report['timestamp']}"
footer_style = ParagraphStyle(
'CompactFooter',
parent=styles['Normal'],
fontSize=7,
textColor=LIGHT_TEXT,
alignment=TA_CENTER
)
story.append(Paragraph(footer_text, footer_style))
# ===== BUILD PDF =====
doc.build(story)
print(f"βœ… PDF created: {filename}")
return True
except Exception as e:
print(f"❌ PDF creation error: {e}")
import traceback
traceback.print_exc()
return False
# Compact helper functions
def get_pace_status(wpm):
if 120 <= wpm <= 180:
return "Optimal"
elif 80 <= wpm < 120:
return "Slow"
elif 180 < wpm <= 220:
return "Fast"
else:
return "Extreme"
def get_fluency_status(filler_ratio):
if filler_ratio <= 0.05:
return "Excellent"
elif filler_ratio <= 0.1:
return "Good"
else:
return "Needs Work"
def get_vocab_status(lexical_diversity):
if lexical_diversity >= 0.7:
return "Rich"
elif lexical_diversity >= 0.5:
return "Average"
else:
return "Limited"
# ==================== SUPABASE STORAGE FUNCTIONS ====================
def upload_to_supabase(file_path, filename, bucket_name=REPORTS_BUCKET_NAME):
"""Upload file to Supabase storage - SIRF PDF KE LIYE"""
try:
print(f"πŸ“€ Uploading {filename} to Supabase...")
with open(file_path, 'rb') as file:
# Upload the file
result = supabase.storage.from_(bucket_name).upload(
file=file,
path=filename,
file_options={"content-type": "application/pdf"}
)
print(f"βœ… Uploaded to Supabase: {filename}")
# Get public URL
public_url = supabase.storage.from_(bucket_name).get_public_url(filename)
print(f"🌐 Public URL: {public_url}")
return public_url
except Exception as e:
print(f"❌ Upload error: {e}")
return None
def store_analysis_data(report):
"""Store analysis data in Supabase database"""
try:
print("πŸ’Ύ Storing analysis data in database...")
# Insert analysis data into database table
data = {
"video_name": report["video_name"],
"timestamp": report["timestamp"],
"duration_seconds": report["duration_seconds"],
"confidence_score": report["confidence_score"],
"acoustic_features": report["acoustic_features"],
"linguistic_features": report["linguistic_features"],
"ai_detection": report["ai_detection"],
"transcription_preview": report["transcription_preview"],
"full_transcription": report["full_transcription"],
"created_at": datetime.now().isoformat()
}
result = supabase.table("video_analysis_results").insert(data).execute()
print(f"βœ… Stored analysis data for: {report['video_name']}")
return True
except Exception as e:
print(f"❌ Database storage error: {e}")
return False
def create_and_store_single_report(report):
"""Create and store PDF for single video - VIDEO KE NAME SE"""
try:
print("\nπŸ“Š Creating and storing individual PDF report...")
# PDF filename video ke name se banayein (extension change karke .pdf)
video_name_without_ext = os.path.splitext(report['video_name'])[0]
pdf_filename = f"{video_name_without_ext}_analysis_report.pdf"
# 1. Create PDF report
if not create_pdf_report(report, pdf_filename):
print("❌ Failed to create PDF")
return False
# 2. Upload PDF to Supabase
pdf_url = upload_to_supabase(pdf_filename, pdf_filename)
if not pdf_url:
print("❌ Failed to upload PDF")
return False
# 3. Store analysis data in database
try:
if not store_analysis_data(report):
print("⚠️ Failed to store analysis data, but PDF uploaded successfully")
except Exception as e:
print(f"⚠️ Database storage failed, but PDF uploaded: {e}")
# 4. Store report metadata
try:
report_metadata = {
"pdf_url": pdf_url,
"video_name": report['video_name'],
"confidence_score": report["confidence_score"],
"timestamp": datetime.now().isoformat(),
"report_id": f"report_{video_name_without_ext}"
}
supabase.table("analysis_reports").insert(report_metadata).execute()
print("βœ… Report metadata stored")
except Exception as e:
print(f"⚠️ Could not store report metadata: {e}")
# Cleanup local PDF file
if os.path.exists(pdf_filename):
os.remove(pdf_filename)
print(f"βœ… Individual PDF report stored successfully in Supabase!")
print(f"πŸ“Ž PDF URL: {pdf_url}")
return True
except Exception as e:
print(f"❌ Report storage error: {e}")
import traceback
traceback.print_exc()
return False
# ==================== REQUIRED DATABASE TABLES SETUP ====================
def setup_database_tables():
"""Create required database tables if they don't exist"""
try:
print("πŸ”§ Checking database tables...")
required_tables = ["video_analysis_results", "analysis_reports"]
print(f"πŸ“‹ Required tables: {required_tables}")
print("πŸ’‘ Note: Create these tables in Supabase Dashboard -> Table Editor")
except Exception as e:
print(f"❌ Database setup error: {e}")
# ==================== MAIN PROCESS - ONE VIDEO AT A TIME ====================
def process_videos():
"""Main video processing loop - EK TIME PE EK VIDEO"""
while True:
try:
print(f"\n{'='*60}")
print(f"πŸ” CHECKING FOR NEW VIDEOS... ({datetime.now().strftime('%H:%M:%S')})")
print(f"{'='*60}")
videos = get_bucket_files()
new_videos = [v for v in videos if v['name'] not in PROCESSED_FILES]
if not new_videos:
print("βœ… No new videos found. Waiting...")
time.sleep(30)
continue
print(f"🎯 Found {len(new_videos)} new video(s) to process")
print("πŸ”„ Processing ONE VIDEO AT A TIME...\n")
# Sirf PEHLA video process karo
video = new_videos[0]
filename = video['name']
print(f"🎬 PROCESSING: {filename}")
# Download
video_path = download_video(filename)
if not video_path:
PROCESSED_FILES.add(filename) # Mark as processed even if failed
continue
# Analyze
report = analyze_video(video_path, filename)
# Cleanup local file
if os.path.exists(video_path):
os.remove(video_path)
print(f"πŸ—‘οΈ Deleted local: {filename}")
# Delete from Supabase
delete_from_supabase(filename)
# Mark as processed
PROCESSED_FILES.add(filename)
# Store individual PDF report in Supabase
if report:
print(f"\n{'='*60}")
print(f"πŸ“Š ANALYSIS COMPLETE - Storing individual PDF report")
print(f"{'='*60}")
create_and_store_single_report(report)
print(f"\nβœ… Video '{filename}' processing complete. Waiting 10 seconds for next video...\n")
time.sleep(10) # Thoda wait karo next video ke liye
except Exception as e:
print(f"❌ Process error: {e}")
import traceback
traceback.print_exc()
time.sleep(30)
# ==================== FLASK ROUTES ====================
@app.route('/')
def home():
return jsonify({
"status": "running",
"service": "Video Analysis System",
"processed_files": len(PROCESSED_FILES),
"reports_bucket": REPORTS_BUCKET_NAME,
"processing_mode": "ONE_VIDEO_AT_A_TIME",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
@app.route('/health')
def health():
return jsonify({"status": "healthy"}), 200
@app.route('/storage-status')
def storage_status():
"""Check storage bucket status"""
try:
# Check if reports bucket exists and is accessible
files = supabase.storage.from_(REPORTS_BUCKET_NAME).list()
pdf_files = [f for f in files if f['name'].endswith('.pdf')]
return jsonify({
"status": "healthy",
"reports_bucket": REPORTS_BUCKET_NAME,
"total_files": len(files),
"pdf_files": len(pdf_files),
"bucket_accessible": True
})
except Exception as e:
return jsonify({
"status": "error",
"reports_bucket": REPORTS_BUCKET_NAME,
"bucket_accessible": False,
"error": str(e)
}), 500
@app.route('/stats')
def stats():
return jsonify({
"total_processed": len(PROCESSED_FILES),
"processed_files": list(PROCESSED_FILES),
"bucket": BUCKET_NAME,
"processing_mode": "sequential"
})
# ==================== MAIN ====================
if __name__ == '__main__':
print("\n" + "="*60)
print("πŸš€ VIDEO ANALYSIS SYSTEM STARTING")
print("="*60)
print(f"πŸ“ Videos Bucket: {BUCKET_NAME}")
print(f"πŸ“Š Reports Bucket: {REPORTS_BUCKET_NAME}")
print(f"πŸ“„ Storage Type: PDF ONLY")
print(f"🎯 Processing: ONE VIDEO AT A TIME")
print(f"⏱️ Check interval: 30 seconds")
print("="*60 + "\n")
# Setup storage and database
setup_storage()
setup_database_tables()
# Start background processor
processor = threading.Thread(target=process_videos, daemon=True)
processor.start()
# Start Flask server
port = int(os.getenv("PORT", 7860))
app.run(host='0.0.0.0', port=port, debug=False)