| """ |
| Gemini AI Processor module for the Construction Intelligence Hub. |
| Handles multimodal document analysis using Google Gemini API. |
| MANDATORY MODEL: gemini-3-flash-preview |
| """ |
|
|
| import io |
| import os |
| import json |
| import time |
| import logging |
| from typing import Dict, Any, List, Optional |
| from pydantic import BaseModel, Field |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _client = None |
|
|
|
|
| class DocumentAnalysis(BaseModel): |
| """Schema for Gemini's structured output.""" |
| project_name: Optional[str] = Field(None, description="Project name, usually 'DSC Hotel' or 'Dubai Studio City'") |
| document_type: Optional[str] = Field(None, description="One of: RFI, WIR, MIR, NCR, Shop Drawing, IFC, Payment/IPA, General") |
| document_reference_number: Optional[str] = Field(None, description="Document reference number e.g. '621-DXB24-RFI-CVL-CRCC-MIM-0036'") |
| status: Optional[str] = Field(None, description="One of: Approved, Approved with Comments, Rejected, Resubmit, Pending/Info") |
| consultant_comments: Optional[str] = Field(None, description="Summary of MIMAR/Consultant comments, drawing changes, or discrepancies") |
| action_required: Optional[str] = Field(None, description="What the contractor needs to do next") |
| assigned_discipline: Optional[str] = Field(None, description="One of: Civil, MEP, Structural, Architectural") |
| priority: Optional[str] = Field("Normal", description="Priority level: High, Normal, Low") |
| key_dates: Optional[str] = Field(None, description="Any deadline or date mentioned") |
| summary: Optional[str] = Field(None, description="Brief 2-3 sentence summary of the entire communication") |
|
|
|
|
| SYSTEM_PROMPT = """You are a Senior Civil Engineer and Project Manager working on the DSC Hotel project |
| (G+8 Hotel) in Dubai Studio City for China Railway 18th Bureau Group (CRCC). The Consultant on this |
| project is MIMAR. |
| |
| Your task is to analyze incoming project emails and their attachments, then extract structured data. |
| |
| RULES: |
| 1. Be precise with document reference numbers - copy them exactly as written. |
| 2. For document_type, use ONLY one of: RFI, WIR, MIR, NCR, Shop Drawing, IFC, Payment/IPA, General |
| 3. For status, use ONLY one of: Approved, Approved with Comments, Rejected, Resubmit, Pending/Info |
| 4. For assigned_discipline, use ONLY one of: Civil, MEP, Structural, Architectural |
| 5. For priority, assess based on urgency words and deadlines: High, Normal, Low |
| 6. consultant_comments should be a concise summary (max 200 words) of the key technical points |
| 7. action_required should be specific and actionable |
| 8. If information is not available in the email/attachments, use null |
| |
| CONTEXT: |
| - Project: DSC Hotel (Dubai Studio City), G+8 Hotel |
| - Contractor: CRCC (China Railway 18th Bureau Group) |
| - Consultant: MIMAR |
| - Common document types: RFIs, WIRs (Work Inspection Reports), NCRs (Non-Conformance Reports), |
| Shop Drawings, IFC (Issued for Construction), Payment Applications (IPA) |
| """ |
|
|
| |
| GEMINI_MODEL = "gemini-3-flash-preview" |
|
|
|
|
| def get_gemini_client(): |
| """Get or create the Gemini client.""" |
| global _client |
| if _client is None: |
| from google import genai |
| api_key = os.getenv("GOOGLE_API_KEY") |
| if not api_key: |
| raise ValueError("GOOGLE_API_KEY not set in environment.") |
| _client = genai.Client(api_key=api_key) |
| return _client |
|
|
|
|
| def process_email_with_gemini( |
| email_body: str, |
| subject: str = "", |
| sender: str = "", |
| attachments: List[Dict[str, Any]] = None, |
| ) -> Dict[str, Any]: |
| """ |
| Process an email and its attachments using Gemini gemini-3-flash-preview. |
| """ |
| from google import genai |
| from google.genai import types |
|
|
| client = get_gemini_client() |
| model = GEMINI_MODEL |
| max_size_mb = int(os.getenv("MAX_ATTACHMENT_SIZE_MB", "50")) |
|
|
| content_parts = [] |
|
|
| email_context = f""" |
| --- EMAIL METADATA --- |
| Subject: {subject} |
| From: {sender} |
| --- EMAIL BODY --- |
| {email_body[:10000]} |
| --- END OF EMAIL --- |
| |
| Please analyze this email and any attached documents below, then extract the structured project data. |
| """ |
| content_parts.append(types.Part.from_text(text=email_context)) |
|
|
| processed_attachments = [] |
| if attachments: |
| for att in attachments: |
| if not att.get("file_data") or not att.get("can_process", False): |
| continue |
|
|
| file_data = att["file_data"] |
| file_size_mb = len(file_data) / (1024 * 1024) |
| mime_type = att.get("mime_type", "application/octet-stream") |
| filename = att.get("filename", "unknown") |
|
|
| if file_size_mb > max_size_mb: |
| logger.warning(f"Skipping {filename} ({file_size_mb:.1f}MB) - exceeds {max_size_mb}MB limit") |
| processed_attachments.append({ |
| "filename": filename, |
| "status": "skipped", |
| "reason": f"File too large ({file_size_mb:.1f}MB)" |
| }) |
| continue |
|
|
| try: |
| if file_size_mb < 20: |
| part = types.Part.from_bytes(data=file_data, mime_type=mime_type) |
| content_parts.append(types.Part.from_text(text=f"\n--- ATTACHMENT: {filename} ---")) |
| content_parts.append(part) |
| else: |
| uploaded_file = client.files.upload( |
| file=io.BytesIO(file_data), |
| config=types.UploadFileConfig( |
| mime_type=mime_type, |
| display_name=filename |
| ) |
| ) |
| _wait_for_file_active(client, uploaded_file) |
| part = types.Part.from_uri( |
| file_uri=uploaded_file.uri, |
| mime_type=uploaded_file.mime_type |
| ) |
| content_parts.append(types.Part.from_text(text=f"\n--- ATTACHMENT: {filename} ---")) |
| content_parts.append(part) |
|
|
| processed_attachments.append({"filename": filename, "status": "processed"}) |
| logger.info(f"Added attachment to Gemini request: {filename}") |
|
|
| except Exception as e: |
| logger.error(f"Error processing attachment {filename}: {e}") |
| processed_attachments.append({"filename": filename, "status": "error", "reason": str(e)}) |
|
|
| |
| try: |
| response = client.models.generate_content( |
| model=model, |
| contents=content_parts, |
| config=types.GenerateContentConfig( |
| system_instruction=SYSTEM_PROMPT, |
| response_mime_type="application/json", |
| response_schema=DocumentAnalysis, |
| temperature=0.1, |
| ) |
| ) |
|
|
| if response.parsed: |
| result = response.parsed.model_dump() |
| else: |
| result = json.loads(response.text) |
|
|
| result["_processing_metadata"] = { |
| "model_used": model, |
| "attachments_processed": processed_attachments, |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), |
| } |
|
|
| logger.info(f"Successfully processed email: {subject}") |
| return result |
|
|
| except Exception as e: |
| logger.error(f"Gemini API error: {e}") |
| if attachments: |
| logger.info("Retrying with email body only (no attachments)...") |
| return _process_body_only(client, model, email_context) |
| raise |
|
|
|
|
| def _process_body_only(client, model: str, email_context: str) -> Dict[str, Any]: |
| """Fallback: Process just the email body without attachments.""" |
| from google.genai import types |
|
|
| try: |
| response = client.models.generate_content( |
| model=model, |
| contents=[types.Part.from_text(text=email_context)], |
| config=types.GenerateContentConfig( |
| system_instruction=SYSTEM_PROMPT, |
| response_mime_type="application/json", |
| response_schema=DocumentAnalysis, |
| temperature=0.1, |
| ) |
| ) |
|
|
| if response.parsed: |
| result = response.parsed.model_dump() |
| else: |
| result = json.loads(response.text) |
|
|
| result["_processing_metadata"] = { |
| "model_used": model, |
| "fallback": True, |
| "reason": "Attachment processing failed, body-only analysis", |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), |
| } |
| return result |
|
|
| except Exception as e: |
| logger.error(f"Fallback processing also failed: {e}") |
| return { |
| "project_name": "DSC Hotel", |
| "document_type": "General", |
| "document_reference_number": None, |
| "status": "Pending/Info", |
| "consultant_comments": f"AI processing failed: {str(e)}", |
| "action_required": "Manual review required", |
| "assigned_discipline": None, |
| "priority": "Normal", |
| "key_dates": None, |
| "summary": "Automated processing failed. Please review email manually.", |
| "_processing_metadata": {"error": str(e), "fallback": True} |
| } |
|
|
|
|
| def _wait_for_file_active(client, file_obj, timeout: int = 120): |
| """Wait for a Gemini file upload to become active.""" |
| from google.genai import types |
|
|
| start = time.time() |
| while file_obj.state == types.FileState.PROCESSING: |
| if time.time() - start > timeout: |
| raise TimeoutError(f"File processing timed out after {timeout}s") |
| time.sleep(3) |
| file_obj = client.files.get(name=file_obj.name) |
|
|
| if file_obj.state == types.FileState.FAILED: |
| raise RuntimeError(f"File processing failed: {file_obj.name}") |
|
|
|
|
| def test_gemini_connection() -> tuple: |
| """Test the Gemini API connection.""" |
| try: |
| from google import genai |
| from google.genai import types |
|
|
| client = get_gemini_client() |
| model = GEMINI_MODEL |
|
|
| response = client.models.generate_content( |
| model=model, |
| contents=[types.Part.from_text(text="Reply with exactly: CONNECTION_OK")], |
| config=types.GenerateContentConfig(temperature=0.0) |
| ) |
|
|
| if "CONNECTION_OK" in response.text: |
| return True, f"Gemini API connected (model: {model})" |
| else: |
| return True, f"Gemini API responded (model: {model}): {response.text[:100]}" |
|
|
| except Exception as e: |
| return False, f"Gemini API error: {str(e)}" |
|
|