File size: 11,762 Bytes
c891504
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
"""
Gemini AI Processor module for the Construction Intelligence Hub.
Handles multimodal document analysis using Google Gemini API.
Uses gemini-3-flash-preview model for best multimodal performance.
"""

import io
import os
import json
import time
import logging
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field

logger = logging.getLogger(__name__)

# Lazy import - will be imported when needed
_client = None


class DocumentAnalysis(BaseModel):
    """Schema for Gemini's structured output."""
    project_name: Optional[str] = Field(None, description="Project name, usually 'DSC Hotel' or 'Dubai Studio City'")
    document_type: Optional[str] = Field(None, description="One of: RFI, WIR, MIR, NCR, Shop Drawing, IFC, Payment/IPA, General")
    document_reference_number: Optional[str] = Field(None, description="Document reference number e.g. '621-DXB24-RFI-CVL-CRCC-MIM-0036'")
    status: Optional[str] = Field(None, description="One of: Approved, Approved with Comments, Rejected, Resubmit, Pending/Info")
    consultant_comments: Optional[str] = Field(None, description="Summary of MIMAR/Consultant comments, drawing changes, or discrepancies")
    action_required: Optional[str] = Field(None, description="What the contractor needs to do next")
    assigned_discipline: Optional[str] = Field(None, description="One of: Civil, MEP, Structural, Architectural")
    priority: Optional[str] = Field("Normal", description="Priority level: High, Normal, Low")
    key_dates: Optional[str] = Field(None, description="Any deadline or date mentioned")
    summary: Optional[str] = Field(None, description="Brief 2-3 sentence summary of the entire communication")


# The system prompt for Gemini
SYSTEM_PROMPT = """You are a Senior Civil Engineer and Project Manager working on the DSC Hotel project 
(G+8 Hotel) in Dubai Studio City for China Railway 18th Bureau Group (CRCC). The Consultant on this 
project is MIMAR.

Your task is to analyze incoming project emails and their attachments, then extract structured data.

RULES:
1. Be precise with document reference numbers - copy them exactly as written.
2. For document_type, use ONLY one of: RFI, WIR, MIR, NCR, Shop Drawing, IFC, Payment/IPA, General
3. For status, use ONLY one of: Approved, Approved with Comments, Rejected, Resubmit, Pending/Info
4. For assigned_discipline, use ONLY one of: Civil, MEP, Structural, Architectural
5. For priority, assess based on urgency words and deadlines: High, Normal, Low
6. consultant_comments should be a concise summary (max 200 words) of the key technical points
7. action_required should be specific and actionable
8. If information is not available in the email/attachments, use null

CONTEXT:
- Project: DSC Hotel (Dubai Studio City), G+8 Hotel
- Contractor: CRCC (China Railway 18th Bureau Group)
- Consultant: MIMAR
- Common document types: RFIs, WIRs (Work Inspection Reports), NCRs (Non-Conformance Reports), 
  Shop Drawings, IFC (Issued for Construction), Payment Applications (IPA)
"""

# MANDATORY: Use gemini-3-flash-preview model
GEMINI_MODEL = "gemini-3-flash-preview"


def get_gemini_client():
    """Get or create the Gemini client."""
    global _client
    if _client is None:
        from google import genai
        api_key = os.getenv("GOOGLE_API_KEY")
        if not api_key:
            raise ValueError("GOOGLE_API_KEY not set. Please add it in Space Settings → Variables & Secrets.")
        _client = genai.Client(api_key=api_key)
    return _client


def process_email_with_gemini(
    email_body: str,
    subject: str = "",
    sender: str = "",
    attachments: List[Dict[str, Any]] = None,
) -> Dict[str, Any]:
    """
    Process an email and its attachments using Gemini.

    Args:
        email_body: The email body text
        subject: Email subject line
        sender: Email sender
        attachments: List of attachment dicts with 'file_data', 'mime_type', 'filename'

    Returns:
        Dict with extracted document analysis fields
    """
    from google import genai
    from google.genai import types

    client = get_gemini_client()
    model = GEMINI_MODEL
    max_size_mb = int(os.getenv("MAX_ATTACHMENT_SIZE_MB", "50"))

    # Build the content parts
    content_parts = []

    # Add the email context as text
    email_context = f"""
--- EMAIL METADATA ---
Subject: {subject}
From: {sender}
--- EMAIL BODY ---
{email_body[:10000]}
--- END OF EMAIL ---

Please analyze this email and any attached documents below, then extract the structured project data.
"""
    content_parts.append(types.Part.from_text(text=email_context))

    # Process attachments
    processed_attachments = []
    if attachments:
        for att in attachments:
            if not att.get("file_data") or not att.get("can_process", False):
                continue

            file_data = att["file_data"]
            file_size_mb = len(file_data) / (1024 * 1024)
            mime_type = att.get("mime_type", "application/octet-stream")
            filename = att.get("filename", "unknown")

            # Skip files that are too large
            if file_size_mb > max_size_mb:
                logger.warning(
                    f"Skipping {filename} ({file_size_mb:.1f}MB) - exceeds {max_size_mb}MB limit"
                )
                processed_attachments.append({
                    "filename": filename,
                    "status": "skipped",
                    "reason": f"File too large ({file_size_mb:.1f}MB)"
                })
                continue

            try:
                # For files under 20MB, use inline bytes
                if file_size_mb < 20:
                    part = types.Part.from_bytes(data=file_data, mime_type=mime_type)
                    content_parts.append(types.Part.from_text(
                        text=f"\n--- ATTACHMENT: {filename} ---"
                    ))
                    content_parts.append(part)
                else:
                    # For larger files, use the File API
                    uploaded_file = client.files.upload(
                        file=io.BytesIO(file_data),
                        config=types.UploadFileConfig(
                            mime_type=mime_type,
                            display_name=filename
                        )
                    )
                    # Wait for processing
                    _wait_for_file_active(client, uploaded_file)
                    part = types.Part.from_uri(
                        file_uri=uploaded_file.uri,
                        mime_type=uploaded_file.mime_type
                    )
                    content_parts.append(types.Part.from_text(
                        text=f"\n--- ATTACHMENT: {filename} ---"
                    ))
                    content_parts.append(part)

                processed_attachments.append({
                    "filename": filename,
                    "status": "processed",
                })
                logger.info(f"Added attachment to Gemini request: {filename}")

            except Exception as e:
                logger.error(f"Error processing attachment {filename}: {e}")
                processed_attachments.append({
                    "filename": filename,
                    "status": "error",
                    "reason": str(e)
                })

    # Call Gemini API
    try:
        response = client.models.generate_content(
            model=model,
            contents=content_parts,
            config=types.GenerateContentConfig(
                system_instruction=SYSTEM_PROMPT,
                response_mime_type="application/json",
                response_schema=DocumentAnalysis,
                temperature=0.1,  # Low temperature for extraction accuracy
            )
        )

        # Parse the response
        if response.parsed:
            result = response.parsed.model_dump()
        else:
            # Fallback to parsing raw text
            result = json.loads(response.text)

        result["_processing_metadata"] = {
            "model_used": model,
            "attachments_processed": processed_attachments,
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
        }

        logger.info(f"Successfully processed email: {subject}")
        return result

    except Exception as e:
        logger.error(f"Gemini API error: {e}")
        # Attempt fallback: process just the email body without attachments
        if attachments:
            logger.info("Retrying with email body only (no attachments)...")
            return _process_body_only(client, model, email_context)
        raise


def _process_body_only(client, model: str, email_context: str) -> Dict[str, Any]:
    """Fallback: Process just the email body without attachments."""
    from google.genai import types

    try:
        response = client.models.generate_content(
            model=model,
            contents=[types.Part.from_text(text=email_context)],
            config=types.GenerateContentConfig(
                system_instruction=SYSTEM_PROMPT,
                response_mime_type="application/json",
                response_schema=DocumentAnalysis,
                temperature=0.1,
            )
        )

        if response.parsed:
            result = response.parsed.model_dump()
        else:
            result = json.loads(response.text)

        result["_processing_metadata"] = {
            "model_used": model,
            "fallback": True,
            "reason": "Attachment processing failed, body-only analysis",
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
        }
        return result

    except Exception as e:
        logger.error(f"Fallback processing also failed: {e}")
        # Return minimal result
        return {
            "project_name": "DSC Hotel",
            "document_type": "General",
            "document_reference_number": None,
            "status": "Pending/Info",
            "consultant_comments": f"AI processing failed: {str(e)}",
            "action_required": "Manual review required",
            "assigned_discipline": None,
            "priority": "Normal",
            "key_dates": None,
            "summary": "Automated processing failed. Please review email manually.",
            "_processing_metadata": {
                "error": str(e),
                "fallback": True,
            }
        }


def _wait_for_file_active(client, file_obj, timeout: int = 120):
    """Wait for a Gemini file upload to become active."""
    from google.genai import types

    start = time.time()
    while file_obj.state == types.FileState.PROCESSING:
        if time.time() - start > timeout:
            raise TimeoutError(f"File processing timed out after {timeout}s")
        time.sleep(3)
        file_obj = client.files.get(name=file_obj.name)

    if file_obj.state == types.FileState.FAILED:
        raise RuntimeError(f"File processing failed: {file_obj.name}")


def test_gemini_connection() -> tuple:
    """Test the Gemini API connection."""
    try:
        from google import genai
        from google.genai import types

        client = get_gemini_client()
        model = GEMINI_MODEL

        response = client.models.generate_content(
            model=model,
            contents=[types.Part.from_text(text="Reply with exactly: CONNECTION_OK")],
            config=types.GenerateContentConfig(temperature=0.0)
        )

        if "CONNECTION_OK" in response.text:
            return True, f"Gemini API connected successfully (model: {model})"
        else:
            return True, f"Gemini API responded (model: {model}): {response.text[:100]}"

    except Exception as e:
        return False, f"Gemini API error: {str(e)}"