MeetGeniusAI / app.py
LovnishVerma's picture
Update app.py
0152b21 verified
import os
import re
import uuid
import shutil
import threading
import torch
from io import BytesIO
import numpy as np
import subprocess # Replacing os.system for stability
import time
from flask import Flask, render_template, request, jsonify, send_file
import yt_dlp
import whisper
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from docx import Document
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.lib.utils import simpleSplit
app = Flask(__name__)
LOCK = threading.Lock()
# ---- CONFIGURATION ----
JOB_TTL_SECONDS = 30 * 60 # 30 minutes
CLEANUP_INTERVAL = 10 * 60 # 10 minutes
BASE_DIR = "jobs"
os.makedirs(BASE_DIR, exist_ok=True)
JOB_STORE = {}
# --- DEVICE SETUP ---
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"๐Ÿš€ Device selected: {DEVICE}")
if DEVICE == "cuda":
print(f"๐ŸŽฎ GPU: {torch.cuda.get_device_name(0)}")
torch.cuda.empty_cache() # Clear old cache
# --- FFMPEG AUTO-DETECTION (FIXED) ---
# Ye ab khud dhundega ki FFmpeg kahan install hai
if shutil.which("ffmpeg") is None:
print("โš ๏ธ CRITICAL ERROR: FFmpeg is not installed or not in PATH!")
print("๐Ÿ‘‰ Please install FFmpeg from https://ffmpeg.org/download.html and add to System Variables.")
# Fallback for Windows default path if user forgot to add to PATH
possible_path = r"C:\ffmpeg\bin"
if os.path.exists(possible_path):
os.environ["PATH"] += os.pathsep + possible_path
print(f"โœ… Found FFmpeg at {possible_path}, added to path.")
else:
print("โŒ FFmpeg not found. Audio processing will fail.")
# --- LOAD MODELS ---
print("โณ Loading AI models (One-time setup)...")
try:
whisper_model = whisper.load_model("base", device=DEVICE)
embedder = SentenceTransformer("all-MiniLM-L6-v2", device=DEVICE)
qa_model_name = "google/flan-t5-base"
tokenizer = AutoTokenizer.from_pretrained(qa_model_name)
qa_model = AutoModelForSeq2SeqLM.from_pretrained(qa_model_name).to(DEVICE)
print("โœ… All Models loaded successfully")
except Exception as e:
print(f"โŒ Model Loading Error: {e}")
exit(1)
# --- HELPER FUNCTIONS ---
def extract_audio(input_path, output_path):
"""
Uses subprocess for safe execution.
Converts video/audio to 16kHz WAV mono for Whisper.
"""
command = [
"ffmpeg", "-y", "-i", input_path,
"-ac", "1", "-ar", "16000", "-vn", output_path
]
# subprocess.run is safer than os.system
try:
subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError as e:
raise Exception(f"FFmpeg conversion failed: {e}")
def clean_sentences(text):
text = re.sub(r'\b(\w+)( \1\b){2,}', r'\1', text, flags=re.I) # Remove repetitions
raw = re.split(r'(?<=[.!?]) +', text)
return [re.sub(r'\s+', ' ', s.strip()) for s in raw if len(s.split()) > 2]
def to_paragraphs(sentences, n=4):
return "\n\n".join(" ".join(sentences[i:i+n]) for i in range(0, len(sentences), n))
def summarize(sentences):
if not sentences:
return "No content.", "No conclusion."
# 1. Embeddings for Summary
with torch.no_grad():
vecs = embedder.encode(sentences, normalize_embeddings=True, batch_size=8)
mean_vec = np.mean(vecs, axis=0)
# FIX: Store index (i) to remember order
# Format: (Score, Index, Sentence)
scored_with_index = []
for i, (s, v) in enumerate(zip(sentences, vecs)):
score = np.dot(v, mean_vec)
scored_with_index.append((score, i, s))
# Step A: Pick Top 3 most important sentences
top_3 = sorted(scored_with_index, key=lambda x: x[0], reverse=True)[:3]
# Step B: Sort those Top 3 back by Index (Time Order)
# Isse pehli line pehle aayegi, aur aakhri line baad mein
top_3_chronological = sorted(top_3, key=lambda x: x[1])
summary = " ".join(s for _, _, s in top_3_chronological)
# 2. Abstractive Conclusion (Same as before)
context_text = " ".join(sentences[:20])
prompt = f"Summarize the main point of this text in one professional paragraph:\n\n{context_text}"
input_ids = tokenizer(prompt, return_tensors="pt", max_length=512, truncation=True).input_ids.to(DEVICE)
with torch.no_grad():
output = qa_model.generate(input_ids, max_length=150, num_beams=4, early_stopping=True)
conclusion = tokenizer.decode(output[0], skip_special_tokens=True)
return summary, conclusion
def build_vectors(job_id):
JOB_STORE[job_id]["vectors"] = []
chunks = JOB_STORE[job_id]["notes"].split("\n\n")
if not chunks or chunks == ['']: return
with torch.no_grad():
vecs = embedder.encode(chunks, normalize_embeddings=True)
for c, v in zip(chunks, vecs):
JOB_STORE[job_id]["vectors"].append({"text": c, "vector": v})
def transcribe(path):
# Move to GPU, transcribe, then clear cache
with torch.no_grad():
res = whisper_model.transcribe(path, fp16=(DEVICE=="cuda"), verbose=False)
if DEVICE == "cuda":
torch.cuda.empty_cache() # IMPORTANT: Free up VRAM after transcription
return " ".join(s["text"] for s in res["segments"])
def generate_ai_answer(question, context):
input_text = f"answer based on context: {context} question: {question}"
input_ids = tokenizer(input_text, return_tensors="pt", max_length=512, truncation=True).input_ids.to(DEVICE)
with torch.no_grad():
outputs = qa_model.generate(input_ids, max_length=200, num_beams=4, early_stopping=True)
return tokenizer.decode(outputs[0], skip_special_tokens=True)
# --- FLASK ROUTES ---
@app.route("/")
def index():
return render_template("landing.html")
def new_job():
# Memory Safety: Remove oldest job if > 20
if len(JOB_STORE) >= 20:
oldest = min(JOB_STORE.keys(), key=lambda k: JOB_STORE[k]['created_at'])
JOB_STORE.pop(oldest)
jid = str(uuid.uuid4())
JOB_STORE[jid] = {
"summary": "", "notes": "", "conclusion": "",
"vectors": [], "created_at": time.time()
}
return jid
@app.route("/dashboard", methods=["GET", "POST"])
def dashboard():
active_tab = request.args.get('tab', 'youtube')
context = {}
if request.method == "POST":
# Changed to Blocking Lock: Wait instead of error
with LOCK:
job_id = new_job()
job_dir = os.path.join(BASE_DIR, job_id)
os.makedirs(job_dir, exist_ok=True)
try:
url = request.form.get("youtube_url")
if not url: raise Exception("URL is missing")
print(f"๐Ÿ“ฅ Processing: {url}")
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': os.path.join(job_dir, 'audio.%(ext)s'),
'postprocessors': [{'key': 'FFmpegExtractAudio','preferredcodec': 'wav'}],
'quiet': True,
'nocheckcertificate': True, # Ignore SSL errors
'socket_timeout': 10, # Retry quickly if stuck
'retries': 10, # Retry more times
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
# Find the wav file
audio_files = [f for f in os.listdir(job_dir) if f.endswith(".wav")]
if not audio_files: raise Exception("Download failed, no audio file found.")
audio_path = os.path.join(job_dir, audio_files[0])
# Transcribe & Process
text = transcribe(audio_path)
sents = clean_sentences(text)
if not sents: raise Exception("Audio transcribed but text was empty.")
summary, conclusion = summarize(sents)
notes = to_paragraphs(sents)
JOB_STORE[job_id].update({"summary": summary, "notes": notes, "conclusion": conclusion})
build_vectors(job_id)
context = {
"job_id": job_id, "summary": summary,
"transcript": notes, "conclusion": conclusion
}
except Exception as e:
print(f"โŒ Error: {e}")
context["error"] = f"Processing Failed: {str(e)}"
finally:
shutil.rmtree(job_dir, ignore_errors=True)
return render_template("dashboard.html", active_tab=active_tab, **context)
@app.route("/enterprise", methods=["POST"])
def enterprise_submit():
with LOCK: # Blocking Lock
file = request.files.get("audio_file")
if not file:
return render_template("dashboard.html", active_tab="enterprise", error="No file uploaded")
job_id = new_job()
job_dir = os.path.join(BASE_DIR, job_id)
os.makedirs(job_dir, exist_ok=True)
try:
original_path = os.path.join(job_dir, file.filename)
file.save(original_path)
wav_path = os.path.join(job_dir, "clean.wav")
# Convert to safe format
extract_audio(original_path, wav_path)
text = transcribe(wav_path)
sents = clean_sentences(text)
if not sents: raise Exception("No speech detected.")
summary, conclusion = summarize(sents)
notes = to_paragraphs(sents)
JOB_STORE[job_id].update({"summary": summary, "notes": notes, "conclusion": conclusion})
build_vectors(job_id)
return render_template("dashboard.html", active_tab="enterprise", job_id=job_id, summary=summary, transcript=notes, conclusion=conclusion)
except Exception as e:
return render_template("dashboard.html", active_tab="enterprise", error=str(e))
finally:
shutil.rmtree(job_dir, ignore_errors=True)
@app.route("/live_submit", methods=["POST"])
def live_submit():
with LOCK: # Locking taaki dusra process clash na kare
file = request.files.get("audio_file")
if not file:
return jsonify({"error": "No audio received"}), 400
job_id = new_job()
job_dir = os.path.join(BASE_DIR, job_id)
os.makedirs(job_dir, exist_ok=True)
try:
# Live recording aksar .webm ya .ogg format mein hoti hai
original_path = os.path.join(job_dir, "live_input.webm")
file.save(original_path)
wav_path = os.path.join(job_dir, "clean.wav")
# Convert WebM/OGG to WAV for Whisper
extract_audio(original_path, wav_path)
# --- Process Audio (Same as Enterprise) ---
text = transcribe(wav_path)
sents = clean_sentences(text)
if not sents: raise Exception("No speech detected in live recording.")
summary, conclusion = summarize(sents)
notes = to_paragraphs(sents)
JOB_STORE[job_id].update({"summary": summary, "notes": notes, "conclusion": conclusion})
build_vectors(job_id)
# JSON response return karein kyuki ye AJAX call hoga
return jsonify({
"job_id": job_id,
"summary": summary,
"transcript": notes,
"conclusion": conclusion,
"status": "success"
})
except Exception as e:
print(f"โŒ Live Error: {e}")
return jsonify({"error": str(e)}), 500
finally:
shutil.rmtree(job_dir, ignore_errors=True)
@app.route("/ask", methods=["POST"])
def ask():
data = request.json if request.is_json else request.form
jid = data.get("job_id")
question = data.get("question")
if not jid or jid not in JOB_STORE:
return jsonify({"answer": "Session expired or invalid. Please upload again."})
store = JOB_STORE[jid].get("vectors", [])
if not store:
return jsonify({"answer": "No content available to answer from."})
try:
q_vec = embedder.encode(question, normalize_embeddings=True)
scored = sorted([(np.dot(q_vec, i["vector"]), i["text"]) for i in store], reverse=True)[:3]
context_text = " ".join([t for _, t in scored])
answer = generate_ai_answer(question, context_text)
return jsonify({"answer": answer})
except Exception as e:
return jsonify({"answer": "I couldn't generate an answer due to an error."})
# --- DOWNLOAD ROUTES (UNCHANGED BUT SAFE) ---
@app.route("/download/word/<job_id>")
def download_word(job_id):
d = JOB_STORE.get(job_id)
if not d: return "Expired ID", 404
doc = Document()
doc.add_heading("AI Summary Report", 0)
doc.add_paragraph(f"Generated on: {time.ctime()}")
doc.add_heading("Summary", 1)
doc.add_paragraph(d["summary"])
doc.add_heading("Conclusion", 1)
doc.add_paragraph(d["conclusion"])
doc.add_heading("Full Transcript", 1)
doc.add_paragraph(d["notes"])
path = f"{job_id}.docx"
doc.save(path)
return send_file(path, as_attachment=True)
@app.route("/download/pdf/<job_id>")
def download_pdf(job_id):
d = JOB_STORE.get(job_id)
if not d:
return "Expired ID", 404
pdf_path = f"{job_id}_MeetGenius_Report.pdf"
c = canvas.Canvas(pdf_path, pagesize=A4)
width, height = A4
x_margin = 40
y = height - 50
def draw_text(title, text):
nonlocal y
c.setFont("Helvetica-Bold", 14)
c.drawString(x_margin, y, title)
y -= 25
c.setFont("Helvetica", 11)
lines = simpleSplit(text, "Helvetica", 11, width - 80)
for line in lines:
if y < 50:
c.showPage()
y = height - 50
c.setFont("Helvetica", 11)
c.drawString(x_margin, y, line)
y -= 15
y -= 20
draw_text("AI Summary Report", f"Generated on: {time.ctime()}")
draw_text("Summary", d["summary"])
draw_text("Conclusion", d["conclusion"])
draw_text("Full Transcript", d["notes"])
c.save()
return send_file(
pdf_path,
as_attachment=True,
download_name="MeetGenius_AI_Report.pdf",
mimetype="application/pdf"
)
# ---- CLEANUP THREAD ----
def cleanup_old_jobs():
while True:
time.sleep(CLEANUP_INTERVAL)
now = time.time()
with LOCK:
# Clean Dictionary
keys_to_remove = [k for k, v in JOB_STORE.items() if now - v['created_at'] > JOB_TTL_SECONDS]
for k in keys_to_remove:
del JOB_STORE[k]
# Clean Files
for f in os.listdir("."):
if f.endswith((".pdf", ".docx", ".wav")):
try:
if now - os.path.getmtime(f) > JOB_TTL_SECONDS:
os.remove(f)
except: pass
print("๐Ÿงน Cleanup cycle completed.")
if __name__ == "__main__":
t = threading.Thread(target=cleanup_old_jobs, daemon=True)
t.start()
app.run(host="0.0.0.0", port=7860)