| import sys |
| import os |
| import streamlit as st |
| import pandas as pd |
| import json |
|
|
|
|
| def check_environment(): |
| """Check environment variable status for deployment debugging""" |
| openai_key = os.getenv("OPENAI_API_KEY") |
| |
| if not openai_key: |
| st.error(""" |
| π¨ **Missing OPENAI_API_KEY Environment Variable** |
| |
| The task extraction requires an OpenAI API key to function. |
| |
| **For Hugging Face Spaces:** |
| 1. Go to your Space settings |
| 2. Click on "Repository secrets" |
| 3. Add `OPENAI_API_KEY` with your OpenAI API key |
| 4. Restart your Space |
| |
| **For local development:** |
| - Create a `.env` file with `OPENAI_API_KEY=your_key_here` |
| - Or set the environment variable in your shell |
| """) |
| return False |
| return True |
|
|
|
|
| |
| try: |
| |
| st._config.set_option('server.maxUploadSize', 1024) |
| except Exception: |
| pass |
|
|
| st.set_page_config( |
| page_title="Automated Task Manager", |
| page_icon="π§", |
| layout="wide" |
| ) |
|
|
| |
| current_dir = os.path.dirname(os.path.abspath(__file__)) |
| if current_dir not in sys.path: |
| sys.path.insert(0, current_dir) |
|
|
| |
| parent_dir = os.path.dirname(current_dir) |
| if parent_dir not in sys.path: |
| sys.path.insert(0, parent_dir) |
|
|
| |
| print(f"Current working directory: {os.getcwd()}") |
| print(f"Script directory: {current_dir}") |
| print(f"Python path: {sys.path[:3]}") |
| print(f"Utils directory exists: {os.path.exists(os.path.join(current_dir, 'utils'))}") |
| print(f"Utils __init__.py exists: {os.path.exists(os.path.join(current_dir, 'utils', '__init__.py'))}") |
|
|
| |
| try: |
| from utils.langgraph_dag import ( |
| run_extraction_pipeline, |
| resume_extraction_pipeline_with_correction |
| ) |
| from utils.email_parser import parse_uploaded_file_with_filters_safe |
| from utils.embedding import embed_dataframe |
|
|
| print("β
Successfully imported utils modules") |
| except ImportError as e: |
| print(f"β Import error: {e}") |
| st.error(f"Import error: {e}") |
| st.error("Please ensure all utils files are properly uploaded to your Hugging Face Space") |
| st.stop() |
|
|
| |
| env_status = check_environment() |
| if not env_status: |
| st.warning("β οΈ Application may not work correctly without proper API keys") |
| st.info("You can still upload and parse emails, but extraction will fail") |
|
|
| |
| st.sidebar.markdown("---") |
| debug_mode = st.sidebar.checkbox("π Enable Debug Mode", value=False) |
| if debug_mode: |
| st.sidebar.info("Debug mode will show detailed extraction logs") |
| |
| st.session_state.debug_mode = True |
| else: |
| st.session_state.debug_mode = False |
|
|
| |
| st.sidebar.markdown("---") |
| st.sidebar.markdown("### ποΈ Database Management") |
|
|
| |
| if st.sidebar.button("ποΈ Clear Cached Data"): |
| for key in ["parsed_emails", "extracted_tasks", "parsing_complete", |
| "processing_complete", "uploaded_file_name", "parse_limit", |
| "process_limit", "extracted_graphs"]: |
| if key in st.session_state: |
| del st.session_state[key] |
| st.sidebar.success("β
Cached data cleared!") |
| st.rerun() |
|
|
| |
| if st.button("Load Emails"): |
| from utils.database import PostgreSQLDatabase |
| import pandas as pd |
| db = PostgreSQLDatabase() |
| emails = db.get_parsed_emails(limit=500) |
| df_emails = pd.DataFrame(emails) |
| st.session_state['parsed_emails'] = df_emails |
| st.session_state['parsing_complete'] = True |
| st.session_state['uploaded_file_name'] = 'Loaded from PostgreSQL' |
| st.success(f"Loaded {len(df_emails)} emails from PostgreSQL database!") |
|
|
| |
| if st.sidebar.button("ποΈ Clear PostgreSQL Database"): |
| try: |
| from utils.database import PostgreSQLDatabase |
| db = PostgreSQLDatabase() |
| if db.connect(): |
| with db.connection.cursor() as cursor: |
| |
| cursor.execute("DELETE FROM task_collaborators") |
| cursor.execute("DELETE FROM tasks") |
| |
| cursor.execute("ALTER SEQUENCE tasks_id_seq RESTART WITH 1") |
| cursor.execute("ALTER SEQUENCE task_collaborators_id_seq RESTART WITH 1") |
| db.close() |
| st.sidebar.success("β
Tasks and collaborators cleared. Parsed emails remain in database.") |
| else: |
| st.sidebar.error("β Failed to connect to PostgreSQL") |
| except Exception as e: |
| st.sidebar.error(f"β Error clearing PostgreSQL: {e}") |
|
|
| |
| if st.sidebar.button("ποΈ Clear Neo4j Database"): |
| try: |
| from utils.neo4j_graph_writer import Neo4jGraphWriter |
| neo4j_writer = Neo4jGraphWriter() |
| if neo4j_writer.connect(): |
| neo4j_writer.clear_graph() |
| neo4j_writer.close() |
| st.sidebar.success("β
Neo4j database cleared!") |
| else: |
| st.sidebar.error("β Failed to connect to Neo4j") |
| except Exception as e: |
| st.sidebar.error(f"β Error clearing Neo4j: {e}") |
|
|
|
|
| def flatten_extractions(json_list): |
| """ |
| Flatten extracted JSON data into a user-friendly table format. |
| |
| Args: |
| json_list: List of validated JSON objects from extraction |
| |
| Returns: |
| pandas.DataFrame: Flattened task data |
| """ |
| rows = [] |
| for item in json_list: |
| |
| if isinstance(item, str): |
| try: |
| item = json.loads(item) |
| except (json.JSONDecodeError, TypeError): |
| |
| continue |
| |
| |
| if not isinstance(item, dict): |
| continue |
| |
| |
| if "validated_json" in item: |
| item = item["validated_json"] |
| |
| topic = item.get("Topic", {}) |
| topic_name = (topic.get("name", "Unknown Topic") |
| if isinstance(topic, dict) else "Unknown Topic") |
| |
| tasks = topic.get("tasks", []) if isinstance(topic, dict) else [] |
| for task_obj in tasks: |
| if not isinstance(task_obj, dict): |
| continue |
| |
| task = task_obj.get("task", {}) |
| if not isinstance(task, dict): |
| continue |
| |
| |
| owner = task.get("owner", {}) |
| if isinstance(owner, dict): |
| owner_name = owner.get("name", "Unknown") |
| owner_role = owner.get("role", "") |
| owner_dept = owner.get("department", "") |
| owner_org = owner.get("organization", "") |
| else: |
| owner_name = str(owner) if owner else "Unknown" |
| owner_role = owner_dept = owner_org = "" |
| |
| |
| rows.append({ |
| "Topic": topic_name, |
| "Task Name": task.get("name", "Unnamed Task"), |
| "Summary": task.get("summary", ""), |
| "Sent Date": task.get("sent_date", ""), |
| "Due Date": task.get("due_date", ""), |
| "Owner Name": owner_name, |
| "Owner Role": owner_role, |
| "Owner Department": owner_dept, |
| "Owner Organization": owner_org, |
| "Email Index": task_obj.get("email_index", "") |
| }) |
| return pd.DataFrame(rows) |
|
|
|
|
| st.set_page_config( |
| page_title="π€ Automated Task Manager", |
| layout="wide", |
| initial_sidebar_state="expanded" |
| ) |
|
|
| |
| try: |
| import streamlit.config as config |
| config.set_option('server.maxUploadSize', 1000) |
| except Exception: |
| pass |
| st.title("π¬ Automated Task Manager") |
|
|
| |
| |
| st.markdown('Click "Load Email" to load a sample dataset (Enron emails) and explore the demo features.') |
|
|
| |
| |
|
|
| |
| if 'parsing_complete' not in st.session_state: |
| st.session_state['parsing_complete'] = False |
| if 'parsed_emails' not in st.session_state: |
| st.session_state['parsed_emails'] = None |
| if 'processing_complete' not in st.session_state: |
| st.session_state['processing_complete'] = False |
| if 'extracted_tasks' not in st.session_state: |
| st.session_state['extracted_tasks'] = [] |
| if 'uploaded_file_name' not in st.session_state: |
| st.session_state['uploaded_file_name'] = 'enron_sample.parquet' |
| if 'process_limit' not in st.session_state: |
| st.session_state['process_limit'] = 5 |
|
|
| |
| if st.session_state.parsing_complete and st.session_state.parsed_emails is not None: |
| df_emails = st.session_state.parsed_emails |
| |
| st.subheader("π Parsed Email Data") |
| filter_settings = st.session_state.get('filter_settings', {}) |
| max_limit = filter_settings.get('max_emails_limit', 'unknown') |
| st.success( |
| f"π Parsed {len(df_emails)} emails from " |
| f"{st.session_state.uploaded_file_name} " |
| f"(filtered from {max_limit} max)" |
| ) |
| |
| with st.expander("π Email Data Details", expanded=False): |
| col1, col2 = st.columns(2) |
| |
| with col1: |
| st.write(f"**Total emails:** {len(df_emails)}") |
| st.write(f"**Columns:** {len(df_emails.columns)}") |
| |
| |
| none_dates = df_emails['date_received'].isna().sum() |
| valid_dates = len(df_emails) - none_dates |
| st.write(f"**Valid dates:** {valid_dates}/{len(df_emails)}") |
| if none_dates > 0: |
| st.warning(f"β οΈ {none_dates} emails have missing dates") |
| |
| st.write("**Column names:**") |
| for col in df_emails.columns: |
| st.write(f" β’ {col}") |
| |
| with col2: |
| st.write("**Sample emails:**") |
| |
| if all(col in df_emails.columns for col in ['from_email', 'subject', 'date_received']): |
| sample_df = df_emails[['from_email', 'subject', 'date_received']].head(3) |
| st.dataframe(sample_df, width='stretch') |
| else: |
| st.warning("Sample columns not found in loaded emails.") |
| |
| |
| st.header("π§ LLM Task Extraction") |
| |
| col1, col2 = st.columns(2) |
| with col1: |
| process_limit = st.number_input( |
| "Max emails to process with LLM", |
| min_value=1, max_value=100, |
| value=st.session_state.process_limit, |
| help="LLM processing is slower and more expensive" |
| ) |
| |
| st.session_state.process_limit = process_limit |
| |
| with col2: |
| estimated_time = process_limit * 3 |
| st.info(f"β±οΈ Estimated time: ~{estimated_time} seconds") |
| |
| if st.button("π Start LLM Processing"): |
| emails_to_process = df_emails.head(process_limit) |
| |
| with st.spinner(f"Processing {len(emails_to_process)} emails with LLM..."): |
| try: |
| |
| with st.status("Creating embeddings..."): |
| index, all_chunks = embed_dataframe(emails_to_process) |
| |
| |
| outputs = [] |
| progress_bar = st.progress(0) |
| |
| with st.status("Running LLM extraction...") as status: |
| for i, (_, email_row) in enumerate(emails_to_process.iterrows()): |
| status.update( |
| label=f"Processing email {i+1}/{len(emails_to_process)}: " |
| f"{email_row.get('Subject', 'No Subject')[:50]}..." |
| ) |
| |
| |
| email_row = emails_to_process.iloc[i].to_dict() |
| |
| |
| message_id = email_row.get('Message-ID') |
| if not message_id: |
| |
| from_addr = email_row.get('From', 'unknown') |
| subject = email_row.get('Subject', 'no-subject') |
| date = email_row.get('Date', '1970-01-01') |
| base_id = f"{from_addr}_{subject}_{date}" |
| message_id = base_id.replace(' ', '_')[:100] |
| |
| |
| try: |
| print(f"π DEBUG: Starting extraction for email {i+1}") |
| print(f"π§ DEBUG: Subject: {email_row.get('Subject', 'No Subject')}") |
| print(f"π§ DEBUG: Message ID: {message_id}") |
| |
| result = run_extraction_pipeline( |
| email_row, index, all_chunks, message_id |
| ) |
| |
| print(f"β
DEBUG: Extraction completed for email {i+1}") |
| print(f"π DEBUG: Result keys: {list(result.keys())}") |
| print(f"π DEBUG: Status: {result.get('status', 'unknown')}") |
| print(f"π DEBUG: Valid: {result.get('valid', 'not set')}") |
| print(f"π DEBUG: Has validated_json: {'validated_json' in result}") |
| |
| |
| if st.session_state.get('debug_mode', False): |
| st.info(f"Email {i+1}: Status={result.get('status')}, Valid={result.get('valid')}") |
| |
| outputs.append(result) |
| except Exception as e: |
| |
| email_subject = email_row.get('Subject', 'No Subject') |
| email_content = email_row.get('Body', email_row.get('Content', '')) |
| |
| |
| template_json = { |
| "Topic": { |
| "name": f"Manual Review: {email_subject[:50]}", |
| "tasks": [{ |
| "task": { |
| "name": "Please extract task from email content", |
| "summary": f"Email content: {email_content[:200]}...", |
| "sent_date": "", |
| "due_date": "", |
| "owner": { |
| "name": "Unknown", |
| "role": "Unknown", |
| "department": "Unknown", |
| "organization": "Unknown" |
| }, |
| "collaborators": [] |
| }, |
| "email_index": message_id |
| }] |
| } |
| } |
| |
| outputs.append({ |
| "email_index": message_id, |
| "status": "error", |
| "error": str(e), |
| "extracted_json": template_json, |
| "correctable_json": json.dumps(template_json, indent=2), |
| "valid": False, |
| "needs_user_review": True, |
| "email_content": email_content[:500], |
| "email_subject": email_subject |
| }) |
| |
| |
| progress_bar.progress((i + 1) / len(emails_to_process)) |
| |
| |
| st.session_state.extracted_tasks = outputs |
| st.session_state.processing_complete = True |
| |
| st.success( |
| f"β
Completed processing {len(emails_to_process)} emails! " |
| f"Generated {len(outputs)} extraction results." |
| ) |
| |
| except Exception as e: |
| st.error(f"β Error during LLM processing: {str(e)}") |
|
|
| |
| if st.session_state.processing_complete and st.session_state.extracted_tasks: |
| outputs = st.session_state.extracted_tasks |
| |
| st.header("π Extraction Results (Persistent)") |
| |
| |
| st.subheader("π Debug: Result Categorization") |
| |
| with st.expander("π Detailed Result Analysis", expanded=False): |
| st.write(f"**Total results:** {len(outputs)}") |
| |
| for i, res in enumerate(outputs): |
| st.write(f"\n**Result {i+1}:**") |
| st.write(f"- Status: `{res.get('status', 'unknown')}`") |
| st.write(f"- Valid: `{res.get('valid', 'not set')}`") |
| st.write(f"- Has validated_json: `{'validated_json' in res}`") |
| st.write(f"- Has graph: `{'graph' in res}`") |
| st.write(f"- Needs user review: `{res.get('needs_user_review', False)}`") |
| st.write(f"- Error: `{res.get('error', 'none')}`") |
| |
| if 'validated_json' in res: |
| st.write(f"- Validated JSON preview: `{str(res['validated_json'])[:100]}...`") |
| elif 'extracted_json' in res: |
| st.write(f"- Extracted JSON preview: `{str(res['extracted_json'])[:100]}...`") |
| |
| |
| valid_tasks = [] |
| invalid_results = [] |
| paused_results = [] |
| graphs = [] |
| |
| for i, res in enumerate(outputs): |
| if "graph" in res: |
| graphs.append(res["graph"]) |
| |
| |
| if (res.get("status") in ["paused", "awaiting_user_review"] or |
| res.get("needs_user_review", False)): |
| paused_results.append((i, res)) |
| elif "validated_json" in res and res.get("valid", False): |
| valid_tasks.append(res["validated_json"]) |
| else: |
| |
| invalid_results.append({ |
| "email_index": i + 1, |
| "raw_json": res.get("extracted_json", "No JSON extracted"), |
| "email_id": res.get("email_index", "Unknown"), |
| "status": res.get("status", "unknown") |
| }) |
|
|
| |
| if valid_tasks: |
| st.success(f"β
Successfully extracted {len(valid_tasks)} valid tasks from {len(outputs)} processed emails.") |
| if paused_results: |
| st.warning(f"β οΈ {len(paused_results)} emails require manual review. See the Human Validation Required section below.") |
| |
| |
| |
|
|
| |
| all_final_tasks = valid_tasks.copy() |
| if hasattr(st.session_state, 'valid_extraction_results'): |
| for validated_result in st.session_state.valid_extraction_results: |
| if "validated_json" in validated_result: |
| all_final_tasks.append(validated_result["validated_json"]) |
| |
|
|
| |
| if st.button("Store Data"): |
| from utils.database import store_validated_tasks |
| task_ids = store_validated_tasks(all_final_tasks) |
| st.success(f"All reviewed tasks ({len(task_ids)}) have been stored in the database!") |
|
|
| |
| |
| if graphs or valid_tasks: |
| st.info( |
| f"πΎ Task data has been stored in Neo4j graph database and PostgreSQL. " |
| f"Processed {len(valid_tasks)} valid tasks for chatbot queries." |
| ) |
| |
| |
| if paused_results: |
| st.subheader("βΈοΈ Human Validation Required") |
| |
| for idx, result in paused_results: |
| st.markdown(f"**Email {idx + 1}** - Validation needed:") |
| |
| |
| if result.get("email_subject"): |
| st.info(f"π§ **Subject:** {result['email_subject']}") |
| if result.get("email_content"): |
| with st.expander("π View Email Content", expanded=False): |
| st.text(result["email_content"]) |
| |
| col1, col2 = st.columns([1, 1]) |
| |
| with col1: |
| st.markdown("**Current JSON Template:**") |
| original_json = result.get("extracted_json", {}) |
| st.json(original_json) |
| |
| with col2: |
| st.markdown("**Edit JSON (extract task details):**") |
| json_key = f"json_edit_{idx}" |
| |
| |
| correctable_template = result.get("correctable_json", |
| result.get("extracted_json", "{}")) |
| |
| if json_key not in st.session_state: |
| if isinstance(correctable_template, str): |
| st.session_state[json_key] = correctable_template |
| else: |
| st.session_state[json_key] = json.dumps( |
| correctable_template, indent=2 |
| ) |
| |
| corrected_json = st.text_area( |
| "Corrected JSON", |
| value=st.session_state[json_key], |
| height=300, |
| key=f"correction_{idx}", |
| help="Edit the JSON to extract the actual task from the email" |
| ) |
| |
| if st.button("β
Validate & Continue", key=f"validate_{idx}"): |
| try: |
| |
| parsed_correction = json.loads(corrected_json) |
| |
| |
| st.session_state.extracted_tasks[idx].update({ |
| "validated_json": parsed_correction, |
| "valid": True, |
| "status": "validated", |
| "user_corrected_json": corrected_json |
| }) |
| |
| |
| if "valid_extraction_results" not in st.session_state: |
| st.session_state.valid_extraction_results = [] |
| |
| st.session_state.valid_extraction_results.append( |
| st.session_state.extracted_tasks[idx] |
| ) |
| |
| st.success("β
Task validated and moved to 'Extracted Valid Tasks'") |
| st.info("οΏ½ Refreshing page to show updated results...") |
| st.rerun() |
| |
| except json.JSONDecodeError as e: |
| st.error(f"β Invalid JSON format: {str(e)}") |
| st.info("π‘ Tip: Check for missing commas, quotes, or brackets") |
| except Exception as e: |
| st.error(f"β Error during validation: {str(e)}") |
| |
| if st.button("π« Reject", key=f"reject_{idx}"): |
| st.session_state.extracted_tasks[idx].update({ |
| "validated_json": parsed_correction, |
| "valid": False, |
| "status": "rejected", |
| "user_corrected_json": corrected_json |
| }) |
| st.warning("β
Task rejected and not added to valid tasks.") |
| st.info(" Refreshing page to show updated results...") |
| st.rerun() |
| |
| |
| |
| all_valid_tasks = valid_tasks.copy() |
| if hasattr(st.session_state, 'valid_extraction_results'): |
| for validated_result in st.session_state.valid_extraction_results: |
| if "validated_json" in validated_result: |
| all_valid_tasks.append(validated_result["validated_json"]) |
| |
| if all_valid_tasks: |
| st.subheader("β
Extracted Valid Tasks") |
| |
| |
| flattened_df = flatten_extractions(all_valid_tasks) |
| st.dataframe(flattened_df, width='stretch') |
| |
| |
| original_count = len(valid_tasks) |
| validated_count = len(all_valid_tasks) - original_count |
| |
| if validated_count > 0: |
| st.info( |
| f"π Total: {len(flattened_df)} tasks " |
| f"({original_count} auto-extracted + {validated_count} human-validated) " |
| f"from {len(outputs)} processed emails" |
| ) |
| else: |
| st.info( |
| f"π Successfully extracted {len(flattened_df)} tasks " |
| f"from {len(outputs)} processed emails" |
| ) |
| |
| |
| if graphs: |
| st.session_state.extracted_graphs = graphs |
| else: |
| warning_msg = "β οΈ No valid tasks were extracted from processed emails." |
| st.warning(warning_msg) |
| |
| |
| if invalid_results: |
| with st.expander("β Invalid/Failed Extractions", expanded=False): |
| st.write("These emails failed to extract valid tasks:") |
| invalid_df = pd.DataFrame(invalid_results) |
| st.dataframe(invalid_df, width='stretch') |
|
|
| |
| |
|
|
| |
| st.markdown("---") |
| st.markdown(''' |
| π **How to use:** |
| - Click **Parse Email** to load a sample dataset (Enron emails) |
| - Process with LLM to extract tasks |
| - Validate any flagged JSON manually (Human-in-the-Loop) |
| - Explore: Graph visualization, Calendar view, and AI Chatbot |
| - Data persists across page navigation! π |
| |
| π§ **Features:** |
| - One-click demo: No upload or Gmail Takeout required |
| - Human-in-the-Loop Validation: Review and correct extracted tasks |
| - Persistent State: Data stays available across page navigation |
| - Graph Visualization: See task relationships and dependencies |
| - Calendar View: Time-based view of extracted tasks |
| - AI Chatbot: Ask natural language questions about your tasks |
| |
| π **Tech Stack:** |
| - LangGraph + GPT: Advanced reasoning pipeline |
| - FAISS: Vector similarity search for relevant email chunks |
| - Neo4j: Graph-based task relationship modeling |
| - Streamlit: Interactive web interface |
| ''') |
|
|