Spaces:
Build error
Build error
| import streamlit as st | |
| import pandas as pd | |
| import os | |
| import tempfile | |
| from typing import List, Optional, Dict, Any, Union | |
| import json | |
| from datetime import datetime | |
| from llama_cpp import Llama | |
| from langchain.output_parsers import PydanticOutputParser | |
| from langchain.prompts import ChatPromptTemplate | |
| from langchain.schema import HumanMessage, SystemMessage | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.schema.runnable import RunnablePassthrough | |
| from langchain.prompts.prompt import PromptTemplate | |
| from langchain.chains import ConversationalRetrievalChain | |
| from langchain.chains import LLMChain | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain_community.vectorstores import Chroma # Fixed import | |
| from pydantic import BaseModel, Field | |
| from Ingestion.ingest import process_document, get_processor_for_file | |
| import warnings | |
| warnings.filterwarnings("ignore", category=RuntimeWarning) | |
| # Set page configuration | |
| st.set_page_config( | |
| page_title="DocMind AI: AI-Powered Document Analysis", | |
| page_icon="🧠", | |
| layout="wide", | |
| initial_sidebar_state="expanded", | |
| ) | |
| # Custom CSS for better dark/light mode compatibility | |
| st.markdown(""" | |
| <style> | |
| /* Common styles for both modes */ | |
| .stApp { | |
| max-width: 1200px; | |
| margin: 0 auto; | |
| } | |
| /* Card styling for results */ | |
| .card { | |
| border-radius: 5px; | |
| padding: 1.5rem; | |
| margin-bottom: 1rem; | |
| border: 1px solid rgba(128, 128, 128, 0.2); | |
| } | |
| /* Dark mode specific */ | |
| @media (prefers-color-scheme: dark) { | |
| .card { | |
| background-color: rgba(255, 255, 255, 0.05); | |
| } | |
| .highlight-container { | |
| background-color: rgba(255, 255, 255, 0.05); | |
| border-left: 3px solid #4CAF50; | |
| } | |
| .chat-user { | |
| background-color: rgba(0, 0, 0, 0.2); | |
| } | |
| .chat-ai { | |
| background-color: rgba(76, 175, 80, 0.1); | |
| } | |
| } | |
| /* Light mode specific */ | |
| @media (prefers-color-scheme: light) { | |
| .card { | |
| background-color: rgba(0, 0, 0, 0.02); | |
| } | |
| .highlight-container { | |
| background-color: rgba(0, 0, 0, 0.03); | |
| border-left: 3px solid #4CAF50; | |
| } | |
| .chat-user { | |
| background-color: rgba(240, 240, 240, 0.7); | |
| } | |
| .chat-ai { | |
| background-color: rgba(76, 175, 80, 0.05); | |
| } | |
| } | |
| /* Chat message styling */ | |
| .chat-container { | |
| margin-bottom: 1rem; | |
| } | |
| .chat-message { | |
| padding: 1rem; | |
| border-radius: 5px; | |
| margin-bottom: 0.5rem; | |
| } | |
| /* Highlight sections */ | |
| .highlight-container { | |
| padding: 1rem; | |
| margin: 1rem 0; | |
| border-radius: 4px; | |
| } | |
| /* Status indicators */ | |
| .status-success { | |
| color: #4CAF50; | |
| } | |
| .status-error { | |
| color: #F44336; | |
| } | |
| /* Document list */ | |
| .doc-list { | |
| list-style-type: none; | |
| padding-left: 0; | |
| } | |
| .doc-list li { | |
| padding: 0.5rem 0; | |
| border-bottom: 1px solid rgba(128, 128, 128, 0.2); | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Define the output structures using Pydantic | |
| class DocumentAnalysis(BaseModel): | |
| summary: str = Field(description="A concise summary of the document") | |
| key_insights: List[str] = Field(description="A list of key insights from the document") | |
| action_items: Optional[List[str]] = Field(None, description="A list of action items derived from the document") | |
| open_questions: Optional[List[str]] = Field(None, description="A list of open questions or areas needing clarification") | |
| # Function to clean up LLM responses for better parsing | |
| def clean_llm_response(response): | |
| """Clean up the LLM response to extract JSON content from potential markdown code blocks.""" | |
| # Extract content from the response | |
| if isinstance(response, dict) and 'choices' in response: | |
| content = response['choices'][0]['message']['content'] | |
| else: | |
| content = str(response) | |
| # Remove markdown code block formatting if present | |
| if '```' in content: | |
| # Handle ```json format | |
| parts = content.split('```') | |
| if len(parts) >= 3: # Has opening and closing backticks | |
| # Take the content between first pair of backticks | |
| content = parts[1] | |
| # Remove json language specifier if present | |
| if content.startswith('json') or content.startswith('JSON'): | |
| content = content[4:].lstrip() | |
| elif '`json' in content: | |
| # Handle `json format | |
| parts = content.split('`json') | |
| if len(parts) >= 2: | |
| content = parts[1] | |
| if '`' in content: | |
| content = content.split('`')[0] | |
| # Strip any leading/trailing whitespace | |
| content = content.strip() | |
| return content | |
| # Initialize LLM without widgets in the cached function | |
| def load_model(): | |
| try: | |
| llm = Llama.from_pretrained( | |
| repo_id="stduhpf/google-gemma-3-1b-it-qat-q4_0-gguf-small", | |
| filename="gemma-3-1b-it-q4_0_s.gguf", | |
| ) | |
| return llm | |
| except Exception as e: | |
| return None | |
| # Initialize embeddings without widgets in the cached function | |
| def load_embeddings(): | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2", | |
| model_kwargs={'device': 'cpu'} | |
| ) | |
| return embeddings | |
| # Sidebar Configuration with improved styling | |
| st.sidebar.markdown("<div style='text-align: center;'><h1>🧠 DocMind AI</h1></div>", unsafe_allow_html=True) | |
| st.sidebar.markdown("<div style='text-align: center;'>AI-Powered Document Analysis</div>", unsafe_allow_html=True) | |
| st.sidebar.markdown("---") | |
| # Load LLM - Move spinner outside the cached function | |
| with st.sidebar: | |
| with st.spinner("Loading model..."): | |
| llm = load_model() | |
| if llm is not None: | |
| st.markdown("<div class='status-success'>✅ Model loaded successfully!</div>", unsafe_allow_html=True) | |
| else: | |
| st.markdown("<div class='status-error'>❌ Error loading model. Check logs for details.</div>", unsafe_allow_html=True) | |
| st.stop() | |
| # Mode Selection | |
| with st.sidebar: | |
| st.markdown("### Analysis Configuration") | |
| analysis_mode = st.radio( | |
| "Analysis Mode", | |
| ["Analyze each document separately", "Combine analysis for all documents"] | |
| ) | |
| # Prompt Selection | |
| prompt_options = { | |
| "Comprehensive Document Analysis": "Analyze the provided document comprehensively. Generate a summary, extract key insights, identify action items, and list open questions.", | |
| "Extract Key Insights and Action Items": "Extract key insights and action items from the provided document.", | |
| "Summarize and Identify Open Questions": "Summarize the provided document and identify any open questions that need clarification.", | |
| "Custom Prompt": "Enter a custom prompt below:" | |
| } | |
| with st.sidebar: | |
| st.markdown("### Prompt Settings") | |
| selected_prompt_option = st.selectbox("Select Prompt", list(prompt_options.keys())) | |
| custom_prompt = "" | |
| if selected_prompt_option == "Custom Prompt": | |
| custom_prompt = st.text_area("Enter Custom Prompt", height=100) | |
| # Tone Selection | |
| tone_options = [ | |
| "Professional", "Academic", "Informal", "Creative", "Neutral", | |
| "Direct", "Empathetic", "Humorous", "Authoritative", "Inquisitive" | |
| ] | |
| with st.sidebar: | |
| selected_tone = st.selectbox("Select Tone", tone_options) | |
| # Instructions Selection | |
| instruction_options = { | |
| "General Assistant": "Act as a helpful assistant.", | |
| "Researcher": "Act as a researcher providing in-depth analysis.", | |
| "Software Engineer": "Act as a software engineer focusing on code and technical details.", | |
| "Product Manager": "Act as a product manager considering strategy and user experience.", | |
| "Data Scientist": "Act as a data scientist emphasizing data analysis.", | |
| "Business Analyst": "Act as a business analyst considering strategic aspects.", | |
| "Technical Writer": "Act as a technical writer creating clear documentation.", | |
| "Marketing Specialist": "Act as a marketing specialist focusing on branding.", | |
| "HR Manager": "Act as an HR manager considering people aspects.", | |
| "Legal Advisor": "Act as a legal advisor providing legal perspective.", | |
| "Custom Instructions": "Enter custom instructions below:" | |
| } | |
| with st.sidebar: | |
| st.markdown("### Assistant Behavior") | |
| selected_instruction = st.selectbox("Select Instructions", list(instruction_options.keys())) | |
| custom_instruction = "" | |
| if selected_instruction == "Custom Instructions": | |
| custom_instruction = st.text_area("Enter Custom Instructions", height=100) | |
| # Length/Detail Selection | |
| length_options = ["Concise", "Detailed", "Comprehensive", "Bullet Points"] | |
| with st.sidebar: | |
| st.markdown("### Response Format") | |
| selected_length = st.selectbox("Select Length/Detail", length_options) | |
| # Main Area | |
| st.markdown("<h1 style='text-align: center;'>📄 DocMind AI: Document Analysis</h1>", unsafe_allow_html=True) | |
| st.markdown("<p style='text-align: center;'>Upload documents and analyze them using the Gemma model</p>", unsafe_allow_html=True) | |
| # File Upload with improved UI | |
| uploaded_files = st.file_uploader( | |
| "Upload Documents", | |
| accept_multiple_files=True, | |
| type=["pdf", "docx", "txt", "xlsx", "md", "json", "xml", "rtf", "csv", "msg", "pptx", "odt", "epub", | |
| "py", "js", "java", "ts", "tsx", "c", "cpp", "h", "html", "css", "sql", "rb", "go", "rs", "php"] | |
| ) | |
| # Display uploaded files with better visual indication | |
| if uploaded_files: | |
| st.markdown("<div class='highlight-container'>", unsafe_allow_html=True) | |
| st.markdown("### Uploaded Documents") | |
| st.markdown("<ul class='doc-list'>", unsafe_allow_html=True) | |
| for file in uploaded_files: | |
| st.markdown(f"<li>📄 {file.name}</li>", unsafe_allow_html=True) | |
| st.markdown("</ul>", unsafe_allow_html=True) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # Function to process the documents and run analysis | |
| def run_analysis(): | |
| if not uploaded_files: | |
| st.error("Please upload at least one document.") | |
| return | |
| # Save uploaded files to temporary directory | |
| temp_dir = tempfile.mkdtemp() | |
| file_paths = [] | |
| for uploaded_file in uploaded_files: | |
| file_path = os.path.join(temp_dir, uploaded_file.name) | |
| with open(file_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| file_paths.append(file_path) | |
| # Process documents | |
| with st.spinner("Processing documents..."): | |
| all_texts = [] | |
| processed_docs = [] | |
| progress_bar = st.progress(0) | |
| for i, file_path in enumerate(file_paths): | |
| processor = get_processor_for_file(file_path) | |
| if processor: | |
| try: | |
| doc_data = process_document(file_path) | |
| if doc_data is not None and len(doc_data.strip()) > 0: # Ensure we have content | |
| all_texts.append(doc_data) | |
| processed_docs.append({"name": os.path.basename(file_path), "data": doc_data}) | |
| except Exception as e: | |
| st.error(f"Error processing {os.path.basename(file_path)}: {str(e)}") | |
| progress_bar.progress((i + 1) / len(file_paths)) | |
| if not all_texts: | |
| st.error("No documents could be processed. Please check the file formats and try again.") | |
| return | |
| # Build the prompt | |
| if selected_prompt_option == "Custom Prompt": | |
| prompt_text = custom_prompt | |
| else: | |
| prompt_text = prompt_options[selected_prompt_option] | |
| if selected_instruction == "Custom Instructions": | |
| instruction_text = custom_instruction | |
| else: | |
| instruction_text = instruction_options[selected_instruction] | |
| # Add tone guidance | |
| tone_guidance = f"Use a {selected_tone.lower()} tone in your response." | |
| # Add length guidance | |
| length_guidance = "" | |
| if selected_length == "Concise": | |
| length_guidance = "Keep your response brief and to the point." | |
| elif selected_length == "Detailed": | |
| length_guidance = "Provide a detailed response with thorough explanations." | |
| elif selected_length == "Comprehensive": | |
| length_guidance = "Provide a comprehensive in-depth analysis covering all aspects." | |
| elif selected_length == "Bullet Points": | |
| length_guidance = "Format your response primarily using bullet points for clarity." | |
| # Set up the output parser | |
| output_parser = PydanticOutputParser(pydantic_object=DocumentAnalysis) | |
| format_instructions = output_parser.get_format_instructions() | |
| if analysis_mode == "Analyze each document separately": | |
| results = [] | |
| for doc in processed_docs: | |
| with st.spinner(f"Analyzing {doc['name']}..."): | |
| # Create system message with combined instructions | |
| system_message = f"{instruction_text} {tone_guidance} {length_guidance} Format your response according to these instructions: {format_instructions}" | |
| prompt = f""" | |
| {prompt_text} | |
| Document: {doc['name']} | |
| Content: {doc['data']} | |
| """ | |
| # Get response from LLM | |
| try: | |
| response = llm.create_chat_completion( | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": system_message | |
| }, | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ] | |
| ) | |
| # Try to parse the response into the pydantic model | |
| try: | |
| # Clean the response before parsing | |
| cleaned_response = clean_llm_response(response) | |
| parsed_response = output_parser.parse(cleaned_response) | |
| results.append({ | |
| "document_name": doc['name'], | |
| "analysis": parsed_response.dict() | |
| }) | |
| except Exception as e: | |
| # If parsing fails, include the raw response | |
| if isinstance(response, dict) and 'choices' in response: | |
| raw_response = response['choices'][0]['message']['content'] | |
| else: | |
| raw_response = str(response) | |
| results.append({ | |
| "document_name": doc['name'], | |
| "analysis": raw_response, | |
| "parsing_error": str(e) | |
| }) | |
| except Exception as e: | |
| st.error(f"Error analyzing {doc['name']}: {str(e)}") | |
| # Display results with card-based UI | |
| for result in results: | |
| st.markdown(f"<div class='card'>", unsafe_allow_html=True) | |
| st.markdown(f"<h3>Analysis for: {result['document_name']}</h3>", unsafe_allow_html=True) | |
| if isinstance(result['analysis'], dict) and 'parsing_error' not in result: | |
| # Structured output | |
| st.markdown("<div class='highlight-container'>", unsafe_allow_html=True) | |
| st.markdown("### Summary") | |
| st.write(result['analysis']['summary']) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| st.markdown("### Key Insights") | |
| for insight in result['analysis']['key_insights']: | |
| st.markdown(f"- {insight}") | |
| if result['analysis'].get('action_items'): | |
| st.markdown("<div class='highlight-container'>", unsafe_allow_html=True) | |
| st.markdown("### Action Items") | |
| for item in result['analysis']['action_items']: | |
| st.markdown(f"- {item}") | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| if result['analysis'].get('open_questions'): | |
| st.markdown("### Open Questions") | |
| for question in result['analysis']['open_questions']: | |
| st.markdown(f"- {question}") | |
| else: | |
| # Raw output | |
| st.markdown(result['analysis']) | |
| if 'parsing_error' in result: | |
| st.info(f"Note: The response could not be parsed into the expected format. Error: {result['parsing_error']}") | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| else: | |
| with st.spinner("Analyzing all documents together..."): | |
| # Combine all documents | |
| combined_content = "\n\n".join([f"Document: {doc['name']}\n\nContent: {doc['data']}" for doc in processed_docs]) | |
| # Create system message with combined instructions | |
| system_message = f"{instruction_text} {tone_guidance} {length_guidance} Format your response according to these instructions: {format_instructions}" | |
| # Create the prompt template for HuggingFace models | |
| prompt = f""" | |
| {prompt_text} | |
| {combined_content} | |
| """ | |
| # Get response from LLM | |
| try: | |
| response = llm.create_chat_completion( | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": system_message | |
| }, | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ] | |
| ) | |
| # Try to parse the response into the pydantic model | |
| try: | |
| # Clean the response before parsing | |
| cleaned_response = clean_llm_response(response) | |
| parsed_response = output_parser.parse(cleaned_response) | |
| st.markdown("<div class='card'>", unsafe_allow_html=True) | |
| st.markdown("<h3>Combined Analysis for All Documents</h3>", unsafe_allow_html=True) | |
| st.markdown("<div class='highlight-container'>", unsafe_allow_html=True) | |
| st.markdown("### Summary") | |
| st.write(parsed_response.summary) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| st.markdown("### Key Insights") | |
| for insight in parsed_response.key_insights: | |
| st.markdown(f"- {insight}") | |
| if parsed_response.action_items: | |
| st.markdown("<div class='highlight-container'>", unsafe_allow_html=True) | |
| st.markdown("### Action Items") | |
| for item in parsed_response.action_items: | |
| st.markdown(f"- {item}") | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| if parsed_response.open_questions: | |
| st.markdown("### Open Questions") | |
| for question in parsed_response.open_questions: | |
| st.markdown(f"- {question}") | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| except Exception as e: | |
| # If parsing fails, return the raw response | |
| st.markdown("<div class='card'>", unsafe_allow_html=True) | |
| st.markdown("<h3>Combined Analysis for All Documents</h3>", unsafe_allow_html=True) | |
| # Get raw content from response | |
| if isinstance(response, dict) and 'choices' in response: | |
| raw_response = response['choices'][0]['message']['content'] | |
| else: | |
| raw_response = str(response) | |
| st.markdown(raw_response) | |
| st.info(f"Note: The response could not be parsed into the expected format. Error: {str(e)}") | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| except Exception as e: | |
| st.error(f"Error analyzing documents: {str(e)}") | |
| # Create text chunks for embeddings | |
| with st.spinner("Setting up document chat..."): | |
| try: | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200 | |
| ) | |
| all_chunks = [] | |
| for doc in processed_docs: | |
| if doc['data'] and len(doc['data'].strip()) > 0: # Verify data exists and is not empty | |
| chunks = text_splitter.split_text(doc['data']) | |
| all_chunks.extend(chunks) | |
| # Only create embeddings if we have chunks | |
| if all_chunks and len(all_chunks) > 0: | |
| # Load embeddings - moving spinner outside | |
| embeddings = load_embeddings() | |
| # Using 'None' as namespace to avoid unique ID issues with Chroma | |
| vectorstore = Chroma.from_texts( | |
| texts=all_chunks, | |
| embedding=embeddings, | |
| collection_name="docmind_collection", | |
| collection_metadata={"timestamp": datetime.now().isoformat()} | |
| ) | |
| retriever = vectorstore.as_retriever() | |
| # Set up conversation memory | |
| memory = ConversationBufferMemory( | |
| memory_key="chat_history", | |
| return_messages=True | |
| ) | |
| # Create conversational chain | |
| qa_chain = ConversationalRetrievalChain.from_llm( | |
| llm=llm, | |
| retriever=retriever, | |
| memory=memory | |
| ) | |
| st.session_state['qa_chain'] = qa_chain | |
| st.session_state['chat_history'] = [] | |
| st.success("Document chat is ready! Ask questions about your documents below.") | |
| else: | |
| st.warning("No text chunks were created from the documents. Chat functionality is unavailable.") | |
| except Exception as e: | |
| st.error(f"Error setting up document chat: {str(e)}") | |
| # For debugging purposes | |
| st.exception(e) | |
| # Initialize chat history | |
| if 'chat_history' not in st.session_state: | |
| st.session_state['chat_history'] = [] | |
| # Chat Interface with improved styling | |
| st.markdown("---") | |
| st.markdown("<h2 style='text-align: center;'>💬 Chat with your Documents</h2>", unsafe_allow_html=True) | |
| st.markdown("<p style='text-align: center;'>Ask follow-up questions about the analyzed documents.</p>", unsafe_allow_html=True) | |
| # Process the analysis if button is clicked | |
| col1, col2, col3 = st.columns([1, 2, 1]) | |
| with col2: | |
| if st.button("Extract and Analyze", use_container_width=True): | |
| run_analysis() | |
| # Chat input and display | |
| if 'qa_chain' in st.session_state: | |
| st.markdown("<div class='card'>", unsafe_allow_html=True) | |
| user_question = st.text_input("Ask a question about your documents:") | |
| if user_question: | |
| with st.spinner("Generating response..."): | |
| try: | |
| response = st.session_state['qa_chain'].invoke({"question": user_question}) | |
| st.session_state['chat_history'].append({"question": user_question, "answer": response['answer']}) | |
| except Exception as e: | |
| st.error(f"Error generating response: {str(e)}") | |
| # Display chat history with improved styling | |
| for exchange in st.session_state['chat_history']: | |
| st.markdown("<div class='chat-container'>", unsafe_allow_html=True) | |
| st.markdown(f"<div class='chat-message chat-user'><strong>You:</strong> {exchange['question']}</div>", unsafe_allow_html=True) | |
| st.markdown(f"<div class='chat-message chat-ai'><strong>DocMind AI:</strong> {exchange['answer']}</div>", unsafe_allow_html=True) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # Footer | |
| st.markdown("---") | |
| st.markdown( | |
| """ | |
| <div style="text-align: center"> | |
| <p>Built with ❤️ using Streamlit, LangChain, and Gemma model</p> | |
| <p>DocMind AI - AI-Powered Document Analysis</p> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) |