""" AI Construction Project Intelligence Hub Main Streamlit Application - Self-Hosted Version Fully automated email processing pipeline: - Connects to IMAP server on startup - Polls for new emails every 60 seconds (configurable) - Sends emails + attachments to Gemini 3 Flash Preview for analysis - Stores structured results in SQLite - Displays real-time dashboard - Process historical emails (read or unread) by date range Built for China Railway 18th Bureau Group (CRCC) - DSC Hotel Project, Dubai. """ import streamlit as st import pandas as pd import os import sys import json import time import logging from datetime import datetime, timedelta, date from io import BytesIO from modules.database import ( init_database, get_all_emails, get_email_by_id, get_attachments_for_email, get_dashboard_stats, get_all_emails_for_export, email_exists ) from modules.pipeline import EmailPipeline from modules.email_fetcher import EmailFetcher from modules.gemini_processor import test_gemini_connection, GEMINI_MODEL # Configure logging logging.basicConfig( level=getattr(logging, os.getenv("LOG_LEVEL", "INFO")), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Initialize database on startup init_database() # ============================================================ # Page Configuration # ============================================================ st.set_page_config( page_title="CRCC Construction Intelligence Hub", page_icon="ποΈ", layout="wide", initial_sidebar_state="expanded", ) # Custom CSS st.markdown(""" """, unsafe_allow_html=True) # ============================================================ # Session State & Auto-Start Polling # ============================================================ if "pipeline" not in st.session_state: st.session_state.pipeline = EmailPipeline() auto_start = os.getenv("AUTO_START_POLLING", "true").lower() == "true" has_imap = bool(os.getenv("IMAP_USER")) and bool(os.getenv("IMAP_PASSWORD")) has_gemini = bool(os.getenv("GOOGLE_API_KEY")) if auto_start and has_imap and has_gemini: st.session_state.pipeline.start_background_polling() logger.info("Auto-started background email polling") if "historical_emails" not in st.session_state: st.session_state.historical_emails = None if "processing_historical" not in st.session_state: st.session_state.processing_historical = False # ============================================================ # Sidebar # ============================================================ with st.sidebar: st.markdown("## ποΈ CRCC Intelligence Hub") st.markdown("**DSC Hotel Project**") st.markdown("Dubai Studio City β’ G+8") st.markdown(f"*Model: `{GEMINI_MODEL}`*") st.markdown("---") # Navigation page = st.radio( "Navigation", ["π Dashboard", "π Documents", "π₯ Historical", "π Search", "βοΈ Settings"], label_visibility="collapsed" ) st.markdown("---") st.markdown("### β‘ Controls") # Pipeline status status = st.session_state.pipeline.get_status() if status["is_running"]: st.success("π’ Auto-polling: ACTIVE") if st.button("βΈοΈ Stop Polling", use_container_width=True): st.session_state.pipeline.stop_background_polling() st.rerun() else: st.error("π΄ Auto-polling: STOPPED") if st.button("βΆοΈ Start Polling", use_container_width=True, type="primary"): st.session_state.pipeline.start_background_polling() st.rerun() # Manual trigger if st.button("π¬ Check New", use_container_width=True): with st.spinner("Fetching & processing..."): try: count = st.session_state.pipeline.check_and_process_new_emails() if count > 0: st.success(f"β {count} new email(s)!") else: st.info("No new unread emails") except Exception as e: st.error(f"Error: {e}") st.markdown("---") st.caption(f"π§ Processed: **{status['emails_processed']}**") st.caption(f"β Errors: **{status['errors']}**") if status["last_check_time"]: st.caption(f"π Last: {status['last_check_time'][:19]}") # ============================================================ # Dashboard Page # ============================================================ if page == "π Dashboard": st.markdown('
π Project Intelligence Dashboard
', unsafe_allow_html=True) st.markdown('Real-time automated email processing β’ DSC Hotel G+8 β’ CRCC Γ MIMAR
', unsafe_allow_html=True) stats = get_dashboard_stats() col1, col2, col3, col4, col5 = st.columns(5) with col1: st.metric("π Total Processed", stats["total_processed"]) with col2: st.metric("β³ Pending", stats["pending"]) with col3: st.metric("β Failed", stats["failed"]) with col4: approved = stats["by_status"].get("Approved", 0) + stats["by_status"].get("Approved with Comments", 0) st.metric("β Approved", approved) with col5: action_needed = stats["by_status"].get("Rejected", 0) + stats["by_status"].get("Resubmit", 0) st.metric("π¨ Action Needed", action_needed) st.markdown("---") col_left, col_right = st.columns(2) with col_left: st.markdown("#### π By Document Type") if stats["by_document_type"]: st.bar_chart(pd.DataFrame(list(stats["by_document_type"].items()), columns=["Type", "Count"]).set_index("Type")) else: st.info("No data yet.") with col_right: st.markdown("#### π By Status") if stats["by_status"]: st.bar_chart(pd.DataFrame(list(stats["by_status"].items()), columns=["Status", "Count"]).set_index("Status")) else: st.info("Waiting for data...") if stats["by_discipline"]: st.markdown("#### π’ By Discipline") cols = st.columns(len(stats["by_discipline"])) for i, (disc, cnt) in enumerate(stats["by_discipline"].items()): with cols[i]: st.metric(disc or "Unassigned", cnt) st.markdown("---") ps = st.session_state.pipeline.get_status() if ps["is_running"]: st.success(f"π€ **Automation Active** β Checking `{os.getenv('IMAP_USER', 'N/A')}` every {os.getenv('EMAIL_CHECK_INTERVAL', '60')}s") else: st.warning("β οΈ **Automation Stopped** β Click 'Start Polling' in sidebar") # ============================================================ # Documents Page # ============================================================ elif page == "π Documents": st.markdown('π Document Registry
', unsafe_allow_html=True) f1, f2, f3, f4 = st.columns(4) with f1: doc_type_filter = st.selectbox("Type", ["All", "RFI", "WIR", "MIR", "NCR", "Shop Drawing", "IFC", "Payment/IPA", "General"]) with f2: status_filter = st.selectbox("Status", ["All", "Approved", "Approved with Comments", "Rejected", "Resubmit", "Pending/Info"]) with f3: discipline_filter = st.selectbox("Discipline", ["All", "Civil", "MEP", "Structural", "Architectural"]) with f4: search_query = st.text_input("π Search", placeholder="Reference, subject...") filters = { "document_type": doc_type_filter if doc_type_filter != "All" else None, "status": status_filter if status_filter != "All" else None, "discipline": discipline_filter if discipline_filter != "All" else None, "search_query": search_query if search_query else None, } emails = get_all_emails(**filters) if emails: df = pd.DataFrame(emails) display_cols = ["id", "date_received", "document_type", "document_reference_number", "status", "assigned_discipline", "subject", "action_required"] display_cols = [c for c in display_cols if c in df.columns] df_display = df[display_cols].rename(columns={ "id": "ID", "date_received": "Date", "document_type": "Type", "document_reference_number": "Reference", "status": "Status", "assigned_discipline": "Discipline", "subject": "Subject", "action_required": "Action Required" }) st.markdown(f"**{len(df_display)} document(s)**") st.dataframe(df_display, use_container_width=True, hide_index=True) # Export st.markdown("---") exp1, exp2, _ = st.columns([1, 1, 4]) export_data = get_all_emails_for_export(document_type=filters["document_type"], status=filters["status"], discipline=filters["discipline"]) if export_data: csv_df = pd.DataFrame(export_data) with exp1: st.download_button("β¬οΈ CSV", data=csv_df.to_csv(index=False).encode("utf-8"), file_name=f"CRCC_Report_{datetime.now():%Y%m%d}.csv", mime="text/csv", type="primary") with exp2: buf = BytesIO() csv_df.to_excel(buf, index=False, sheet_name="Documents") st.download_button("β¬οΈ Excel", data=buf.getvalue(), file_name=f"CRCC_Report_{datetime.now():%Y%m%d}.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") # Detail view st.markdown("---") st.markdown("### π Detail View") selected_id = st.selectbox("Select document", options=[e["id"] for e in emails], format_func=lambda x: next((f"#{e['id']} β {e.get('document_reference_number') or e.get('subject', 'N/A')}" for e in emails if e["id"] == x), str(x))) if selected_id: detail = get_email_by_id(selected_id) if detail: c1, c2 = st.columns([2, 1]) with c1: st.markdown(f"**Subject:** {detail.get('subject', 'N/A')}") st.markdown(f"**From:** {detail.get('sender', 'N/A')}") st.markdown(f"**Date:** {detail.get('date_received', 'N/A')}") st.markdown(f"**Reference:** `{detail.get('document_reference_number', 'N/A')}`") with c2: s = detail.get("status", "N/A") emoji = {"Approved": "β ", "Approved with Comments": "βοΈ", "Rejected": "β", "Resubmit": "π", "Pending/Info": "β³"}.get(s, "β") st.markdown(f"### {emoji} {s}") st.markdown(f"**Type:** {detail.get('document_type', 'N/A')}") st.markdown(f"**Discipline:** {detail.get('assigned_discipline', 'N/A')}") st.markdown("---") st.markdown("#### π¬ Consultant Comments") st.info(detail.get("consultant_comments", "No comments extracted")) st.markdown("#### β‘ Action Required") st.warning(detail.get("action_required", "No action identified")) attachments = get_attachments_for_email(selected_id) if attachments: st.markdown("#### π Attachments") for att in attachments: icon = "β " if att.get("processed_by_ai") else "βοΈ" size_mb = (att.get("file_size", 0) or 0) / (1024 * 1024) st.markdown(f"- {icon} **{att['filename']}** ({att.get('mime_type', '?')}, {size_mb:.1f} MB)") with st.expander("π€ Raw AI JSON"): if detail.get("ai_raw_response"): try: st.json(json.loads(detail["ai_raw_response"])) except: st.code(detail["ai_raw_response"]) with st.expander("π§ Original Email Body"): st.text(detail.get("body_text", "No body")) else: st.info("No processed documents yet. Use 'π₯ Historical' to process old emails, or wait for new ones.") # ============================================================ # HISTORICAL EMAIL PROCESSING PAGE (NEW) # ============================================================ elif page == "π₯ Historical": st.markdown('π₯ Process Historical Emails
', unsafe_allow_html=True) st.markdown('Fetch ALL emails (read & unread) from your server by date range, preview them, then process with AI
', unsafe_allow_html=True) st.info(""" **Why this page exists:** The auto-polling only catches NEW unread emails. Your email client already marked older emails as "read", so they were skipped. Use this page to fetch and process ALL emails from any date β regardless of read status. """) # Date picker col_date, col_btn = st.columns([2, 1]) with col_date: since_date = st.date_input( "Fetch emails since:", value=date(2025, 1, 1), min_value=date(2024, 1, 1), max_value=date.today(), ) with col_btn: st.markdown("π Search
', unsafe_allow_html=True) q = st.text_input("Search all documents", placeholder="Reference number, subject, keywords...") if q: results = get_all_emails(search_query=q, limit=50) if results: st.success(f"{len(results)} result(s)") for r in results: with st.expander(f"π {r.get('document_reference_number', 'N/A')} β {r.get('subject', '')}"): c1, c2, c3 = st.columns(3) c1.markdown(f"**Type:** {r.get('document_type')}") c2.markdown(f"**Status:** {r.get('status')}") c3.markdown(f"**Date:** {(r.get('date_received') or '')[:10]}") st.markdown(f"**Comments:** {r.get('consultant_comments', 'N/A')}") st.markdown(f"**Action:** {r.get('action_required', 'N/A')}") else: st.warning("No results found.") # ============================================================ # Settings Page # ============================================================ elif page == "βοΈ Settings": st.markdown('βοΈ Settings
', unsafe_allow_html=True) tab1, tab2, tab3 = st.tabs(["π§ Email", "π€ AI", "π Status"]) with tab1: st.markdown("### IMAP Configuration") st.markdown(f""" | Setting | Value | |---------|-------| | Host | `{os.getenv('IMAP_HOST', 'NOT SET')}` | | Port | `{os.getenv('IMAP_PORT', '993')}` | | User | `{os.getenv('IMAP_USER', 'NOT SET')}` | | Password | `{'β Set' if os.getenv('IMAP_PASSWORD') else 'β NOT SET'}` | | Check Interval | `{os.getenv('EMAIL_CHECK_INTERVAL', '60')}s` | """) if st.button("π Test IMAP Connection"): with st.spinner("Connecting..."): fetcher = EmailFetcher() ok, msg = fetcher.test_connection() if ok: st.success(f"β {msg}") else: st.error(f"β {msg}") with tab2: st.markdown("### Gemini AI") st.markdown(f""" | Setting | Value | |---------|-------| | Model | `{GEMINI_MODEL}` | | API Key | `{'β Set' if os.getenv('GOOGLE_API_KEY') else 'β NOT SET'}` | | Max Attachment | `{os.getenv('MAX_ATTACHMENT_SIZE_MB', '50')} MB` | """) if st.button("π§ͺ Test Gemini"): with st.spinner("Testing..."): ok, msg = test_gemini_connection() if ok: st.success(f"β {msg}") else: st.error(f"β {msg}") with tab3: st.markdown("### System Status") ps = st.session_state.pipeline.get_status() stats = get_dashboard_stats() st.markdown(f""" | Metric | Value | |--------|-------| | Polling | {'π’ Active' if ps['is_running'] else 'π΄ Stopped'} | | Last Check | {ps['last_check_time'] or 'Never'} | | Session Processed | {ps['emails_processed']} | | Session Errors | {ps['errors']} | | DB Total Processed | {stats['total_processed']} | | DB Pending | {stats['pending']} | | DB Failed | {stats['failed']} | """) # Footer st.markdown("---") st.markdown( f'