""" AI Construction Project Intelligence Hub Main Streamlit Application - Self-Hosted Version Fully automated email processing pipeline: - Connects to IMAP server on startup - Polls for new emails every 60 seconds (configurable) - Sends emails + attachments to Gemini 3 Flash Preview for analysis - Stores structured results in SQLite - Displays real-time dashboard - Process historical emails (read or unread) by date range Built for China Railway 18th Bureau Group (CRCC) - DSC Hotel Project, Dubai. """ import streamlit as st import pandas as pd import os import sys import json import time import logging from datetime import datetime, timedelta, date from io import BytesIO from modules.database import ( init_database, get_all_emails, get_email_by_id, get_attachments_for_email, get_dashboard_stats, get_all_emails_for_export, email_exists ) from modules.pipeline import EmailPipeline from modules.email_fetcher import EmailFetcher from modules.gemini_processor import test_gemini_connection, GEMINI_MODEL # Configure logging logging.basicConfig( level=getattr(logging, os.getenv("LOG_LEVEL", "INFO")), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Initialize database on startup init_database() # ============================================================ # Page Configuration # ============================================================ st.set_page_config( page_title="CRCC Construction Intelligence Hub", page_icon="πŸ—οΈ", layout="wide", initial_sidebar_state="expanded", ) # Custom CSS st.markdown(""" """, unsafe_allow_html=True) # ============================================================ # Session State & Auto-Start Polling # ============================================================ if "pipeline" not in st.session_state: st.session_state.pipeline = EmailPipeline() auto_start = os.getenv("AUTO_START_POLLING", "true").lower() == "true" has_imap = bool(os.getenv("IMAP_USER")) and bool(os.getenv("IMAP_PASSWORD")) has_gemini = bool(os.getenv("GOOGLE_API_KEY")) if auto_start and has_imap and has_gemini: st.session_state.pipeline.start_background_polling() logger.info("Auto-started background email polling") if "historical_emails" not in st.session_state: st.session_state.historical_emails = None if "processing_historical" not in st.session_state: st.session_state.processing_historical = False # ============================================================ # Sidebar # ============================================================ with st.sidebar: st.markdown("## πŸ—οΈ CRCC Intelligence Hub") st.markdown("**DSC Hotel Project**") st.markdown("Dubai Studio City β€’ G+8") st.markdown(f"*Model: `{GEMINI_MODEL}`*") st.markdown("---") # Navigation page = st.radio( "Navigation", ["πŸ“Š Dashboard", "πŸ“‹ Documents", "πŸ“₯ Historical", "πŸ” Search", "βš™οΈ Settings"], label_visibility="collapsed" ) st.markdown("---") st.markdown("### ⚑ Controls") # Pipeline status status = st.session_state.pipeline.get_status() if status["is_running"]: st.success("🟒 Auto-polling: ACTIVE") if st.button("⏸️ Stop Polling", use_container_width=True): st.session_state.pipeline.stop_background_polling() st.rerun() else: st.error("πŸ”΄ Auto-polling: STOPPED") if st.button("▢️ Start Polling", use_container_width=True, type="primary"): st.session_state.pipeline.start_background_polling() st.rerun() # Manual trigger if st.button("πŸ“¬ Check New", use_container_width=True): with st.spinner("Fetching & processing..."): try: count = st.session_state.pipeline.check_and_process_new_emails() if count > 0: st.success(f"βœ… {count} new email(s)!") else: st.info("No new unread emails") except Exception as e: st.error(f"Error: {e}") st.markdown("---") st.caption(f"πŸ“§ Processed: **{status['emails_processed']}**") st.caption(f"❌ Errors: **{status['errors']}**") if status["last_check_time"]: st.caption(f"πŸ• Last: {status['last_check_time'][:19]}") # ============================================================ # Dashboard Page # ============================================================ if page == "πŸ“Š Dashboard": st.markdown('

πŸ“Š Project Intelligence Dashboard

', unsafe_allow_html=True) st.markdown('

Real-time automated email processing β€’ DSC Hotel G+8 β€’ CRCC Γ— MIMAR

', unsafe_allow_html=True) stats = get_dashboard_stats() col1, col2, col3, col4, col5 = st.columns(5) with col1: st.metric("πŸ“„ Total Processed", stats["total_processed"]) with col2: st.metric("⏳ Pending", stats["pending"]) with col3: st.metric("❌ Failed", stats["failed"]) with col4: approved = stats["by_status"].get("Approved", 0) + stats["by_status"].get("Approved with Comments", 0) st.metric("βœ… Approved", approved) with col5: action_needed = stats["by_status"].get("Rejected", 0) + stats["by_status"].get("Resubmit", 0) st.metric("🚨 Action Needed", action_needed) st.markdown("---") col_left, col_right = st.columns(2) with col_left: st.markdown("#### πŸ“ By Document Type") if stats["by_document_type"]: st.bar_chart(pd.DataFrame(list(stats["by_document_type"].items()), columns=["Type", "Count"]).set_index("Type")) else: st.info("No data yet.") with col_right: st.markdown("#### πŸ“Š By Status") if stats["by_status"]: st.bar_chart(pd.DataFrame(list(stats["by_status"].items()), columns=["Status", "Count"]).set_index("Status")) else: st.info("Waiting for data...") if stats["by_discipline"]: st.markdown("#### 🏒 By Discipline") cols = st.columns(len(stats["by_discipline"])) for i, (disc, cnt) in enumerate(stats["by_discipline"].items()): with cols[i]: st.metric(disc or "Unassigned", cnt) st.markdown("---") ps = st.session_state.pipeline.get_status() if ps["is_running"]: st.success(f"πŸ€– **Automation Active** β€” Checking `{os.getenv('IMAP_USER', 'N/A')}` every {os.getenv('EMAIL_CHECK_INTERVAL', '60')}s") else: st.warning("⚠️ **Automation Stopped** β€” Click 'Start Polling' in sidebar") # ============================================================ # Documents Page # ============================================================ elif page == "πŸ“‹ Documents": st.markdown('

πŸ“‹ Document Registry

', unsafe_allow_html=True) f1, f2, f3, f4 = st.columns(4) with f1: doc_type_filter = st.selectbox("Type", ["All", "RFI", "WIR", "MIR", "NCR", "Shop Drawing", "IFC", "Payment/IPA", "General"]) with f2: status_filter = st.selectbox("Status", ["All", "Approved", "Approved with Comments", "Rejected", "Resubmit", "Pending/Info"]) with f3: discipline_filter = st.selectbox("Discipline", ["All", "Civil", "MEP", "Structural", "Architectural"]) with f4: search_query = st.text_input("πŸ” Search", placeholder="Reference, subject...") filters = { "document_type": doc_type_filter if doc_type_filter != "All" else None, "status": status_filter if status_filter != "All" else None, "discipline": discipline_filter if discipline_filter != "All" else None, "search_query": search_query if search_query else None, } emails = get_all_emails(**filters) if emails: df = pd.DataFrame(emails) display_cols = ["id", "date_received", "document_type", "document_reference_number", "status", "assigned_discipline", "subject", "action_required"] display_cols = [c for c in display_cols if c in df.columns] df_display = df[display_cols].rename(columns={ "id": "ID", "date_received": "Date", "document_type": "Type", "document_reference_number": "Reference", "status": "Status", "assigned_discipline": "Discipline", "subject": "Subject", "action_required": "Action Required" }) st.markdown(f"**{len(df_display)} document(s)**") st.dataframe(df_display, use_container_width=True, hide_index=True) # Export st.markdown("---") exp1, exp2, _ = st.columns([1, 1, 4]) export_data = get_all_emails_for_export(document_type=filters["document_type"], status=filters["status"], discipline=filters["discipline"]) if export_data: csv_df = pd.DataFrame(export_data) with exp1: st.download_button("⬇️ CSV", data=csv_df.to_csv(index=False).encode("utf-8"), file_name=f"CRCC_Report_{datetime.now():%Y%m%d}.csv", mime="text/csv", type="primary") with exp2: buf = BytesIO() csv_df.to_excel(buf, index=False, sheet_name="Documents") st.download_button("⬇️ Excel", data=buf.getvalue(), file_name=f"CRCC_Report_{datetime.now():%Y%m%d}.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") # Detail view st.markdown("---") st.markdown("### πŸ“– Detail View") selected_id = st.selectbox("Select document", options=[e["id"] for e in emails], format_func=lambda x: next((f"#{e['id']} β€” {e.get('document_reference_number') or e.get('subject', 'N/A')}" for e in emails if e["id"] == x), str(x))) if selected_id: detail = get_email_by_id(selected_id) if detail: c1, c2 = st.columns([2, 1]) with c1: st.markdown(f"**Subject:** {detail.get('subject', 'N/A')}") st.markdown(f"**From:** {detail.get('sender', 'N/A')}") st.markdown(f"**Date:** {detail.get('date_received', 'N/A')}") st.markdown(f"**Reference:** `{detail.get('document_reference_number', 'N/A')}`") with c2: s = detail.get("status", "N/A") emoji = {"Approved": "βœ…", "Approved with Comments": "β˜‘οΈ", "Rejected": "❌", "Resubmit": "πŸ”„", "Pending/Info": "⏳"}.get(s, "❓") st.markdown(f"### {emoji} {s}") st.markdown(f"**Type:** {detail.get('document_type', 'N/A')}") st.markdown(f"**Discipline:** {detail.get('assigned_discipline', 'N/A')}") st.markdown("---") st.markdown("#### πŸ’¬ Consultant Comments") st.info(detail.get("consultant_comments", "No comments extracted")) st.markdown("#### ⚑ Action Required") st.warning(detail.get("action_required", "No action identified")) attachments = get_attachments_for_email(selected_id) if attachments: st.markdown("#### πŸ“Ž Attachments") for att in attachments: icon = "βœ…" if att.get("processed_by_ai") else "⏭️" size_mb = (att.get("file_size", 0) or 0) / (1024 * 1024) st.markdown(f"- {icon} **{att['filename']}** ({att.get('mime_type', '?')}, {size_mb:.1f} MB)") with st.expander("πŸ€– Raw AI JSON"): if detail.get("ai_raw_response"): try: st.json(json.loads(detail["ai_raw_response"])) except: st.code(detail["ai_raw_response"]) with st.expander("πŸ“§ Original Email Body"): st.text(detail.get("body_text", "No body")) else: st.info("No processed documents yet. Use 'πŸ“₯ Historical' to process old emails, or wait for new ones.") # ============================================================ # HISTORICAL EMAIL PROCESSING PAGE (NEW) # ============================================================ elif page == "πŸ“₯ Historical": st.markdown('

πŸ“₯ Process Historical Emails

', unsafe_allow_html=True) st.markdown('

Fetch ALL emails (read & unread) from your server by date range, preview them, then process with AI

', unsafe_allow_html=True) st.info(""" **Why this page exists:** The auto-polling only catches NEW unread emails. Your email client already marked older emails as "read", so they were skipped. Use this page to fetch and process ALL emails from any date β€” regardless of read status. """) # Date picker col_date, col_btn = st.columns([2, 1]) with col_date: since_date = st.date_input( "Fetch emails since:", value=date(2025, 1, 1), min_value=date(2024, 1, 1), max_value=date.today(), ) with col_btn: st.markdown("
", unsafe_allow_html=True) fetch_clicked = st.button("πŸ” Fetch Email List", type="primary", use_container_width=True) if fetch_clicked: # Format date for IMAP: DD-Mon-YYYY imap_date = since_date.strftime("%d-%b-%Y") with st.spinner(f"Connecting to server and fetching headers since {imap_date}..."): try: fetcher = EmailFetcher() if fetcher.connect(): headers = fetcher.fetch_email_headers_by_date(imap_date) fetcher.disconnect() st.session_state.historical_emails = headers st.success(f"βœ… Found **{len(headers)}** emails since {since_date}") else: st.error("❌ Failed to connect to email server") except Exception as e: st.error(f"Error: {e}") # Display fetched emails if st.session_state.historical_emails: headers = st.session_state.historical_emails # Show which are already in database already_processed = [] new_emails = [] for h in headers: if email_exists(h["message_id"]): already_processed.append(h) else: new_emails.append(h) st.markdown("---") st.markdown(f"### πŸ“Š Results: {len(headers)} total emails found") col_a, col_b = st.columns(2) with col_a: st.metric("βœ… Already Processed", len(already_processed)) with col_b: st.metric("πŸ†• Not Yet Processed", len(new_emails)) if new_emails: # Show preview table st.markdown("### πŸ†• Emails Ready to Process") preview_df = pd.DataFrame(new_emails) display_cols = ["date_received", "sender", "subject"] display_cols = [c for c in display_cols if c in preview_df.columns] preview_display = preview_df[display_cols].rename(columns={ "date_received": "Date", "sender": "From", "subject": "Subject" }) st.dataframe(preview_display, use_container_width=True, hide_index=True) # Process buttons st.markdown("---") st.markdown("### πŸš€ Process These Emails with AI") st.warning(f"⚠️ This will process **{len(new_emails)}** emails through Gemini AI. " f"Each email takes ~3-5 seconds. Estimated time: **{len(new_emails) * 4 // 60} min {len(new_emails) * 4 % 60} sec**") process_col1, process_col2 = st.columns(2) with process_col1: process_all = st.button( f"πŸ€– Summarise All {len(new_emails)} Emails", type="primary", use_container_width=True ) with process_col2: batch_size = st.selectbox("Or process in batches of:", [10, 25, 50, 100], index=0) process_batch = st.button( f"πŸ€– Process Next {min(batch_size, len(new_emails))} Emails", use_container_width=True ) # Execute processing emails_to_process = [] if process_all: emails_to_process = new_emails elif process_batch: emails_to_process = new_emails[:batch_size] if emails_to_process: st.markdown("---") st.markdown("### ⏳ Processing...") progress_bar = st.progress(0) status_text = st.empty() success_count = 0 error_count = 0 # Connect and fetch full emails fetcher = EmailFetcher() if fetcher.connect(): # Open in readonly mode fetcher.connection.select("INBOX", readonly=True) pipeline = st.session_state.pipeline for i, header in enumerate(emails_to_process): progress = (i + 1) / len(emails_to_process) progress_bar.progress(progress) status_text.markdown( f"**Processing {i+1}/{len(emails_to_process)}:** {header.get('subject', 'N/A')[:60]}..." ) try: # Fetch full email email_data = fetcher._fetch_single_email(header["msg_id"], mark_read=False) if email_data: result = pipeline.process_single_email(email_data) if result is not None: success_count += 1 # Rate limit for Gemini API time.sleep(2) except Exception as e: error_count += 1 logger.error(f"Error processing: {e}") time.sleep(1) fetcher.disconnect() progress_bar.progress(1.0) status_text.empty() st.success(f"βœ… **Done!** Processed: {success_count} | Errors: {error_count}") st.balloons() # Clear cache so the list updates st.session_state.historical_emails = None st.rerun() else: st.error("❌ Failed to connect to email server") elif already_processed and not new_emails: st.success("βœ… All emails in this date range are already processed! Check the πŸ“‹ Documents page.") # ============================================================ # Search Page # ============================================================ elif page == "πŸ” Search": st.markdown('

πŸ” Search

', unsafe_allow_html=True) q = st.text_input("Search all documents", placeholder="Reference number, subject, keywords...") if q: results = get_all_emails(search_query=q, limit=50) if results: st.success(f"{len(results)} result(s)") for r in results: with st.expander(f"πŸ“„ {r.get('document_reference_number', 'N/A')} β€” {r.get('subject', '')}"): c1, c2, c3 = st.columns(3) c1.markdown(f"**Type:** {r.get('document_type')}") c2.markdown(f"**Status:** {r.get('status')}") c3.markdown(f"**Date:** {(r.get('date_received') or '')[:10]}") st.markdown(f"**Comments:** {r.get('consultant_comments', 'N/A')}") st.markdown(f"**Action:** {r.get('action_required', 'N/A')}") else: st.warning("No results found.") # ============================================================ # Settings Page # ============================================================ elif page == "βš™οΈ Settings": st.markdown('

βš™οΈ Settings

', unsafe_allow_html=True) tab1, tab2, tab3 = st.tabs(["πŸ“§ Email", "πŸ€– AI", "πŸ“Š Status"]) with tab1: st.markdown("### IMAP Configuration") st.markdown(f""" | Setting | Value | |---------|-------| | Host | `{os.getenv('IMAP_HOST', 'NOT SET')}` | | Port | `{os.getenv('IMAP_PORT', '993')}` | | User | `{os.getenv('IMAP_USER', 'NOT SET')}` | | Password | `{'βœ… Set' if os.getenv('IMAP_PASSWORD') else '❌ NOT SET'}` | | Check Interval | `{os.getenv('EMAIL_CHECK_INTERVAL', '60')}s` | """) if st.button("πŸ”Œ Test IMAP Connection"): with st.spinner("Connecting..."): fetcher = EmailFetcher() ok, msg = fetcher.test_connection() if ok: st.success(f"βœ… {msg}") else: st.error(f"❌ {msg}") with tab2: st.markdown("### Gemini AI") st.markdown(f""" | Setting | Value | |---------|-------| | Model | `{GEMINI_MODEL}` | | API Key | `{'βœ… Set' if os.getenv('GOOGLE_API_KEY') else '❌ NOT SET'}` | | Max Attachment | `{os.getenv('MAX_ATTACHMENT_SIZE_MB', '50')} MB` | """) if st.button("πŸ§ͺ Test Gemini"): with st.spinner("Testing..."): ok, msg = test_gemini_connection() if ok: st.success(f"βœ… {msg}") else: st.error(f"❌ {msg}") with tab3: st.markdown("### System Status") ps = st.session_state.pipeline.get_status() stats = get_dashboard_stats() st.markdown(f""" | Metric | Value | |--------|-------| | Polling | {'🟒 Active' if ps['is_running'] else 'πŸ”΄ Stopped'} | | Last Check | {ps['last_check_time'] or 'Never'} | | Session Processed | {ps['emails_processed']} | | Session Errors | {ps['errors']} | | DB Total Processed | {stats['total_processed']} | | DB Pending | {stats['pending']} | | DB Failed | {stats['failed']} | """) # Footer st.markdown("---") st.markdown( f'
' f'πŸ—οΈ CRCC Construction Intelligence Hub v1.1 β€’ DSC Hotel, Dubai Studio City β€’ ' f'Powered by {GEMINI_MODEL}
', unsafe_allow_html=True )