""" Gradio app for AI Project Assistant. """ import gradio as gr from pathlib import Path import os import hashlib import time from datetime import datetime from dotenv import load_dotenv from src.rag import ProjectRAG from src.agent import ProjectAgent from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint from langchain_google_genai import ChatGoogleGenerativeAI from langchain_core.messages import SystemMessage, HumanMessage # Load environment variables load_dotenv() # Response Cache with TTL class ResponseCache: """Simple in-memory cache with time-to-live for LLM responses.""" def __init__(self, ttl_seconds: int = 300): """Initialize cache with TTL in seconds (default 5 minutes).""" self.cache = {} self.ttl = ttl_seconds def _make_key(self, query: str, project: str, provider: str) -> str: """Create a unique cache key.""" key_str = f"{query}|{project}|{provider}" return hashlib.md5(key_str.encode()).hexdigest() def get(self, query: str, project: str, provider: str) -> str | None: """Get cached response if exists and not expired.""" key = self._make_key(query, project, provider) if key in self.cache: entry = self.cache[key] if time.time() - entry["timestamp"] < self.ttl: return entry["response"] else: del self.cache[key] return None def set(self, query: str, project: str, provider: str, response: str): """Cache a response.""" key = self._make_key(query, project, provider) self.cache[key] = { "response": response, "timestamp": time.time() } def clear(self): """Clear all cached responses.""" self.cache = {} # Initialize response cache (5 minute TTL) response_cache = ResponseCache(ttl_seconds=300) # LangSmith Observability - Enable tracing if API key is set if os.getenv("LANGCHAIN_API_KEY"): os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_PROJECT"] = os.getenv("LANGCHAIN_PROJECT", "sherlock") print("LangSmith tracing enabled") # Global state - Initialize RAG only (not agent) rag = None def initialize_rag(): """Initialize RAG system (embeddings only, no LLM needed).""" global rag data_dir = Path("./data") if not data_dir.exists(): return False try: rag = ProjectRAG(data_dir) rag.load_and_index() return True except Exception as e: print(f"RAG initialization error: {e}") return False # Initialize RAG on module load initialize_rag() def chat(message, history, project_filter, provider, api_token, use_streaming=True): """Process chat message with streaming and caching support.""" if not api_token or api_token.strip() == "": if provider == "HuggingFace (Free)": yield "⚠️ Please enter your HuggingFace token first (get one at https://huggingface.co/settings/tokens)" else: yield "⚠️ Please enter your Google API key first (get one at https://aistudio.google.com/apikey)" return if not rag: yield "⚠️ System not initialized. Please check the data directory." return try: # Check cache first project_key = project_filter if project_filter and project_filter != "All Projects" else "all" provider_key = "hf" if provider == "HuggingFace (Free)" else "google" cached_response = response_cache.get(message, project_key, provider_key) if cached_response: yield f"{cached_response}\n\n_⚡ Cached response_" return # Set token in environment for this request if provider == "HuggingFace (Free)": os.environ["HF_TOKEN"] = api_token.strip() agent = ProjectAgent(rag, provider="huggingface") else: os.environ["GOOGLE_API_KEY"] = api_token.strip() agent = ProjectAgent(rag, provider="google") # Add project context if specified if project_filter and project_filter != "All Projects": enhanced_prompt = f"[Project: {project_filter}] {message}" else: enhanced_prompt = message # Use streaming if enabled if use_streaming: final_response = "" for response_chunk in agent.stream_query(enhanced_prompt): final_response = response_chunk yield response_chunk # Cache the final response response_cache.set(message, project_key, provider_key, final_response) else: response = agent.query(enhanced_prompt) response_cache.set(message, project_key, provider_key, response) yield response except Exception as e: error_msg = str(e).lower() if "401" in error_msg or "unauthorized" in error_msg or "invalid" in error_msg: yield "❌ **Invalid API Token**\n\nYour token appears to be invalid or expired. Please check:\n- Token is correctly copied (no extra spaces)\n- Token has proper permissions\n- Token is not expired" elif "403" in error_msg or "forbidden" in error_msg: yield "❌ **Access Denied**\n\nYour token doesn't have permission to access this model. Please ensure:\n- HuggingFace: Token has 'Read' permission\n- Google: API is enabled in your project" elif "rate" in error_msg or "quota" in error_msg or "limit" in error_msg: yield "❌ **Rate Limit Exceeded**\n\nYou've hit the API rate limit. Please:\n- Wait a few minutes and try again\n- Consider upgrading to a paid plan" elif "timeout" in error_msg or "timed out" in error_msg: yield "❌ **Request Timeout**\n\nThe request took too long. Please try again." else: yield f"❌ **Error**: {str(e)}\n\nPlease verify your API token is valid and try again." def get_projects(): """Get list of projects.""" if not rag: return ["All Projects"] projects = rag.get_all_projects() return ["All Projects"] + projects def export_chat_to_pdf(chat_history, project): """Export chat history to PDF format and return as downloadable file.""" if not chat_history: return None from fpdf import FPDF # Create PDF pdf = FPDF() pdf.set_auto_page_break(auto=True, margin=15) pdf.add_page() # Title pdf.set_font("Helvetica", "B", 16) pdf.cell(0, 10, "Sherlock Chat Export", ln=True, align="C") pdf.ln(5) # Metadata pdf.set_font("Helvetica", "", 10) pdf.cell(0, 6, f"Project: {project}", ln=True) pdf.cell(0, 6, f"Exported: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", ln=True) pdf.ln(10) # Chat content for i, (user_msg, bot_msg) in enumerate(chat_history, 1): # Question header pdf.set_font("Helvetica", "B", 11) pdf.set_fill_color(230, 230, 250) pdf.multi_cell(0, 8, f"Q{i}: {user_msg}", fill=True) pdf.ln(2) # Answer pdf.set_font("Helvetica", "", 10) # Clean up markdown formatting for PDF clean_response = bot_msg.replace("**", "").replace("##", "").replace("- ", " * ") pdf.multi_cell(0, 6, clean_response) pdf.ln(5) # Separator line pdf.set_draw_color(200, 200, 200) pdf.line(10, pdf.get_y(), 200, pdf.get_y()) pdf.ln(5) # Save to temp file filename = f"sherlock_chat_{project.replace(' ', '_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf" filepath = Path("/tmp") / filename pdf.output(str(filepath)) return str(filepath) def structure_meeting(project_name, meeting_title, meeting_date, participants, meeting_text, provider, api_token): """Structure meeting notes using AI.""" if not api_token or api_token.strip() == "": return "❌ Please enter your API token first" if not project_name or not meeting_text: return "❌ Please provide both project name and meeting notes" try: # Create LLM based on provider if provider == "HuggingFace (Free)": endpoint = HuggingFaceEndpoint( repo_id="meta-llama/Llama-3.2-3B-Instruct", temperature=0.3, max_new_tokens=1024, huggingfacehub_api_token=api_token.strip() ) llm = ChatHuggingFace(llm=endpoint) else: llm = ChatGoogleGenerativeAI( model="gemini-2.5-flash-lite", temperature=0.3, google_api_key=api_token.strip(), convert_system_message_to_human=True ) system_prompt = """You are a meeting notes structuring assistant. Convert unstructured meeting notes into a well-formatted markdown document with these sections: 1. # Meeting: [title] 2. Date: [date] 3. Participants: [list] 4. ## Discussion (key points discussed) 5. ## Decisions (decisions made) 6. ## Action Items (as checkboxes with assignee and deadline if mentioned) 7. ## Blockers (any blockers or issues raised) Format action items as: - [ ] Person: Task description by deadline or - [ ] Task description (if no person/deadline mentioned) Extract all relevant information from the raw notes.""" user_prompt = f"""Structure these meeting notes: Raw Notes: {meeting_text} Meeting Details: - Title: {meeting_title or 'Meeting'} - Date: {meeting_date} - Participants: {participants or 'Not specified'} """ messages = [ SystemMessage(content=system_prompt), HumanMessage(content=user_prompt) ] response = llm.invoke(messages) structured_md = response.content # Save to file project_dir = Path("data") / project_name / "meetings" project_dir.mkdir(parents=True, exist_ok=True) filename = f"{meeting_date}-{meeting_title.lower().replace(' ', '-') if meeting_title else 'meeting'}.md" file_path = project_dir / filename with open(file_path, 'w') as f: f.write(structured_md) return f"✅ Meeting structured and saved to `{file_path}`\n\n---\n\n{structured_md}" except Exception as e: error_msg = str(e).lower() if "401" in error_msg or "unauthorized" in error_msg or "invalid" in error_msg: return "❌ **Invalid API Token**\n\nYour token appears to be invalid or expired." elif "403" in error_msg or "forbidden" in error_msg: return "❌ **Access Denied**\n\nYour token doesn't have permission." elif "rate" in error_msg or "quota" in error_msg or "limit" in error_msg: return "❌ **Rate Limit Exceeded**\n\nPlease wait a few minutes and try again." else: return f"❌ **Error**: {str(e)}\n\nPlease verify your API token." # Create Gradio interface with custom CSS custom_css = """ .chatbot-container { background-color: #f7f7f8; border-radius: 8px; padding: 10px; } .example-panel { background-color: #f0f2f6; border-radius: 8px; padding: 15px; height: 100%; color: #1f2937 !important; } .example-panel h3 { color: #1f2937 !important; } .example-panel p, .example-panel li { color: #374151 !important; } .token-status { padding: 10px; border-radius: 5px; margin-top: 10px; font-weight: bold; } .token-accepted { background-color: #d1fae5; color: #065f46; } /* Mobile responsiveness */ @media (max-width: 768px) { .row { flex-direction: column !important; } .chatbot-container { margin-top: 10px; } .example-panel { color: #1f2937 !important; } } """ favicon_head = ''' ''' with gr.Blocks( title="Sherlock: AI Project Assistant", theme=gr.themes.Soft(), css=custom_css, head=favicon_head ) as demo: # Header with logo gr.HTML("""
Sherlock Logo

Sherlock: AI Project Assistant

Your intelligent assistant for managing multiple projects through meeting summaries.

""") # Global Authentication gr.Markdown("### 🔑 Authentication") with gr.Row(): with gr.Column(scale=1): provider_dropdown = gr.Dropdown( label="Select Provider", choices=["HuggingFace (Free)", "Google AI (Paid)"], value="HuggingFace (Free)", interactive=True ) with gr.Column(scale=2): api_token_global = gr.Textbox( label="API Token (Required)", placeholder="Enter your HuggingFace token", type="password" ) with gr.Column(scale=2): provider_info = gr.Markdown(""" **HuggingFace (Free):** 1. Visit [huggingface.co/settings/tokens](https://huggingface.co/settings/tokens) 2. Click "New token" → Select "Read" """) with gr.Row(): submit_token_btn = gr.Button("Submit Token", variant="primary") token_status = gr.Markdown("", elem_classes="token-status") def update_provider_ui(provider): """Update UI based on selected provider. Also clears token and status.""" if provider == "HuggingFace (Free)": return ( gr.update(placeholder="Enter your HuggingFace token", value=""), """**HuggingFace (Free):** 1. Visit [huggingface.co/settings/tokens](https://huggingface.co/settings/tokens) 2. Click "New token" → Select "Read" """, "" # Clear status ) else: return ( gr.update(placeholder="Enter your Google API key", value=""), """**Google AI (Paid):** 1. Visit [aistudio.google.com/apikey](https://aistudio.google.com/apikey) 2. Create an API key """, "" # Clear status ) provider_dropdown.change( fn=update_provider_ui, inputs=[provider_dropdown], outputs=[api_token_global, provider_info, token_status] ) def validate_token(token, provider): """Validate token - simplified without heavy API call.""" if not token or not token.strip(): return '
❌ Please enter a token
', "" token_value = token.strip() # Simple format validation if provider == "HuggingFace (Free)": # HF tokens start with "hf_" if token_value.startswith("hf_") and len(token_value) > 10: return '
✅ Token format valid - will verify on first query
', token_value else: return '
❌ Invalid HuggingFace token format (should start with hf_)
', "" else: # Google API keys are typically 39 chars if len(token_value) >= 30: return '
✅ API key format valid - will verify on first query
', token_value else: return '
❌ Invalid Google API key format
', "" submit_token_btn.click( fn=validate_token, inputs=[api_token_global, provider_dropdown], outputs=[token_status, api_token_global] ) # Main tabs with gr.Tabs(): # Chat tab with gr.Tab("💬 Chat"): gr.Markdown("### Ask questions about your projects") # State for per-project chat histories chat_histories = gr.State({}) # {project_name: [(user_msg, bot_msg), ...]} current_project = gr.State("All Projects") # Project selection dropdown project_dropdown = gr.Dropdown( label="Select Project", choices=get_projects(), value="All Projects", interactive=True ) # Chat interface with example queries on the side with gr.Row(elem_classes="row"): # Left panel - Example queries (same width as right panel chat box) with gr.Column(scale=1, elem_classes="example-panel"): gr.Markdown(""" ### 📖 How to Use 1. Select the project you want to query from the dropdown above 2. Type your question in the chat box or use one of the examples below 3. Press Enter or click Send ### 💡 Example Queries - What are the open action items? - What blockers do we have? - What decisions were made? - What should I focus on next? - Summarize the project status """) # Right panel - Chat (same width as left panel) with gr.Column(scale=1, elem_classes="chatbot-container"): chatbot = gr.Chatbot( label="Chat", height=350, show_label=False ) msg = gr.Textbox( label="Your Message", placeholder="What are the open action items?", lines=1, show_label=False ) with gr.Row(): submit_btn = gr.Button("Send", variant="primary", scale=1) clear_btn = gr.Button("Clear", scale=1) export_btn = gr.Button("📥 Export", scale=1) export_file = gr.File(label="Download", visible=False) def respond(message, chat_history, histories, project, provider, token): if not message: yield chat_history, "", histories return # Add user message with empty bot response placeholder chat_history = chat_history + [(message, "")] # Stream bot response for response_chunk in chat(message, chat_history, project, provider, token): # Update the last message with streaming response chat_history[-1] = (message, response_chunk) yield chat_history, "", histories # Save final history to per-project histories histories[project] = chat_history.copy() yield chat_history, "", histories return def respond_non_streaming(message, chat_history, histories, project, provider, token): """Non-streaming version for fallback.""" if not message: return chat_history, "", histories # Get bot response bot_message = "" for response_chunk in chat(message, chat_history, project, provider, token, use_streaming=False): bot_message = response_chunk # Add to history as tuple chat_history.append((message, bot_message)) # Save to per-project histories histories[project] = chat_history.copy() return chat_history, "", histories def switch_project(new_project, current_chat, histories, old_project): # Save current chat to old project if current_chat: histories[old_project] = current_chat.copy() # Load chat history for new project (or empty if none) new_chat = histories.get(new_project, []) return new_chat, histories, new_project def clear_chat(project, histories): # Clear current project's history histories[project] = [] return [], histories submit_btn.click( fn=respond, inputs=[msg, chatbot, chat_histories, project_dropdown, provider_dropdown, api_token_global], outputs=[chatbot, msg, chat_histories] ) msg.submit( fn=respond, inputs=[msg, chatbot, chat_histories, project_dropdown, provider_dropdown, api_token_global], outputs=[chatbot, msg, chat_histories] ) clear_btn.click( fn=clear_chat, inputs=[project_dropdown, chat_histories], outputs=[chatbot, chat_histories] ) def handle_export(chat_history, project): """Handle export button click.""" if not chat_history: return gr.update(visible=False, value=None) filepath = export_chat_to_pdf(chat_history, project) return gr.update(visible=True, value=filepath) export_btn.click( fn=handle_export, inputs=[chatbot, project_dropdown], outputs=[export_file] ) # Switch project: save current, load new project_dropdown.change( fn=switch_project, inputs=[project_dropdown, chatbot, chat_histories, current_project], outputs=[chatbot, chat_histories, current_project] ) # Upload Meeting tab with gr.Tab("📤 Upload Meeting"): gr.Markdown("### Upload plain text meeting notes and let AI structure them") # Project selection with toggle with gr.Row(): with gr.Column(): project_mode = gr.Radio( choices=["Use Existing Project", "Create New Project"], value="Use Existing Project", label="Project Selection" ) # Existing project dropdown (shown when "Use Existing" is selected) existing_project = gr.Dropdown( label="Select Existing Project", choices=get_projects()[1:], # Exclude "All Projects" visible=True ) # New project textbox (shown when "Create New" is selected) new_project = gr.Textbox( label="New Project Name", placeholder="e.g., mobile_app_redesign", visible=False ) upload_title = gr.Textbox( label="Meeting Title", placeholder="e.g., Sprint Planning" ) with gr.Column(): upload_date = gr.Textbox( label="Meeting Date (YYYY-MM-DD)", value=datetime.now().strftime("%Y-%m-%d"), placeholder="2025-01-15" ) upload_participants = gr.Textbox( label="Participants (comma-separated)", placeholder="e.g., Alice, Bob, Charlie" ) # Toggle visibility based on project mode def toggle_project_input(mode): if mode == "Use Existing Project": return gr.update(visible=True), gr.update(visible=False) else: return gr.update(visible=False), gr.update(visible=True) project_mode.change( fn=toggle_project_input, inputs=[project_mode], outputs=[existing_project, new_project] ) upload_text = gr.Textbox( label="Meeting Notes (plain text)", placeholder="""Example: We discussed the new feature requirements. Alice will implement the login page by next Friday. Bob raised a concern about the database migration. We decided to use PostgreSQL instead of MySQL. Charlie is blocked waiting for API credentials.""", lines=10 ) structure_btn = gr.Button("🤖 Structure Meeting with AI", variant="primary") structure_output = gr.Markdown(label="Structured Output") def structure_meeting_wrapper(mode, existing_proj, new_proj, title, date, participants, text, provider, token): """Wrapper to handle both project modes.""" global rag # Determine which project name to use project_name = existing_proj if mode == "Use Existing Project" else new_proj result = structure_meeting(project_name, title, date, participants, text, provider, token) # If successful, re-index RAG and update project lists if result.startswith("✅"): # Re-initialize RAG to pick up new project/meeting initialize_rag() # Get updated project list updated_projects = get_projects() updated_existing = updated_projects[1:] # Exclude "All Projects" return ( result, gr.update(choices=updated_projects, value="All Projects"), gr.update(choices=updated_existing) ) return result, gr.update(), gr.update() structure_btn.click( fn=structure_meeting_wrapper, inputs=[project_mode, existing_project, new_project, upload_title, upload_date, upload_participants, upload_text, provider_dropdown, api_token_global], outputs=[structure_output, project_dropdown, existing_project] ) # Insights tab with gr.Tab("📊 Insights"): gr.Markdown("### Project Insights & Analytics") insights_project = gr.Dropdown( label="Select Project", choices=get_projects()[1:], # Exclude "All Projects" interactive=True ) with gr.Row(): with gr.Column(): gr.Markdown("#### 📝 Meeting Summary") gr.Markdown("Generate a comprehensive summary with key takeaways from all meetings.") summary_btn = gr.Button("Generate Summary", variant="primary") summary_output = gr.Markdown(label="Summary") with gr.Column(): gr.Markdown("#### 📈 Trend Analysis") gr.Markdown("Analyze patterns across meetings: recurring topics, blocker trends, action item progress.") trends_btn = gr.Button("Analyze Trends", variant="primary") trends_output = gr.Markdown(label="Trends") def generate_summary(project, provider, token): """Generate a summary with key takeaways for a project.""" if not token or token.strip() == "": return "❌ Please enter your API token first" if not project: return "❌ Please select a project" if not rag: return "❌ System not initialized" try: # Get all meeting content for the project meetings = rag.get_project_documents(project) if not meetings: return f"❌ No meetings found for project: {project}" meeting_content = "\n\n---\n\n".join([doc.page_content for doc in meetings]) # Create LLM if provider == "HuggingFace (Free)": endpoint = HuggingFaceEndpoint( repo_id="meta-llama/Llama-3.2-3B-Instruct", temperature=0.3, max_new_tokens=1500, huggingfacehub_api_token=token.strip() ) llm = ChatHuggingFace(llm=endpoint) else: llm = ChatGoogleGenerativeAI( model="gemini-2.5-flash-lite", temperature=0.3, google_api_key=token.strip() ) prompt = f"""Analyze these meeting notes and provide a comprehensive project summary with key takeaways. Meeting Notes: {meeting_content} Provide: ## Project Summary A brief overview of the project status and progress. ## Key Takeaways - List the most important points and insights - Highlight critical decisions made - Note significant achievements ## Open Items - List pending action items - Note unresolved blockers ## Recommendations - Suggest next steps based on the meeting content """ messages = [HumanMessage(content=prompt)] response = llm.invoke(messages) return response.content except Exception as e: return f"❌ Error: {str(e)}" def analyze_trends(project, provider, token): """Analyze trends across meetings for a project.""" if not token or token.strip() == "": return "❌ Please enter your API token first" if not project: return "❌ Please select a project" if not rag: return "❌ System not initialized" try: # Get all meeting content for the project meetings = rag.get_project_documents(project) if not meetings: return f"❌ No meetings found for project: {project}" if len(meetings) < 2: return "⚠️ Need at least 2 meetings to analyze trends" meeting_content = "\n\n---\n\n".join([doc.page_content for doc in meetings]) # Create LLM if provider == "HuggingFace (Free)": endpoint = HuggingFaceEndpoint( repo_id="meta-llama/Llama-3.2-3B-Instruct", temperature=0.3, max_new_tokens=1500, huggingfacehub_api_token=token.strip() ) llm = ChatHuggingFace(llm=endpoint) else: llm = ChatGoogleGenerativeAI( model="gemini-2.5-flash-lite", temperature=0.3, google_api_key=token.strip() ) prompt = f"""Analyze these meeting notes chronologically and identify trends and patterns. Meeting Notes: {meeting_content} Provide a trend analysis with: ## Topic Evolution How have discussion topics evolved across meetings? ## Recurring Themes What topics or issues keep coming up repeatedly? ## Blocker Patterns - Are there recurring blockers? - How quickly are blockers typically resolved? - Are there systemic issues causing repeated blockers? ## Action Item Trends - Are action items being completed on time? - Who are the most frequently assigned team members? - Are there patterns in delayed items? ## Team Dynamics - Who are the key contributors? - Are there communication patterns worth noting? ## Progress Trajectory Is the project on track? Accelerating or slowing down? """ messages = [HumanMessage(content=prompt)] response = llm.invoke(messages) return response.content except Exception as e: return f"❌ Error: {str(e)}" summary_btn.click( fn=generate_summary, inputs=[insights_project, provider_dropdown, api_token_global], outputs=summary_output ) trends_btn.click( fn=analyze_trends, inputs=[insights_project, provider_dropdown, api_token_global], outputs=trends_output ) # Launch if __name__ == "__main__": import socket def find_free_port(start_port=7860, max_attempts=10): """Find an available port starting from start_port.""" for port in range(start_port, start_port + max_attempts): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', port)) return port except OSError: continue return None port = find_free_port() if port: print(f"Starting on port {port}") demo.launch( server_name="0.0.0.0", server_port=port, favicon_path="assets/favicon/favicon.ico", allowed_paths=["assets/"] ) else: print("Error: Could not find an available port in range 7860-7869")