crcc-construction-hub-docker / modules /gemini_processor.py
Ultronprime's picture
Add Gemini processor with gemini-3-flash-preview
8cce3d6 verified
"""
Gemini AI Processor module for the Construction Intelligence Hub.
Handles multimodal document analysis using Google Gemini API.
MANDATORY MODEL: gemini-3-flash-preview
"""
import io
import os
import json
import time
import logging
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
_client = None
class DocumentAnalysis(BaseModel):
"""Schema for Gemini's structured output."""
project_name: Optional[str] = Field(None, description="Project name, usually 'DSC Hotel' or 'Dubai Studio City'")
document_type: Optional[str] = Field(None, description="One of: RFI, WIR, MIR, NCR, Shop Drawing, IFC, Payment/IPA, General")
document_reference_number: Optional[str] = Field(None, description="Document reference number e.g. '621-DXB24-RFI-CVL-CRCC-MIM-0036'")
status: Optional[str] = Field(None, description="One of: Approved, Approved with Comments, Rejected, Resubmit, Pending/Info")
consultant_comments: Optional[str] = Field(None, description="Summary of MIMAR/Consultant comments, drawing changes, or discrepancies")
action_required: Optional[str] = Field(None, description="What the contractor needs to do next")
assigned_discipline: Optional[str] = Field(None, description="One of: Civil, MEP, Structural, Architectural")
priority: Optional[str] = Field("Normal", description="Priority level: High, Normal, Low")
key_dates: Optional[str] = Field(None, description="Any deadline or date mentioned")
summary: Optional[str] = Field(None, description="Brief 2-3 sentence summary of the entire communication")
SYSTEM_PROMPT = """You are a Senior Civil Engineer and Project Manager working on the DSC Hotel project
(G+8 Hotel) in Dubai Studio City for China Railway 18th Bureau Group (CRCC). The Consultant on this
project is MIMAR.
Your task is to analyze incoming project emails and their attachments, then extract structured data.
RULES:
1. Be precise with document reference numbers - copy them exactly as written.
2. For document_type, use ONLY one of: RFI, WIR, MIR, NCR, Shop Drawing, IFC, Payment/IPA, General
3. For status, use ONLY one of: Approved, Approved with Comments, Rejected, Resubmit, Pending/Info
4. For assigned_discipline, use ONLY one of: Civil, MEP, Structural, Architectural
5. For priority, assess based on urgency words and deadlines: High, Normal, Low
6. consultant_comments should be a concise summary (max 200 words) of the key technical points
7. action_required should be specific and actionable
8. If information is not available in the email/attachments, use null
CONTEXT:
- Project: DSC Hotel (Dubai Studio City), G+8 Hotel
- Contractor: CRCC (China Railway 18th Bureau Group)
- Consultant: MIMAR
- Common document types: RFIs, WIRs (Work Inspection Reports), NCRs (Non-Conformance Reports),
Shop Drawings, IFC (Issued for Construction), Payment Applications (IPA)
"""
# MANDATORY: gemini-3-flash-preview
GEMINI_MODEL = "gemini-3-flash-preview"
def get_gemini_client():
"""Get or create the Gemini client."""
global _client
if _client is None:
from google import genai
api_key = os.getenv("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY not set in environment.")
_client = genai.Client(api_key=api_key)
return _client
def process_email_with_gemini(
email_body: str,
subject: str = "",
sender: str = "",
attachments: List[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Process an email and its attachments using Gemini gemini-3-flash-preview.
"""
from google import genai
from google.genai import types
client = get_gemini_client()
model = GEMINI_MODEL
max_size_mb = int(os.getenv("MAX_ATTACHMENT_SIZE_MB", "50"))
content_parts = []
email_context = f"""
--- EMAIL METADATA ---
Subject: {subject}
From: {sender}
--- EMAIL BODY ---
{email_body[:10000]}
--- END OF EMAIL ---
Please analyze this email and any attached documents below, then extract the structured project data.
"""
content_parts.append(types.Part.from_text(text=email_context))
processed_attachments = []
if attachments:
for att in attachments:
if not att.get("file_data") or not att.get("can_process", False):
continue
file_data = att["file_data"]
file_size_mb = len(file_data) / (1024 * 1024)
mime_type = att.get("mime_type", "application/octet-stream")
filename = att.get("filename", "unknown")
if file_size_mb > max_size_mb:
logger.warning(f"Skipping {filename} ({file_size_mb:.1f}MB) - exceeds {max_size_mb}MB limit")
processed_attachments.append({
"filename": filename,
"status": "skipped",
"reason": f"File too large ({file_size_mb:.1f}MB)"
})
continue
try:
if file_size_mb < 20:
part = types.Part.from_bytes(data=file_data, mime_type=mime_type)
content_parts.append(types.Part.from_text(text=f"\n--- ATTACHMENT: {filename} ---"))
content_parts.append(part)
else:
uploaded_file = client.files.upload(
file=io.BytesIO(file_data),
config=types.UploadFileConfig(
mime_type=mime_type,
display_name=filename
)
)
_wait_for_file_active(client, uploaded_file)
part = types.Part.from_uri(
file_uri=uploaded_file.uri,
mime_type=uploaded_file.mime_type
)
content_parts.append(types.Part.from_text(text=f"\n--- ATTACHMENT: {filename} ---"))
content_parts.append(part)
processed_attachments.append({"filename": filename, "status": "processed"})
logger.info(f"Added attachment to Gemini request: {filename}")
except Exception as e:
logger.error(f"Error processing attachment {filename}: {e}")
processed_attachments.append({"filename": filename, "status": "error", "reason": str(e)})
# Call Gemini
try:
response = client.models.generate_content(
model=model,
contents=content_parts,
config=types.GenerateContentConfig(
system_instruction=SYSTEM_PROMPT,
response_mime_type="application/json",
response_schema=DocumentAnalysis,
temperature=0.1,
)
)
if response.parsed:
result = response.parsed.model_dump()
else:
result = json.loads(response.text)
result["_processing_metadata"] = {
"model_used": model,
"attachments_processed": processed_attachments,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
logger.info(f"Successfully processed email: {subject}")
return result
except Exception as e:
logger.error(f"Gemini API error: {e}")
if attachments:
logger.info("Retrying with email body only (no attachments)...")
return _process_body_only(client, model, email_context)
raise
def _process_body_only(client, model: str, email_context: str) -> Dict[str, Any]:
"""Fallback: Process just the email body without attachments."""
from google.genai import types
try:
response = client.models.generate_content(
model=model,
contents=[types.Part.from_text(text=email_context)],
config=types.GenerateContentConfig(
system_instruction=SYSTEM_PROMPT,
response_mime_type="application/json",
response_schema=DocumentAnalysis,
temperature=0.1,
)
)
if response.parsed:
result = response.parsed.model_dump()
else:
result = json.loads(response.text)
result["_processing_metadata"] = {
"model_used": model,
"fallback": True,
"reason": "Attachment processing failed, body-only analysis",
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
return result
except Exception as e:
logger.error(f"Fallback processing also failed: {e}")
return {
"project_name": "DSC Hotel",
"document_type": "General",
"document_reference_number": None,
"status": "Pending/Info",
"consultant_comments": f"AI processing failed: {str(e)}",
"action_required": "Manual review required",
"assigned_discipline": None,
"priority": "Normal",
"key_dates": None,
"summary": "Automated processing failed. Please review email manually.",
"_processing_metadata": {"error": str(e), "fallback": True}
}
def _wait_for_file_active(client, file_obj, timeout: int = 120):
"""Wait for a Gemini file upload to become active."""
from google.genai import types
start = time.time()
while file_obj.state == types.FileState.PROCESSING:
if time.time() - start > timeout:
raise TimeoutError(f"File processing timed out after {timeout}s")
time.sleep(3)
file_obj = client.files.get(name=file_obj.name)
if file_obj.state == types.FileState.FAILED:
raise RuntimeError(f"File processing failed: {file_obj.name}")
def test_gemini_connection() -> tuple:
"""Test the Gemini API connection."""
try:
from google import genai
from google.genai import types
client = get_gemini_client()
model = GEMINI_MODEL
response = client.models.generate_content(
model=model,
contents=[types.Part.from_text(text="Reply with exactly: CONNECTION_OK")],
config=types.GenerateContentConfig(temperature=0.0)
)
if "CONNECTION_OK" in response.text:
return True, f"Gemini API connected (model: {model})"
else:
return True, f"Gemini API responded (model: {model}): {response.text[:100]}"
except Exception as e:
return False, f"Gemini API error: {str(e)}"