Spaces:
Sleeping
Sleeping
| import pymupdf | |
| import pytesseract | |
| from PIL import Image | |
| import os | |
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sentence_transformers import SentenceTransformer | |
| import gradio as gr | |
| from supabase import create_client, Client | |
| import uuid | |
| import hashlib | |
| from openai import OpenAI | |
| # ============================================================================= | |
| # CONNECTIONS: Read API keys from HF Secrets (environment variables) | |
| # Set these in your Space: Settings > Variables and secrets | |
| # ============================================================================= | |
| supabase: Client = create_client( | |
| os.getenv("SUPABASE_URL"), | |
| os.getenv("SUPABASE_ANON_KEY") | |
| ) | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| # ============================================================================= | |
| # MODEL: Load the sentence transformer for semantic search | |
| # This runs once on startup. It finds which text chunks are most relevant | |
| # to the user's question before sending them to GPT. | |
| # ============================================================================= | |
| model = SentenceTransformer("all-MiniLM-L6-v2") | |
| print("Model loaded!") | |
| # ============================================================================= | |
| # FILE PROCESSING: Extract raw text from uploaded PDFs and images | |
| # ============================================================================= | |
| def extract_text_from_pdf(file_path): | |
| """Opens a PDF and concatenates all page text into one string.""" | |
| doc = pymupdf.open(file_path) | |
| text = "" | |
| for page in doc: | |
| text += page.get_text() | |
| return text | |
| def extract_text_from_image(image_path): | |
| """Uses Tesseract OCR to extract text from an image file.""" | |
| try: | |
| img = Image.open(image_path) | |
| extracted_text = pytesseract.image_to_string(img) | |
| return extracted_text.strip() | |
| except Exception as e: | |
| return f"Error extracting text from image: {e}" | |
| # ============================================================================= | |
| # TEXT CHUNKING: Break long documents into overlapping pieces | |
| # Overlap ensures we don't cut off a sentence right at a chunk boundary | |
| # ============================================================================= | |
| def chunk_text(text, chunk_size=1000, overlap=200): | |
| """Splits text into overlapping chunks for semantic search.""" | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = start + chunk_size | |
| chunks.append(text[start:end]) | |
| start += chunk_size - overlap | |
| return chunks | |
| # ============================================================================= | |
| # SEMANTIC SEARCH: Find the 3 most relevant chunks for the question | |
| # Uses cosine similarity between the question embedding and chunk embeddings | |
| # ============================================================================= | |
| def search_relevant_chunks(query, chunks, embeddings): | |
| """Returns the top 3 chunks most semantically similar to the query.""" | |
| query_vec = model.encode([query]) | |
| similarities = cosine_similarity(query_vec, embeddings)[0] | |
| top_indices = np.argsort(similarities)[-3:][::-1] | |
| return [chunks[i] for i in top_indices] | |
| # ============================================================================= | |
| # FILE HASHING: Create a unique fingerprint for each uploaded file | |
| # Used to track which file was used in a chat session | |
| # ============================================================================= | |
| def get_file_hash(file_path): | |
| """Returns an MD5 hash of the file contents.""" | |
| try: | |
| with open(file_path, "rb") as f: | |
| return hashlib.md5(f.read()).hexdigest() | |
| except: | |
| return None | |
| # ============================================================================= | |
| # AI ANSWER: Send question + context to GPT-4o-mini | |
| # Uses Socratic method: guides the student rather than just giving answers | |
| # If no file is uploaded, answers from general knowledge | |
| # ============================================================================= | |
| def generate_answer(question, context): | |
| """Generates a Socratic/Feynman-style answer using GPT-4o-mini.""" | |
| if "No document provided" in context: | |
| system_prompt = "You are a helpful academic math tutor. Use the Socratic method to guide the student." | |
| else: | |
| system_prompt = f"You are an academic assistant. Based only on the following context, answer the question:\n{context}" | |
| prompt = f""" | |
| {system_prompt} | |
| Give me the output without latex format. | |
| Use the socratic/feynman method for learning. | |
| Question: | |
| {question} | |
| Answer: | |
| """ | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.2 | |
| ) | |
| return response.choices[0].message.content.strip() | |
| # ============================================================================= | |
| # CHAT WITH FILE: Main RAG pipeline | |
| # Combines file reading, chunking, search, and answer generation | |
| # Falls back to general knowledge if no file is uploaded | |
| # ============================================================================= | |
| def chat_with_file(question, file): | |
| """Runs the full RAG pipeline: extract, chunk, search, answer.""" | |
| if file is None: | |
| return generate_answer(question, context="No document provided. Answer from general knowledge.") | |
| file_path = file.name | |
| file_extension = os.path.splitext(file_path)[1].lower() | |
| if file_extension == ".pdf": | |
| text = extract_text_from_pdf(file_path) | |
| elif file_extension in [".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff"]: | |
| text = extract_text_from_image(file_path) | |
| else: | |
| return "Unsupported file type. Please upload a PDF or image file." | |
| if not text.strip(): | |
| return "No text could be extracted from the file." | |
| chunks = chunk_text(text) | |
| embeddings = model.encode(chunks) | |
| top_chunks = search_relevant_chunks(question, chunks, embeddings) | |
| combined_context = "\n\n".join(top_chunks) | |
| return generate_answer(question, combined_context) | |
| # ============================================================================= | |
| # DATABASE: Save and load chat history from Supabase | |
| # Each message is stored with user_id, session_id, question, and answer | |
| # Sessions allow users to revisit past conversations | |
| # ============================================================================= | |
| def save_chat_to_db(user_id, session_id, question, answer, file_name=None, file_hash=None): | |
| """Saves a single Q&A exchange to the chat_history table.""" | |
| try: | |
| supabase.table("chat_history").insert({ | |
| "user_id": user_id, | |
| "session_id": session_id, | |
| "question": question, | |
| "answer": answer, | |
| "file_name": file_name, | |
| "file_hash": file_hash | |
| }).execute() | |
| return True | |
| except Exception as e: | |
| print(f"Error saving chat: {e}") | |
| return False | |
| def load_chat_history(user_id, session_id=None, limit=50): | |
| """Loads chat history for a user, optionally filtered by session.""" | |
| try: | |
| query = supabase.table("chat_history") .select("*") .eq("user_id", user_id) .order("created_at", desc=False) .limit(limit) | |
| if session_id: | |
| query = query.eq("session_id", session_id) | |
| response = query.execute() | |
| history = [] | |
| for msg in response.data: | |
| history.append([msg["question"], msg["answer"]]) | |
| return history | |
| except Exception as e: | |
| print(f"Error loading history: {e}") | |
| return [] | |
| def get_user_sessions(user_id, limit=10): | |
| """Returns a deduplicated list of recent sessions for a user.""" | |
| try: | |
| response = supabase.table("chat_history") .select("session_id, created_at, file_name") .eq("user_id", user_id) .order("created_at", desc=True) .limit(limit * 5) .execute() | |
| sessions = {} | |
| for msg in response.data: | |
| sid = msg["session_id"] | |
| if sid not in sessions: | |
| sessions[sid] = { | |
| "session_id": sid, | |
| "created_at": msg["created_at"], | |
| "file_name": msg.get("file_name", "No file") | |
| } | |
| return list(sessions.values())[:limit] | |
| except Exception as e: | |
| print(f"Error loading sessions: {e}") | |
| return [] | |
| # ============================================================================= | |
| # AUTH MANAGER: Handles signup, login, and logout via Supabase Auth | |
| # Stores the current user and session ID in memory while the app is running | |
| # ============================================================================= | |
| class AuthManager: | |
| def __init__(self): | |
| self.current_user = None | |
| self.session_id = None | |
| def signup(self, email, password, username): | |
| """Creates a new Supabase Auth user with username in metadata.""" | |
| try: | |
| response = supabase.auth.sign_up({ | |
| "email": email, | |
| "password": password, | |
| "options": {"data": {"username": username}} | |
| }) | |
| if response.user: | |
| return True, "Account created! Please check your email to verify." | |
| else: | |
| return False, "Signup failed" | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "duplicate" in error_msg.lower() or "unique" in error_msg.lower(): | |
| return False, "Username or email already exists" | |
| return False, f"Error: {error_msg}" | |
| def login(self, email, password): | |
| """Signs in with email and password, returns user ID on success.""" | |
| try: | |
| response = supabase.auth.sign_in_with_password({ | |
| "email": email, | |
| "password": password | |
| }) | |
| if response.user: | |
| self.current_user = response.user | |
| self.session_id = str(uuid.uuid4()) | |
| profile = supabase.table("user_profiles") .select("username") .eq("id", response.user.id) .execute() | |
| username = profile.data[0]["username"] if profile.data else "User" | |
| return True, f"Welcome back, {username}!", response.user.id | |
| else: | |
| return False, "Invalid credentials", None | |
| except Exception as e: | |
| return False, f"Login error: {str(e)}", None | |
| def logout(self): | |
| """Signs out and clears local user state.""" | |
| try: | |
| supabase.auth.sign_out() | |
| self.current_user = None | |
| self.session_id = None | |
| return True, "Logged out successfully" | |
| except Exception as e: | |
| return False, f"Logout error: {str(e)}" | |
| def is_authenticated(self): | |
| """Returns True if a user is currently logged in.""" | |
| return self.current_user is not None | |
| # Create a single global auth manager instance | |
| auth = AuthManager() | |
| # ============================================================================= | |
| # CHAT HANDLER: Combines chat_with_file with database saving | |
| # Requires the user to be logged in before processing | |
| # ============================================================================= | |
| def chat_with_file_and_save(question, file, history, user_id, session_id): | |
| """Processes a question, saves the result to DB, updates chat display.""" | |
| if not auth.is_authenticated(): | |
| return history + [["", "Please login to use the chatbot."]], "", None | |
| answer = chat_with_file(question, file) | |
| file_name = os.path.basename(file.name) if file else None | |
| file_hash = get_file_hash(file.name) if file else None | |
| save_chat_to_db( | |
| user_id=user_id, | |
| session_id=session_id, | |
| question=question, | |
| answer=answer, | |
| file_name=file_name, | |
| file_hash=file_hash | |
| ) | |
| history = history + [[question, answer]] | |
| return history, "", None | |
| # ============================================================================= | |
| # GRADIO INTERFACE: Full UI with two tabs | |
| # Tab 1: Login / Signup | |
| # Tab 2: Chat with file upload, session history, and session loader | |
| # ============================================================================= | |
| def create_interface(): | |
| with gr.Blocks(title="Math Tutor Chatbot", theme=gr.themes.Soft()) as demo: | |
| # Hidden state: stores user ID and session ID across interactions | |
| user_id_state = gr.State(None) | |
| session_id_state = gr.State(None) | |
| gr.Markdown("# Math Tutor Chatbot") | |
| gr.Markdown("Create an account to save your chat history and get Socratic math tutoring!") | |
| with gr.Tabs() as tabs: | |
| # ββ TAB 1: Login and Signup ββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("Login / Sign Up", id="login_tab"): | |
| with gr.Row(): | |
| # Left side: Login | |
| with gr.Column(): | |
| gr.Markdown("### Login to Existing Account") | |
| login_email = gr.Textbox(label="Email", placeholder="you@example.com") | |
| login_password = gr.Textbox(label="Password", type="password") | |
| login_btn = gr.Button("Login", variant="primary", size="lg") | |
| login_msg = gr.Markdown("") | |
| # Right side: Signup | |
| with gr.Column(): | |
| gr.Markdown("### Create New Account") | |
| signup_email = gr.Textbox(label="Email", placeholder="you@example.com") | |
| signup_username = gr.Textbox(label="Username", placeholder="cool_username") | |
| signup_password = gr.Textbox(label="Password", type="password") | |
| signup_btn = gr.Button("Sign Up", variant="primary", size="lg") | |
| signup_msg = gr.Markdown("") | |
| # ββ TAB 2: Chat ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("Chat", id="chat_tab"): | |
| gr.Markdown("### Upload a PDF or image and ask questions!") | |
| with gr.Row(): | |
| # Left: Chat area | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot(label="Conversation", height=500, type="tuples") | |
| with gr.Row(): | |
| question_input = gr.Textbox( | |
| show_label=False, | |
| placeholder="Ask a math question or about your uploaded file...", | |
| scale=6 | |
| ) | |
| file_input = gr.File( | |
| label="Attach", | |
| file_types=[".pdf", ".png", ".jpg", ".jpeg"], | |
| scale=1 | |
| ) | |
| send_btn = gr.Button("Send", scale=1, variant="primary") | |
| with gr.Row(): | |
| new_session_btn = gr.Button("New Session", size="sm") | |
| clear_btn = gr.Button("Clear Chat", size="sm") | |
| logout_btn = gr.Button("Logout", size="sm") | |
| # Right: Session history panel | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Your Past Sessions") | |
| sessions_display = gr.Dataframe( | |
| headers=["Date", "File"], | |
| datatype=["str", "str"], | |
| interactive=False, | |
| row_count=5 | |
| ) | |
| refresh_sessions_btn = gr.Button("Refresh Sessions", size="sm") | |
| gr.Markdown("**Load a Previous Session:**") | |
| session_dropdown = gr.Dropdown( | |
| label="Select Session", | |
| choices=[], | |
| interactive=True, | |
| value=None | |
| ) | |
| load_session_btn = gr.Button("Load Selected Session", size="sm", variant="primary") | |
| # ββ EVENT HANDLERS βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def handle_login(email, password): | |
| """Logs in and switches to the chat tab on success.""" | |
| success, message, uid = auth.login(email, password) | |
| if success: | |
| return message, uid, str(uuid.uuid4()), gr.update(selected="chat_tab") | |
| else: | |
| return message, None, None, gr.update() | |
| def handle_signup(email, password, username): | |
| """Creates a new account and returns a status message.""" | |
| success, message = auth.signup(email, password, username) | |
| return message | |
| def handle_send(question, file, history, user_id, session_id): | |
| """Sends the question through the RAG pipeline and saves result.""" | |
| if not user_id: | |
| return history + [["", "Please login first!"]], "", None | |
| return chat_with_file_and_save(question, file, history, user_id, session_id) | |
| def handle_logout(): | |
| """Logs out and switches back to the login tab.""" | |
| auth.logout() | |
| return [], "Logged out successfully", None, None, gr.update(selected="login_tab") | |
| def handle_new_session(user_id): | |
| """Clears the chat and generates a fresh session ID.""" | |
| return [], str(uuid.uuid4()) | |
| def handle_refresh_sessions(user_id): | |
| """Loads recent sessions from DB and populates the dropdown.""" | |
| if not user_id: | |
| return [["Login first", ""]], [] | |
| sessions = get_user_sessions(user_id, limit=20) | |
| if not sessions: | |
| return [["No sessions yet", ""]], [] | |
| df_data = [ | |
| [s["created_at"][:19], s["file_name"] or "No file"] | |
| for s in sessions | |
| ] | |
| # Using .format() instead of f-strings to avoid quote conflicts | |
| dropdown_choices = [ | |
| "{} - {}".format(s["created_at"][:19], (s["file_name"] or "No file")[:20]) | |
| for s in sessions | |
| ] | |
| return df_data, gr.update(choices=dropdown_choices, value=None) | |
| def handle_load_session(user_id, selected_session_dropdown): | |
| """Loads a previously selected session into the chat window.""" | |
| if not user_id or not selected_session_dropdown: | |
| return [], None, "Select a session first" | |
| sessions = get_user_sessions(user_id, limit=20) | |
| selected_date = selected_session_dropdown.split(" - ")[0] | |
| matching_session = next( | |
| (s["session_id"] for s in sessions if s["created_at"][:19] == selected_date), | |
| None | |
| ) | |
| if matching_session: | |
| return load_chat_history(user_id, matching_session), matching_session, "Session loaded!" | |
| return [], None, "Session not found" | |
| # ββ WIRE UP BUTTONS TO HANDLERS ββββββββββββββββββββββββββββββββββββ | |
| login_btn.click( | |
| fn=handle_login, | |
| inputs=[login_email, login_password], | |
| outputs=[login_msg, user_id_state, session_id_state, tabs] | |
| ) | |
| signup_btn.click( | |
| fn=handle_signup, | |
| inputs=[signup_email, signup_password, signup_username], | |
| outputs=[signup_msg] | |
| ) | |
| send_btn.click( | |
| fn=handle_send, | |
| inputs=[question_input, file_input, chatbot, user_id_state, session_id_state], | |
| outputs=[chatbot, question_input, file_input] | |
| ) | |
| question_input.submit( | |
| fn=handle_send, | |
| inputs=[question_input, file_input, chatbot, user_id_state, session_id_state], | |
| outputs=[chatbot, question_input, file_input] | |
| ) | |
| logout_btn.click( | |
| fn=handle_logout, | |
| outputs=[chatbot, login_msg, user_id_state, session_id_state, tabs] | |
| ) | |
| new_session_btn.click( | |
| fn=handle_new_session, | |
| inputs=[user_id_state], | |
| outputs=[chatbot, session_id_state] | |
| ) | |
| clear_btn.click(fn=lambda: [], outputs=[chatbot]) | |
| refresh_sessions_btn.click( | |
| fn=handle_refresh_sessions, | |
| inputs=[user_id_state], | |
| outputs=[sessions_display, session_dropdown] | |
| ) | |
| load_session_btn.click( | |
| fn=handle_load_session, | |
| inputs=[user_id_state, session_dropdown], | |
| outputs=[chatbot, session_id_state, login_msg] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch() | |