Ultronprime's picture
Add Historical Email Processing page - fetch ALL emails by date, preview, and batch process
98707da verified
"""
AI Construction Project Intelligence Hub
Main Streamlit Application - Self-Hosted Version
Fully automated email processing pipeline:
- Connects to IMAP server on startup
- Polls for new emails every 60 seconds (configurable)
- Sends emails + attachments to Gemini 3 Flash Preview for analysis
- Stores structured results in SQLite
- Displays real-time dashboard
- Process historical emails (read or unread) by date range
Built for China Railway 18th Bureau Group (CRCC) - DSC Hotel Project, Dubai.
"""
import streamlit as st
import pandas as pd
import os
import sys
import json
import time
import logging
from datetime import datetime, timedelta, date
from io import BytesIO
from modules.database import (
init_database, get_all_emails, get_email_by_id,
get_attachments_for_email, get_dashboard_stats,
get_all_emails_for_export, email_exists
)
from modules.pipeline import EmailPipeline
from modules.email_fetcher import EmailFetcher
from modules.gemini_processor import test_gemini_connection, GEMINI_MODEL
# Configure logging
logging.basicConfig(
level=getattr(logging, os.getenv("LOG_LEVEL", "INFO")),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Initialize database on startup
init_database()
# ============================================================
# Page Configuration
# ============================================================
st.set_page_config(
page_title="CRCC Construction Intelligence Hub",
page_icon="πŸ—οΈ",
layout="wide",
initial_sidebar_state="expanded",
)
# Custom CSS
st.markdown("""
<style>
.main-header {
font-size: 2rem;
font-weight: 700;
color: #1B3A5C;
margin-bottom: 0.5rem;
}
.sub-header {
font-size: 1rem;
color: #666;
margin-bottom: 2rem;
}
div[data-testid="stMetric"] {
background-color: #f0f2f6;
border-radius: 10px;
padding: 15px;
border-left: 4px solid #1B3A5C;
}
</style>
""", unsafe_allow_html=True)
# ============================================================
# Session State & Auto-Start Polling
# ============================================================
if "pipeline" not in st.session_state:
st.session_state.pipeline = EmailPipeline()
auto_start = os.getenv("AUTO_START_POLLING", "true").lower() == "true"
has_imap = bool(os.getenv("IMAP_USER")) and bool(os.getenv("IMAP_PASSWORD"))
has_gemini = bool(os.getenv("GOOGLE_API_KEY"))
if auto_start and has_imap and has_gemini:
st.session_state.pipeline.start_background_polling()
logger.info("Auto-started background email polling")
if "historical_emails" not in st.session_state:
st.session_state.historical_emails = None
if "processing_historical" not in st.session_state:
st.session_state.processing_historical = False
# ============================================================
# Sidebar
# ============================================================
with st.sidebar:
st.markdown("## πŸ—οΈ CRCC Intelligence Hub")
st.markdown("**DSC Hotel Project**")
st.markdown("Dubai Studio City β€’ G+8")
st.markdown(f"*Model: `{GEMINI_MODEL}`*")
st.markdown("---")
# Navigation
page = st.radio(
"Navigation",
["πŸ“Š Dashboard", "πŸ“‹ Documents", "πŸ“₯ Historical", "πŸ” Search", "βš™οΈ Settings"],
label_visibility="collapsed"
)
st.markdown("---")
st.markdown("### ⚑ Controls")
# Pipeline status
status = st.session_state.pipeline.get_status()
if status["is_running"]:
st.success("🟒 Auto-polling: ACTIVE")
if st.button("⏸️ Stop Polling", use_container_width=True):
st.session_state.pipeline.stop_background_polling()
st.rerun()
else:
st.error("πŸ”΄ Auto-polling: STOPPED")
if st.button("▢️ Start Polling", use_container_width=True, type="primary"):
st.session_state.pipeline.start_background_polling()
st.rerun()
# Manual trigger
if st.button("πŸ“¬ Check New", use_container_width=True):
with st.spinner("Fetching & processing..."):
try:
count = st.session_state.pipeline.check_and_process_new_emails()
if count > 0:
st.success(f"βœ… {count} new email(s)!")
else:
st.info("No new unread emails")
except Exception as e:
st.error(f"Error: {e}")
st.markdown("---")
st.caption(f"πŸ“§ Processed: **{status['emails_processed']}**")
st.caption(f"❌ Errors: **{status['errors']}**")
if status["last_check_time"]:
st.caption(f"πŸ• Last: {status['last_check_time'][:19]}")
# ============================================================
# Dashboard Page
# ============================================================
if page == "πŸ“Š Dashboard":
st.markdown('<p class="main-header">πŸ“Š Project Intelligence Dashboard</p>', unsafe_allow_html=True)
st.markdown('<p class="sub-header">Real-time automated email processing β€’ DSC Hotel G+8 β€’ CRCC Γ— MIMAR</p>', unsafe_allow_html=True)
stats = get_dashboard_stats()
col1, col2, col3, col4, col5 = st.columns(5)
with col1:
st.metric("πŸ“„ Total Processed", stats["total_processed"])
with col2:
st.metric("⏳ Pending", stats["pending"])
with col3:
st.metric("❌ Failed", stats["failed"])
with col4:
approved = stats["by_status"].get("Approved", 0) + stats["by_status"].get("Approved with Comments", 0)
st.metric("βœ… Approved", approved)
with col5:
action_needed = stats["by_status"].get("Rejected", 0) + stats["by_status"].get("Resubmit", 0)
st.metric("🚨 Action Needed", action_needed)
st.markdown("---")
col_left, col_right = st.columns(2)
with col_left:
st.markdown("#### πŸ“ By Document Type")
if stats["by_document_type"]:
st.bar_chart(pd.DataFrame(list(stats["by_document_type"].items()), columns=["Type", "Count"]).set_index("Type"))
else:
st.info("No data yet.")
with col_right:
st.markdown("#### πŸ“Š By Status")
if stats["by_status"]:
st.bar_chart(pd.DataFrame(list(stats["by_status"].items()), columns=["Status", "Count"]).set_index("Status"))
else:
st.info("Waiting for data...")
if stats["by_discipline"]:
st.markdown("#### 🏒 By Discipline")
cols = st.columns(len(stats["by_discipline"]))
for i, (disc, cnt) in enumerate(stats["by_discipline"].items()):
with cols[i]:
st.metric(disc or "Unassigned", cnt)
st.markdown("---")
ps = st.session_state.pipeline.get_status()
if ps["is_running"]:
st.success(f"πŸ€– **Automation Active** β€” Checking `{os.getenv('IMAP_USER', 'N/A')}` every {os.getenv('EMAIL_CHECK_INTERVAL', '60')}s")
else:
st.warning("⚠️ **Automation Stopped** β€” Click 'Start Polling' in sidebar")
# ============================================================
# Documents Page
# ============================================================
elif page == "πŸ“‹ Documents":
st.markdown('<p class="main-header">πŸ“‹ Document Registry</p>', unsafe_allow_html=True)
f1, f2, f3, f4 = st.columns(4)
with f1:
doc_type_filter = st.selectbox("Type", ["All", "RFI", "WIR", "MIR", "NCR", "Shop Drawing", "IFC", "Payment/IPA", "General"])
with f2:
status_filter = st.selectbox("Status", ["All", "Approved", "Approved with Comments", "Rejected", "Resubmit", "Pending/Info"])
with f3:
discipline_filter = st.selectbox("Discipline", ["All", "Civil", "MEP", "Structural", "Architectural"])
with f4:
search_query = st.text_input("πŸ” Search", placeholder="Reference, subject...")
filters = {
"document_type": doc_type_filter if doc_type_filter != "All" else None,
"status": status_filter if status_filter != "All" else None,
"discipline": discipline_filter if discipline_filter != "All" else None,
"search_query": search_query if search_query else None,
}
emails = get_all_emails(**filters)
if emails:
df = pd.DataFrame(emails)
display_cols = ["id", "date_received", "document_type", "document_reference_number",
"status", "assigned_discipline", "subject", "action_required"]
display_cols = [c for c in display_cols if c in df.columns]
df_display = df[display_cols].rename(columns={
"id": "ID", "date_received": "Date", "document_type": "Type",
"document_reference_number": "Reference", "status": "Status",
"assigned_discipline": "Discipline", "subject": "Subject",
"action_required": "Action Required"
})
st.markdown(f"**{len(df_display)} document(s)**")
st.dataframe(df_display, use_container_width=True, hide_index=True)
# Export
st.markdown("---")
exp1, exp2, _ = st.columns([1, 1, 4])
export_data = get_all_emails_for_export(document_type=filters["document_type"], status=filters["status"], discipline=filters["discipline"])
if export_data:
csv_df = pd.DataFrame(export_data)
with exp1:
st.download_button("⬇️ CSV", data=csv_df.to_csv(index=False).encode("utf-8"),
file_name=f"CRCC_Report_{datetime.now():%Y%m%d}.csv", mime="text/csv", type="primary")
with exp2:
buf = BytesIO()
csv_df.to_excel(buf, index=False, sheet_name="Documents")
st.download_button("⬇️ Excel", data=buf.getvalue(),
file_name=f"CRCC_Report_{datetime.now():%Y%m%d}.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
# Detail view
st.markdown("---")
st.markdown("### πŸ“– Detail View")
selected_id = st.selectbox("Select document", options=[e["id"] for e in emails],
format_func=lambda x: next((f"#{e['id']} β€” {e.get('document_reference_number') or e.get('subject', 'N/A')}" for e in emails if e["id"] == x), str(x)))
if selected_id:
detail = get_email_by_id(selected_id)
if detail:
c1, c2 = st.columns([2, 1])
with c1:
st.markdown(f"**Subject:** {detail.get('subject', 'N/A')}")
st.markdown(f"**From:** {detail.get('sender', 'N/A')}")
st.markdown(f"**Date:** {detail.get('date_received', 'N/A')}")
st.markdown(f"**Reference:** `{detail.get('document_reference_number', 'N/A')}`")
with c2:
s = detail.get("status", "N/A")
emoji = {"Approved": "βœ…", "Approved with Comments": "β˜‘οΈ", "Rejected": "❌", "Resubmit": "πŸ”„", "Pending/Info": "⏳"}.get(s, "❓")
st.markdown(f"### {emoji} {s}")
st.markdown(f"**Type:** {detail.get('document_type', 'N/A')}")
st.markdown(f"**Discipline:** {detail.get('assigned_discipline', 'N/A')}")
st.markdown("---")
st.markdown("#### πŸ’¬ Consultant Comments")
st.info(detail.get("consultant_comments", "No comments extracted"))
st.markdown("#### ⚑ Action Required")
st.warning(detail.get("action_required", "No action identified"))
attachments = get_attachments_for_email(selected_id)
if attachments:
st.markdown("#### πŸ“Ž Attachments")
for att in attachments:
icon = "βœ…" if att.get("processed_by_ai") else "⏭️"
size_mb = (att.get("file_size", 0) or 0) / (1024 * 1024)
st.markdown(f"- {icon} **{att['filename']}** ({att.get('mime_type', '?')}, {size_mb:.1f} MB)")
with st.expander("πŸ€– Raw AI JSON"):
if detail.get("ai_raw_response"):
try:
st.json(json.loads(detail["ai_raw_response"]))
except:
st.code(detail["ai_raw_response"])
with st.expander("πŸ“§ Original Email Body"):
st.text(detail.get("body_text", "No body"))
else:
st.info("No processed documents yet. Use 'πŸ“₯ Historical' to process old emails, or wait for new ones.")
# ============================================================
# HISTORICAL EMAIL PROCESSING PAGE (NEW)
# ============================================================
elif page == "πŸ“₯ Historical":
st.markdown('<p class="main-header">πŸ“₯ Process Historical Emails</p>', unsafe_allow_html=True)
st.markdown('<p class="sub-header">Fetch ALL emails (read & unread) from your server by date range, preview them, then process with AI</p>', unsafe_allow_html=True)
st.info("""
**Why this page exists:** The auto-polling only catches NEW unread emails.
Your email client already marked older emails as "read", so they were skipped.
Use this page to fetch and process ALL emails from any date β€” regardless of read status.
""")
# Date picker
col_date, col_btn = st.columns([2, 1])
with col_date:
since_date = st.date_input(
"Fetch emails since:",
value=date(2025, 1, 1),
min_value=date(2024, 1, 1),
max_value=date.today(),
)
with col_btn:
st.markdown("<br>", unsafe_allow_html=True)
fetch_clicked = st.button("πŸ” Fetch Email List", type="primary", use_container_width=True)
if fetch_clicked:
# Format date for IMAP: DD-Mon-YYYY
imap_date = since_date.strftime("%d-%b-%Y")
with st.spinner(f"Connecting to server and fetching headers since {imap_date}..."):
try:
fetcher = EmailFetcher()
if fetcher.connect():
headers = fetcher.fetch_email_headers_by_date(imap_date)
fetcher.disconnect()
st.session_state.historical_emails = headers
st.success(f"βœ… Found **{len(headers)}** emails since {since_date}")
else:
st.error("❌ Failed to connect to email server")
except Exception as e:
st.error(f"Error: {e}")
# Display fetched emails
if st.session_state.historical_emails:
headers = st.session_state.historical_emails
# Show which are already in database
already_processed = []
new_emails = []
for h in headers:
if email_exists(h["message_id"]):
already_processed.append(h)
else:
new_emails.append(h)
st.markdown("---")
st.markdown(f"### πŸ“Š Results: {len(headers)} total emails found")
col_a, col_b = st.columns(2)
with col_a:
st.metric("βœ… Already Processed", len(already_processed))
with col_b:
st.metric("πŸ†• Not Yet Processed", len(new_emails))
if new_emails:
# Show preview table
st.markdown("### πŸ†• Emails Ready to Process")
preview_df = pd.DataFrame(new_emails)
display_cols = ["date_received", "sender", "subject"]
display_cols = [c for c in display_cols if c in preview_df.columns]
preview_display = preview_df[display_cols].rename(columns={
"date_received": "Date", "sender": "From", "subject": "Subject"
})
st.dataframe(preview_display, use_container_width=True, hide_index=True)
# Process buttons
st.markdown("---")
st.markdown("### πŸš€ Process These Emails with AI")
st.warning(f"⚠️ This will process **{len(new_emails)}** emails through Gemini AI. "
f"Each email takes ~3-5 seconds. Estimated time: **{len(new_emails) * 4 // 60} min {len(new_emails) * 4 % 60} sec**")
process_col1, process_col2 = st.columns(2)
with process_col1:
process_all = st.button(
f"πŸ€– Summarise All {len(new_emails)} Emails",
type="primary",
use_container_width=True
)
with process_col2:
batch_size = st.selectbox("Or process in batches of:", [10, 25, 50, 100], index=0)
process_batch = st.button(
f"πŸ€– Process Next {min(batch_size, len(new_emails))} Emails",
use_container_width=True
)
# Execute processing
emails_to_process = []
if process_all:
emails_to_process = new_emails
elif process_batch:
emails_to_process = new_emails[:batch_size]
if emails_to_process:
st.markdown("---")
st.markdown("### ⏳ Processing...")
progress_bar = st.progress(0)
status_text = st.empty()
success_count = 0
error_count = 0
# Connect and fetch full emails
fetcher = EmailFetcher()
if fetcher.connect():
# Open in readonly mode
fetcher.connection.select("INBOX", readonly=True)
pipeline = st.session_state.pipeline
for i, header in enumerate(emails_to_process):
progress = (i + 1) / len(emails_to_process)
progress_bar.progress(progress)
status_text.markdown(
f"**Processing {i+1}/{len(emails_to_process)}:** {header.get('subject', 'N/A')[:60]}..."
)
try:
# Fetch full email
email_data = fetcher._fetch_single_email(header["msg_id"], mark_read=False)
if email_data:
result = pipeline.process_single_email(email_data)
if result is not None:
success_count += 1
# Rate limit for Gemini API
time.sleep(2)
except Exception as e:
error_count += 1
logger.error(f"Error processing: {e}")
time.sleep(1)
fetcher.disconnect()
progress_bar.progress(1.0)
status_text.empty()
st.success(f"βœ… **Done!** Processed: {success_count} | Errors: {error_count}")
st.balloons()
# Clear cache so the list updates
st.session_state.historical_emails = None
st.rerun()
else:
st.error("❌ Failed to connect to email server")
elif already_processed and not new_emails:
st.success("βœ… All emails in this date range are already processed! Check the πŸ“‹ Documents page.")
# ============================================================
# Search Page
# ============================================================
elif page == "πŸ” Search":
st.markdown('<p class="main-header">πŸ” Search</p>', unsafe_allow_html=True)
q = st.text_input("Search all documents", placeholder="Reference number, subject, keywords...")
if q:
results = get_all_emails(search_query=q, limit=50)
if results:
st.success(f"{len(results)} result(s)")
for r in results:
with st.expander(f"πŸ“„ {r.get('document_reference_number', 'N/A')} β€” {r.get('subject', '')}"):
c1, c2, c3 = st.columns(3)
c1.markdown(f"**Type:** {r.get('document_type')}")
c2.markdown(f"**Status:** {r.get('status')}")
c3.markdown(f"**Date:** {(r.get('date_received') or '')[:10]}")
st.markdown(f"**Comments:** {r.get('consultant_comments', 'N/A')}")
st.markdown(f"**Action:** {r.get('action_required', 'N/A')}")
else:
st.warning("No results found.")
# ============================================================
# Settings Page
# ============================================================
elif page == "βš™οΈ Settings":
st.markdown('<p class="main-header">βš™οΈ Settings</p>', unsafe_allow_html=True)
tab1, tab2, tab3 = st.tabs(["πŸ“§ Email", "πŸ€– AI", "πŸ“Š Status"])
with tab1:
st.markdown("### IMAP Configuration")
st.markdown(f"""
| Setting | Value |
|---------|-------|
| Host | `{os.getenv('IMAP_HOST', 'NOT SET')}` |
| Port | `{os.getenv('IMAP_PORT', '993')}` |
| User | `{os.getenv('IMAP_USER', 'NOT SET')}` |
| Password | `{'βœ… Set' if os.getenv('IMAP_PASSWORD') else '❌ NOT SET'}` |
| Check Interval | `{os.getenv('EMAIL_CHECK_INTERVAL', '60')}s` |
""")
if st.button("πŸ”Œ Test IMAP Connection"):
with st.spinner("Connecting..."):
fetcher = EmailFetcher()
ok, msg = fetcher.test_connection()
if ok:
st.success(f"βœ… {msg}")
else:
st.error(f"❌ {msg}")
with tab2:
st.markdown("### Gemini AI")
st.markdown(f"""
| Setting | Value |
|---------|-------|
| Model | `{GEMINI_MODEL}` |
| API Key | `{'βœ… Set' if os.getenv('GOOGLE_API_KEY') else '❌ NOT SET'}` |
| Max Attachment | `{os.getenv('MAX_ATTACHMENT_SIZE_MB', '50')} MB` |
""")
if st.button("πŸ§ͺ Test Gemini"):
with st.spinner("Testing..."):
ok, msg = test_gemini_connection()
if ok:
st.success(f"βœ… {msg}")
else:
st.error(f"❌ {msg}")
with tab3:
st.markdown("### System Status")
ps = st.session_state.pipeline.get_status()
stats = get_dashboard_stats()
st.markdown(f"""
| Metric | Value |
|--------|-------|
| Polling | {'🟒 Active' if ps['is_running'] else 'πŸ”΄ Stopped'} |
| Last Check | {ps['last_check_time'] or 'Never'} |
| Session Processed | {ps['emails_processed']} |
| Session Errors | {ps['errors']} |
| DB Total Processed | {stats['total_processed']} |
| DB Pending | {stats['pending']} |
| DB Failed | {stats['failed']} |
""")
# Footer
st.markdown("---")
st.markdown(
f'<div style="text-align:center;color:#888;font-size:0.8rem">'
f'πŸ—οΈ CRCC Construction Intelligence Hub v1.1 β€’ DSC Hotel, Dubai Studio City β€’ '
f'Powered by {GEMINI_MODEL}</div>',
unsafe_allow_html=True
)