vivicake666's picture
Upload app.py with huggingface_hub
4c0ad61 verified
import pymupdf
import pytesseract
from PIL import Image
import os
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
import gradio as gr
from supabase import create_client, Client
import uuid
import hashlib
from openai import OpenAI
# =============================================================================
# CONNECTIONS: Read API keys from HF Secrets (environment variables)
# Set these in your Space: Settings > Variables and secrets
# =============================================================================
supabase: Client = create_client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_ANON_KEY")
)
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# =============================================================================
# MODEL: Load the sentence transformer for semantic search
# This runs once on startup. It finds which text chunks are most relevant
# to the user's question before sending them to GPT.
# =============================================================================
model = SentenceTransformer("all-MiniLM-L6-v2")
print("Model loaded!")
# =============================================================================
# FILE PROCESSING: Extract raw text from uploaded PDFs and images
# =============================================================================
def extract_text_from_pdf(file_path):
"""Opens a PDF and concatenates all page text into one string."""
doc = pymupdf.open(file_path)
text = ""
for page in doc:
text += page.get_text()
return text
def extract_text_from_image(image_path):
"""Uses Tesseract OCR to extract text from an image file."""
try:
img = Image.open(image_path)
extracted_text = pytesseract.image_to_string(img)
return extracted_text.strip()
except Exception as e:
return f"Error extracting text from image: {e}"
# =============================================================================
# TEXT CHUNKING: Break long documents into overlapping pieces
# Overlap ensures we don't cut off a sentence right at a chunk boundary
# =============================================================================
def chunk_text(text, chunk_size=1000, overlap=200):
"""Splits text into overlapping chunks for semantic search."""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start += chunk_size - overlap
return chunks
# =============================================================================
# SEMANTIC SEARCH: Find the 3 most relevant chunks for the question
# Uses cosine similarity between the question embedding and chunk embeddings
# =============================================================================
def search_relevant_chunks(query, chunks, embeddings):
"""Returns the top 3 chunks most semantically similar to the query."""
query_vec = model.encode([query])
similarities = cosine_similarity(query_vec, embeddings)[0]
top_indices = np.argsort(similarities)[-3:][::-1]
return [chunks[i] for i in top_indices]
# =============================================================================
# FILE HASHING: Create a unique fingerprint for each uploaded file
# Used to track which file was used in a chat session
# =============================================================================
def get_file_hash(file_path):
"""Returns an MD5 hash of the file contents."""
try:
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
except:
return None
# =============================================================================
# AI ANSWER: Send question + context to GPT-4o-mini
# Uses Socratic method: guides the student rather than just giving answers
# If no file is uploaded, answers from general knowledge
# =============================================================================
def generate_answer(question, context):
"""Generates a Socratic/Feynman-style answer using GPT-4o-mini."""
if "No document provided" in context:
system_prompt = "You are a helpful academic math tutor. Use the Socratic method to guide the student."
else:
system_prompt = f"You are an academic assistant. Based only on the following context, answer the question:\n{context}"
prompt = f"""
{system_prompt}
Give me the output without latex format.
Use the socratic/feynman method for learning.
Question:
{question}
Answer:
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.2
)
return response.choices[0].message.content.strip()
# =============================================================================
# CHAT WITH FILE: Main RAG pipeline
# Combines file reading, chunking, search, and answer generation
# Falls back to general knowledge if no file is uploaded
# =============================================================================
def chat_with_file(question, file):
"""Runs the full RAG pipeline: extract, chunk, search, answer."""
if file is None:
return generate_answer(question, context="No document provided. Answer from general knowledge.")
file_path = file.name
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension == ".pdf":
text = extract_text_from_pdf(file_path)
elif file_extension in [".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff"]:
text = extract_text_from_image(file_path)
else:
return "Unsupported file type. Please upload a PDF or image file."
if not text.strip():
return "No text could be extracted from the file."
chunks = chunk_text(text)
embeddings = model.encode(chunks)
top_chunks = search_relevant_chunks(question, chunks, embeddings)
combined_context = "\n\n".join(top_chunks)
return generate_answer(question, combined_context)
# =============================================================================
# DATABASE: Save and load chat history from Supabase
# Each message is stored with user_id, session_id, question, and answer
# Sessions allow users to revisit past conversations
# =============================================================================
def save_chat_to_db(user_id, session_id, question, answer, file_name=None, file_hash=None):
"""Saves a single Q&A exchange to the chat_history table."""
try:
supabase.table("chat_history").insert({
"user_id": user_id,
"session_id": session_id,
"question": question,
"answer": answer,
"file_name": file_name,
"file_hash": file_hash
}).execute()
return True
except Exception as e:
print(f"Error saving chat: {e}")
return False
def load_chat_history(user_id, session_id=None, limit=50):
"""Loads chat history for a user, optionally filtered by session."""
try:
query = supabase.table("chat_history") .select("*") .eq("user_id", user_id) .order("created_at", desc=False) .limit(limit)
if session_id:
query = query.eq("session_id", session_id)
response = query.execute()
history = []
for msg in response.data:
history.append([msg["question"], msg["answer"]])
return history
except Exception as e:
print(f"Error loading history: {e}")
return []
def get_user_sessions(user_id, limit=10):
"""Returns a deduplicated list of recent sessions for a user."""
try:
response = supabase.table("chat_history") .select("session_id, created_at, file_name") .eq("user_id", user_id) .order("created_at", desc=True) .limit(limit * 5) .execute()
sessions = {}
for msg in response.data:
sid = msg["session_id"]
if sid not in sessions:
sessions[sid] = {
"session_id": sid,
"created_at": msg["created_at"],
"file_name": msg.get("file_name", "No file")
}
return list(sessions.values())[:limit]
except Exception as e:
print(f"Error loading sessions: {e}")
return []
# =============================================================================
# AUTH MANAGER: Handles signup, login, and logout via Supabase Auth
# Stores the current user and session ID in memory while the app is running
# =============================================================================
class AuthManager:
def __init__(self):
self.current_user = None
self.session_id = None
def signup(self, email, password, username):
"""Creates a new Supabase Auth user with username in metadata."""
try:
response = supabase.auth.sign_up({
"email": email,
"password": password,
"options": {"data": {"username": username}}
})
if response.user:
return True, "Account created! Please check your email to verify."
else:
return False, "Signup failed"
except Exception as e:
error_msg = str(e)
if "duplicate" in error_msg.lower() or "unique" in error_msg.lower():
return False, "Username or email already exists"
return False, f"Error: {error_msg}"
def login(self, email, password):
"""Signs in with email and password, returns user ID on success."""
try:
response = supabase.auth.sign_in_with_password({
"email": email,
"password": password
})
if response.user:
self.current_user = response.user
self.session_id = str(uuid.uuid4())
profile = supabase.table("user_profiles") .select("username") .eq("id", response.user.id) .execute()
username = profile.data[0]["username"] if profile.data else "User"
return True, f"Welcome back, {username}!", response.user.id
else:
return False, "Invalid credentials", None
except Exception as e:
return False, f"Login error: {str(e)}", None
def logout(self):
"""Signs out and clears local user state."""
try:
supabase.auth.sign_out()
self.current_user = None
self.session_id = None
return True, "Logged out successfully"
except Exception as e:
return False, f"Logout error: {str(e)}"
def is_authenticated(self):
"""Returns True if a user is currently logged in."""
return self.current_user is not None
# Create a single global auth manager instance
auth = AuthManager()
# =============================================================================
# CHAT HANDLER: Combines chat_with_file with database saving
# Requires the user to be logged in before processing
# =============================================================================
def chat_with_file_and_save(question, file, history, user_id, session_id):
"""Processes a question, saves the result to DB, updates chat display."""
if not auth.is_authenticated():
return history + [["", "Please login to use the chatbot."]], "", None
answer = chat_with_file(question, file)
file_name = os.path.basename(file.name) if file else None
file_hash = get_file_hash(file.name) if file else None
save_chat_to_db(
user_id=user_id,
session_id=session_id,
question=question,
answer=answer,
file_name=file_name,
file_hash=file_hash
)
history = history + [[question, answer]]
return history, "", None
# =============================================================================
# GRADIO INTERFACE: Full UI with two tabs
# Tab 1: Login / Signup
# Tab 2: Chat with file upload, session history, and session loader
# =============================================================================
def create_interface():
with gr.Blocks(title="Math Tutor Chatbot", theme=gr.themes.Soft()) as demo:
# Hidden state: stores user ID and session ID across interactions
user_id_state = gr.State(None)
session_id_state = gr.State(None)
gr.Markdown("# Math Tutor Chatbot")
gr.Markdown("Create an account to save your chat history and get Socratic math tutoring!")
with gr.Tabs() as tabs:
# ── TAB 1: Login and Signup ────────────────────────────────────
with gr.Tab("Login / Sign Up", id="login_tab"):
with gr.Row():
# Left side: Login
with gr.Column():
gr.Markdown("### Login to Existing Account")
login_email = gr.Textbox(label="Email", placeholder="you@example.com")
login_password = gr.Textbox(label="Password", type="password")
login_btn = gr.Button("Login", variant="primary", size="lg")
login_msg = gr.Markdown("")
# Right side: Signup
with gr.Column():
gr.Markdown("### Create New Account")
signup_email = gr.Textbox(label="Email", placeholder="you@example.com")
signup_username = gr.Textbox(label="Username", placeholder="cool_username")
signup_password = gr.Textbox(label="Password", type="password")
signup_btn = gr.Button("Sign Up", variant="primary", size="lg")
signup_msg = gr.Markdown("")
# ── TAB 2: Chat ────────────────────────────────────────────────
with gr.Tab("Chat", id="chat_tab"):
gr.Markdown("### Upload a PDF or image and ask questions!")
with gr.Row():
# Left: Chat area
with gr.Column(scale=3):
chatbot = gr.Chatbot(label="Conversation", height=500, type="tuples")
with gr.Row():
question_input = gr.Textbox(
show_label=False,
placeholder="Ask a math question or about your uploaded file...",
scale=6
)
file_input = gr.File(
label="Attach",
file_types=[".pdf", ".png", ".jpg", ".jpeg"],
scale=1
)
send_btn = gr.Button("Send", scale=1, variant="primary")
with gr.Row():
new_session_btn = gr.Button("New Session", size="sm")
clear_btn = gr.Button("Clear Chat", size="sm")
logout_btn = gr.Button("Logout", size="sm")
# Right: Session history panel
with gr.Column(scale=1):
gr.Markdown("### Your Past Sessions")
sessions_display = gr.Dataframe(
headers=["Date", "File"],
datatype=["str", "str"],
interactive=False,
row_count=5
)
refresh_sessions_btn = gr.Button("Refresh Sessions", size="sm")
gr.Markdown("**Load a Previous Session:**")
session_dropdown = gr.Dropdown(
label="Select Session",
choices=[],
interactive=True,
value=None
)
load_session_btn = gr.Button("Load Selected Session", size="sm", variant="primary")
# ── EVENT HANDLERS ─────────────────────────────────────────────────
def handle_login(email, password):
"""Logs in and switches to the chat tab on success."""
success, message, uid = auth.login(email, password)
if success:
return message, uid, str(uuid.uuid4()), gr.update(selected="chat_tab")
else:
return message, None, None, gr.update()
def handle_signup(email, password, username):
"""Creates a new account and returns a status message."""
success, message = auth.signup(email, password, username)
return message
def handle_send(question, file, history, user_id, session_id):
"""Sends the question through the RAG pipeline and saves result."""
if not user_id:
return history + [["", "Please login first!"]], "", None
return chat_with_file_and_save(question, file, history, user_id, session_id)
def handle_logout():
"""Logs out and switches back to the login tab."""
auth.logout()
return [], "Logged out successfully", None, None, gr.update(selected="login_tab")
def handle_new_session(user_id):
"""Clears the chat and generates a fresh session ID."""
return [], str(uuid.uuid4())
def handle_refresh_sessions(user_id):
"""Loads recent sessions from DB and populates the dropdown."""
if not user_id:
return [["Login first", ""]], []
sessions = get_user_sessions(user_id, limit=20)
if not sessions:
return [["No sessions yet", ""]], []
df_data = [
[s["created_at"][:19], s["file_name"] or "No file"]
for s in sessions
]
# Using .format() instead of f-strings to avoid quote conflicts
dropdown_choices = [
"{} - {}".format(s["created_at"][:19], (s["file_name"] or "No file")[:20])
for s in sessions
]
return df_data, gr.update(choices=dropdown_choices, value=None)
def handle_load_session(user_id, selected_session_dropdown):
"""Loads a previously selected session into the chat window."""
if not user_id or not selected_session_dropdown:
return [], None, "Select a session first"
sessions = get_user_sessions(user_id, limit=20)
selected_date = selected_session_dropdown.split(" - ")[0]
matching_session = next(
(s["session_id"] for s in sessions if s["created_at"][:19] == selected_date),
None
)
if matching_session:
return load_chat_history(user_id, matching_session), matching_session, "Session loaded!"
return [], None, "Session not found"
# ── WIRE UP BUTTONS TO HANDLERS ────────────────────────────────────
login_btn.click(
fn=handle_login,
inputs=[login_email, login_password],
outputs=[login_msg, user_id_state, session_id_state, tabs]
)
signup_btn.click(
fn=handle_signup,
inputs=[signup_email, signup_password, signup_username],
outputs=[signup_msg]
)
send_btn.click(
fn=handle_send,
inputs=[question_input, file_input, chatbot, user_id_state, session_id_state],
outputs=[chatbot, question_input, file_input]
)
question_input.submit(
fn=handle_send,
inputs=[question_input, file_input, chatbot, user_id_state, session_id_state],
outputs=[chatbot, question_input, file_input]
)
logout_btn.click(
fn=handle_logout,
outputs=[chatbot, login_msg, user_id_state, session_id_state, tabs]
)
new_session_btn.click(
fn=handle_new_session,
inputs=[user_id_state],
outputs=[chatbot, session_id_state]
)
clear_btn.click(fn=lambda: [], outputs=[chatbot])
refresh_sessions_btn.click(
fn=handle_refresh_sessions,
inputs=[user_id_state],
outputs=[sessions_display, session_dropdown]
)
load_session_btn.click(
fn=handle_load_session,
inputs=[user_id_state, session_dropdown],
outputs=[chatbot, session_id_state, login_msg]
)
return demo
if __name__ == "__main__":
demo = create_interface()
demo.launch()