File size: 10,513 Bytes
8cce3d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
"""
Gemini AI Processor module for the Construction Intelligence Hub.
Handles multimodal document analysis using Google Gemini API.
MANDATORY MODEL: gemini-3-flash-preview
"""

import io
import os
import json
import time
import logging
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field

logger = logging.getLogger(__name__)

_client = None


class DocumentAnalysis(BaseModel):
    """Schema for Gemini's structured output."""
    project_name: Optional[str] = Field(None, description="Project name, usually 'DSC Hotel' or 'Dubai Studio City'")
    document_type: Optional[str] = Field(None, description="One of: RFI, WIR, MIR, NCR, Shop Drawing, IFC, Payment/IPA, General")
    document_reference_number: Optional[str] = Field(None, description="Document reference number e.g. '621-DXB24-RFI-CVL-CRCC-MIM-0036'")
    status: Optional[str] = Field(None, description="One of: Approved, Approved with Comments, Rejected, Resubmit, Pending/Info")
    consultant_comments: Optional[str] = Field(None, description="Summary of MIMAR/Consultant comments, drawing changes, or discrepancies")
    action_required: Optional[str] = Field(None, description="What the contractor needs to do next")
    assigned_discipline: Optional[str] = Field(None, description="One of: Civil, MEP, Structural, Architectural")
    priority: Optional[str] = Field("Normal", description="Priority level: High, Normal, Low")
    key_dates: Optional[str] = Field(None, description="Any deadline or date mentioned")
    summary: Optional[str] = Field(None, description="Brief 2-3 sentence summary of the entire communication")


SYSTEM_PROMPT = """You are a Senior Civil Engineer and Project Manager working on the DSC Hotel project 
(G+8 Hotel) in Dubai Studio City for China Railway 18th Bureau Group (CRCC). The Consultant on this 
project is MIMAR.

Your task is to analyze incoming project emails and their attachments, then extract structured data.

RULES:
1. Be precise with document reference numbers - copy them exactly as written.
2. For document_type, use ONLY one of: RFI, WIR, MIR, NCR, Shop Drawing, IFC, Payment/IPA, General
3. For status, use ONLY one of: Approved, Approved with Comments, Rejected, Resubmit, Pending/Info
4. For assigned_discipline, use ONLY one of: Civil, MEP, Structural, Architectural
5. For priority, assess based on urgency words and deadlines: High, Normal, Low
6. consultant_comments should be a concise summary (max 200 words) of the key technical points
7. action_required should be specific and actionable
8. If information is not available in the email/attachments, use null

CONTEXT:
- Project: DSC Hotel (Dubai Studio City), G+8 Hotel
- Contractor: CRCC (China Railway 18th Bureau Group)
- Consultant: MIMAR
- Common document types: RFIs, WIRs (Work Inspection Reports), NCRs (Non-Conformance Reports), 
  Shop Drawings, IFC (Issued for Construction), Payment Applications (IPA)
"""

# MANDATORY: gemini-3-flash-preview
GEMINI_MODEL = "gemini-3-flash-preview"


def get_gemini_client():
    """Get or create the Gemini client."""
    global _client
    if _client is None:
        from google import genai
        api_key = os.getenv("GOOGLE_API_KEY")
        if not api_key:
            raise ValueError("GOOGLE_API_KEY not set in environment.")
        _client = genai.Client(api_key=api_key)
    return _client


def process_email_with_gemini(
    email_body: str,
    subject: str = "",
    sender: str = "",
    attachments: List[Dict[str, Any]] = None,
) -> Dict[str, Any]:
    """
    Process an email and its attachments using Gemini gemini-3-flash-preview.
    """
    from google import genai
    from google.genai import types

    client = get_gemini_client()
    model = GEMINI_MODEL
    max_size_mb = int(os.getenv("MAX_ATTACHMENT_SIZE_MB", "50"))

    content_parts = []

    email_context = f"""
--- EMAIL METADATA ---
Subject: {subject}
From: {sender}
--- EMAIL BODY ---
{email_body[:10000]}
--- END OF EMAIL ---

Please analyze this email and any attached documents below, then extract the structured project data.
"""
    content_parts.append(types.Part.from_text(text=email_context))

    processed_attachments = []
    if attachments:
        for att in attachments:
            if not att.get("file_data") or not att.get("can_process", False):
                continue

            file_data = att["file_data"]
            file_size_mb = len(file_data) / (1024 * 1024)
            mime_type = att.get("mime_type", "application/octet-stream")
            filename = att.get("filename", "unknown")

            if file_size_mb > max_size_mb:
                logger.warning(f"Skipping {filename} ({file_size_mb:.1f}MB) - exceeds {max_size_mb}MB limit")
                processed_attachments.append({
                    "filename": filename,
                    "status": "skipped",
                    "reason": f"File too large ({file_size_mb:.1f}MB)"
                })
                continue

            try:
                if file_size_mb < 20:
                    part = types.Part.from_bytes(data=file_data, mime_type=mime_type)
                    content_parts.append(types.Part.from_text(text=f"\n--- ATTACHMENT: {filename} ---"))
                    content_parts.append(part)
                else:
                    uploaded_file = client.files.upload(
                        file=io.BytesIO(file_data),
                        config=types.UploadFileConfig(
                            mime_type=mime_type,
                            display_name=filename
                        )
                    )
                    _wait_for_file_active(client, uploaded_file)
                    part = types.Part.from_uri(
                        file_uri=uploaded_file.uri,
                        mime_type=uploaded_file.mime_type
                    )
                    content_parts.append(types.Part.from_text(text=f"\n--- ATTACHMENT: {filename} ---"))
                    content_parts.append(part)

                processed_attachments.append({"filename": filename, "status": "processed"})
                logger.info(f"Added attachment to Gemini request: {filename}")

            except Exception as e:
                logger.error(f"Error processing attachment {filename}: {e}")
                processed_attachments.append({"filename": filename, "status": "error", "reason": str(e)})

    # Call Gemini
    try:
        response = client.models.generate_content(
            model=model,
            contents=content_parts,
            config=types.GenerateContentConfig(
                system_instruction=SYSTEM_PROMPT,
                response_mime_type="application/json",
                response_schema=DocumentAnalysis,
                temperature=0.1,
            )
        )

        if response.parsed:
            result = response.parsed.model_dump()
        else:
            result = json.loads(response.text)

        result["_processing_metadata"] = {
            "model_used": model,
            "attachments_processed": processed_attachments,
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
        }

        logger.info(f"Successfully processed email: {subject}")
        return result

    except Exception as e:
        logger.error(f"Gemini API error: {e}")
        if attachments:
            logger.info("Retrying with email body only (no attachments)...")
            return _process_body_only(client, model, email_context)
        raise


def _process_body_only(client, model: str, email_context: str) -> Dict[str, Any]:
    """Fallback: Process just the email body without attachments."""
    from google.genai import types

    try:
        response = client.models.generate_content(
            model=model,
            contents=[types.Part.from_text(text=email_context)],
            config=types.GenerateContentConfig(
                system_instruction=SYSTEM_PROMPT,
                response_mime_type="application/json",
                response_schema=DocumentAnalysis,
                temperature=0.1,
            )
        )

        if response.parsed:
            result = response.parsed.model_dump()
        else:
            result = json.loads(response.text)

        result["_processing_metadata"] = {
            "model_used": model,
            "fallback": True,
            "reason": "Attachment processing failed, body-only analysis",
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
        }
        return result

    except Exception as e:
        logger.error(f"Fallback processing also failed: {e}")
        return {
            "project_name": "DSC Hotel",
            "document_type": "General",
            "document_reference_number": None,
            "status": "Pending/Info",
            "consultant_comments": f"AI processing failed: {str(e)}",
            "action_required": "Manual review required",
            "assigned_discipline": None,
            "priority": "Normal",
            "key_dates": None,
            "summary": "Automated processing failed. Please review email manually.",
            "_processing_metadata": {"error": str(e), "fallback": True}
        }


def _wait_for_file_active(client, file_obj, timeout: int = 120):
    """Wait for a Gemini file upload to become active."""
    from google.genai import types

    start = time.time()
    while file_obj.state == types.FileState.PROCESSING:
        if time.time() - start > timeout:
            raise TimeoutError(f"File processing timed out after {timeout}s")
        time.sleep(3)
        file_obj = client.files.get(name=file_obj.name)

    if file_obj.state == types.FileState.FAILED:
        raise RuntimeError(f"File processing failed: {file_obj.name}")


def test_gemini_connection() -> tuple:
    """Test the Gemini API connection."""
    try:
        from google import genai
        from google.genai import types

        client = get_gemini_client()
        model = GEMINI_MODEL

        response = client.models.generate_content(
            model=model,
            contents=[types.Part.from_text(text="Reply with exactly: CONNECTION_OK")],
            config=types.GenerateContentConfig(temperature=0.0)
        )

        if "CONNECTION_OK" in response.text:
            return True, f"Gemini API connected (model: {model})"
        else:
            return True, f"Gemini API responded (model: {model}): {response.text[:100]}"

    except Exception as e:
        return False, f"Gemini API error: {str(e)}"