import streamlit as st import os import PyPDF2 import docx from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.embeddings import HuggingFaceEmbeddings # Use HuggingFaceEmbeddings from langchain_community.vectorstores import Chroma from groq import Groq from langchain_core.prompts import PromptTemplate import json import random import plotly.graph_objects as go import plotly.express as px import pandas as pd from datetime import datetime # Class Definitions (Combined) class DocumentProcessor: def __init__(self): # Use a free Hugging Face model for embeddings self.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) def extract_text_from_pdf(self, pdf_path): """Extract text from PDF file""" text = "" with open(pdf_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page in pdf_reader.pages: text += page.extract_text() return text def extract_text_from_docx(self, docx_path): """Extract text from DOCX file""" doc = docx.Document(docx_path) text = "" for paragraph in doc.paragraphs: text += paragraph.text + "\n" return text def process_document(self, file_path, file_type): """Process document and create vector store""" if file_type.lower() == 'pdf': text = self.extract_text_from_pdf(file_path) elif file_type.lower() in ['docx', 'doc']: text = self.extract_text_from_docx(file_path) else: raise ValueError("Unsupported file type") chunks = self.text_splitter.split_text(text) vectorstore = Chroma.from_texts( texts=chunks, embedding=self.embeddings ) return vectorstore, len(chunks) class RAGLearningSystem: def __init__(self, vectorstore): # Initialize Groq client with API key from environment variable if "GROQ_API_KEY" not in os.environ: st.error("Groq API key is required for generating responses.") st.stop() self.llm = Groq(api_key=os.environ["GROQ_API_KEY"]) self.vectorstore = vectorstore self.retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) # Story explanation prompt self.story_prompt = PromptTemplate( input_variables=["context", "topic"], template=""" Based on the following context from the book, explain {topic} as an engaging story. Make it educational yet entertaining, using metaphors, analogies, and narrative elements. Context: {context} Create a story explanation for {topic}: """ ) # Question generation prompts self.mcq_prompt = PromptTemplate( input_variables=["context", "topic"], template=""" Based on this context about {topic}, create 3 multiple choice questions. Format as JSON with structure: {{ "questions": [ {{ "question": "Question text", "options": ["A. Option 1", "B. Option 2", "C. Option 3", "D. Option 4"], "correct": "A", "explanation": "Why this answer is correct" }} ] }} Context: {context} """ ) self.fill_blank_prompt = PromptTemplate( input_variables=["context", "topic"], template=""" Based on this context about {topic}, create 3 fill-in-the-blank questions. Format as JSON with structure: {{ "questions": [ {{ "question": "Question with _____ blank", "answer": "correct answer", "hint": "helpful hint" }} ] }} Context: {context} """ ) self.match_prompt = PromptTemplate( input_variables=["context", "topic"], template=""" Based on this context about {topic}, create a matching exercise with 4 pairs. Format as JSON with structure: {{ "left_items": ["Item 1", "Item 2", "Item 3", "Item 4"], "right_items": ["Match A", "Match B", "Match C", "Match D"], "correct_matches": {{"Item 1": "Match A", "Item 2": "Match B", "Item 3": "Match C", "Item 4": "Match D"}} }} Context: {context} """ ) def get_story_explanation(self, topic): docs = self.retriever.get_relevant_documents(topic) context = "\n".join([doc.page_content for doc in docs]) response = self.llm.chat.completions.create( messages=[ { "role": "user", "content": self.story_prompt.format(context=context, topic=topic), } ], model="llama3-8b-8192", ) return response.choices[0].message.content def generate_mcq_questions(self, topic): docs = self.retriever.get_relevant_documents(topic) context = "\n".join([doc.page_content for doc in docs]) response = self.llm.chat.completions.create( messages=[ { "role": "user", "content": self.mcq_prompt.format(context=context, topic=topic), } ], model="llama3-8b-8192", response_format={"type": "json_object"}, ) try: return json.loads(response.choices[0].message.content) except json.JSONDecodeError: return {"questions": []} def generate_fill_blank_questions(self, topic): docs = self.retriever.get_relevant_documents(topic) context = "\n".join([doc.page_content for doc in docs]) response = self.llm.chat.completions.create( messages=[ { "role": "user", "content": self.fill_blank_prompt.format(context=context, topic=topic), } ], model="llama3-8b-8192", response_format={"type": "json_object"}, ) try: return json.loads(response.choices[0].message.content) except json.JSONDecodeError: return {"questions": []} def generate_matching_questions(self, topic): docs = self.retriever.get_relevant_documents(topic) context = "\n".join([doc.page_content for doc in docs]) response = self.llm.chat.completions.create( messages=[ { "role": "user", "content": self.match_prompt.format(context=context, topic=topic), } ], model="llama3-8b-8192", response_format={"type": "json_object"}, ) try: return json.loads(response.choices[0].message.content) except json.JSONDecodeError: return {"left_items": [], "right_items": [], "correct_matches": {}} class LearningGames: def __init__(self): self.init_session_state() def init_session_state(self): if 'game_scores' not in st.session_state: st.session_state.game_scores = { 'mcq': [], 'fill_blank': [], 'matching': [] } if 'current_topic' not in st.session_state: st.session_state.current_topic = "" def play_mcq_game(self, questions, topic): st.subheader(f"🎯 Multiple Choice Quiz: {topic}") if not questions.get('questions'): st.error("No questions available for this topic.") return score = 0 total_questions = len(questions['questions']) with st.form("mcq_form"): answers = {} for i, q in enumerate(questions['questions']): st.write(f"**Question {i+1}:** {q['question']}") answers[i] = st.radio( f"Select answer for Q{i+1}:", q['options'], key=f"mcq_{i}" ) st.write("---") submitted = st.form_submit_button("Submit Quiz") if submitted: for i, q in enumerate(questions['questions']): selected = answers[i] correct = q['correct'] if selected.startswith(correct): score += 1 st.success(f"Q{i+1}: Correct! ✅") else: st.error(f"Q{i+1}: Wrong. Correct answer: {correct}") st.info(f"Explanation: {q.get('explanation', 'No explanation provided')}") percentage = (score / total_questions) * 100 st.write(f"**Final Score: {score}/{total_questions} ({percentage:.1f}%)**") st.session_state.game_scores['mcq'].append({ 'topic': topic, 'score': percentage, 'timestamp': datetime.now(), 'questions_attempted': total_questions }) return percentage def play_fill_blank_game(self, questions, topic): st.subheader(f"📝 Fill in the Blanks: {topic}") if not questions.get('questions'): st.error("No questions available for this topic.") return score = 0 total_questions = len(questions['questions']) with st.form("fill_blank_form"): answers = {} for i, q in enumerate(questions['questions']): st.write(f"**Question {i+1}:** {q['question']}") st.write(f"💡 Hint: {q.get('hint', 'No hint available')}") answers[i] = st.text_input( f"Your answer for Q{i+1}:", key=f"fill_{i}" ) st.write("---") submitted = st.form_submit_button("Submit Answers") if submitted: for i, q in enumerate(questions['questions']): user_answer = answers[i].strip().lower() correct_answer = q['answer'].strip().lower() if user_answer == correct_answer: score += 1 st.success(f"Q{i+1}: Correct! ✅") else: st.error(f"Q{i+1}: Wrong. Correct answer: {q['answer']}") percentage = (score / total_questions) * 100 st.write(f"**Final Score: {score}/{total_questions} ({percentage:.1f}%)**") st.session_state.game_scores['fill_blank'].append({ 'topic': topic, 'score': percentage, 'timestamp': datetime.now(), 'questions_attempted': total_questions }) return percentage def play_matching_game(self, questions, topic): st.subheader(f"🔗 Match the Following: {topic}") if not questions.get('left_items') or not questions.get('right_items'): st.error("No matching pairs available for this topic.") return left_items = questions['left_items'] right_items = questions['right_items'].copy() correct_matches = questions['correct_matches'] random.shuffle(right_items) score = 0 total_pairs = len(left_items) with st.form("matching_form"): matches = {} st.write("Match each item on the left with the correct item on the right:") for i, left_item in enumerate(left_items): matches[left_item] = st.selectbox( f"**{left_item}** matches with:", ["Select..."] + right_items, key=f"match_{i}" ) submitted = st.form_submit_button("Submit Matches") if submitted: for left_item, user_match in matches.items(): correct_match = correct_matches.get(left_item, "") if user_match == correct_match: score += 1 st.success(f"✅ {left_item} → {user_match} (Correct!)") else: st.error(f"❌ {left_item} → {user_match} (Wrong! Correct: {correct_match})") percentage = (score / total_pairs) * 100 st.write(f"**Final Score: {score}/{total_pairs} ({percentage:.1f}%)**") st.session_state.game_scores['matching'].append({ 'topic': topic, 'score': percentage, 'timestamp': datetime.now(), 'questions_attempted': total_pairs }) return percentage class LearningDashboard: def __init__(self): pass def show_dashboard(self): st.title("📊 Learning Analytics Dashboard") if not any(st.session_state.game_scores.values()): st.info("No learning data available yet. Complete some games to see your analytics!") return self.show_overall_stats() col1, col2 = st.columns(2) with col1: self.show_game_type_performance() with col2: self.show_topic_performance() self.show_progress_over_time() self.show_strengths_weaknesses() def show_overall_stats(self): st.subheader("🎯 Overall Performance") all_scores = [] for game_type, scores in st.session_state.game_scores.items(): for score_data in scores: all_scores.append({ 'game_type': game_type, 'score': score_data['score'], 'topic': score_data['topic'], 'timestamp': score_data['timestamp'] }) if not all_scores: return df = pd.DataFrame(all_scores) col1, col2, col3, col4 = st.columns(4) with col1: avg_score = df['score'].mean() st.metric("Average Score", f"{avg_score:.1f}%") with col2: total_games = len(df) st.metric("Games Played", total_games) with col3: best_score = df['score'].max() st.metric("Best Score", f"{best_score:.1f}%") with col4: unique_topics = df['topic'].nunique() st.metric("Topics Studied", unique_topics) def show_game_type_performance(self): st.subheader("🎮 Performance by Game Type") game_averages = {} for game_type, scores in st.session_state.game_scores.items(): if scores: avg_score = sum(score['score'] for score in scores) / len(scores) game_averages[game_type] = avg_score if game_averages: fig = go.Figure(data=[ go.Bar( x=list(game_averages.keys()), y=list(game_averages.values()), marker_color=['#FF6B6B', '#4ECDC4', '#45B7D1'] ) ]) fig.update_layout( title="Average Score by Game Type", xaxis_title="Game Type", yaxis_title="Average Score (%)", showlegend=False ) st.plotly_chart(fig, use_container_width=True) def show_topic_performance(self): st.subheader("📚 Performance by Topic") topic_scores = {} for game_type, scores in st.session_state.game_scores.items(): for score_data in scores: topic = score_data['topic'] if topic not in topic_scores: topic_scores[topic] = [] topic_scores[topic].append(score_data['score']) topic_averages = {topic: sum(scores)/len(scores) for topic, scores in topic_scores.items()} if topic_averages: fig = go.Figure(data=[ go.Bar( x=list(topic_averages.keys()), y=list(topic_averages.values()), marker_color='#96CEB4' ) ]) fig.update_layout( title="Average Score by Topic", xaxis_title="Topic", yaxis_title="Average Score (%)", showlegend=False ) st.plotly_chart(fig, use_container_width=True) def show_progress_over_time(self): st.subheader("📈 Progress Over Time") all_data = [] for game_type, scores in st.session_state.game_scores.items(): for score_data in scores: all_data.append({ 'timestamp': score_data['timestamp'], 'score': score_data['score'], 'game_type': game_type, 'topic': score_data['topic'] }) if all_data: df = pd.DataFrame(all_data) df = df.sort_values('timestamp') fig = px.line(df, x='timestamp', y='score', color='game_type', title="Score Progress Over Time", labels={'timestamp': 'Time', 'score': 'Score (%)'}) st.plotly_chart(fig, use_container_width=True) def show_strengths_weaknesses(self): st.subheader("💪 Strengths & Areas for Improvement") game_averages = {} topic_averages = {} for game_type, scores in st.session_state.game_scores.items(): if scores: game_averages[game_type] = sum(score['score'] for score in scores) / len(scores) topic_scores = {} for game_type, scores in st.session_state.game_scores.items(): for score_data in scores: topic = score_data['topic'] if topic not in topic_scores: topic_scores[topic] = [] topic_scores[topic].append(score_data['score']) topic_averages = {topic: sum(scores)/len(scores) for topic, scores in topic_scores.items()} col1, col2 = st.columns(2) with col1: st.write("**🎯 Strengths:**") if game_averages: best_game = max(game_averages, key=game_averages.get) st.success(f"• Excellent at {best_game} games ({game_averages[best_game]:.1f}% avg)") if topic_averages: best_topic = max(topic_averages, key=topic_averages.get) st.success(f"• Strong understanding of {best_topic} ({topic_averages[best_topic]:.1f}% avg)") with col2: st.write("**📈 Areas for Improvement:**") if game_averages: weak_game = min(game_averages, key=game_averages.get) if game_averages[weak_game] < 80: st.warning(f"• Practice {weak_game} games more ({game_averages[weak_game]:.1f}% avg)") if topic_averages: weak_topic = min(topic_averages, key=topic_averages.get) if topic_averages[weak_topic] < 80: st.warning(f"• Review {weak_topic} concepts ({topic_averages[weak_topic]:.1f}% avg)") st.subheader("🎓 Personalized Recommendations") if game_averages: overall_avg = sum(game_averages.values()) / len(game_averages) if overall_avg >= 90: st.success("🌟 Excellent performance! You're mastering the material well.") elif overall_avg >= 75: st.info("👍 Good progress! Focus on your weaker areas to improve further.") else: st.warning("📚 Keep practicing! Consider reviewing the story explanations before attempting games.") # Streamlit App Pages (Combined) def upload_and_process_page(doc_processor): st.header("📂 Process Your Learning Material") # Hardcoded file name and path file_path = "ragdatascience.pdf" file_extension = "pdf" st.info(f"Processing the pre-uploaded file: `{file_path}`") if st.button("Process Document"): with st.spinner("Processing document..."): try: vectorstore, chunk_count = doc_processor.process_document( file_path, file_extension ) st.session_state.vectorstore = vectorstore st.session_state.document_name = file_path st.success(f"Document processed successfully! Created {chunk_count} text chunks.") st.info("You can now go to 'Learn Topic' to start learning!") except Exception as e: st.error(f"Error processing document: {str(e)}") def learn_topic_page(rag_system): st.header("📖 Learn About Any Topic") topic = st.text_input("What would you like to learn about?", placeholder="e.g., machine learning algorithms, statistics, data visualization") if st.button("Get Story Explanation") and topic: with st.spinner("Generating story explanation..."): try: story = rag_system.get_story_explanation(topic) st.session_state.current_topic = topic st.subheader(f"📝 Story: {topic}") st.write(story) st.success("Story generated! Now you can test your understanding with games.") except Exception as e: st.error(f"Error generating explanation: {str(e)}") def play_games_page(rag_system, games): st.header("🎮 Test Your Knowledge") topic = st.text_input("Enter topic to test:", value=st.session_state.get('current_topic', '')) if topic: game_type = st.selectbox("Choose game type:", ["Multiple Choice", "Fill in the Blanks", "Matching"]) if st.button("Generate Questions"): with st.spinner("Generating questions..."): try: if game_type == "Multiple Choice": questions = rag_system.generate_mcq_questions(topic) games.play_mcq_game(questions, topic) elif game_type == "Fill in the Blanks": questions = rag_system.generate_fill_blank_questions(topic) games.play_fill_blank_game(questions, topic) elif game_type == "Matching": questions = rag_system.generate_matching_questions(topic) games.play_matching_game(questions, topic) except Exception as e: st.error(f"Error generating questions: {str(e)}") # Main function to run the app def main(): st.set_page_config( page_title="RAG Learning System", page_icon="🤖", layout="wide" ) st.title("🤖 RAG Learning System") st.write("Upload your learning materials and start your interactive learning journey!") # Check for API keys from Hugging Face secrets before proceeding if "COHERE_API_KEY" not in os.environ or "GROQ_API_KEY" not in os.environ: st.error("API keys not found. Please add `COHERE_API_KEY` and `GROQ_API_KEY` as secrets in the Hugging Face Space settings.") st.stop() doc_processor = DocumentProcessor() games = LearningGames() dashboard = LearningDashboard() st.sidebar.title("Navigation") page = st.sidebar.selectbox("Choose a page:", ["Process Document", "Learn Topic", "Play Games", "Dashboard"]) if page == "Process Document": upload_and_process_page(doc_processor) elif page == "Learn Topic": if 'vectorstore' in st.session_state: learn_topic_page(RAGLearningSystem(st.session_state.vectorstore)) else: st.warning("Please process a document first!") elif page == "Play Games": if 'vectorstore' in st.session_state: play_games_page(RAGLearningSystem(st.session_state.vectorstore), games) else: st.warning("Please process a document first!") elif page == "Dashboard": dashboard.show_dashboard() if __name__ == "__main__": main()