Spaces:
Build error
Build error
| import streamlit as st | |
| import os | |
| import time | |
| import tempfile | |
| import subprocess | |
| import re | |
| import PyPDF2 | |
| import docx | |
| from fpdf import FPDF | |
| from pathlib import Path | |
| import threading | |
| import queue | |
| import logging | |
| from datetime import datetime | |
| import io | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("streamlit_app.log"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Set page configuration | |
| st.set_page_config( | |
| page_title="B.Tech Assignment Solution Generator", | |
| page_icon="🎓", | |
| layout="wide", | |
| initial_sidebar_state="expanded", | |
| menu_items={ | |
| 'About': "# B.Tech Assignment Solution Generator\nThis app processes B.Tech assignments and generates comprehensive solutions." | |
| } | |
| ) | |
| # Apply dark theme | |
| st.markdown(""" | |
| <style> | |
| .main { | |
| background-color: #0E1117; | |
| color: white; | |
| } | |
| .stButton>button { | |
| background-color: #4CAF50; | |
| color: white; | |
| border-radius: 8px; | |
| padding: 10px 24px; | |
| font-weight: bold; | |
| border: none; | |
| transition-duration: 0.4s; | |
| } | |
| .stButton>button:hover { | |
| background-color: #45a049; | |
| } | |
| .stTextInput>div>div>input { | |
| background-color: #1E2126; | |
| color: white; | |
| } | |
| .stMarkdown { | |
| color: white; | |
| } | |
| .css-1cpxqw2 { | |
| background-color: #262730; | |
| border-radius: 10px; | |
| padding: 20px; | |
| margin-bottom: 20px; | |
| } | |
| .css-1v3fvcr { | |
| background-color: #0E1117; | |
| } | |
| .css-18e3th9 { | |
| padding-top: 2rem; | |
| } | |
| .css-1kyxreq { | |
| justify-content: center; | |
| align-items: center; | |
| } | |
| .stProgress > div > div > div > div { | |
| background-color: #4CAF50; | |
| } | |
| .log-container { | |
| background-color: #1E2126; | |
| color: #B0B0B0; | |
| padding: 15px; | |
| border-radius: 8px; | |
| height: 300px; | |
| overflow-y: auto; | |
| font-family: monospace; | |
| margin-bottom: 20px; | |
| } | |
| .success-message { | |
| color: #4CAF50; | |
| font-weight: bold; | |
| } | |
| .error-message { | |
| color: #FF5252; | |
| font-weight: bold; | |
| } | |
| .info-message { | |
| color: #2196F3; | |
| } | |
| .warning-message { | |
| color: #FFC107; | |
| } | |
| .pdf-container { | |
| background-color: #262730; | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin-top: 20px; | |
| } | |
| .download-button { | |
| background-color: #2196F3; | |
| color: white; | |
| padding: 10px 15px; | |
| border-radius: 5px; | |
| text-decoration: none; | |
| display: inline-block; | |
| margin-top: 10px; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| def extract_text_from_pdf(file): | |
| """Extract text from a PDF file.""" | |
| try: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() + "\n" | |
| return text | |
| except Exception as e: | |
| logger.error(f"Error extracting text from PDF: {e}") | |
| st.error(f"Error extracting text from PDF: {e}") | |
| return None | |
| def extract_text_from_docx(file): | |
| """Extract text from a DOCX file.""" | |
| try: | |
| doc = docx.Document(file) | |
| text = "" | |
| for para in doc.paragraphs: | |
| text += para.text + "\n" | |
| return text | |
| except Exception as e: | |
| logger.error(f"Error extracting text from DOCX: {e}") | |
| st.error(f"Error extracting text from DOCX: {e}") | |
| return None | |
| def create_pdf(content, filename="solution.pdf"): | |
| """Create a PDF file from the solution content.""" | |
| try: | |
| pdf = FPDF() | |
| pdf.add_page() | |
| pdf.set_auto_page_break(auto=True, margin=15) | |
| # Set font for title | |
| pdf.set_font("Arial", "B", 16) | |
| pdf.cell(0, 10, "B.Tech Assignment Solution", ln=True, align="C") | |
| pdf.ln(5) | |
| # Set font for content | |
| pdf.set_font("Arial", "", 12) | |
| # Process content by paragraphs | |
| paragraphs = content.split('\n') | |
| for para in paragraphs: | |
| # Check if this is a heading (simple heuristic) | |
| if para.strip() and len(para.strip()) < 100 and para.strip().isupper(): | |
| pdf.set_font("Arial", "B", 14) | |
| pdf.cell(0, 10, para, ln=True) | |
| pdf.set_font("Arial", "", 12) | |
| else: | |
| # Handle regular paragraph | |
| pdf.multi_cell(0, 10, para) | |
| pdf.ln(2) | |
| # Save PDF to a bytes buffer | |
| pdf_buffer = io.BytesIO() | |
| pdf.output(pdf_buffer) | |
| pdf_buffer.seek(0) | |
| return pdf_buffer | |
| except Exception as e: | |
| logger.error(f"Error creating PDF: {e}") | |
| st.error(f"Error creating PDF: {e}") | |
| return None | |
| def process_log_line(line): | |
| """Process a log line to determine its type and format it for display.""" | |
| line = line.strip() | |
| if not line: | |
| return None, None | |
| # Determine message type | |
| if "ERROR" in line or "error" in line.lower(): | |
| msg_type = "error" | |
| elif "WARNING" in line or "warning" in line.lower(): | |
| msg_type = "warning" | |
| elif "INFO" in line or "info" in line.lower(): | |
| msg_type = "info" | |
| else: | |
| msg_type = "info" | |
| # Extract the important part of the message | |
| # Remove timestamp and log level if present | |
| match = re.search(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} - \w+ - \w+ - (.*)', line) | |
| if match: | |
| message = match.group(1) | |
| else: | |
| message = line | |
| return msg_type, message | |
| def monitor_log_file(log_file, message_queue, stop_event): | |
| """Monitor the log file for new content and put it in the queue.""" | |
| try: | |
| with open(log_file, 'r') as f: | |
| # Move to the end of the file | |
| f.seek(0, 2) | |
| while not stop_event.is_set(): | |
| line = f.readline() | |
| if line: | |
| msg_type, message = process_log_line(line) | |
| if msg_type and message: | |
| message_queue.put((msg_type, message)) | |
| else: | |
| # No new line, wait a bit | |
| time.sleep(0.1) | |
| except Exception as e: | |
| logger.error(f"Error monitoring log file: {e}") | |
| message_queue.put(("error", f"Error monitoring log file: {e}")) | |
| def run_assignment_processor(input_file_path, message_queue): | |
| """Run the assignment processor and capture its output.""" | |
| try: | |
| # Get the directory of the current script | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| # Construct the path to streamlit_main.py | |
| main_script = os.path.join(script_dir, "streamlit_main.py") | |
| # Activate virtual environment if available | |
| venv_python = os.path.join(script_dir, "venv", "bin", "python") | |
| if os.path.exists(venv_python): | |
| python_cmd = venv_python | |
| else: | |
| python_cmd = "python3" | |
| # Run the assignment processor | |
| cmd = [python_cmd, main_script, input_file_path] | |
| # Add log message | |
| message_queue.put(("info", "Starting assignment processing...")) | |
| # Execute the command | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True | |
| ) | |
| # Capture and process output | |
| for line in process.stdout: | |
| msg_type, message = process_log_line(line) | |
| if msg_type and message: | |
| message_queue.put((msg_type, message)) | |
| # Wait for process to complete | |
| return_code = process.wait() | |
| if return_code != 0: | |
| # Capture error output if process failed | |
| error_output = process.stderr.read() | |
| message_queue.put(("error", f"Process failed with return code {return_code}: {error_output}")) | |
| return False | |
| # Add completion message | |
| message_queue.put(("success", "COMPLETED B.TECH ASSIGNMENT SOLUTION PIPELINE")) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error running assignment processor: {e}") | |
| message_queue.put(("error", f"Error running assignment processor: {e}")) | |
| return False | |
| def main(): | |
| # Header | |
| st.title("B.Tech Assignment Solution Generator") | |
| st.markdown("Upload your assignment file (PDF or DOCX) and get comprehensive solutions.") | |
| # File uploader | |
| uploaded_file = st.file_uploader("Choose an assignment file", type=["pdf", "docx"]) | |
| if uploaded_file is not None: | |
| st.success(f"File uploaded: {uploaded_file.name}") | |
| # Extract text from the uploaded file | |
| if uploaded_file.name.endswith('.pdf'): | |
| text = extract_text_from_pdf(uploaded_file) | |
| elif uploaded_file.name.endswith('.docx'): | |
| text = extract_text_from_docx(uploaded_file) | |
| else: | |
| st.error("Unsupported file format. Please upload a PDF or DOCX file.") | |
| return | |
| if text is None: | |
| st.error("Failed to extract text from the file.") | |
| return | |
| # Preview the extracted text | |
| with st.expander("Preview Extracted Text", expanded=False): | |
| st.text_area("Extracted Text", text, height=200) | |
| # Process button | |
| if st.button("Generate Solution"): | |
| # Create a temporary file to save the extracted text | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file: | |
| temp_file.write(text) | |
| temp_file_path = temp_file.name | |
| try: | |
| # Set up progress tracking | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| # Create a container for log messages | |
| st.subheader("Processing Status") | |
| log_container = st.empty() | |
| log_messages = [] | |
| # Create a queue for messages | |
| message_queue = queue.Queue() | |
| # Create an event to signal thread to stop | |
| stop_event = threading.Event() | |
| # Start a thread to monitor the log file | |
| log_file = "btech_solution_system.log" | |
| log_thread = threading.Thread( | |
| target=monitor_log_file, | |
| args=(log_file, message_queue, stop_event) | |
| ) | |
| log_thread.daemon = True | |
| log_thread.start() | |
| # Start processing in a separate thread | |
| processing_thread = threading.Thread( | |
| target=run_assignment_processor, | |
| args=(temp_file_path, message_queue) | |
| ) | |
| processing_thread.daemon = True | |
| processing_thread.start() | |
| # Update progress and display log messages | |
| start_time = time.time() | |
| solution_content = None | |
| # Define key stages and their progress values | |
| stages = { | |
| "ANALYZING QUESTIONS": 10, | |
| "RESEARCHING INFORMATION": 30, | |
| "DEVELOPING INITIAL SOLUTIONS": 50, | |
| "VERIFYING SOLUTIONS": 70, | |
| "REVISING SOLUTIONS": 85, | |
| "FORMATTING FINAL REPORT": 95, | |
| "COMPLETED": 100 | |
| } | |
| current_progress = 0 | |
| # Update status until processing is complete | |
| while processing_thread.is_alive() or not message_queue.empty(): | |
| # Check for new messages | |
| try: | |
| while not message_queue.empty(): | |
| msg_type, message = message_queue.get(block=False) | |
| # Update progress based on message content | |
| for stage, progress_value in stages.items(): | |
| if stage in message: | |
| current_progress = progress_value | |
| break | |
| # Format message based on type | |
| formatted_msg = f"<div class='{msg_type}-message'>{message}</div>" | |
| log_messages.append(formatted_msg) | |
| # Keep only the last 20 messages to avoid cluttering | |
| if len(log_messages) > 20: | |
| log_messages = log_messages[-20:] | |
| # Update log display | |
| log_container.markdown( | |
| f"<div class='log-container'>{''.join(log_messages)}</div>", | |
| unsafe_allow_html=True | |
| ) | |
| # Check if solution is ready | |
| if "COMPLETED B.TECH ASSIGNMENT SOLUTION PIPELINE" in message: | |
| # Try to read the solution file | |
| try: | |
| solution_file = "solution.txt" | |
| if os.path.exists(solution_file): | |
| with open(solution_file, 'r') as f: | |
| solution_content = f.read() | |
| except Exception as e: | |
| logger.error(f"Error reading solution file: {e}") | |
| log_messages.append(f"<div class='error-message'>Error reading solution file: {e}</div>") | |
| except queue.Empty: | |
| pass | |
| # Update progress bar | |
| progress_bar.progress(current_progress) | |
| status_text.text(f"Processing... ({current_progress}%)") | |
| # Check if processing is complete | |
| if current_progress >= 100: | |
| break | |
| # Check timeout (30 minutes) | |
| if time.time() - start_time > 1800: | |
| status_text.error("Processing timed out after 30 minutes.") | |
| break | |
| time.sleep(0.5) | |
| # Stop the log monitoring thread | |
| stop_event.set() | |
| # Check if solution was found | |
| if solution_content: | |
| # Update progress to 100% | |
| progress_bar.progress(100) | |
| status_text.success("Solution generated successfully!") | |
| # Display the solution | |
| st.subheader("Generated Solution") | |
| st.markdown(solution_content) | |
| # Create PDF | |
| pdf_buffer = create_pdf(solution_content) | |
| if pdf_buffer: | |
| # Provide download button for PDF | |
| st.download_button( | |
| label="Download Solution as PDF", | |
| data=pdf_buffer, | |
| file_name="btech_assignment_solution.pdf", | |
| mime="application/pdf", | |
| key="solution-pdf", | |
| help="Click to download the solution as a PDF file" | |
| ) | |
| else: | |
| status_text.error("Failed to generate solution.") | |
| finally: | |
| # Clean up temporary file | |
| if os.path.exists(temp_file_path): | |
| os.unlink(temp_file_path) | |
| if __name__ == "__main__": | |
| main() | |