learn / app.py
Sazzz02's picture
Update app.py
69f23de verified
import streamlit as st
import os
import PyPDF2
import docx
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings # Use HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from groq import Groq
from langchain_core.prompts import PromptTemplate
import json
import random
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
from datetime import datetime
# Class Definitions (Combined)
class DocumentProcessor:
def __init__(self):
# Use a free Hugging Face model for embeddings
self.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
def extract_text_from_pdf(self, pdf_path):
"""Extract text from PDF file"""
text = ""
with open(pdf_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
text += page.extract_text()
return text
def extract_text_from_docx(self, docx_path):
"""Extract text from DOCX file"""
doc = docx.Document(docx_path)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text
def process_document(self, file_path, file_type):
"""Process document and create vector store"""
if file_type.lower() == 'pdf':
text = self.extract_text_from_pdf(file_path)
elif file_type.lower() in ['docx', 'doc']:
text = self.extract_text_from_docx(file_path)
else:
raise ValueError("Unsupported file type")
chunks = self.text_splitter.split_text(text)
vectorstore = Chroma.from_texts(
texts=chunks,
embedding=self.embeddings
)
return vectorstore, len(chunks)
class RAGLearningSystem:
def __init__(self, vectorstore):
# Initialize Groq client with API key from environment variable
if "GROQ_API_KEY" not in os.environ:
st.error("Groq API key is required for generating responses.")
st.stop()
self.llm = Groq(api_key=os.environ["GROQ_API_KEY"])
self.vectorstore = vectorstore
self.retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# Story explanation prompt
self.story_prompt = PromptTemplate(
input_variables=["context", "topic"],
template="""
Based on the following context from the book, explain {topic} as an engaging story.
Make it educational yet entertaining, using metaphors, analogies, and narrative elements.
Context: {context}
Create a story explanation for {topic}:
"""
)
# Question generation prompts
self.mcq_prompt = PromptTemplate(
input_variables=["context", "topic"],
template="""
Based on this context about {topic}, create 3 multiple choice questions.
Format as JSON with structure:
{{
"questions": [
{{
"question": "Question text",
"options": ["A. Option 1", "B. Option 2", "C. Option 3", "D. Option 4"],
"correct": "A",
"explanation": "Why this answer is correct"
}}
]
}}
Context: {context}
"""
)
self.fill_blank_prompt = PromptTemplate(
input_variables=["context", "topic"],
template="""
Based on this context about {topic}, create 3 fill-in-the-blank questions.
Format as JSON with structure:
{{
"questions": [
{{
"question": "Question with _____ blank",
"answer": "correct answer",
"hint": "helpful hint"
}}
]
}}
Context: {context}
"""
)
self.match_prompt = PromptTemplate(
input_variables=["context", "topic"],
template="""
Based on this context about {topic}, create a matching exercise with 4 pairs.
Format as JSON with structure:
{{
"left_items": ["Item 1", "Item 2", "Item 3", "Item 4"],
"right_items": ["Match A", "Match B", "Match C", "Match D"],
"correct_matches": {{"Item 1": "Match A", "Item 2": "Match B", "Item 3": "Match C", "Item 4": "Match D"}}
}}
Context: {context}
"""
)
def get_story_explanation(self, topic):
docs = self.retriever.get_relevant_documents(topic)
context = "\n".join([doc.page_content for doc in docs])
response = self.llm.chat.completions.create(
messages=[
{
"role": "user",
"content": self.story_prompt.format(context=context, topic=topic),
}
],
model="llama3-8b-8192",
)
return response.choices[0].message.content
def generate_mcq_questions(self, topic):
docs = self.retriever.get_relevant_documents(topic)
context = "\n".join([doc.page_content for doc in docs])
response = self.llm.chat.completions.create(
messages=[
{
"role": "user",
"content": self.mcq_prompt.format(context=context, topic=topic),
}
],
model="llama3-8b-8192",
response_format={"type": "json_object"},
)
try:
return json.loads(response.choices[0].message.content)
except json.JSONDecodeError:
return {"questions": []}
def generate_fill_blank_questions(self, topic):
docs = self.retriever.get_relevant_documents(topic)
context = "\n".join([doc.page_content for doc in docs])
response = self.llm.chat.completions.create(
messages=[
{
"role": "user",
"content": self.fill_blank_prompt.format(context=context, topic=topic),
}
],
model="llama3-8b-8192",
response_format={"type": "json_object"},
)
try:
return json.loads(response.choices[0].message.content)
except json.JSONDecodeError:
return {"questions": []}
def generate_matching_questions(self, topic):
docs = self.retriever.get_relevant_documents(topic)
context = "\n".join([doc.page_content for doc in docs])
response = self.llm.chat.completions.create(
messages=[
{
"role": "user",
"content": self.match_prompt.format(context=context, topic=topic),
}
],
model="llama3-8b-8192",
response_format={"type": "json_object"},
)
try:
return json.loads(response.choices[0].message.content)
except json.JSONDecodeError:
return {"left_items": [], "right_items": [], "correct_matches": {}}
class LearningGames:
def __init__(self):
self.init_session_state()
def init_session_state(self):
if 'game_scores' not in st.session_state:
st.session_state.game_scores = {
'mcq': [],
'fill_blank': [],
'matching': []
}
if 'current_topic' not in st.session_state:
st.session_state.current_topic = ""
def play_mcq_game(self, questions, topic):
st.subheader(f"๐ŸŽฏ Multiple Choice Quiz: {topic}")
if not questions.get('questions'):
st.error("No questions available for this topic.")
return
score = 0
total_questions = len(questions['questions'])
with st.form("mcq_form"):
answers = {}
for i, q in enumerate(questions['questions']):
st.write(f"**Question {i+1}:** {q['question']}")
answers[i] = st.radio(
f"Select answer for Q{i+1}:",
q['options'],
key=f"mcq_{i}"
)
st.write("---")
submitted = st.form_submit_button("Submit Quiz")
if submitted:
for i, q in enumerate(questions['questions']):
selected = answers[i]
correct = q['correct']
if selected.startswith(correct):
score += 1
st.success(f"Q{i+1}: Correct! โœ…")
else:
st.error(f"Q{i+1}: Wrong. Correct answer: {correct}")
st.info(f"Explanation: {q.get('explanation', 'No explanation provided')}")
percentage = (score / total_questions) * 100
st.write(f"**Final Score: {score}/{total_questions} ({percentage:.1f}%)**")
st.session_state.game_scores['mcq'].append({
'topic': topic,
'score': percentage,
'timestamp': datetime.now(),
'questions_attempted': total_questions
})
return percentage
def play_fill_blank_game(self, questions, topic):
st.subheader(f"๐Ÿ“ Fill in the Blanks: {topic}")
if not questions.get('questions'):
st.error("No questions available for this topic.")
return
score = 0
total_questions = len(questions['questions'])
with st.form("fill_blank_form"):
answers = {}
for i, q in enumerate(questions['questions']):
st.write(f"**Question {i+1}:** {q['question']}")
st.write(f"๐Ÿ’ก Hint: {q.get('hint', 'No hint available')}")
answers[i] = st.text_input(
f"Your answer for Q{i+1}:",
key=f"fill_{i}"
)
st.write("---")
submitted = st.form_submit_button("Submit Answers")
if submitted:
for i, q in enumerate(questions['questions']):
user_answer = answers[i].strip().lower()
correct_answer = q['answer'].strip().lower()
if user_answer == correct_answer:
score += 1
st.success(f"Q{i+1}: Correct! โœ…")
else:
st.error(f"Q{i+1}: Wrong. Correct answer: {q['answer']}")
percentage = (score / total_questions) * 100
st.write(f"**Final Score: {score}/{total_questions} ({percentage:.1f}%)**")
st.session_state.game_scores['fill_blank'].append({
'topic': topic,
'score': percentage,
'timestamp': datetime.now(),
'questions_attempted': total_questions
})
return percentage
def play_matching_game(self, questions, topic):
st.subheader(f"๐Ÿ”— Match the Following: {topic}")
if not questions.get('left_items') or not questions.get('right_items'):
st.error("No matching pairs available for this topic.")
return
left_items = questions['left_items']
right_items = questions['right_items'].copy()
correct_matches = questions['correct_matches']
random.shuffle(right_items)
score = 0
total_pairs = len(left_items)
with st.form("matching_form"):
matches = {}
st.write("Match each item on the left with the correct item on the right:")
for i, left_item in enumerate(left_items):
matches[left_item] = st.selectbox(
f"**{left_item}** matches with:",
["Select..."] + right_items,
key=f"match_{i}"
)
submitted = st.form_submit_button("Submit Matches")
if submitted:
for left_item, user_match in matches.items():
correct_match = correct_matches.get(left_item, "")
if user_match == correct_match:
score += 1
st.success(f"โœ… {left_item} โ†’ {user_match} (Correct!)")
else:
st.error(f"โŒ {left_item} โ†’ {user_match} (Wrong! Correct: {correct_match})")
percentage = (score / total_pairs) * 100
st.write(f"**Final Score: {score}/{total_pairs} ({percentage:.1f}%)**")
st.session_state.game_scores['matching'].append({
'topic': topic,
'score': percentage,
'timestamp': datetime.now(),
'questions_attempted': total_pairs
})
return percentage
class LearningDashboard:
def __init__(self):
pass
def show_dashboard(self):
st.title("๐Ÿ“Š Learning Analytics Dashboard")
if not any(st.session_state.game_scores.values()):
st.info("No learning data available yet. Complete some games to see your analytics!")
return
self.show_overall_stats()
col1, col2 = st.columns(2)
with col1:
self.show_game_type_performance()
with col2:
self.show_topic_performance()
self.show_progress_over_time()
self.show_strengths_weaknesses()
def show_overall_stats(self):
st.subheader("๐ŸŽฏ Overall Performance")
all_scores = []
for game_type, scores in st.session_state.game_scores.items():
for score_data in scores:
all_scores.append({
'game_type': game_type,
'score': score_data['score'],
'topic': score_data['topic'],
'timestamp': score_data['timestamp']
})
if not all_scores:
return
df = pd.DataFrame(all_scores)
col1, col2, col3, col4 = st.columns(4)
with col1:
avg_score = df['score'].mean()
st.metric("Average Score", f"{avg_score:.1f}%")
with col2:
total_games = len(df)
st.metric("Games Played", total_games)
with col3:
best_score = df['score'].max()
st.metric("Best Score", f"{best_score:.1f}%")
with col4:
unique_topics = df['topic'].nunique()
st.metric("Topics Studied", unique_topics)
def show_game_type_performance(self):
st.subheader("๐ŸŽฎ Performance by Game Type")
game_averages = {}
for game_type, scores in st.session_state.game_scores.items():
if scores:
avg_score = sum(score['score'] for score in scores) / len(scores)
game_averages[game_type] = avg_score
if game_averages:
fig = go.Figure(data=[
go.Bar(
x=list(game_averages.keys()),
y=list(game_averages.values()),
marker_color=['#FF6B6B', '#4ECDC4', '#45B7D1']
)
])
fig.update_layout(
title="Average Score by Game Type",
xaxis_title="Game Type",
yaxis_title="Average Score (%)",
showlegend=False
)
st.plotly_chart(fig, use_container_width=True)
def show_topic_performance(self):
st.subheader("๐Ÿ“š Performance by Topic")
topic_scores = {}
for game_type, scores in st.session_state.game_scores.items():
for score_data in scores:
topic = score_data['topic']
if topic not in topic_scores:
topic_scores[topic] = []
topic_scores[topic].append(score_data['score'])
topic_averages = {topic: sum(scores)/len(scores) for topic, scores in topic_scores.items()}
if topic_averages:
fig = go.Figure(data=[
go.Bar(
x=list(topic_averages.keys()),
y=list(topic_averages.values()),
marker_color='#96CEB4'
)
])
fig.update_layout(
title="Average Score by Topic",
xaxis_title="Topic",
yaxis_title="Average Score (%)",
showlegend=False
)
st.plotly_chart(fig, use_container_width=True)
def show_progress_over_time(self):
st.subheader("๐Ÿ“ˆ Progress Over Time")
all_data = []
for game_type, scores in st.session_state.game_scores.items():
for score_data in scores:
all_data.append({
'timestamp': score_data['timestamp'],
'score': score_data['score'],
'game_type': game_type,
'topic': score_data['topic']
})
if all_data:
df = pd.DataFrame(all_data)
df = df.sort_values('timestamp')
fig = px.line(df, x='timestamp', y='score',
color='game_type',
title="Score Progress Over Time",
labels={'timestamp': 'Time', 'score': 'Score (%)'})
st.plotly_chart(fig, use_container_width=True)
def show_strengths_weaknesses(self):
st.subheader("๐Ÿ’ช Strengths & Areas for Improvement")
game_averages = {}
topic_averages = {}
for game_type, scores in st.session_state.game_scores.items():
if scores:
game_averages[game_type] = sum(score['score'] for score in scores) / len(scores)
topic_scores = {}
for game_type, scores in st.session_state.game_scores.items():
for score_data in scores:
topic = score_data['topic']
if topic not in topic_scores:
topic_scores[topic] = []
topic_scores[topic].append(score_data['score'])
topic_averages = {topic: sum(scores)/len(scores) for topic, scores in topic_scores.items()}
col1, col2 = st.columns(2)
with col1:
st.write("**๐ŸŽฏ Strengths:**")
if game_averages:
best_game = max(game_averages, key=game_averages.get)
st.success(f"โ€ข Excellent at {best_game} games ({game_averages[best_game]:.1f}% avg)")
if topic_averages:
best_topic = max(topic_averages, key=topic_averages.get)
st.success(f"โ€ข Strong understanding of {best_topic} ({topic_averages[best_topic]:.1f}% avg)")
with col2:
st.write("**๐Ÿ“ˆ Areas for Improvement:**")
if game_averages:
weak_game = min(game_averages, key=game_averages.get)
if game_averages[weak_game] < 80:
st.warning(f"โ€ข Practice {weak_game} games more ({game_averages[weak_game]:.1f}% avg)")
if topic_averages:
weak_topic = min(topic_averages, key=topic_averages.get)
if topic_averages[weak_topic] < 80:
st.warning(f"โ€ข Review {weak_topic} concepts ({topic_averages[weak_topic]:.1f}% avg)")
st.subheader("๐ŸŽ“ Personalized Recommendations")
if game_averages:
overall_avg = sum(game_averages.values()) / len(game_averages)
if overall_avg >= 90:
st.success("๐ŸŒŸ Excellent performance! You're mastering the material well.")
elif overall_avg >= 75:
st.info("๐Ÿ‘ Good progress! Focus on your weaker areas to improve further.")
else:
st.warning("๐Ÿ“š Keep practicing! Consider reviewing the story explanations before attempting games.")
# Streamlit App Pages (Combined)
def upload_and_process_page(doc_processor):
st.header("๐Ÿ“‚ Process Your Learning Material")
# Hardcoded file name and path
file_path = "ragdatascience.pdf"
file_extension = "pdf"
st.info(f"Processing the pre-uploaded file: `{file_path}`")
if st.button("Process Document"):
with st.spinner("Processing document..."):
try:
vectorstore, chunk_count = doc_processor.process_document(
file_path, file_extension
)
st.session_state.vectorstore = vectorstore
st.session_state.document_name = file_path
st.success(f"Document processed successfully! Created {chunk_count} text chunks.")
st.info("You can now go to 'Learn Topic' to start learning!")
except Exception as e:
st.error(f"Error processing document: {str(e)}")
def learn_topic_page(rag_system):
st.header("๐Ÿ“– Learn About Any Topic")
topic = st.text_input("What would you like to learn about?",
placeholder="e.g., machine learning algorithms, statistics, data visualization")
if st.button("Get Story Explanation") and topic:
with st.spinner("Generating story explanation..."):
try:
story = rag_system.get_story_explanation(topic)
st.session_state.current_topic = topic
st.subheader(f"๐Ÿ“ Story: {topic}")
st.write(story)
st.success("Story generated! Now you can test your understanding with games.")
except Exception as e:
st.error(f"Error generating explanation: {str(e)}")
def play_games_page(rag_system, games):
st.header("๐ŸŽฎ Test Your Knowledge")
topic = st.text_input("Enter topic to test:",
value=st.session_state.get('current_topic', ''))
if topic:
game_type = st.selectbox("Choose game type:",
["Multiple Choice", "Fill in the Blanks", "Matching"])
if st.button("Generate Questions"):
with st.spinner("Generating questions..."):
try:
if game_type == "Multiple Choice":
questions = rag_system.generate_mcq_questions(topic)
games.play_mcq_game(questions, topic)
elif game_type == "Fill in the Blanks":
questions = rag_system.generate_fill_blank_questions(topic)
games.play_fill_blank_game(questions, topic)
elif game_type == "Matching":
questions = rag_system.generate_matching_questions(topic)
games.play_matching_game(questions, topic)
except Exception as e:
st.error(f"Error generating questions: {str(e)}")
# Main function to run the app
def main():
st.set_page_config(
page_title="RAG Learning System",
page_icon="๐Ÿค–",
layout="wide"
)
st.title("๐Ÿค– RAG Learning System")
st.write("Upload your learning materials and start your interactive learning journey!")
# Check for API keys from Hugging Face secrets before proceeding
if "COHERE_API_KEY" not in os.environ or "GROQ_API_KEY" not in os.environ:
st.error("API keys not found. Please add `COHERE_API_KEY` and `GROQ_API_KEY` as secrets in the Hugging Face Space settings.")
st.stop()
doc_processor = DocumentProcessor()
games = LearningGames()
dashboard = LearningDashboard()
st.sidebar.title("Navigation")
page = st.sidebar.selectbox("Choose a page:",
["Process Document", "Learn Topic", "Play Games", "Dashboard"])
if page == "Process Document":
upload_and_process_page(doc_processor)
elif page == "Learn Topic":
if 'vectorstore' in st.session_state:
learn_topic_page(RAGLearningSystem(st.session_state.vectorstore))
else:
st.warning("Please process a document first!")
elif page == "Play Games":
if 'vectorstore' in st.session_state:
play_games_page(RAGLearningSystem(st.session_state.vectorstore), games)
else:
st.warning("Please process a document first!")
elif page == "Dashboard":
dashboard.show_dashboard()
if __name__ == "__main__":
main()