PetroMind_AI / backend /rlm_extraction.py
gauthamnairy's picture
Upload 41 files
609c821 verified
"""
RLM-based Data Extraction Engine for PetroMind - Enhanced Version.
Uses Recursive Language Model approach to extract structured data from large O&G documents.
Key improvements:
1. KEYWORD SEARCH STRATEGY: Systematically searches document for data sections instead of random sampling.
2. CONTINUITY CHECK: Forces reading subsequent pages when tables are detected to ensure no data is skipped.
3. Multi-phase extraction: Discovery (Search) → Extraction → Validation
4. 8KB output buffer for large tables.
"""
import os
import re
import json
import time
from typing import List, Dict, Any, Optional, Tuple
from collections import deque
from e2b_code_interpreter import Sandbox
import google.generativeai as genai
from groq import Groq
from groq import Groq
from mistralai import Mistral
from openai import OpenAI
class ExtractionRateLimiter:
"""Rate limiter for extraction API calls."""
LIMITS = {
"gemini": {"rpm": 15, "min_interval": 4.0},
"groq": {"rpm": 25, "min_interval": 2.5},
"mistral": {"rpm": 50, "min_interval": 1.2},
"openrouter": {"rpm": 12, "min_interval": 5.0},
}
def __init__(self, provider: str):
self.provider = provider.lower()
limits = self.LIMITS.get(self.provider, {"rpm": 15, "min_interval": 4.0})
self.max_rpm = limits["rpm"]
self.min_interval = limits["min_interval"]
self.request_times: deque = deque(maxlen=self.max_rpm)
def wait_if_needed(self):
"""Wait if we're approaching rate limits."""
now = time.time()
if self.request_times:
time_since_last = now - self.request_times[-1]
if time_since_last < self.min_interval:
wait_time = self.min_interval - time_since_last
print(f"[Rate Limiter] Waiting {wait_time:.2f}s...")
time.sleep(wait_time)
if len(self.request_times) >= self.max_rpm:
oldest = self.request_times[0]
elapsed = now - oldest
if elapsed < 60:
wait_time = 60 - elapsed + 1.0
print(f"[Rate Limiter] RPM limit. Waiting {wait_time:.2f}s...")
time.sleep(wait_time)
self.request_times.append(time.time())
class RLMExtractionEngine:
"""
Enhanced RLM extraction engine with Search-First strategy.
Prioritizes locating data via keyword search over random sampling.
"""
def __init__(
self,
doc_content: str,
total_pages: int,
model_provider: str = "gemini",
model_name: str = "gemini-2.0-flash"
):
self.doc_content = doc_content
self.total_pages = total_pages
self.model_provider = model_provider.lower()
self.model_name = model_name
self.e2b_api_key = os.getenv("E2B_API_KEY")
self.rate_limiter = ExtractionRateLimiter(model_provider)
if not self.e2b_api_key:
print("WARNING: E2B_API_KEY not found. RLM extraction will fail.")
@property
def assistant_role(self) -> str:
return "model" if self.model_provider == "gemini" else "assistant"
def run(self, hard_cap: int = 50, soft_warning_at: int = 40) -> Dict[str, Any]:
"""
Run enhanced RLM extraction with Search-First strategy.
"""
print(f"[RLM Extraction] Starting enhanced extraction for {self.total_pages} page document")
# Enhanced system prompt with Search-First strategy and Detailed Schema
base_system_prompt = f"""You are a Senior Petroleum Data Engineer and Geoscientist.
{ "NOTE: YOU ARE AN OPENROUTER MODEL via Nvidia. OUTPUT STRICT JSON ONLY. NO MARKDOWN. NO PREAMBLE." if self.model_provider == "openrouter" else "" }
Your task is to extract, standardize, and structure high-fidelity technical data from a large Well Completion Report (WCR) of {self.total_pages} pages into a hierarchical JSON format.
The document is in 'document.txt', pages separated by '--- Page N ---'.
=== OBJECTIVE & INSTRUCTIONS ===
Analyze the provided document and extract specific data points into a hierarchical JSON format. You must prioritize accuracy, unit normalization, and the preservation of relational context.
**General Instructions:**
1. **Unit Handling:** Preserve original units (e.g., ft vs m) but note them explicitly.
2. **Table Extraction:** Reconstruct table rows into structured data objects. Do not summarize; extract row-by-row.
3. **OCR/Scanned Text:** Use context clues to correct OCR errors (e.g., "1l" -> "11"). If uncertain, flag as null.
4. **Null Values:** If data is missing, return null or []. Do not hallucinate.
5. **Long Document Handling:** Focus on "Summary", "Geology", "Engineering", "Testing", and "Appendices".
=== CRITICAL EXECUTION RULES ===
1. **NO SKIPPING:** Use Python SEARCH to find data. Do not sample.
2. **READ CONTINUOUSLY:** If a table starts on page X, read X+1, X+2 until it ends.
3. **EXTRACT COMPLETELY:** If a table has 1000 rows, extract 1000 rows.
=== PYTHON SEARCH TOOLS ===
Write SIMPLE Python. Always `import re` and `content = open('document.txt').read()`.
**TOOL 1: FIND PAGES WITH KEYWORDS**
```python
# Find pages containing specific terms
keywords = ['Header', 'Casing', 'Survey', 'Formation', 'Mud', 'Production', 'Test']
pages = open('document.txt').read().split('--- Page ')
found = {{kw: [] for kw in keywords}}
for i, p in enumerate(pages):
for kw in keywords:
if kw.lower() in p.lower():
found[kw].append(i)
print(found)
```
**TOOL 2: READ CONSECUTIVE PAGES**
```python
# Read pages 10-15
pages = open('document.txt').read().split('--- Page ')
print(pages[10] + "\\n" + pages[11] + "\\n" + pages[12])
```
=== DATA EXTRACTION SCHEMA ===
You must populate the following categories:
1. **Header & Location Data**
- Well Name, Operator, Basin/Block/Permit
- Coordinates (Lat/Long, Easting/Northing)
- Elevations (GL, KB, RT, Water Depth)
- Key Dates (Spud, TD, Rig Release)
- Status (e.g., P&A, Completed)
2. **Wellbore Architecture (Hardware)**
- Hole Sections (Sizes, Depths)
- Casing & Liner (Type, OD, Shoe Depth MD/TVD, Grade/Weight)
- Cementing (Slurry vol, TOC, Returns)
3. **Geology & Stratigraphy**
- Formation Tops (Name, Top MD/TVD, Thickness, Lithology)
- Biostratigraphy/Palynology (Age dating)
4. **Drilling Parameters & Fluids**
- Mud Systems (Type, Density/MW, Viscosity)
- Operational Events/NPT (Kicks, Lost Circ, Stuck Pipe, Equipment Failure)
5. **Formation Evaluation (Petrophysics & Logs)**
- Wireline/LWD Logs (Tools, Intervals)
- Petrophysical Analysis (Net Pay, Avg Porosity, Avg Sw, Cut-offs)
6. **Coring & Rock Mechanics**
- Core Intervals (No., Top/Bottom, Recovery %)
- Routine Core Analysis (Permeability, Porosity, Grain Density) - *Extract Max/Min/Avg if huge*
- Special Core Analysis (SCAL)
7. **Fluid Sampling & Testing (DST/RFT/MDT)**
- Tests (DST #, Interval, Formation)
- Flow Rates (Oil bopd, Gas MMscfd, Water bwpd, Choke)
- Pressures (Initial, FTHP)
- Fluid Properties (Gravity, Composition, H2S/CO2, Salinity)
8. **Directional Survey Data (Well Path)**
- Station Data: MD, Inclination, Azimuth, TVD, N/S, E/W
- Survey Method (MWD, Gyro)
- *Constraint: Extract the DEFINITIVE survey table.*
9. **Mud Data & Drilling Fluids**
- Daily/Section properties: Mud Type, MW, Vis, PV, YP, Gel, Fluid Loss, pH, Cl, Additives.
=== OUTPUT JSON FORMAT ===
Action: {{"action": "execute", "code": "...", "thought": "..."}}
Finish: {{"action": "finish", "data": {{
"summary": "...",
"tables": [
{{
"title": "Well Header",
"rows": [
{{"WellName": "Mutineer 3", "Operator": "Santos", "SpudDate": "2023-01-15", "__confidence": 1.0}}
],
"page_number": 2
}},
{{
"title": "Casing Data",
"rows": [
{{"Size_in": 13.375, "ShoeDepth_mMD": 500, "Grade": "K55", "__confidence": 0.95}}
],
"page_number": 8
}}
]
}}, "thought": "..."}}
"""
history = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"""Extract ALL structured data from this {self.total_pages}-page document.
STEP 1: Use Python to SEARCH for keywords (Well Header, Survey, Casing, Production, Mud) to find exact page numbers.
STEP 2: Visit those pages and extract data.
STEP 3: If you see a table likely continues to the next page, READ THE NEXT PAGE immediately.
STEP 4: Verify you haven't missed any sections from the Table of Contents."""}
]
reasoning_trace = []
final_data = None
step = 0
sandbox = None
try:
sandbox = Sandbox.create(timeout=300) # 5 min timeout
sandbox.files.write("document.txt", self.doc_content)
sandbox.run_code("import re\nimport json")
while step < hard_cap:
step += 1
print(f"[RLM Extraction] --- Step {step}/{hard_cap} ---")
# Progressive warnings
if step == 15:
history.append({
"role": "user",
"content": "[CHECKPOINT: Have you performed a keyword verification? Ensure you looked at ALL pages containing 'Survey' or 'Table'.]"
})
if step == soft_warning_at:
history.append({
"role": "user",
"content": f"[WARNING: Step {step}/{hard_cap}. Finalize extraction. Ensure NO rows were skipped in large tables.]"
})
self.rate_limiter.wait_if_needed()
response_text = self._call_model(history)
try:
cleaned_json = re.sub(r'```json\n?|\n?```', '', response_text).strip()
action_data = json.loads(cleaned_json)
except json.JSONDecodeError:
print(f"[RLM Extraction] Invalid JSON, raw output:\n{response_text[:500]}...\n----------------")
# Try to find JSON within the text
try:
match = re.search(r'\{.*\}', response_text, re.DOTALL)
if match:
action_data = json.loads(match.group(0))
else:
raise ValueError("No JSON found")
except Exception:
# Reduce noise by not appending every retry failure, but we must signal error
continue
action_type = action_data.get("action")
thought = action_data.get("thought", "")
current_trace = {"step": step, "thought": thought, "type": action_type}
if action_type == "execute":
code = action_data.get("code", "")
current_trace["content"] = code
print(f"[RLM Extraction] Executing code length: {len(code)}")
execution = sandbox.run_code(code)
output = ""
if execution.error:
output = f"Error: {execution.error.name}: {execution.error.value}"
else:
# Large buffer for complete table extraction
output = execution.logs.stdout[:12000] # Increased for better capture
if len(execution.logs.stdout) > 12000:
output += f"\n...[Output truncated. Full length: {len(execution.logs.stdout)} chars]..."
print(f"[RLM Extraction] Output length: {len(output)}")
current_trace["output"] = output[:500] + "..." if len(output) > 500 else output
history.append({"role": self.assistant_role, "content": response_text})
history.append({"role": "user", "content": f"Code Output:\n{output}"})
elif action_type == "finish":
final_data = action_data.get("data", {})
current_trace["content"] = f"Extraction complete with {len(final_data.get('tables', []))} tables"
reasoning_trace.append(current_trace)
print(f"[RLM Extraction] Finished after {step} steps")
break
reasoning_trace.append(current_trace)
except Exception as e:
print(f"[RLM Extraction] Error: {e}")
reasoning_trace.append({"step": -1, "type": "error", "content": str(e)})
finally:
if sandbox:
try:
sandbox.kill()
except Exception:
pass
# Ensure we have valid output format
if not final_data or not final_data.get("tables"):
final_data = {
"summary": f"RLM extraction ran for {step} steps.",
"tables": []
}
# Post-process: ensure all tables have required fields
processed_tables = []
for table in final_data.get("tables", []):
if not table.get("rows"):
continue
# Ensure each row has confidence
processed_rows = []
for row in table.get("rows", []):
if isinstance(row, dict):
if "__confidence" not in row:
row["__confidence"] = 0.90
if "__confidence" not in row:
row["__confidence"] = 0.90
# Normalize keys (remove spaces, special chars) to prevent 'undefined' in frontend
clean_row = {}
for k, v in row.items():
clean_key = re.sub(r'[^a-zA-Z0-9_]', '', k.replace(' ', ''))
clean_row[clean_key] = v
processed_rows.append(clean_row)
if processed_rows:
processed_table = {
"title": table.get("title", "Extracted Data"),
"rows": processed_rows,
"page_number": table.get("page_number", 1),
"visualization_config": table.get("visualization_config")
}
processed_tables.append(processed_table)
result = {
"summary": final_data.get("summary", f"Data extracted using RLM ({step} steps)"),
"metadata": {
"fileName": "Extracted via RLM",
"dateProcessed": time.strftime("%Y-%m-%d"),
"pageCount": self.total_pages,
"fileType": "PDF",
"extraction_method": "rlm",
"steps_used": step
},
"tables": processed_tables,
"using_rlm": True,
"reasoning_trace": reasoning_trace
}
return result
# Removed _get_sample_pages as we now use search strategy
def _call_model(self, history: List[Dict[str, str]]) -> str:
"""Call the LLM with conversation history."""
if self.model_provider == "gemini":
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
model = genai.GenerativeModel(f"models/{self.model_name}")
full_prompt = ""
for msg in history:
full_prompt += f"{msg['role'].upper()}: {msg['content']}\n\n"
response = model.generate_content(
full_prompt,
generation_config={"response_mime_type": "application/json"}
)
return response.text
elif self.model_provider == "groq":
client = Groq(api_key=os.getenv("GROQ_API_KEY"))
completion = client.chat.completions.create(
model=self.model_name,
messages=history,
response_format={"type": "json_object"}
)
return completion.choices[0].message.content
elif self.model_provider == "mistral":
client = Mistral(api_key=os.getenv("MISTRAL_API_KEY"))
response = client.chat.complete(
model=self.model_name,
messages=history,
response_format={"type": "json_object"}
)
return response.choices[0].message.content
elif self.model_provider == "openrouter":
api_key = os.getenv("OPENROUTER_API_KEY")
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key,
)
# Many OpenRouter models don't support json_object mode, removing to prevent empty responses
completion = client.chat.completions.create(
model=self.model_name,
messages=history
)
return completion.choices[0].message.content
return "{}"