Spaces:
Runtime error
Runtime error
| """ | |
| RLM-based Data Extraction Engine for PetroMind - Enhanced Version. | |
| Uses Recursive Language Model approach to extract structured data from large O&G documents. | |
| Key improvements: | |
| 1. KEYWORD SEARCH STRATEGY: Systematically searches document for data sections instead of random sampling. | |
| 2. CONTINUITY CHECK: Forces reading subsequent pages when tables are detected to ensure no data is skipped. | |
| 3. Multi-phase extraction: Discovery (Search) → Extraction → Validation | |
| 4. 8KB output buffer for large tables. | |
| """ | |
| import os | |
| import re | |
| import json | |
| import time | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from collections import deque | |
| from e2b_code_interpreter import Sandbox | |
| import google.generativeai as genai | |
| from groq import Groq | |
| from groq import Groq | |
| from mistralai import Mistral | |
| from openai import OpenAI | |
| class ExtractionRateLimiter: | |
| """Rate limiter for extraction API calls.""" | |
| LIMITS = { | |
| "gemini": {"rpm": 15, "min_interval": 4.0}, | |
| "groq": {"rpm": 25, "min_interval": 2.5}, | |
| "mistral": {"rpm": 50, "min_interval": 1.2}, | |
| "openrouter": {"rpm": 12, "min_interval": 5.0}, | |
| } | |
| def __init__(self, provider: str): | |
| self.provider = provider.lower() | |
| limits = self.LIMITS.get(self.provider, {"rpm": 15, "min_interval": 4.0}) | |
| self.max_rpm = limits["rpm"] | |
| self.min_interval = limits["min_interval"] | |
| self.request_times: deque = deque(maxlen=self.max_rpm) | |
| def wait_if_needed(self): | |
| """Wait if we're approaching rate limits.""" | |
| now = time.time() | |
| if self.request_times: | |
| time_since_last = now - self.request_times[-1] | |
| if time_since_last < self.min_interval: | |
| wait_time = self.min_interval - time_since_last | |
| print(f"[Rate Limiter] Waiting {wait_time:.2f}s...") | |
| time.sleep(wait_time) | |
| if len(self.request_times) >= self.max_rpm: | |
| oldest = self.request_times[0] | |
| elapsed = now - oldest | |
| if elapsed < 60: | |
| wait_time = 60 - elapsed + 1.0 | |
| print(f"[Rate Limiter] RPM limit. Waiting {wait_time:.2f}s...") | |
| time.sleep(wait_time) | |
| self.request_times.append(time.time()) | |
| class RLMExtractionEngine: | |
| """ | |
| Enhanced RLM extraction engine with Search-First strategy. | |
| Prioritizes locating data via keyword search over random sampling. | |
| """ | |
| def __init__( | |
| self, | |
| doc_content: str, | |
| total_pages: int, | |
| model_provider: str = "gemini", | |
| model_name: str = "gemini-2.0-flash" | |
| ): | |
| self.doc_content = doc_content | |
| self.total_pages = total_pages | |
| self.model_provider = model_provider.lower() | |
| self.model_name = model_name | |
| self.e2b_api_key = os.getenv("E2B_API_KEY") | |
| self.rate_limiter = ExtractionRateLimiter(model_provider) | |
| if not self.e2b_api_key: | |
| print("WARNING: E2B_API_KEY not found. RLM extraction will fail.") | |
| def assistant_role(self) -> str: | |
| return "model" if self.model_provider == "gemini" else "assistant" | |
| def run(self, hard_cap: int = 50, soft_warning_at: int = 40) -> Dict[str, Any]: | |
| """ | |
| Run enhanced RLM extraction with Search-First strategy. | |
| """ | |
| print(f"[RLM Extraction] Starting enhanced extraction for {self.total_pages} page document") | |
| # Enhanced system prompt with Search-First strategy and Detailed Schema | |
| base_system_prompt = f"""You are a Senior Petroleum Data Engineer and Geoscientist. | |
| { "NOTE: YOU ARE AN OPENROUTER MODEL via Nvidia. OUTPUT STRICT JSON ONLY. NO MARKDOWN. NO PREAMBLE." if self.model_provider == "openrouter" else "" } | |
| Your task is to extract, standardize, and structure high-fidelity technical data from a large Well Completion Report (WCR) of {self.total_pages} pages into a hierarchical JSON format. | |
| The document is in 'document.txt', pages separated by '--- Page N ---'. | |
| === OBJECTIVE & INSTRUCTIONS === | |
| Analyze the provided document and extract specific data points into a hierarchical JSON format. You must prioritize accuracy, unit normalization, and the preservation of relational context. | |
| **General Instructions:** | |
| 1. **Unit Handling:** Preserve original units (e.g., ft vs m) but note them explicitly. | |
| 2. **Table Extraction:** Reconstruct table rows into structured data objects. Do not summarize; extract row-by-row. | |
| 3. **OCR/Scanned Text:** Use context clues to correct OCR errors (e.g., "1l" -> "11"). If uncertain, flag as null. | |
| 4. **Null Values:** If data is missing, return null or []. Do not hallucinate. | |
| 5. **Long Document Handling:** Focus on "Summary", "Geology", "Engineering", "Testing", and "Appendices". | |
| === CRITICAL EXECUTION RULES === | |
| 1. **NO SKIPPING:** Use Python SEARCH to find data. Do not sample. | |
| 2. **READ CONTINUOUSLY:** If a table starts on page X, read X+1, X+2 until it ends. | |
| 3. **EXTRACT COMPLETELY:** If a table has 1000 rows, extract 1000 rows. | |
| === PYTHON SEARCH TOOLS === | |
| Write SIMPLE Python. Always `import re` and `content = open('document.txt').read()`. | |
| **TOOL 1: FIND PAGES WITH KEYWORDS** | |
| ```python | |
| # Find pages containing specific terms | |
| keywords = ['Header', 'Casing', 'Survey', 'Formation', 'Mud', 'Production', 'Test'] | |
| pages = open('document.txt').read().split('--- Page ') | |
| found = {{kw: [] for kw in keywords}} | |
| for i, p in enumerate(pages): | |
| for kw in keywords: | |
| if kw.lower() in p.lower(): | |
| found[kw].append(i) | |
| print(found) | |
| ``` | |
| **TOOL 2: READ CONSECUTIVE PAGES** | |
| ```python | |
| # Read pages 10-15 | |
| pages = open('document.txt').read().split('--- Page ') | |
| print(pages[10] + "\\n" + pages[11] + "\\n" + pages[12]) | |
| ``` | |
| === DATA EXTRACTION SCHEMA === | |
| You must populate the following categories: | |
| 1. **Header & Location Data** | |
| - Well Name, Operator, Basin/Block/Permit | |
| - Coordinates (Lat/Long, Easting/Northing) | |
| - Elevations (GL, KB, RT, Water Depth) | |
| - Key Dates (Spud, TD, Rig Release) | |
| - Status (e.g., P&A, Completed) | |
| 2. **Wellbore Architecture (Hardware)** | |
| - Hole Sections (Sizes, Depths) | |
| - Casing & Liner (Type, OD, Shoe Depth MD/TVD, Grade/Weight) | |
| - Cementing (Slurry vol, TOC, Returns) | |
| 3. **Geology & Stratigraphy** | |
| - Formation Tops (Name, Top MD/TVD, Thickness, Lithology) | |
| - Biostratigraphy/Palynology (Age dating) | |
| 4. **Drilling Parameters & Fluids** | |
| - Mud Systems (Type, Density/MW, Viscosity) | |
| - Operational Events/NPT (Kicks, Lost Circ, Stuck Pipe, Equipment Failure) | |
| 5. **Formation Evaluation (Petrophysics & Logs)** | |
| - Wireline/LWD Logs (Tools, Intervals) | |
| - Petrophysical Analysis (Net Pay, Avg Porosity, Avg Sw, Cut-offs) | |
| 6. **Coring & Rock Mechanics** | |
| - Core Intervals (No., Top/Bottom, Recovery %) | |
| - Routine Core Analysis (Permeability, Porosity, Grain Density) - *Extract Max/Min/Avg if huge* | |
| - Special Core Analysis (SCAL) | |
| 7. **Fluid Sampling & Testing (DST/RFT/MDT)** | |
| - Tests (DST #, Interval, Formation) | |
| - Flow Rates (Oil bopd, Gas MMscfd, Water bwpd, Choke) | |
| - Pressures (Initial, FTHP) | |
| - Fluid Properties (Gravity, Composition, H2S/CO2, Salinity) | |
| 8. **Directional Survey Data (Well Path)** | |
| - Station Data: MD, Inclination, Azimuth, TVD, N/S, E/W | |
| - Survey Method (MWD, Gyro) | |
| - *Constraint: Extract the DEFINITIVE survey table.* | |
| 9. **Mud Data & Drilling Fluids** | |
| - Daily/Section properties: Mud Type, MW, Vis, PV, YP, Gel, Fluid Loss, pH, Cl, Additives. | |
| === OUTPUT JSON FORMAT === | |
| Action: {{"action": "execute", "code": "...", "thought": "..."}} | |
| Finish: {{"action": "finish", "data": {{ | |
| "summary": "...", | |
| "tables": [ | |
| {{ | |
| "title": "Well Header", | |
| "rows": [ | |
| {{"WellName": "Mutineer 3", "Operator": "Santos", "SpudDate": "2023-01-15", "__confidence": 1.0}} | |
| ], | |
| "page_number": 2 | |
| }}, | |
| {{ | |
| "title": "Casing Data", | |
| "rows": [ | |
| {{"Size_in": 13.375, "ShoeDepth_mMD": 500, "Grade": "K55", "__confidence": 0.95}} | |
| ], | |
| "page_number": 8 | |
| }} | |
| ] | |
| }}, "thought": "..."}} | |
| """ | |
| history = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": f"""Extract ALL structured data from this {self.total_pages}-page document. | |
| STEP 1: Use Python to SEARCH for keywords (Well Header, Survey, Casing, Production, Mud) to find exact page numbers. | |
| STEP 2: Visit those pages and extract data. | |
| STEP 3: If you see a table likely continues to the next page, READ THE NEXT PAGE immediately. | |
| STEP 4: Verify you haven't missed any sections from the Table of Contents."""} | |
| ] | |
| reasoning_trace = [] | |
| final_data = None | |
| step = 0 | |
| sandbox = None | |
| try: | |
| sandbox = Sandbox.create(timeout=300) # 5 min timeout | |
| sandbox.files.write("document.txt", self.doc_content) | |
| sandbox.run_code("import re\nimport json") | |
| while step < hard_cap: | |
| step += 1 | |
| print(f"[RLM Extraction] --- Step {step}/{hard_cap} ---") | |
| # Progressive warnings | |
| if step == 15: | |
| history.append({ | |
| "role": "user", | |
| "content": "[CHECKPOINT: Have you performed a keyword verification? Ensure you looked at ALL pages containing 'Survey' or 'Table'.]" | |
| }) | |
| if step == soft_warning_at: | |
| history.append({ | |
| "role": "user", | |
| "content": f"[WARNING: Step {step}/{hard_cap}. Finalize extraction. Ensure NO rows were skipped in large tables.]" | |
| }) | |
| self.rate_limiter.wait_if_needed() | |
| response_text = self._call_model(history) | |
| try: | |
| cleaned_json = re.sub(r'```json\n?|\n?```', '', response_text).strip() | |
| action_data = json.loads(cleaned_json) | |
| except json.JSONDecodeError: | |
| print(f"[RLM Extraction] Invalid JSON, raw output:\n{response_text[:500]}...\n----------------") | |
| # Try to find JSON within the text | |
| try: | |
| match = re.search(r'\{.*\}', response_text, re.DOTALL) | |
| if match: | |
| action_data = json.loads(match.group(0)) | |
| else: | |
| raise ValueError("No JSON found") | |
| except Exception: | |
| # Reduce noise by not appending every retry failure, but we must signal error | |
| continue | |
| action_type = action_data.get("action") | |
| thought = action_data.get("thought", "") | |
| current_trace = {"step": step, "thought": thought, "type": action_type} | |
| if action_type == "execute": | |
| code = action_data.get("code", "") | |
| current_trace["content"] = code | |
| print(f"[RLM Extraction] Executing code length: {len(code)}") | |
| execution = sandbox.run_code(code) | |
| output = "" | |
| if execution.error: | |
| output = f"Error: {execution.error.name}: {execution.error.value}" | |
| else: | |
| # Large buffer for complete table extraction | |
| output = execution.logs.stdout[:12000] # Increased for better capture | |
| if len(execution.logs.stdout) > 12000: | |
| output += f"\n...[Output truncated. Full length: {len(execution.logs.stdout)} chars]..." | |
| print(f"[RLM Extraction] Output length: {len(output)}") | |
| current_trace["output"] = output[:500] + "..." if len(output) > 500 else output | |
| history.append({"role": self.assistant_role, "content": response_text}) | |
| history.append({"role": "user", "content": f"Code Output:\n{output}"}) | |
| elif action_type == "finish": | |
| final_data = action_data.get("data", {}) | |
| current_trace["content"] = f"Extraction complete with {len(final_data.get('tables', []))} tables" | |
| reasoning_trace.append(current_trace) | |
| print(f"[RLM Extraction] Finished after {step} steps") | |
| break | |
| reasoning_trace.append(current_trace) | |
| except Exception as e: | |
| print(f"[RLM Extraction] Error: {e}") | |
| reasoning_trace.append({"step": -1, "type": "error", "content": str(e)}) | |
| finally: | |
| if sandbox: | |
| try: | |
| sandbox.kill() | |
| except Exception: | |
| pass | |
| # Ensure we have valid output format | |
| if not final_data or not final_data.get("tables"): | |
| final_data = { | |
| "summary": f"RLM extraction ran for {step} steps.", | |
| "tables": [] | |
| } | |
| # Post-process: ensure all tables have required fields | |
| processed_tables = [] | |
| for table in final_data.get("tables", []): | |
| if not table.get("rows"): | |
| continue | |
| # Ensure each row has confidence | |
| processed_rows = [] | |
| for row in table.get("rows", []): | |
| if isinstance(row, dict): | |
| if "__confidence" not in row: | |
| row["__confidence"] = 0.90 | |
| if "__confidence" not in row: | |
| row["__confidence"] = 0.90 | |
| # Normalize keys (remove spaces, special chars) to prevent 'undefined' in frontend | |
| clean_row = {} | |
| for k, v in row.items(): | |
| clean_key = re.sub(r'[^a-zA-Z0-9_]', '', k.replace(' ', '')) | |
| clean_row[clean_key] = v | |
| processed_rows.append(clean_row) | |
| if processed_rows: | |
| processed_table = { | |
| "title": table.get("title", "Extracted Data"), | |
| "rows": processed_rows, | |
| "page_number": table.get("page_number", 1), | |
| "visualization_config": table.get("visualization_config") | |
| } | |
| processed_tables.append(processed_table) | |
| result = { | |
| "summary": final_data.get("summary", f"Data extracted using RLM ({step} steps)"), | |
| "metadata": { | |
| "fileName": "Extracted via RLM", | |
| "dateProcessed": time.strftime("%Y-%m-%d"), | |
| "pageCount": self.total_pages, | |
| "fileType": "PDF", | |
| "extraction_method": "rlm", | |
| "steps_used": step | |
| }, | |
| "tables": processed_tables, | |
| "using_rlm": True, | |
| "reasoning_trace": reasoning_trace | |
| } | |
| return result | |
| # Removed _get_sample_pages as we now use search strategy | |
| def _call_model(self, history: List[Dict[str, str]]) -> str: | |
| """Call the LLM with conversation history.""" | |
| if self.model_provider == "gemini": | |
| genai.configure(api_key=os.getenv("GEMINI_API_KEY")) | |
| model = genai.GenerativeModel(f"models/{self.model_name}") | |
| full_prompt = "" | |
| for msg in history: | |
| full_prompt += f"{msg['role'].upper()}: {msg['content']}\n\n" | |
| response = model.generate_content( | |
| full_prompt, | |
| generation_config={"response_mime_type": "application/json"} | |
| ) | |
| return response.text | |
| elif self.model_provider == "groq": | |
| client = Groq(api_key=os.getenv("GROQ_API_KEY")) | |
| completion = client.chat.completions.create( | |
| model=self.model_name, | |
| messages=history, | |
| response_format={"type": "json_object"} | |
| ) | |
| return completion.choices[0].message.content | |
| elif self.model_provider == "mistral": | |
| client = Mistral(api_key=os.getenv("MISTRAL_API_KEY")) | |
| response = client.chat.complete( | |
| model=self.model_name, | |
| messages=history, | |
| response_format={"type": "json_object"} | |
| ) | |
| return response.choices[0].message.content | |
| elif self.model_provider == "openrouter": | |
| api_key = os.getenv("OPENROUTER_API_KEY") | |
| client = OpenAI( | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key=api_key, | |
| ) | |
| # Many OpenRouter models don't support json_object mode, removing to prevent empty responses | |
| completion = client.chat.completions.create( | |
| model=self.model_name, | |
| messages=history | |
| ) | |
| return completion.choices[0].message.content | |
| return "{}" | |