| """ |
| Database module for the Construction Intelligence Hub. |
| Handles SQLite database creation, CRUD operations, and queries. |
| """ |
|
|
| import sqlite3 |
| import json |
| import os |
| from datetime import datetime |
| from typing import Optional, List, Dict, Any |
| from contextlib import contextmanager |
|
|
| DATABASE_PATH = os.getenv("DATABASE_PATH", "/data/construction_hub.db") |
|
|
|
|
| def get_db_path(): |
| return DATABASE_PATH |
|
|
|
|
| @contextmanager |
| def get_connection(): |
| """Context manager for database connections.""" |
| db_path = get_db_path() |
| os.makedirs(os.path.dirname(db_path), exist_ok=True) |
| conn = sqlite3.connect(db_path) |
| conn.row_factory = sqlite3.Row |
| conn.execute("PRAGMA journal_mode=WAL") |
| conn.execute("PRAGMA foreign_keys=ON") |
| try: |
| yield conn |
| conn.commit() |
| except Exception: |
| conn.rollback() |
| raise |
| finally: |
| conn.close() |
|
|
|
|
| def init_database(): |
| """Initialize the database schema.""" |
| with get_connection() as conn: |
| conn.executescript(""" |
| CREATE TABLE IF NOT EXISTS emails ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| message_id TEXT UNIQUE NOT NULL, |
| subject TEXT, |
| sender TEXT, |
| recipients TEXT, |
| date_received TIMESTAMP, |
| date_processed TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| body_text TEXT, |
| body_html TEXT, |
| project_name TEXT, |
| document_type TEXT, |
| document_reference_number TEXT, |
| status TEXT, |
| consultant_comments TEXT, |
| action_required TEXT, |
| assigned_discipline TEXT, |
| priority TEXT DEFAULT 'Normal', |
| ai_raw_response TEXT, |
| processing_status TEXT DEFAULT 'pending', |
| processing_error TEXT, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ); |
| |
| CREATE TABLE IF NOT EXISTS attachments ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| email_id INTEGER NOT NULL, |
| filename TEXT NOT NULL, |
| mime_type TEXT, |
| file_size INTEGER, |
| file_path TEXT, |
| processed_by_ai BOOLEAN DEFAULT 0, |
| processing_error TEXT, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| FOREIGN KEY (email_id) REFERENCES emails(id) ON DELETE CASCADE |
| ); |
| |
| CREATE INDEX IF NOT EXISTS idx_emails_message_id ON emails(message_id); |
| CREATE INDEX IF NOT EXISTS idx_emails_document_type ON emails(document_type); |
| CREATE INDEX IF NOT EXISTS idx_emails_status ON emails(status); |
| CREATE INDEX IF NOT EXISTS idx_emails_date_received ON emails(date_received); |
| CREATE INDEX IF NOT EXISTS idx_emails_processing_status ON emails(processing_status); |
| CREATE INDEX IF NOT EXISTS idx_attachments_email_id ON attachments(email_id); |
| """) |
|
|
|
|
| def email_exists(message_id: str) -> bool: |
| """Check if an email has already been processed.""" |
| with get_connection() as conn: |
| cursor = conn.execute( |
| "SELECT 1 FROM emails WHERE message_id = ?", (message_id,) |
| ) |
| return cursor.fetchone() is not None |
|
|
|
|
| def insert_email(email_data: Dict[str, Any]) -> int: |
| """Insert a new email record and return its ID.""" |
| with get_connection() as conn: |
| cursor = conn.execute(""" |
| INSERT INTO emails ( |
| message_id, subject, sender, recipients, date_received, |
| body_text, body_html, processing_status |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) |
| """, ( |
| email_data["message_id"], |
| email_data.get("subject"), |
| email_data.get("sender"), |
| email_data.get("recipients"), |
| email_data.get("date_received"), |
| email_data.get("body_text"), |
| email_data.get("body_html"), |
| "pending" |
| )) |
| return cursor.lastrowid |
|
|
|
|
| def update_email_ai_results(email_id: int, ai_results: Dict[str, Any]): |
| """Update an email record with AI extraction results.""" |
| with get_connection() as conn: |
| conn.execute(""" |
| UPDATE emails SET |
| project_name = ?, |
| document_type = ?, |
| document_reference_number = ?, |
| status = ?, |
| consultant_comments = ?, |
| action_required = ?, |
| assigned_discipline = ?, |
| priority = ?, |
| ai_raw_response = ?, |
| processing_status = 'processed', |
| updated_at = CURRENT_TIMESTAMP |
| WHERE id = ? |
| """, ( |
| ai_results.get("project_name"), |
| ai_results.get("document_type"), |
| ai_results.get("document_reference_number"), |
| ai_results.get("status"), |
| ai_results.get("consultant_comments"), |
| ai_results.get("action_required"), |
| ai_results.get("assigned_discipline"), |
| ai_results.get("priority", "Normal"), |
| json.dumps(ai_results), |
| email_id |
| )) |
|
|
|
|
| def mark_email_failed(email_id: int, error_message: str): |
| """Mark an email as failed processing.""" |
| with get_connection() as conn: |
| conn.execute(""" |
| UPDATE emails SET |
| processing_status = 'failed', |
| processing_error = ?, |
| updated_at = CURRENT_TIMESTAMP |
| WHERE id = ? |
| """, (error_message, email_id)) |
|
|
|
|
| def insert_attachment(email_id: int, attachment_data: Dict[str, Any]) -> int: |
| """Insert an attachment record.""" |
| with get_connection() as conn: |
| cursor = conn.execute(""" |
| INSERT INTO attachments ( |
| email_id, filename, mime_type, file_size, file_path |
| ) VALUES (?, ?, ?, ?, ?) |
| """, ( |
| email_id, |
| attachment_data["filename"], |
| attachment_data.get("mime_type"), |
| attachment_data.get("file_size"), |
| attachment_data.get("file_path") |
| )) |
| return cursor.lastrowid |
|
|
|
|
| def mark_attachment_processed(attachment_id: int, success: bool, error: str = None): |
| """Mark an attachment as processed or failed.""" |
| with get_connection() as conn: |
| conn.execute(""" |
| UPDATE attachments SET |
| processed_by_ai = ?, |
| processing_error = ? |
| WHERE id = ? |
| """, (1 if success else 0, error, attachment_id)) |
|
|
|
|
| def get_all_emails( |
| document_type: Optional[str] = None, |
| status: Optional[str] = None, |
| discipline: Optional[str] = None, |
| search_query: Optional[str] = None, |
| limit: int = 500, |
| offset: int = 0 |
| ) -> List[Dict[str, Any]]: |
| """Get all processed emails with optional filters.""" |
| query = "SELECT * FROM emails WHERE processing_status = 'processed'" |
| params = [] |
|
|
| if document_type: |
| query += " AND document_type = ?" |
| params.append(document_type) |
| if status: |
| query += " AND status = ?" |
| params.append(status) |
| if discipline: |
| query += " AND assigned_discipline = ?" |
| params.append(discipline) |
| if search_query: |
| query += " AND (subject LIKE ? OR document_reference_number LIKE ? OR consultant_comments LIKE ?)" |
| params.extend([f"%{search_query}%"] * 3) |
|
|
| query += " ORDER BY date_received DESC LIMIT ? OFFSET ?" |
| params.extend([limit, offset]) |
|
|
| with get_connection() as conn: |
| rows = conn.execute(query, params).fetchall() |
| return [dict(row) for row in rows] |
|
|
|
|
| def get_email_by_id(email_id: int) -> Optional[Dict[str, Any]]: |
| """Get a single email by ID.""" |
| with get_connection() as conn: |
| row = conn.execute("SELECT * FROM emails WHERE id = ?", (email_id,)).fetchone() |
| return dict(row) if row else None |
|
|
|
|
| def get_attachments_for_email(email_id: int) -> List[Dict[str, Any]]: |
| """Get all attachments for an email.""" |
| with get_connection() as conn: |
| rows = conn.execute( |
| "SELECT * FROM attachments WHERE email_id = ?", (email_id,) |
| ).fetchall() |
| return [dict(row) for row in rows] |
|
|
|
|
| def get_dashboard_stats() -> Dict[str, Any]: |
| """Get statistics for the dashboard.""" |
| with get_connection() as conn: |
| stats = {} |
|
|
| row = conn.execute( |
| "SELECT COUNT(*) as cnt FROM emails WHERE processing_status = 'processed'" |
| ).fetchone() |
| stats["total_processed"] = row["cnt"] |
|
|
| rows = conn.execute(""" |
| SELECT document_type, COUNT(*) as cnt |
| FROM emails WHERE processing_status = 'processed' |
| GROUP BY document_type |
| """).fetchall() |
| stats["by_document_type"] = {r["document_type"]: r["cnt"] for r in rows} |
|
|
| rows = conn.execute(""" |
| SELECT status, COUNT(*) as cnt |
| FROM emails WHERE processing_status = 'processed' |
| GROUP BY status |
| """).fetchall() |
| stats["by_status"] = {r["status"]: r["cnt"] for r in rows} |
|
|
| rows = conn.execute(""" |
| SELECT assigned_discipline, COUNT(*) as cnt |
| FROM emails WHERE processing_status = 'processed' |
| GROUP BY assigned_discipline |
| """).fetchall() |
| stats["by_discipline"] = {r["assigned_discipline"]: r["cnt"] for r in rows} |
|
|
| row = conn.execute( |
| "SELECT COUNT(*) as cnt FROM emails WHERE processing_status = 'pending'" |
| ).fetchone() |
| stats["pending"] = row["cnt"] |
|
|
| row = conn.execute( |
| "SELECT COUNT(*) as cnt FROM emails WHERE processing_status = 'failed'" |
| ).fetchone() |
| stats["failed"] = row["cnt"] |
|
|
| rows = conn.execute(""" |
| SELECT DATE(date_received) as day, COUNT(*) as cnt |
| FROM emails WHERE processing_status = 'processed' |
| AND date_received >= DATE('now', '-7 days') |
| GROUP BY DATE(date_received) |
| ORDER BY day |
| """).fetchall() |
| stats["recent_activity"] = {r["day"]: r["cnt"] for r in rows} |
|
|
| return stats |
|
|
|
|
| def get_all_emails_for_export( |
| document_type: Optional[str] = None, |
| status: Optional[str] = None, |
| discipline: Optional[str] = None, |
| ) -> List[Dict[str, Any]]: |
| """Get all emails for CSV/Excel export (no limit).""" |
| query = """ |
| SELECT |
| date_received, subject, sender, project_name, document_type, |
| document_reference_number, status, assigned_discipline, |
| consultant_comments, action_required, priority |
| FROM emails |
| WHERE processing_status = 'processed' |
| """ |
| params = [] |
|
|
| if document_type: |
| query += " AND document_type = ?" |
| params.append(document_type) |
| if status: |
| query += " AND status = ?" |
| params.append(status) |
| if discipline: |
| query += " AND assigned_discipline = ?" |
| params.append(discipline) |
|
|
| query += " ORDER BY date_received DESC" |
|
|
| with get_connection() as conn: |
| rows = conn.execute(query, params).fetchall() |
| return [dict(row) for row in rows] |
|
|