Add Historical Email Processing page - fetch ALL emails by date, preview, and batch process
98707da verified | """ | |
| AI Construction Project Intelligence Hub | |
| Main Streamlit Application - Self-Hosted Version | |
| Fully automated email processing pipeline: | |
| - Connects to IMAP server on startup | |
| - Polls for new emails every 60 seconds (configurable) | |
| - Sends emails + attachments to Gemini 3 Flash Preview for analysis | |
| - Stores structured results in SQLite | |
| - Displays real-time dashboard | |
| - Process historical emails (read or unread) by date range | |
| Built for China Railway 18th Bureau Group (CRCC) - DSC Hotel Project, Dubai. | |
| """ | |
| import streamlit as st | |
| import pandas as pd | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import logging | |
| from datetime import datetime, timedelta, date | |
| from io import BytesIO | |
| from modules.database import ( | |
| init_database, get_all_emails, get_email_by_id, | |
| get_attachments_for_email, get_dashboard_stats, | |
| get_all_emails_for_export, email_exists | |
| ) | |
| from modules.pipeline import EmailPipeline | |
| from modules.email_fetcher import EmailFetcher | |
| from modules.gemini_processor import test_gemini_connection, GEMINI_MODEL | |
| # Configure logging | |
| logging.basicConfig( | |
| level=getattr(logging, os.getenv("LOG_LEVEL", "INFO")), | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Initialize database on startup | |
| init_database() | |
| # ============================================================ | |
| # Page Configuration | |
| # ============================================================ | |
| st.set_page_config( | |
| page_title="CRCC Construction Intelligence Hub", | |
| page_icon="ποΈ", | |
| layout="wide", | |
| initial_sidebar_state="expanded", | |
| ) | |
| # Custom CSS | |
| st.markdown(""" | |
| <style> | |
| .main-header { | |
| font-size: 2rem; | |
| font-weight: 700; | |
| color: #1B3A5C; | |
| margin-bottom: 0.5rem; | |
| } | |
| .sub-header { | |
| font-size: 1rem; | |
| color: #666; | |
| margin-bottom: 2rem; | |
| } | |
| div[data-testid="stMetric"] { | |
| background-color: #f0f2f6; | |
| border-radius: 10px; | |
| padding: 15px; | |
| border-left: 4px solid #1B3A5C; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # ============================================================ | |
| # Session State & Auto-Start Polling | |
| # ============================================================ | |
| if "pipeline" not in st.session_state: | |
| st.session_state.pipeline = EmailPipeline() | |
| auto_start = os.getenv("AUTO_START_POLLING", "true").lower() == "true" | |
| has_imap = bool(os.getenv("IMAP_USER")) and bool(os.getenv("IMAP_PASSWORD")) | |
| has_gemini = bool(os.getenv("GOOGLE_API_KEY")) | |
| if auto_start and has_imap and has_gemini: | |
| st.session_state.pipeline.start_background_polling() | |
| logger.info("Auto-started background email polling") | |
| if "historical_emails" not in st.session_state: | |
| st.session_state.historical_emails = None | |
| if "processing_historical" not in st.session_state: | |
| st.session_state.processing_historical = False | |
| # ============================================================ | |
| # Sidebar | |
| # ============================================================ | |
| with st.sidebar: | |
| st.markdown("## ποΈ CRCC Intelligence Hub") | |
| st.markdown("**DSC Hotel Project**") | |
| st.markdown("Dubai Studio City β’ G+8") | |
| st.markdown(f"*Model: `{GEMINI_MODEL}`*") | |
| st.markdown("---") | |
| # Navigation | |
| page = st.radio( | |
| "Navigation", | |
| ["π Dashboard", "π Documents", "π₯ Historical", "π Search", "βοΈ Settings"], | |
| label_visibility="collapsed" | |
| ) | |
| st.markdown("---") | |
| st.markdown("### β‘ Controls") | |
| # Pipeline status | |
| status = st.session_state.pipeline.get_status() | |
| if status["is_running"]: | |
| st.success("π’ Auto-polling: ACTIVE") | |
| if st.button("βΈοΈ Stop Polling", use_container_width=True): | |
| st.session_state.pipeline.stop_background_polling() | |
| st.rerun() | |
| else: | |
| st.error("π΄ Auto-polling: STOPPED") | |
| if st.button("βΆοΈ Start Polling", use_container_width=True, type="primary"): | |
| st.session_state.pipeline.start_background_polling() | |
| st.rerun() | |
| # Manual trigger | |
| if st.button("π¬ Check New", use_container_width=True): | |
| with st.spinner("Fetching & processing..."): | |
| try: | |
| count = st.session_state.pipeline.check_and_process_new_emails() | |
| if count > 0: | |
| st.success(f"β {count} new email(s)!") | |
| else: | |
| st.info("No new unread emails") | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| st.markdown("---") | |
| st.caption(f"π§ Processed: **{status['emails_processed']}**") | |
| st.caption(f"β Errors: **{status['errors']}**") | |
| if status["last_check_time"]: | |
| st.caption(f"π Last: {status['last_check_time'][:19]}") | |
| # ============================================================ | |
| # Dashboard Page | |
| # ============================================================ | |
| if page == "π Dashboard": | |
| st.markdown('<p class="main-header">π Project Intelligence Dashboard</p>', unsafe_allow_html=True) | |
| st.markdown('<p class="sub-header">Real-time automated email processing β’ DSC Hotel G+8 β’ CRCC Γ MIMAR</p>', unsafe_allow_html=True) | |
| stats = get_dashboard_stats() | |
| col1, col2, col3, col4, col5 = st.columns(5) | |
| with col1: | |
| st.metric("π Total Processed", stats["total_processed"]) | |
| with col2: | |
| st.metric("β³ Pending", stats["pending"]) | |
| with col3: | |
| st.metric("β Failed", stats["failed"]) | |
| with col4: | |
| approved = stats["by_status"].get("Approved", 0) + stats["by_status"].get("Approved with Comments", 0) | |
| st.metric("β Approved", approved) | |
| with col5: | |
| action_needed = stats["by_status"].get("Rejected", 0) + stats["by_status"].get("Resubmit", 0) | |
| st.metric("π¨ Action Needed", action_needed) | |
| st.markdown("---") | |
| col_left, col_right = st.columns(2) | |
| with col_left: | |
| st.markdown("#### π By Document Type") | |
| if stats["by_document_type"]: | |
| st.bar_chart(pd.DataFrame(list(stats["by_document_type"].items()), columns=["Type", "Count"]).set_index("Type")) | |
| else: | |
| st.info("No data yet.") | |
| with col_right: | |
| st.markdown("#### π By Status") | |
| if stats["by_status"]: | |
| st.bar_chart(pd.DataFrame(list(stats["by_status"].items()), columns=["Status", "Count"]).set_index("Status")) | |
| else: | |
| st.info("Waiting for data...") | |
| if stats["by_discipline"]: | |
| st.markdown("#### π’ By Discipline") | |
| cols = st.columns(len(stats["by_discipline"])) | |
| for i, (disc, cnt) in enumerate(stats["by_discipline"].items()): | |
| with cols[i]: | |
| st.metric(disc or "Unassigned", cnt) | |
| st.markdown("---") | |
| ps = st.session_state.pipeline.get_status() | |
| if ps["is_running"]: | |
| st.success(f"π€ **Automation Active** β Checking `{os.getenv('IMAP_USER', 'N/A')}` every {os.getenv('EMAIL_CHECK_INTERVAL', '60')}s") | |
| else: | |
| st.warning("β οΈ **Automation Stopped** β Click 'Start Polling' in sidebar") | |
| # ============================================================ | |
| # Documents Page | |
| # ============================================================ | |
| elif page == "π Documents": | |
| st.markdown('<p class="main-header">π Document Registry</p>', unsafe_allow_html=True) | |
| f1, f2, f3, f4 = st.columns(4) | |
| with f1: | |
| doc_type_filter = st.selectbox("Type", ["All", "RFI", "WIR", "MIR", "NCR", "Shop Drawing", "IFC", "Payment/IPA", "General"]) | |
| with f2: | |
| status_filter = st.selectbox("Status", ["All", "Approved", "Approved with Comments", "Rejected", "Resubmit", "Pending/Info"]) | |
| with f3: | |
| discipline_filter = st.selectbox("Discipline", ["All", "Civil", "MEP", "Structural", "Architectural"]) | |
| with f4: | |
| search_query = st.text_input("π Search", placeholder="Reference, subject...") | |
| filters = { | |
| "document_type": doc_type_filter if doc_type_filter != "All" else None, | |
| "status": status_filter if status_filter != "All" else None, | |
| "discipline": discipline_filter if discipline_filter != "All" else None, | |
| "search_query": search_query if search_query else None, | |
| } | |
| emails = get_all_emails(**filters) | |
| if emails: | |
| df = pd.DataFrame(emails) | |
| display_cols = ["id", "date_received", "document_type", "document_reference_number", | |
| "status", "assigned_discipline", "subject", "action_required"] | |
| display_cols = [c for c in display_cols if c in df.columns] | |
| df_display = df[display_cols].rename(columns={ | |
| "id": "ID", "date_received": "Date", "document_type": "Type", | |
| "document_reference_number": "Reference", "status": "Status", | |
| "assigned_discipline": "Discipline", "subject": "Subject", | |
| "action_required": "Action Required" | |
| }) | |
| st.markdown(f"**{len(df_display)} document(s)**") | |
| st.dataframe(df_display, use_container_width=True, hide_index=True) | |
| # Export | |
| st.markdown("---") | |
| exp1, exp2, _ = st.columns([1, 1, 4]) | |
| export_data = get_all_emails_for_export(document_type=filters["document_type"], status=filters["status"], discipline=filters["discipline"]) | |
| if export_data: | |
| csv_df = pd.DataFrame(export_data) | |
| with exp1: | |
| st.download_button("β¬οΈ CSV", data=csv_df.to_csv(index=False).encode("utf-8"), | |
| file_name=f"CRCC_Report_{datetime.now():%Y%m%d}.csv", mime="text/csv", type="primary") | |
| with exp2: | |
| buf = BytesIO() | |
| csv_df.to_excel(buf, index=False, sheet_name="Documents") | |
| st.download_button("β¬οΈ Excel", data=buf.getvalue(), | |
| file_name=f"CRCC_Report_{datetime.now():%Y%m%d}.xlsx", | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") | |
| # Detail view | |
| st.markdown("---") | |
| st.markdown("### π Detail View") | |
| selected_id = st.selectbox("Select document", options=[e["id"] for e in emails], | |
| format_func=lambda x: next((f"#{e['id']} β {e.get('document_reference_number') or e.get('subject', 'N/A')}" for e in emails if e["id"] == x), str(x))) | |
| if selected_id: | |
| detail = get_email_by_id(selected_id) | |
| if detail: | |
| c1, c2 = st.columns([2, 1]) | |
| with c1: | |
| st.markdown(f"**Subject:** {detail.get('subject', 'N/A')}") | |
| st.markdown(f"**From:** {detail.get('sender', 'N/A')}") | |
| st.markdown(f"**Date:** {detail.get('date_received', 'N/A')}") | |
| st.markdown(f"**Reference:** `{detail.get('document_reference_number', 'N/A')}`") | |
| with c2: | |
| s = detail.get("status", "N/A") | |
| emoji = {"Approved": "β ", "Approved with Comments": "βοΈ", "Rejected": "β", "Resubmit": "π", "Pending/Info": "β³"}.get(s, "β") | |
| st.markdown(f"### {emoji} {s}") | |
| st.markdown(f"**Type:** {detail.get('document_type', 'N/A')}") | |
| st.markdown(f"**Discipline:** {detail.get('assigned_discipline', 'N/A')}") | |
| st.markdown("---") | |
| st.markdown("#### π¬ Consultant Comments") | |
| st.info(detail.get("consultant_comments", "No comments extracted")) | |
| st.markdown("#### β‘ Action Required") | |
| st.warning(detail.get("action_required", "No action identified")) | |
| attachments = get_attachments_for_email(selected_id) | |
| if attachments: | |
| st.markdown("#### π Attachments") | |
| for att in attachments: | |
| icon = "β " if att.get("processed_by_ai") else "βοΈ" | |
| size_mb = (att.get("file_size", 0) or 0) / (1024 * 1024) | |
| st.markdown(f"- {icon} **{att['filename']}** ({att.get('mime_type', '?')}, {size_mb:.1f} MB)") | |
| with st.expander("π€ Raw AI JSON"): | |
| if detail.get("ai_raw_response"): | |
| try: | |
| st.json(json.loads(detail["ai_raw_response"])) | |
| except: | |
| st.code(detail["ai_raw_response"]) | |
| with st.expander("π§ Original Email Body"): | |
| st.text(detail.get("body_text", "No body")) | |
| else: | |
| st.info("No processed documents yet. Use 'π₯ Historical' to process old emails, or wait for new ones.") | |
| # ============================================================ | |
| # HISTORICAL EMAIL PROCESSING PAGE (NEW) | |
| # ============================================================ | |
| elif page == "π₯ Historical": | |
| st.markdown('<p class="main-header">π₯ Process Historical Emails</p>', unsafe_allow_html=True) | |
| st.markdown('<p class="sub-header">Fetch ALL emails (read & unread) from your server by date range, preview them, then process with AI</p>', unsafe_allow_html=True) | |
| st.info(""" | |
| **Why this page exists:** The auto-polling only catches NEW unread emails. | |
| Your email client already marked older emails as "read", so they were skipped. | |
| Use this page to fetch and process ALL emails from any date β regardless of read status. | |
| """) | |
| # Date picker | |
| col_date, col_btn = st.columns([2, 1]) | |
| with col_date: | |
| since_date = st.date_input( | |
| "Fetch emails since:", | |
| value=date(2025, 1, 1), | |
| min_value=date(2024, 1, 1), | |
| max_value=date.today(), | |
| ) | |
| with col_btn: | |
| st.markdown("<br>", unsafe_allow_html=True) | |
| fetch_clicked = st.button("π Fetch Email List", type="primary", use_container_width=True) | |
| if fetch_clicked: | |
| # Format date for IMAP: DD-Mon-YYYY | |
| imap_date = since_date.strftime("%d-%b-%Y") | |
| with st.spinner(f"Connecting to server and fetching headers since {imap_date}..."): | |
| try: | |
| fetcher = EmailFetcher() | |
| if fetcher.connect(): | |
| headers = fetcher.fetch_email_headers_by_date(imap_date) | |
| fetcher.disconnect() | |
| st.session_state.historical_emails = headers | |
| st.success(f"β Found **{len(headers)}** emails since {since_date}") | |
| else: | |
| st.error("β Failed to connect to email server") | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| # Display fetched emails | |
| if st.session_state.historical_emails: | |
| headers = st.session_state.historical_emails | |
| # Show which are already in database | |
| already_processed = [] | |
| new_emails = [] | |
| for h in headers: | |
| if email_exists(h["message_id"]): | |
| already_processed.append(h) | |
| else: | |
| new_emails.append(h) | |
| st.markdown("---") | |
| st.markdown(f"### π Results: {len(headers)} total emails found") | |
| col_a, col_b = st.columns(2) | |
| with col_a: | |
| st.metric("β Already Processed", len(already_processed)) | |
| with col_b: | |
| st.metric("π Not Yet Processed", len(new_emails)) | |
| if new_emails: | |
| # Show preview table | |
| st.markdown("### π Emails Ready to Process") | |
| preview_df = pd.DataFrame(new_emails) | |
| display_cols = ["date_received", "sender", "subject"] | |
| display_cols = [c for c in display_cols if c in preview_df.columns] | |
| preview_display = preview_df[display_cols].rename(columns={ | |
| "date_received": "Date", "sender": "From", "subject": "Subject" | |
| }) | |
| st.dataframe(preview_display, use_container_width=True, hide_index=True) | |
| # Process buttons | |
| st.markdown("---") | |
| st.markdown("### π Process These Emails with AI") | |
| st.warning(f"β οΈ This will process **{len(new_emails)}** emails through Gemini AI. " | |
| f"Each email takes ~3-5 seconds. Estimated time: **{len(new_emails) * 4 // 60} min {len(new_emails) * 4 % 60} sec**") | |
| process_col1, process_col2 = st.columns(2) | |
| with process_col1: | |
| process_all = st.button( | |
| f"π€ Summarise All {len(new_emails)} Emails", | |
| type="primary", | |
| use_container_width=True | |
| ) | |
| with process_col2: | |
| batch_size = st.selectbox("Or process in batches of:", [10, 25, 50, 100], index=0) | |
| process_batch = st.button( | |
| f"π€ Process Next {min(batch_size, len(new_emails))} Emails", | |
| use_container_width=True | |
| ) | |
| # Execute processing | |
| emails_to_process = [] | |
| if process_all: | |
| emails_to_process = new_emails | |
| elif process_batch: | |
| emails_to_process = new_emails[:batch_size] | |
| if emails_to_process: | |
| st.markdown("---") | |
| st.markdown("### β³ Processing...") | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| success_count = 0 | |
| error_count = 0 | |
| # Connect and fetch full emails | |
| fetcher = EmailFetcher() | |
| if fetcher.connect(): | |
| # Open in readonly mode | |
| fetcher.connection.select("INBOX", readonly=True) | |
| pipeline = st.session_state.pipeline | |
| for i, header in enumerate(emails_to_process): | |
| progress = (i + 1) / len(emails_to_process) | |
| progress_bar.progress(progress) | |
| status_text.markdown( | |
| f"**Processing {i+1}/{len(emails_to_process)}:** {header.get('subject', 'N/A')[:60]}..." | |
| ) | |
| try: | |
| # Fetch full email | |
| email_data = fetcher._fetch_single_email(header["msg_id"], mark_read=False) | |
| if email_data: | |
| result = pipeline.process_single_email(email_data) | |
| if result is not None: | |
| success_count += 1 | |
| # Rate limit for Gemini API | |
| time.sleep(2) | |
| except Exception as e: | |
| error_count += 1 | |
| logger.error(f"Error processing: {e}") | |
| time.sleep(1) | |
| fetcher.disconnect() | |
| progress_bar.progress(1.0) | |
| status_text.empty() | |
| st.success(f"β **Done!** Processed: {success_count} | Errors: {error_count}") | |
| st.balloons() | |
| # Clear cache so the list updates | |
| st.session_state.historical_emails = None | |
| st.rerun() | |
| else: | |
| st.error("β Failed to connect to email server") | |
| elif already_processed and not new_emails: | |
| st.success("β All emails in this date range are already processed! Check the π Documents page.") | |
| # ============================================================ | |
| # Search Page | |
| # ============================================================ | |
| elif page == "π Search": | |
| st.markdown('<p class="main-header">π Search</p>', unsafe_allow_html=True) | |
| q = st.text_input("Search all documents", placeholder="Reference number, subject, keywords...") | |
| if q: | |
| results = get_all_emails(search_query=q, limit=50) | |
| if results: | |
| st.success(f"{len(results)} result(s)") | |
| for r in results: | |
| with st.expander(f"π {r.get('document_reference_number', 'N/A')} β {r.get('subject', '')}"): | |
| c1, c2, c3 = st.columns(3) | |
| c1.markdown(f"**Type:** {r.get('document_type')}") | |
| c2.markdown(f"**Status:** {r.get('status')}") | |
| c3.markdown(f"**Date:** {(r.get('date_received') or '')[:10]}") | |
| st.markdown(f"**Comments:** {r.get('consultant_comments', 'N/A')}") | |
| st.markdown(f"**Action:** {r.get('action_required', 'N/A')}") | |
| else: | |
| st.warning("No results found.") | |
| # ============================================================ | |
| # Settings Page | |
| # ============================================================ | |
| elif page == "βοΈ Settings": | |
| st.markdown('<p class="main-header">βοΈ Settings</p>', unsafe_allow_html=True) | |
| tab1, tab2, tab3 = st.tabs(["π§ Email", "π€ AI", "π Status"]) | |
| with tab1: | |
| st.markdown("### IMAP Configuration") | |
| st.markdown(f""" | |
| | Setting | Value | | |
| |---------|-------| | |
| | Host | `{os.getenv('IMAP_HOST', 'NOT SET')}` | | |
| | Port | `{os.getenv('IMAP_PORT', '993')}` | | |
| | User | `{os.getenv('IMAP_USER', 'NOT SET')}` | | |
| | Password | `{'β Set' if os.getenv('IMAP_PASSWORD') else 'β NOT SET'}` | | |
| | Check Interval | `{os.getenv('EMAIL_CHECK_INTERVAL', '60')}s` | | |
| """) | |
| if st.button("π Test IMAP Connection"): | |
| with st.spinner("Connecting..."): | |
| fetcher = EmailFetcher() | |
| ok, msg = fetcher.test_connection() | |
| if ok: | |
| st.success(f"β {msg}") | |
| else: | |
| st.error(f"β {msg}") | |
| with tab2: | |
| st.markdown("### Gemini AI") | |
| st.markdown(f""" | |
| | Setting | Value | | |
| |---------|-------| | |
| | Model | `{GEMINI_MODEL}` | | |
| | API Key | `{'β Set' if os.getenv('GOOGLE_API_KEY') else 'β NOT SET'}` | | |
| | Max Attachment | `{os.getenv('MAX_ATTACHMENT_SIZE_MB', '50')} MB` | | |
| """) | |
| if st.button("π§ͺ Test Gemini"): | |
| with st.spinner("Testing..."): | |
| ok, msg = test_gemini_connection() | |
| if ok: | |
| st.success(f"β {msg}") | |
| else: | |
| st.error(f"β {msg}") | |
| with tab3: | |
| st.markdown("### System Status") | |
| ps = st.session_state.pipeline.get_status() | |
| stats = get_dashboard_stats() | |
| st.markdown(f""" | |
| | Metric | Value | | |
| |--------|-------| | |
| | Polling | {'π’ Active' if ps['is_running'] else 'π΄ Stopped'} | | |
| | Last Check | {ps['last_check_time'] or 'Never'} | | |
| | Session Processed | {ps['emails_processed']} | | |
| | Session Errors | {ps['errors']} | | |
| | DB Total Processed | {stats['total_processed']} | | |
| | DB Pending | {stats['pending']} | | |
| | DB Failed | {stats['failed']} | | |
| """) | |
| # Footer | |
| st.markdown("---") | |
| st.markdown( | |
| f'<div style="text-align:center;color:#888;font-size:0.8rem">' | |
| f'ποΈ CRCC Construction Intelligence Hub v1.1 β’ DSC Hotel, Dubai Studio City β’ ' | |
| f'Powered by {GEMINI_MODEL}</div>', | |
| unsafe_allow_html=True | |
| ) | |