rsm-wew068's picture
Fix deprecated use_container_width and improve chatbot error handling
8637201
import sys
import os
import streamlit as st
import pandas as pd
import json
def check_environment():
"""Check environment variable status for deployment debugging"""
openai_key = os.getenv("OPENAI_API_KEY")
if not openai_key:
st.error("""
🚨 **Missing OPENAI_API_KEY Environment Variable**
The task extraction requires an OpenAI API key to function.
**For Hugging Face Spaces:**
1. Go to your Space settings
2. Click on "Repository secrets"
3. Add `OPENAI_API_KEY` with your OpenAI API key
4. Restart your Space
**For local development:**
- Create a `.env` file with `OPENAI_API_KEY=your_key_here`
- Or set the environment variable in your shell
""")
return False
return True
# Configure Streamlit for large file uploads BEFORE any other operations
try:
# Set configuration for maximum upload size
st._config.set_option('server.maxUploadSize', 1024) # 1GB in MB
except Exception:
pass # Ignore if config setting fails
st.set_page_config(
page_title="Automated Task Manager",
page_icon="πŸ“§",
layout="wide"
)
# Robust path fix for Hugging Face Spaces and local development
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.insert(0, current_dir)
# Also add parent directory for extra safety
parent_dir = os.path.dirname(current_dir)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
# Debug: Print current working directory and Python path
print(f"Current working directory: {os.getcwd()}")
print(f"Script directory: {current_dir}")
print(f"Python path: {sys.path[:3]}") # First 3 entries
print(f"Utils directory exists: {os.path.exists(os.path.join(current_dir, 'utils'))}")
print(f"Utils __init__.py exists: {os.path.exists(os.path.join(current_dir, 'utils', '__init__.py'))}")
# Try importing with error handling
try:
from utils.langgraph_dag import (
run_extraction_pipeline,
resume_extraction_pipeline_with_correction
)
from utils.email_parser import parse_uploaded_file_with_filters_safe
from utils.embedding import embed_dataframe
print("βœ… Successfully imported utils modules")
except ImportError as e:
print(f"❌ Import error: {e}")
st.error(f"Import error: {e}")
st.error("Please ensure all utils files are properly uploaded to your Hugging Face Space")
st.stop()
# Check environment variables and show warnings if needed
env_status = check_environment()
if not env_status:
st.warning("⚠️ Application may not work correctly without proper API keys")
st.info("You can still upload and parse emails, but extraction will fail")
# Add extraction debugging toggle
st.sidebar.markdown("---")
debug_mode = st.sidebar.checkbox("πŸ› Enable Debug Mode", value=False)
if debug_mode:
st.sidebar.info("Debug mode will show detailed extraction logs")
# Store in session state for use during extraction
st.session_state.debug_mode = True
else:
st.session_state.debug_mode = False
# Database management buttons
st.sidebar.markdown("---")
st.sidebar.markdown("### πŸ—ƒοΈ Database Management")
# Clear Cached Data button (moved from main area)
if st.sidebar.button("πŸ—‘οΈ Clear Cached Data"):
for key in ["parsed_emails", "extracted_tasks", "parsing_complete",
"processing_complete", "uploaded_file_name", "parse_limit",
"process_limit", "extracted_graphs"]:
if key in st.session_state:
del st.session_state[key]
st.sidebar.success("βœ… Cached data cleared!")
st.rerun()
# Add Load Emails button to fetch from PostgreSQL
if st.button("Load Emails"):
from utils.database import PostgreSQLDatabase
import pandas as pd
db = PostgreSQLDatabase()
emails = db.get_parsed_emails(limit=500)
df_emails = pd.DataFrame(emails)
st.session_state['parsed_emails'] = df_emails
st.session_state['parsing_complete'] = True
st.session_state['uploaded_file_name'] = 'Loaded from PostgreSQL'
st.success(f"Loaded {len(df_emails)} emails from PostgreSQL database!")
# Update Clear PostgreSQL Database button to only clear tasks and task_collaborators
if st.sidebar.button("πŸ—‘οΈ Clear PostgreSQL Database"):
try:
from utils.database import PostgreSQLDatabase
db = PostgreSQLDatabase()
if db.connect():
with db.connection.cursor() as cursor:
# Only clear tasks and task_collaborators
cursor.execute("DELETE FROM task_collaborators")
cursor.execute("DELETE FROM tasks")
# Reset sequences to 1
cursor.execute("ALTER SEQUENCE tasks_id_seq RESTART WITH 1")
cursor.execute("ALTER SEQUENCE task_collaborators_id_seq RESTART WITH 1")
db.close()
st.sidebar.success("βœ… Tasks and collaborators cleared. Parsed emails remain in database.")
else:
st.sidebar.error("❌ Failed to connect to PostgreSQL")
except Exception as e:
st.sidebar.error(f"❌ Error clearing PostgreSQL: {e}")
# Clear Neo4j Database button
if st.sidebar.button("πŸ—‘οΈ Clear Neo4j Database"):
try:
from utils.neo4j_graph_writer import Neo4jGraphWriter
neo4j_writer = Neo4jGraphWriter()
if neo4j_writer.connect():
neo4j_writer.clear_graph()
neo4j_writer.close()
st.sidebar.success("βœ… Neo4j database cleared!")
else:
st.sidebar.error("❌ Failed to connect to Neo4j")
except Exception as e:
st.sidebar.error(f"❌ Error clearing Neo4j: {e}")
def flatten_extractions(json_list):
"""
Flatten extracted JSON data into a user-friendly table format.
Args:
json_list: List of validated JSON objects from extraction
Returns:
pandas.DataFrame: Flattened task data
"""
rows = []
for item in json_list:
# Handle both dict and string cases
if isinstance(item, str):
try:
item = json.loads(item)
except (json.JSONDecodeError, TypeError):
# Skip invalid items
continue
# Skip non-dict items (None, etc.)
if not isinstance(item, dict):
continue
# Handle validated_json wrapper from HITL validation
if "validated_json" in item:
item = item["validated_json"]
topic = item.get("Topic", {})
topic_name = (topic.get("name", "Unknown Topic")
if isinstance(topic, dict) else "Unknown Topic")
tasks = topic.get("tasks", []) if isinstance(topic, dict) else []
for task_obj in tasks:
if not isinstance(task_obj, dict):
continue
task = task_obj.get("task", {})
if not isinstance(task, dict):
continue
# Extract comprehensive owner information
owner = task.get("owner", {})
if isinstance(owner, dict):
owner_name = owner.get("name", "Unknown")
owner_role = owner.get("role", "")
owner_dept = owner.get("department", "")
owner_org = owner.get("organization", "")
else:
owner_name = str(owner) if owner else "Unknown"
owner_role = owner_dept = owner_org = ""
# Extract all task details for comprehensive display
rows.append({
"Topic": topic_name,
"Task Name": task.get("name", "Unnamed Task"),
"Summary": task.get("summary", ""),
"Sent Date": task.get("sent_date", ""),
"Due Date": task.get("due_date", ""),
"Owner Name": owner_name,
"Owner Role": owner_role,
"Owner Department": owner_dept,
"Owner Organization": owner_org,
"Email Index": task_obj.get("email_index", "")
})
return pd.DataFrame(rows)
st.set_page_config(
page_title="πŸ€– Automated Task Manager",
layout="wide",
initial_sidebar_state="expanded"
)
# Configure Streamlit for larger file uploads
try:
import streamlit.config as config
config.set_option('server.maxUploadSize', 1000) # 1GB limit
except Exception:
pass # Ignore if config can't be set
st.title("πŸ“¬ Automated Task Manager")
# Remove all upload and Gmail Takeout instructions
# Only show a simple message for demo
st.markdown('Click "Load Email" to load a sample dataset (Enron emails) and explore the demo features.')
# Remove the 'Parse Email' button and demo/sample code
# Update the instructions to reflect the new workflow: Load Emails from PostgreSQL, then Extract Tasks with AI
# Initialize session state variables if not already set
if 'parsing_complete' not in st.session_state:
st.session_state['parsing_complete'] = False
if 'parsed_emails' not in st.session_state:
st.session_state['parsed_emails'] = None
if 'processing_complete' not in st.session_state:
st.session_state['processing_complete'] = False
if 'extracted_tasks' not in st.session_state:
st.session_state['extracted_tasks'] = []
if 'uploaded_file_name' not in st.session_state:
st.session_state['uploaded_file_name'] = 'enron_sample.parquet'
if 'process_limit' not in st.session_state:
st.session_state['process_limit'] = 5
# Display parsed emails
if st.session_state.parsing_complete and st.session_state.parsed_emails is not None:
df_emails = st.session_state.parsed_emails
st.subheader("πŸ“Š Parsed Email Data")
filter_settings = st.session_state.get('filter_settings', {})
max_limit = filter_settings.get('max_emails_limit', 'unknown')
st.success(
f"πŸ“Š Parsed {len(df_emails)} emails from "
f"{st.session_state.uploaded_file_name} "
f"(filtered from {max_limit} max)"
)
with st.expander("πŸ“Š Email Data Details", expanded=False):
col1, col2 = st.columns(2)
with col1:
st.write(f"**Total emails:** {len(df_emails)}")
st.write(f"**Columns:** {len(df_emails.columns)}")
# Check for None dates
none_dates = df_emails['date_received'].isna().sum()
valid_dates = len(df_emails) - none_dates
st.write(f"**Valid dates:** {valid_dates}/{len(df_emails)}")
if none_dates > 0:
st.warning(f"⚠️ {none_dates} emails have missing dates")
st.write("**Column names:**")
for col in df_emails.columns:
st.write(f" β€’ {col}")
with col2:
st.write("**Sample emails:**")
# Display sample emails with correct column names
if all(col in df_emails.columns for col in ['from_email', 'subject', 'date_received']):
sample_df = df_emails[['from_email', 'subject', 'date_received']].head(3)
st.dataframe(sample_df, width='stretch')
else:
st.warning("Sample columns not found in loaded emails.")
# LLM Processing section
st.header("🧠 LLM Task Extraction")
col1, col2 = st.columns(2)
with col1:
process_limit = st.number_input(
"Max emails to process with LLM",
min_value=1, max_value=100,
value=st.session_state.process_limit,
help="LLM processing is slower and more expensive"
)
# Store the limit in session state
st.session_state.process_limit = process_limit
with col2:
estimated_time = process_limit * 3 # Rough estimate
st.info(f"⏱️ Estimated time: ~{estimated_time} seconds")
if st.button("πŸš€ Start LLM Processing"):
emails_to_process = df_emails.head(process_limit)
with st.spinner(f"Processing {len(emails_to_process)} emails with LLM..."):
try:
# Create embeddings for similarity search
with st.status("Creating embeddings..."):
index, all_chunks = embed_dataframe(emails_to_process)
# Run extraction pipeline for each email
outputs = []
progress_bar = st.progress(0)
with st.status("Running LLM extraction...") as status:
for i, (_, email_row) in enumerate(emails_to_process.iterrows()):
status.update(
label=f"Processing email {i+1}/{len(emails_to_process)}: "
f"{email_row.get('Subject', 'No Subject')[:50]}..."
)
# Get the full email row for this iteration
email_row = emails_to_process.iloc[i].to_dict()
# Get proper email identifier (Message-ID or fallback)
message_id = email_row.get('Message-ID')
if not message_id:
# Fallback to unique identifier if Message-ID missing
from_addr = email_row.get('From', 'unknown')
subject = email_row.get('Subject', 'no-subject')
date = email_row.get('Date', '1970-01-01')
base_id = f"{from_addr}_{subject}_{date}"
message_id = base_id.replace(' ', '_')[:100]
# Run extraction for this email with full metadata
try:
print(f"πŸš€ DEBUG: Starting extraction for email {i+1}")
print(f"πŸ“§ DEBUG: Subject: {email_row.get('Subject', 'No Subject')}")
print(f"πŸ“§ DEBUG: Message ID: {message_id}")
result = run_extraction_pipeline(
email_row, index, all_chunks, message_id
)
print(f"βœ… DEBUG: Extraction completed for email {i+1}")
print(f"πŸ“Š DEBUG: Result keys: {list(result.keys())}")
print(f"πŸ“Š DEBUG: Status: {result.get('status', 'unknown')}")
print(f"πŸ“Š DEBUG: Valid: {result.get('valid', 'not set')}")
print(f"πŸ“Š DEBUG: Has validated_json: {'validated_json' in result}")
# Show debug info in Streamlit if enabled
if st.session_state.get('debug_mode', False):
st.info(f"Email {i+1}: Status={result.get('status')}, Valid={result.get('valid')}")
outputs.append(result)
except Exception as e:
# Handle individual email errors - provide template for HITL
email_subject = email_row.get('Subject', 'No Subject')
email_content = email_row.get('Body', email_row.get('Content', ''))
# Create a helpful template for user validation
template_json = {
"Topic": {
"name": f"Manual Review: {email_subject[:50]}",
"tasks": [{
"task": {
"name": "Please extract task from email content",
"summary": f"Email content: {email_content[:200]}...",
"sent_date": "",
"due_date": "",
"owner": {
"name": "Unknown",
"role": "Unknown",
"department": "Unknown",
"organization": "Unknown"
},
"collaborators": []
},
"email_index": message_id
}]
}
}
outputs.append({
"email_index": message_id,
"status": "error",
"error": str(e),
"extracted_json": template_json, # Provide template instead of {}
"correctable_json": json.dumps(template_json, indent=2),
"valid": False,
"needs_user_review": True, # Trigger HITL
"email_content": email_content[:500], # Show email content for context
"email_subject": email_subject
})
# Update progress
progress_bar.progress((i + 1) / len(emails_to_process))
# Store results in session state
st.session_state.extracted_tasks = outputs
st.session_state.processing_complete = True
st.success(
f"βœ… Completed processing {len(emails_to_process)} emails! "
f"Generated {len(outputs)} extraction results."
)
except Exception as e:
st.error(f"❌ Error during LLM processing: {str(e)}")
# Display extraction results
if st.session_state.processing_complete and st.session_state.extracted_tasks:
outputs = st.session_state.extracted_tasks
st.header("πŸ“‹ Extraction Results (Persistent)")
# Add comprehensive debugging for result categorization
st.subheader("πŸ” Debug: Result Categorization")
with st.expander("πŸ“Š Detailed Result Analysis", expanded=False):
st.write(f"**Total results:** {len(outputs)}")
for i, res in enumerate(outputs):
st.write(f"\n**Result {i+1}:**")
st.write(f"- Status: `{res.get('status', 'unknown')}`")
st.write(f"- Valid: `{res.get('valid', 'not set')}`")
st.write(f"- Has validated_json: `{'validated_json' in res}`")
st.write(f"- Has graph: `{'graph' in res}`")
st.write(f"- Needs user review: `{res.get('needs_user_review', False)}`")
st.write(f"- Error: `{res.get('error', 'none')}`")
if 'validated_json' in res:
st.write(f"- Validated JSON preview: `{str(res['validated_json'])[:100]}...`")
elif 'extracted_json' in res:
st.write(f"- Extracted JSON preview: `{str(res['extracted_json'])[:100]}...`")
# Separate valid and invalid results
valid_tasks = []
invalid_results = []
paused_results = []
graphs = []
for i, res in enumerate(outputs):
if "graph" in res:
graphs.append(res["graph"])
# Check for paused/awaiting review status (including errors that need review)
if (res.get("status") in ["paused", "awaiting_user_review"] or
res.get("needs_user_review", False)):
paused_results.append((i, res))
elif "validated_json" in res and res.get("valid", False):
valid_tasks.append(res["validated_json"])
else:
# Store invalid results for display
invalid_results.append({
"email_index": i + 1,
"raw_json": res.get("extracted_json", "No JSON extracted"),
"email_id": res.get("email_index", "Unknown"),
"status": res.get("status", "unknown")
})
# UI feedback after valid_tasks and paused_results are defined
if valid_tasks:
st.success(f"βœ… Successfully extracted {len(valid_tasks)} valid tasks from {len(outputs)} processed emails.")
if paused_results:
st.warning(f"⚠️ {len(paused_results)} emails require manual review. See the Human Validation Required section below.")
# Remove DB storage during extraction: comment out or remove store_validated_tasks(valid_tasks) call
# Instead, after HITL and review, add a 'Store Data' button
# Collect all final tasks (valid, user-validated, and rejected)
all_final_tasks = valid_tasks.copy()
if hasattr(st.session_state, 'valid_extraction_results'):
for validated_result in st.session_state.valid_extraction_results:
if "validated_json" in validated_result:
all_final_tasks.append(validated_result["validated_json"])
# Optionally, add rejected tasks if you track them
# Add Store Data button after HITL section
if st.button("Store Data"):
from utils.database import store_validated_tasks
task_ids = store_validated_tasks(all_final_tasks)
st.success(f"All reviewed tasks ({len(task_ids)}) have been stored in the database!")
# Graph storage is now handled automatically by Neo4j during processing
# Each task is stored in Neo4j via the write_graph_node function
if graphs or valid_tasks:
st.info(
f"πŸ’Ύ Task data has been stored in Neo4j graph database and PostgreSQL. "
f"Processed {len(valid_tasks)} valid tasks for chatbot queries."
)
# Handle paused results that need human validation
if paused_results:
st.subheader("⏸️ Human Validation Required")
for idx, result in paused_results:
st.markdown(f"**Email {idx + 1}** - Validation needed:")
# Show email context for better validation
if result.get("email_subject"):
st.info(f"πŸ“§ **Subject:** {result['email_subject']}")
if result.get("email_content"):
with st.expander("πŸ“„ View Email Content", expanded=False):
st.text(result["email_content"])
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**Current JSON Template:**")
original_json = result.get("extracted_json", {})
st.json(original_json)
with col2:
st.markdown("**Edit JSON (extract task details):**")
json_key = f"json_edit_{idx}"
# Use correctable_json if available
correctable_template = result.get("correctable_json",
result.get("extracted_json", "{}"))
if json_key not in st.session_state:
if isinstance(correctable_template, str):
st.session_state[json_key] = correctable_template
else:
st.session_state[json_key] = json.dumps(
correctable_template, indent=2
)
corrected_json = st.text_area(
"Corrected JSON",
value=st.session_state[json_key],
height=300,
key=f"correction_{idx}",
help="Edit the JSON to extract the actual task from the email"
)
if st.button("βœ… Validate & Continue", key=f"validate_{idx}"):
try:
# Parse the corrected JSON
parsed_correction = json.loads(corrected_json)
# Update the result with validated JSON
st.session_state.extracted_tasks[idx].update({
"validated_json": parsed_correction,
"valid": True,
"status": "validated",
"user_corrected_json": corrected_json
})
# Move to valid tasks immediately
if "valid_extraction_results" not in st.session_state:
st.session_state.valid_extraction_results = []
st.session_state.valid_extraction_results.append(
st.session_state.extracted_tasks[idx]
)
st.success("βœ… Task validated and moved to 'Extracted Valid Tasks'")
st.info("οΏ½ Refreshing page to show updated results...")
st.rerun()
except json.JSONDecodeError as e:
st.error(f"❌ Invalid JSON format: {str(e)}")
st.info("πŸ’‘ Tip: Check for missing commas, quotes, or brackets")
except Exception as e:
st.error(f"❌ Error during validation: {str(e)}")
if st.button("🚫 Reject", key=f"reject_{idx}"):
st.session_state.extracted_tasks[idx].update({
"validated_json": parsed_correction,
"valid": False,
"status": "rejected",
"user_corrected_json": corrected_json
})
st.warning("βœ… Task rejected and not added to valid tasks.")
st.info(" Refreshing page to show updated results...")
st.rerun()
# Display valid tasks with flattened format
# Combine original valid tasks with newly validated ones
all_valid_tasks = valid_tasks.copy()
if hasattr(st.session_state, 'valid_extraction_results'):
for validated_result in st.session_state.valid_extraction_results:
if "validated_json" in validated_result:
all_valid_tasks.append(validated_result["validated_json"])
if all_valid_tasks:
st.subheader("βœ… Extracted Valid Tasks")
# Use the flattening function
flattened_df = flatten_extractions(all_valid_tasks)
st.dataframe(flattened_df, width='stretch')
# Show summary stats
original_count = len(valid_tasks)
validated_count = len(all_valid_tasks) - original_count
if validated_count > 0:
st.info(
f"πŸ“Š Total: {len(flattened_df)} tasks "
f"({original_count} auto-extracted + {validated_count} human-validated) "
f"from {len(outputs)} processed emails"
)
else:
st.info(
f"πŸ“Š Successfully extracted {len(flattened_df)} tasks "
f"from {len(outputs)} processed emails"
)
# Store graphs for persistence
if graphs:
st.session_state.extracted_graphs = graphs
else:
warning_msg = "⚠️ No valid tasks were extracted from processed emails."
st.warning(warning_msg)
# Display invalid results (if any remain)
if invalid_results:
with st.expander("❌ Invalid/Failed Extractions", expanded=False):
st.write("These emails failed to extract valid tasks:")
invalid_df = pd.DataFrame(invalid_results)
st.dataframe(invalid_df, width='stretch')
# After extraction, update UI feedback
# The UI feedback code was moved inside the block where 'valid_tasks' and 'paused_results' are defined.
# Footer
st.markdown("---")
st.markdown('''
πŸ“– **How to use:**
- Click **Parse Email** to load a sample dataset (Enron emails)
- Process with LLM to extract tasks
- Validate any flagged JSON manually (Human-in-the-Loop)
- Explore: Graph visualization, Calendar view, and AI Chatbot
- Data persists across page navigation! πŸŽ‰
πŸ”§ **Features:**
- One-click demo: No upload or Gmail Takeout required
- Human-in-the-Loop Validation: Review and correct extracted tasks
- Persistent State: Data stays available across page navigation
- Graph Visualization: See task relationships and dependencies
- Calendar View: Time-based view of extracted tasks
- AI Chatbot: Ask natural language questions about your tasks
πŸš€ **Tech Stack:**
- LangGraph + GPT: Advanced reasoning pipeline
- FAISS: Vector similarity search for relevant email chunks
- Neo4j: Graph-based task relationship modeling
- Streamlit: Interactive web interface
''')