import gradio as gr from transformers import pipeline import PyPDF2 import re import os import io import random import time import hashlib import secrets import sqlite3 import tempfile from datetime import datetime, timedelta from pathlib import Path from groq import Groq # Try to import audio libraries (optional) try: import speech_recognition as sr SPEECH_AVAILABLE = True except ImportError: SPEECH_AVAILABLE = False try: from gtts import gTTS TTS_AVAILABLE = True except ImportError: TTS_AVAILABLE = False # ==================== CONFIGURATION ==================== try: from google import genai GENAI_AVAILABLE = True except ImportError: GENAI_AVAILABLE = False genai = None # Environment variables hf_token = os.environ.get("HF_TOKEN") gemini_key = os.environ.get("GEMINI_API_KEY") groq_key = os.environ.get("GROQ_API_KEY") dev_password = os.environ.get("DEV_PASSWORD", "dev123") # Database path DATA_DIR = Path("/tmp/data") if os.path.exists("/tmp") else Path("./data") DATA_DIR.mkdir(exist_ok=True) DB_PATH = DATA_DIR / "student_facilitator.db" # API Clients gemini_client = None groq_client = None if gemini_key and GENAI_AVAILABLE: try: gemini_client = genai.Client(api_key=gemini_key) except: try: import google.generativeai as old_genai old_genai.configure(api_key=gemini_key) gemini_client = old_genai except: pass if groq_key: try: groq_client = Groq(api_key=groq_key) except: pass # Lazy load summarizer summarizer = None def load_summarizer(): global summarizer if summarizer is None: try: summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6", device=-1) except: pass return summarizer # ==================== DATABASE MANAGER ==================== class DatabaseManager: def __init__(self, db_path): self.db_path = db_path self.init_database() def get_connection(self): conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row return conn def init_database(self): conn = self.get_connection() cursor = conn.cursor() cursor.execute("PRAGMA foreign_keys = ON") cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, name TEXT NOT NULL, email TEXT UNIQUE NOT NULL, role TEXT DEFAULT 'student', created_at TEXT DEFAULT (datetime('now')), last_login TEXT, is_active INTEGER DEFAULT 1, is_dev INTEGER DEFAULT 0 ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, session_token TEXT UNIQUE NOT NULL, created_at TEXT DEFAULT (datetime('now')), expires_at TEXT NOT NULL, is_active INTEGER DEFAULT 1, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS quiz_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, score INTEGER NOT NULL, total_questions INTEGER NOT NULL, percentage REAL NOT NULL, material_summary TEXT, timestamp TEXT DEFAULT (datetime('now')), FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS activity_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, action TEXT NOT NULL, tool_used TEXT, timestamp TEXT DEFAULT (datetime('now')), FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL ) """) conn.commit() conn.close() print(f"Database initialized at {self.db_path}") def create_user(self, username, password_hash, name, email, is_dev=False): try: conn = self.get_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO users (username, password_hash, name, email, is_dev) VALUES (?, ?, ?, ?, ?) """, (username, password_hash, name, email, is_dev)) user_id = cursor.lastrowid conn.commit() conn.close() return True, user_id except sqlite3.IntegrityError as e: if "username" in str(e).lower(): return False, "Username already exists" elif "email" in str(e).lower(): return False, "Email already registered" return False, str(e) except Exception as e: return False, str(e) def get_user_by_username(self, username): conn = self.get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE username = ? AND is_active = 1", (username,)) user = cursor.fetchone() conn.close() return dict(user) if user else None def update_last_login(self, user_id): conn = self.get_connection() cursor = conn.cursor() cursor.execute("UPDATE users SET last_login = datetime('now') WHERE id = ?", (user_id,)) conn.commit() conn.close() def create_session(self, user_id, session_token, expires_at): conn = self.get_connection() cursor = conn.cursor() cursor.execute("UPDATE sessions SET is_active = 0 WHERE user_id = ?", (user_id,)) cursor.execute(""" INSERT INTO sessions (user_id, session_token, expires_at) VALUES (?, ?, ?) """, (user_id, session_token, expires_at)) conn.commit() conn.close() return True def get_session(self, session_token): conn = self.get_connection() cursor = conn.cursor() cursor.execute(""" SELECT s.*, u.username, u.name, u.role, u.is_dev, u.id as user_id FROM sessions s JOIN users u ON s.user_id = u.id WHERE s.session_token = ? AND s.is_active = 1 AND datetime(s.expires_at) > datetime('now') AND u.is_active = 1 """, (session_token,)) session = cursor.fetchone() conn.close() return dict(session) if session else None def invalidate_session(self, session_token): conn = self.get_connection() cursor = conn.cursor() cursor.execute("UPDATE sessions SET is_active = 0 WHERE session_token = ?", (session_token,)) conn.commit() conn.close() return True def save_quiz_record(self, user_id, score, total, percentage, material_summary=None): conn = self.get_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO quiz_records (user_id, score, total_questions, percentage, material_summary) VALUES (?, ?, ?, ?, ?) """, (user_id, score, total, percentage, material_summary)) conn.commit() conn.close() return True def get_user_quiz_stats(self, user_id): conn = self.get_connection() cursor = conn.cursor() cursor.execute(""" SELECT COUNT(*) as total_quizzes, AVG(percentage) as avg_score, MAX(percentage) as best_score, MIN(percentage) as worst_score FROM quiz_records WHERE user_id = ? """, (user_id,)) stats = cursor.fetchone() cursor.execute(""" SELECT score, total_questions, percentage, timestamp FROM quiz_records WHERE user_id = ? ORDER BY timestamp DESC LIMIT 10 """, (user_id,)) history = [dict(row) for row in cursor.fetchall()] conn.close() return dict(stats) if stats else None, history def get_database_stats(self): conn = self.get_connection() cursor = conn.cursor() stats = {} cursor.execute("SELECT COUNT(*) FROM users WHERE is_dev = 0") stats['total_users'] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM sessions WHERE is_active = 1") stats['active_sessions'] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM quiz_records") stats['total_quizzes'] = cursor.fetchone()[0] conn.close() return stats db = DatabaseManager(DB_PATH) # ==================== AUTHENTICATION ==================== class UserAuth: def __init__(self, database): self.db = database self.failed_attempts = {} self._init_dev_account() def _hash(self, password): return hashlib.sha256(password.encode()).hexdigest() def _init_dev_account(self): existing = self.db.get_user_by_username("admin") if not existing and dev_password: self.db.create_user( username="admin", password_hash=self._hash(dev_password), name="Developer", email="dev@studentfacilitator.local", is_dev=True ) print("Developer account created") def validate_email(self, email): if not email or len(email) < 12: return False, "Email must be at least 12 characters" if "@" not in email: return False, "Email must contain @" if "." not in email.split("@")[-1]: return False, "Email must have valid domain" if email.count("@") != 1: return False, "Email must contain exactly one @" local, domain = email.split("@") if len(local) < 1 or len(domain) < 3: return False, "Invalid email format" return True, "Valid" def validate_username(self, username): if not username or len(username) < 3: return False, "Username must be at least 3 characters" if not username.isalnum(): return False, "Username must be alphanumeric only" return True, "Valid" def register(self, username, password, name, email): valid, msg = self.validate_username(username) if not valid: return False, msg valid, msg = self.validate_email(email) if not valid: return False, msg if not password or len(password) < 6: return False, "Password must be at least 6 characters" if not name or len(name.strip()) < 2: return False, "Please enter your full name" password_hash = self._hash(password) success, result = self.db.create_user(username, password_hash, name, email) if success: return True, "Account created successfully!" else: return False, result def login(self, username, password): if not username or not password: return None, "Please enter both username and password" if username in self.failed_attempts: if self.failed_attempts[username]["count"] >= 5: last = self.failed_attempts[username]["last_attempt"] if datetime.now() - last < timedelta(minutes=15): return None, "Too many failed attempts. Try again in 15 minutes." else: del self.failed_attempts[username] user = self.db.get_user_by_username(username) if not user: self._record_failed_attempt(username) return None, "Invalid username or password" if user['password_hash'] != self._hash(password): self._record_failed_attempt(username) return None, "Invalid username or password" self.db.update_last_login(user['id']) if username in self.failed_attempts: del self.failed_attempts[username] session_token = secrets.token_urlsafe(32) expires_at = datetime.now() + timedelta(hours=24) self.db.create_session(user['id'], session_token, expires_at) return session_token, { 'user_id': user['id'], 'username': user['username'], 'name': user['name'], 'role': user['role'], 'is_dev': user['is_dev'] } def _record_failed_attempt(self, username): if username not in self.failed_attempts: self.failed_attempts[username] = {"count": 0, "last_attempt": datetime.now()} self.failed_attempts[username]["count"] += 1 self.failed_attempts[username]["last_attempt"] = datetime.now() def validate_session(self, session_token): if not session_token: return None return self.db.get_session(session_token) def logout(self, session_token): if session_token: self.db.invalidate_session(session_token) return True auth_system = UserAuth(db) # ==================== AUDIO FUNCTIONS ==================== def speech_to_text(audio_file): if not SPEECH_AVAILABLE: return "Speech recognition not available" if not audio_file: return "No audio recorded" try: recognizer = sr.Recognizer() with sr.AudioFile(audio_file) as source: audio = recognizer.record(source) text = recognizer.recognize_google(audio) return text except sr.UnknownValueError: return "Could not understand audio" except sr.RequestError: return "Speech recognition service unavailable" except Exception as e: return f"Error: {str(e)}" def process_audio_input(audio_file, current_text): if not audio_file: return current_text text = speech_to_text(audio_file) if text.startswith("Error:") or text.startswith("Speech") or text.startswith("No audio") or text.startswith("Could not"): return current_text return current_text + " " + text if current_text else text def text_to_speech(text, lang='en'): if not TTS_AVAILABLE: return None, "Text-to-speech not available" if not text: return None, "No text to speak" try: tts = gTTS(text=text, lang=lang, slow=False) with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as f: tts.save(f.name) return f.name, "Audio generated" except Exception as e: return None, f"TTS error: {str(e)}" def speak_text(text, lang='en'): if not text: return None audio_file, error = text_to_speech(text, lang) return audio_file # ==================== CORE FUNCTIONS ==================== def extract_text_from_pdf(pdf_file): if pdf_file is None: return None, "Please upload a PDF file." try: if isinstance(pdf_file, str): with open(pdf_file, 'rb') as f: pdf_reader = PyPDF2.PdfReader(f) text = "" for page in pdf_reader.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" else: if hasattr(pdf_file, 'read'): pdf_bytes = pdf_file.read() if hasattr(pdf_file, 'seek'): pdf_file.seek(0) else: pdf_bytes = pdf_file if isinstance(pdf_bytes, bytes): pdf_stream = io.BytesIO(pdf_bytes) else: pdf_stream = io.BytesIO(pdf_bytes.encode() if isinstance(pdf_bytes, str) else pdf_bytes) pdf_reader = PyPDF2.PdfReader(pdf_stream) text = "" for page in pdf_reader.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" text = re.sub(r'\s+', ' ', text).strip() if len(text) < 50: return None, "Could not extract text. PDF may be image-based or scanned." return text, None except Exception as e: return None, f"Error reading PDF: {str(e)}" def summarize_with_gemini(text, max_length, min_length): if not gemini_client: return None try: if hasattr(gemini_client, 'models'): prompt = f"Summarize the following text in {min_length}-{max_length} words:\n\n{text[:15000]}" try: response = gemini_client.models.generate_content( model="gemini-2.5-flash", contents=prompt ) return response.text except: if hasattr(gemini_client, 'GenerativeModel'): model = gemini_client.GenerativeModel('gemini-2.5-flash') response = model.generate_content(prompt) return response.text elif hasattr(gemini_client, 'GenerativeModel'): model = gemini_client.GenerativeModel('gemini-2.5-flash') prompt = f"Summarize the following text in {min_length}-{max_length} words:\n\n{text[:15000]}" response = model.generate_content(prompt) return response.text except Exception as e: print(f"Gemini error: {e}") return None def summarize_pdf(pdf_file, max_length, min_length): text, error = extract_text_from_pdf(pdf_file) if error: return error gemini_result = summarize_with_gemini(text, max_length, min_length) if gemini_result: return gemini_result summ = load_summarizer() if summ: try: result = summ(text[:3500], max_length=max_length, min_length=min_length, do_sample=False) return result[0]['summary_text'] except Exception as e: return f"Error: {str(e)}" return "Error: No summarization available" def generate_essay_with_gemini(prompt, essay_type, word_count, tone): if not gemini_client: return None try: full_prompt = f"""Write a {essay_type} essay in {tone} tone (~{word_count} words). Topic: {prompt} Requirements: Engaging intro, structured body, strong conclusion.""" if hasattr(gemini_client, 'models'): response = gemini_client.models.generate_content( model="gemini-2.5-flash", contents=full_prompt ) essay = response.text.strip() else: model = gemini_client.GenerativeModel('gemini-2.5-flash') response = model.generate_content(full_prompt) essay = response.text.strip() word_count_actual = len(essay.split()) return f"""# {essay_type} Essay: {prompt[:50]}{'...' if len(prompt) > 50 else ''} {essay} --- *~{word_count_actual} words | {tone} tone*""" except Exception as e: print(f"Essay error: {e}") return None def generate_essay(prompt, essay_type, word_count, tone): if not prompt or len(prompt.strip()) < 10: return "Please provide a detailed prompt (at least 10 characters)." result = generate_essay_with_gemini(prompt, essay_type, word_count, tone) if result: return result return "❌ Essay generation failed. Please check Gemini API configuration." def summarize_text(text, max_length, min_length): if len(text.strip()) < 100: return "Please provide at least 100 characters." gemini_result = summarize_with_gemini(text, max_length, min_length) if gemini_result: return gemini_result summ = load_summarizer() if summ: try: result = summ(text[:3500], max_length=max_length, min_length=min_length, do_sample=False) return result[0]['summary_text'] except Exception as e: return f"Error: {str(e)}" return "Error: No summarization available" # ==================== QUIZ FUNCTIONS - FIXED ==================== def extract_sentences(text): if not text or len(text.strip()) < 50: return [] text = re.sub(r'\s+', ' ', text.strip()) sentences = re.split(r'[.!?]+', text) valid_sentences = [] for s in sentences: s = s.strip() words = s.split() if 8 <= len(words) <= 20: if s and s[0].isupper(): valid_sentences.append(s) return valid_sentences[:20] def create_quiz(text, num_questions): sentences = extract_sentences(text) if len(sentences) < num_questions: num_questions = max(1, len(sentences)) if num_questions == 0: return [] selected = random.sample(sentences, num_questions) quiz_data = [] for sentence in selected: words = sentence.split() if len(words) < 5: continue candidates = [w for w in words[2:-2] if len(w) > 3 and w.isalpha()] if not candidates: candidates = [w for w in words[2:-2] if len(w) > 2] if not candidates: continue keyword = random.choice(candidates) question = sentence.replace(keyword, "________", 1) all_words = list(set([w for w in text.split() if len(w) > 3 and w.isalpha() and w.lower() != keyword.lower()])) if len(all_words) < 3: wrong = ["alternative", "option", "choice", "selection"][:3] else: wrong = random.sample(all_words, min(3, len(all_words))) wrong = [w for w in wrong if w.lower() != keyword.lower()] while len(wrong) < 3: wrong.append(f"option_{len(wrong)+1}") options = wrong[:3] + [keyword] random.shuffle(options) quiz_data.append({ "question": question, "options": options, "answer": keyword, "full_sentence": sentence }) return quiz_data def start_quiz(text, num_questions, timer_minutes): if not text or not text.strip(): return ( "⚠️ Please enter study material first!", gr.update(choices=[], visible=False), "", None, 0, 0, None, "⏳ --:--", gr.update(visible=False), "" ) if len(text.strip()) < 100: return ( "⚠️ Please provide at least 100 characters!", gr.update(choices=[], visible=False), "", None, 0, 0, None, "⏳ --:--", gr.update(visible=False), "" ) quiz = create_quiz(text, num_questions) if not quiz or len(quiz) == 0: return ( "⚠️ Could not generate quiz. Add more detailed material!", gr.update(choices=[], visible=False), "", None, 0, 0, None, "⏳ --:--", gr.update(visible=False), "" ) actual_questions = len(quiz) end_time = time.time() + (timer_minutes * 60) material_summary = text[:300] + "..." if len(text) > 300 else text return show_question(quiz, 0, 0, end_time, actual_questions, material_summary) def show_question(quiz, index, score, end_time, total_questions, material_summary): remaining = int(end_time - time.time()) if remaining <= 0: return finish_quiz(quiz, score, index, total_questions, material_summary, time_up=True) if index >= len(quiz): return finish_quiz(quiz, score, len(quiz), total_questions, material_summary) q = quiz[index] mins = remaining // 60 secs = remaining % 60 timer_text = f"⏳ {mins:02d}:{secs:02d}" progress = (index / total_questions) * 20 bar = "█" * int(progress) + "░" * (20 - int(progress)) question_html = f"""
{message}
Your AI-powered academic companion
""") with gr.Row(): login_tab = gr.Button("Sign In", variant="primary") signup_tab = gr.Button("Create Account") with gr.Column(visible=True) as login_form: login_username = gr.Textbox(label="Username", placeholder="Enter username") login_password = gr.Textbox(label="Password", type="password", placeholder="Enter password") login_btn = gr.Button("Sign In", variant="primary") login_message = gr.Markdown(visible=False) with gr.Column(visible=False) as signup_form: signup_name = gr.Textbox(label="Full Name", placeholder="Your name") signup_email = gr.Textbox(label="Email", placeholder="your@email.com") signup_username = gr.Textbox(label="Username", placeholder="Choose username (min 3 chars)") signup_password = gr.Textbox(label="Password", type="password", placeholder="Min 6 characters") signup_btn = gr.Button("Create Account", variant="primary") signup_message = gr.Markdown(visible=False) gr.Markdown("""