import pymupdf import pytesseract from PIL import Image import os import numpy as np from sklearn.metrics.pairwise import cosine_similarity from sentence_transformers import SentenceTransformer import gradio as gr from supabase import create_client, Client import uuid import hashlib from openai import OpenAI # ============================================================================= # CONNECTIONS: Read API keys from HF Secrets (environment variables) # Set these in your Space: Settings > Variables and secrets # ============================================================================= supabase: Client = create_client( os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_ANON_KEY") ) client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # ============================================================================= # MODEL: Load the sentence transformer for semantic search # This runs once on startup. It finds which text chunks are most relevant # to the user's question before sending them to GPT. # ============================================================================= model = SentenceTransformer("all-MiniLM-L6-v2") print("Model loaded!") # ============================================================================= # FILE PROCESSING: Extract raw text from uploaded PDFs and images # ============================================================================= def extract_text_from_pdf(file_path): """Opens a PDF and concatenates all page text into one string.""" doc = pymupdf.open(file_path) text = "" for page in doc: text += page.get_text() return text def extract_text_from_image(image_path): """Uses Tesseract OCR to extract text from an image file.""" try: img = Image.open(image_path) extracted_text = pytesseract.image_to_string(img) return extracted_text.strip() except Exception as e: return f"Error extracting text from image: {e}" # ============================================================================= # TEXT CHUNKING: Break long documents into overlapping pieces # Overlap ensures we don't cut off a sentence right at a chunk boundary # ============================================================================= def chunk_text(text, chunk_size=1000, overlap=200): """Splits text into overlapping chunks for semantic search.""" chunks = [] start = 0 while start < len(text): end = start + chunk_size chunks.append(text[start:end]) start += chunk_size - overlap return chunks # ============================================================================= # SEMANTIC SEARCH: Find the 3 most relevant chunks for the question # Uses cosine similarity between the question embedding and chunk embeddings # ============================================================================= def search_relevant_chunks(query, chunks, embeddings): """Returns the top 3 chunks most semantically similar to the query.""" query_vec = model.encode([query]) similarities = cosine_similarity(query_vec, embeddings)[0] top_indices = np.argsort(similarities)[-3:][::-1] return [chunks[i] for i in top_indices] # ============================================================================= # FILE HASHING: Create a unique fingerprint for each uploaded file # Used to track which file was used in a chat session # ============================================================================= def get_file_hash(file_path): """Returns an MD5 hash of the file contents.""" try: with open(file_path, "rb") as f: return hashlib.md5(f.read()).hexdigest() except: return None # ============================================================================= # AI ANSWER: Send question + context to GPT-4o-mini # Uses Socratic method: guides the student rather than just giving answers # If no file is uploaded, answers from general knowledge # ============================================================================= def generate_answer(question, context): """Generates a Socratic/Feynman-style answer using GPT-4o-mini.""" if "No document provided" in context: system_prompt = "You are a helpful academic math tutor. Use the Socratic method to guide the student." else: system_prompt = f"You are an academic assistant. Based only on the following context, answer the question:\n{context}" prompt = f""" {system_prompt} Give me the output without latex format. Use the socratic/feynman method for learning. Question: {question} Answer: """ response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], temperature=0.2 ) return response.choices[0].message.content.strip() # ============================================================================= # CHAT WITH FILE: Main RAG pipeline # Combines file reading, chunking, search, and answer generation # Falls back to general knowledge if no file is uploaded # ============================================================================= def chat_with_file(question, file): """Runs the full RAG pipeline: extract, chunk, search, answer.""" if file is None: return generate_answer(question, context="No document provided. Answer from general knowledge.") file_path = file.name file_extension = os.path.splitext(file_path)[1].lower() if file_extension == ".pdf": text = extract_text_from_pdf(file_path) elif file_extension in [".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff"]: text = extract_text_from_image(file_path) else: return "Unsupported file type. Please upload a PDF or image file." if not text.strip(): return "No text could be extracted from the file." chunks = chunk_text(text) embeddings = model.encode(chunks) top_chunks = search_relevant_chunks(question, chunks, embeddings) combined_context = "\n\n".join(top_chunks) return generate_answer(question, combined_context) # ============================================================================= # DATABASE: Save and load chat history from Supabase # Each message is stored with user_id, session_id, question, and answer # Sessions allow users to revisit past conversations # ============================================================================= def save_chat_to_db(user_id, session_id, question, answer, file_name=None, file_hash=None): """Saves a single Q&A exchange to the chat_history table.""" try: supabase.table("chat_history").insert({ "user_id": user_id, "session_id": session_id, "question": question, "answer": answer, "file_name": file_name, "file_hash": file_hash }).execute() return True except Exception as e: print(f"Error saving chat: {e}") return False def load_chat_history(user_id, session_id=None, limit=50): """Loads chat history for a user, optionally filtered by session.""" try: query = supabase.table("chat_history") .select("*") .eq("user_id", user_id) .order("created_at", desc=False) .limit(limit) if session_id: query = query.eq("session_id", session_id) response = query.execute() history = [] for msg in response.data: history.append([msg["question"], msg["answer"]]) return history except Exception as e: print(f"Error loading history: {e}") return [] def get_user_sessions(user_id, limit=10): """Returns a deduplicated list of recent sessions for a user.""" try: response = supabase.table("chat_history") .select("session_id, created_at, file_name") .eq("user_id", user_id) .order("created_at", desc=True) .limit(limit * 5) .execute() sessions = {} for msg in response.data: sid = msg["session_id"] if sid not in sessions: sessions[sid] = { "session_id": sid, "created_at": msg["created_at"], "file_name": msg.get("file_name", "No file") } return list(sessions.values())[:limit] except Exception as e: print(f"Error loading sessions: {e}") return [] # ============================================================================= # AUTH MANAGER: Handles signup, login, and logout via Supabase Auth # Stores the current user and session ID in memory while the app is running # ============================================================================= class AuthManager: def __init__(self): self.current_user = None self.session_id = None def signup(self, email, password, username): """Creates a new Supabase Auth user with username in metadata.""" try: response = supabase.auth.sign_up({ "email": email, "password": password, "options": {"data": {"username": username}} }) if response.user: return True, "Account created! Please check your email to verify." else: return False, "Signup failed" except Exception as e: error_msg = str(e) if "duplicate" in error_msg.lower() or "unique" in error_msg.lower(): return False, "Username or email already exists" return False, f"Error: {error_msg}" def login(self, email, password): """Signs in with email and password, returns user ID on success.""" try: response = supabase.auth.sign_in_with_password({ "email": email, "password": password }) if response.user: self.current_user = response.user self.session_id = str(uuid.uuid4()) profile = supabase.table("user_profiles") .select("username") .eq("id", response.user.id) .execute() username = profile.data[0]["username"] if profile.data else "User" return True, f"Welcome back, {username}!", response.user.id else: return False, "Invalid credentials", None except Exception as e: return False, f"Login error: {str(e)}", None def logout(self): """Signs out and clears local user state.""" try: supabase.auth.sign_out() self.current_user = None self.session_id = None return True, "Logged out successfully" except Exception as e: return False, f"Logout error: {str(e)}" def is_authenticated(self): """Returns True if a user is currently logged in.""" return self.current_user is not None # Create a single global auth manager instance auth = AuthManager() # ============================================================================= # CHAT HANDLER: Combines chat_with_file with database saving # Requires the user to be logged in before processing # ============================================================================= def chat_with_file_and_save(question, file, history, user_id, session_id): """Processes a question, saves the result to DB, updates chat display.""" if not auth.is_authenticated(): return history + [["", "Please login to use the chatbot."]], "", None answer = chat_with_file(question, file) file_name = os.path.basename(file.name) if file else None file_hash = get_file_hash(file.name) if file else None save_chat_to_db( user_id=user_id, session_id=session_id, question=question, answer=answer, file_name=file_name, file_hash=file_hash ) history = history + [[question, answer]] return history, "", None # ============================================================================= # GRADIO INTERFACE: Full UI with two tabs # Tab 1: Login / Signup # Tab 2: Chat with file upload, session history, and session loader # ============================================================================= def create_interface(): with gr.Blocks(title="Math Tutor Chatbot", theme=gr.themes.Soft()) as demo: # Hidden state: stores user ID and session ID across interactions user_id_state = gr.State(None) session_id_state = gr.State(None) gr.Markdown("# Math Tutor Chatbot") gr.Markdown("Create an account to save your chat history and get Socratic math tutoring!") with gr.Tabs() as tabs: # ── TAB 1: Login and Signup ──────────────────────────────────── with gr.Tab("Login / Sign Up", id="login_tab"): with gr.Row(): # Left side: Login with gr.Column(): gr.Markdown("### Login to Existing Account") login_email = gr.Textbox(label="Email", placeholder="you@example.com") login_password = gr.Textbox(label="Password", type="password") login_btn = gr.Button("Login", variant="primary", size="lg") login_msg = gr.Markdown("") # Right side: Signup with gr.Column(): gr.Markdown("### Create New Account") signup_email = gr.Textbox(label="Email", placeholder="you@example.com") signup_username = gr.Textbox(label="Username", placeholder="cool_username") signup_password = gr.Textbox(label="Password", type="password") signup_btn = gr.Button("Sign Up", variant="primary", size="lg") signup_msg = gr.Markdown("") # ── TAB 2: Chat ──────────────────────────────────────────────── with gr.Tab("Chat", id="chat_tab"): gr.Markdown("### Upload a PDF or image and ask questions!") with gr.Row(): # Left: Chat area with gr.Column(scale=3): chatbot = gr.Chatbot(label="Conversation", height=500, type="tuples") with gr.Row(): question_input = gr.Textbox( show_label=False, placeholder="Ask a math question or about your uploaded file...", scale=6 ) file_input = gr.File( label="Attach", file_types=[".pdf", ".png", ".jpg", ".jpeg"], scale=1 ) send_btn = gr.Button("Send", scale=1, variant="primary") with gr.Row(): new_session_btn = gr.Button("New Session", size="sm") clear_btn = gr.Button("Clear Chat", size="sm") logout_btn = gr.Button("Logout", size="sm") # Right: Session history panel with gr.Column(scale=1): gr.Markdown("### Your Past Sessions") sessions_display = gr.Dataframe( headers=["Date", "File"], datatype=["str", "str"], interactive=False, row_count=5 ) refresh_sessions_btn = gr.Button("Refresh Sessions", size="sm") gr.Markdown("**Load a Previous Session:**") session_dropdown = gr.Dropdown( label="Select Session", choices=[], interactive=True, value=None ) load_session_btn = gr.Button("Load Selected Session", size="sm", variant="primary") # ── EVENT HANDLERS ───────────────────────────────────────────────── def handle_login(email, password): """Logs in and switches to the chat tab on success.""" success, message, uid = auth.login(email, password) if success: return message, uid, str(uuid.uuid4()), gr.update(selected="chat_tab") else: return message, None, None, gr.update() def handle_signup(email, password, username): """Creates a new account and returns a status message.""" success, message = auth.signup(email, password, username) return message def handle_send(question, file, history, user_id, session_id): """Sends the question through the RAG pipeline and saves result.""" if not user_id: return history + [["", "Please login first!"]], "", None return chat_with_file_and_save(question, file, history, user_id, session_id) def handle_logout(): """Logs out and switches back to the login tab.""" auth.logout() return [], "Logged out successfully", None, None, gr.update(selected="login_tab") def handle_new_session(user_id): """Clears the chat and generates a fresh session ID.""" return [], str(uuid.uuid4()) def handle_refresh_sessions(user_id): """Loads recent sessions from DB and populates the dropdown.""" if not user_id: return [["Login first", ""]], [] sessions = get_user_sessions(user_id, limit=20) if not sessions: return [["No sessions yet", ""]], [] df_data = [ [s["created_at"][:19], s["file_name"] or "No file"] for s in sessions ] # Using .format() instead of f-strings to avoid quote conflicts dropdown_choices = [ "{} - {}".format(s["created_at"][:19], (s["file_name"] or "No file")[:20]) for s in sessions ] return df_data, gr.update(choices=dropdown_choices, value=None) def handle_load_session(user_id, selected_session_dropdown): """Loads a previously selected session into the chat window.""" if not user_id or not selected_session_dropdown: return [], None, "Select a session first" sessions = get_user_sessions(user_id, limit=20) selected_date = selected_session_dropdown.split(" - ")[0] matching_session = next( (s["session_id"] for s in sessions if s["created_at"][:19] == selected_date), None ) if matching_session: return load_chat_history(user_id, matching_session), matching_session, "Session loaded!" return [], None, "Session not found" # ── WIRE UP BUTTONS TO HANDLERS ──────────────────────────────────── login_btn.click( fn=handle_login, inputs=[login_email, login_password], outputs=[login_msg, user_id_state, session_id_state, tabs] ) signup_btn.click( fn=handle_signup, inputs=[signup_email, signup_password, signup_username], outputs=[signup_msg] ) send_btn.click( fn=handle_send, inputs=[question_input, file_input, chatbot, user_id_state, session_id_state], outputs=[chatbot, question_input, file_input] ) question_input.submit( fn=handle_send, inputs=[question_input, file_input, chatbot, user_id_state, session_id_state], outputs=[chatbot, question_input, file_input] ) logout_btn.click( fn=handle_logout, outputs=[chatbot, login_msg, user_id_state, session_id_state, tabs] ) new_session_btn.click( fn=handle_new_session, inputs=[user_id_state], outputs=[chatbot, session_id_state] ) clear_btn.click(fn=lambda: [], outputs=[chatbot]) refresh_sessions_btn.click( fn=handle_refresh_sessions, inputs=[user_id_state], outputs=[sessions_display, session_dropdown] ) load_session_btn.click( fn=handle_load_session, inputs=[user_id_state, session_dropdown], outputs=[chatbot, session_id_state, login_msg] ) return demo if __name__ == "__main__": demo = create_interface() demo.launch()