File size: 8,628 Bytes
67e93c9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | """
orchestrator.py
---------------
Hybrid Orchestrator for the Drilling Intelligence System (Phase 6).
Supports streaming "Thinking" logs and real-time responses.
"""
import os
import re
import time
import logging
from pathlib import Path
from typing import Generator, Dict, Any
from dotenv import load_dotenv
from google import genai
# Tools
from src.agents.tools import get_iadc_db, get_volve_db
# The deep reasoning loop
from src.agents.crew import run_aggregation_loop
load_dotenv()
log = logging.getLogger(__name__)
BASE_DIR = Path(__file__).resolve().parents[2]
MODEL_NAME = os.environ.get("GEMINI_MODEL", "gemini-3.1-flash-lite-preview")
_genai_client = genai.Client(api_key=os.environ.get("GOOGLE_API_KEY"))
# ── Router Tags ──────────────────────────────────────────────────────────────
ROUTING_IADC = "IADC_Definition"
ROUTING_VOLVE_HISTORY = "Volve_History"
ROUTING_DEEP_ANALYST = "Data_Analysis"
ROUTING_AGGREGATE = "Extrapolation"
ROUTING_DUAL = "Dual_Search" # New in Phase 6: Multi-source for ambiguous terms
# ── 1. Classification Engine ──────────────────────────────────────────────────
def classify_question(question: str) -> str:
"""Heuristic router with Phase 6 'Dual Search' and 'Geophysics' awareness."""
q_lower = question.lower()
# 1. Macro / Lessons
agg_kw = ["lessons learned", "extrapolate", "summarize", "overall", "compare across"]
if any(kw in q_lower for kw in agg_kw): return ROUTING_AGGREGATE
# 2. Tech Terms that need Dual Search (Theory + Volve Context)
# Give 65% weight to Volve later in prompt.
dual_kw = ["wow", "waiting on weather", "npt", "stuck pipe", "milling", "kicks", "losses"]
if any(kw == q_lower.strip() or f" {kw} " in f" {q_lower} " for kw in dual_kw):
return ROUTING_DUAL
# 3. Geophysics (Formation Tops)
geo_kw = ["formation", "top", "stratigraphy", "geology", "lithology", "hugin", "shetland", "skagerrak"]
if any(kw in q_lower for kw in geo_kw): return ROUTING_VOLVE_HISTORY
# 4. Numerical / Analytics
math_kw = ["average", "mean", "max", "min", "trend", "calc", "rop", "rpm", "chart", "table", "plot", "compare"]
if any(kw in q_lower for kw in math_kw): return ROUTING_DEEP_ANALYST
# 5. Volve Historical
history_kw = ["what happened", "records", "incident", "daily log", "instance"]
well_pattern = r"(\d{1,2}/\d+-[A-Za-z]+-?\d+(?:\s*[A-Z])?)"
if "instance" in q_lower or "record" in q_lower or re.search(well_pattern, q_lower):
return ROUTING_VOLVE_HISTORY
return ROUTING_IADC
# ── 2. Unified RAG Execution ──────────────────────────────────────────────────
def run_fast_rag(question: str, routes: list, persona="Technical Assistant") -> str:
"""Supports single OR multi-source RAG (Dual Search)."""
context_blocks = []
for route in routes:
if route == ROUTING_IADC:
db = get_iadc_db()
label = "IADC Drilling Glossary (Theory)"
results = db.similarity_search(
question, k=4,
filter={"viking_namespace": "resources/iadc/"}
)
# Fallback: unfiltered search if namespace yields nothing
if not results:
results = db.similarity_search(question, k=4)
else:
db = get_volve_db()
label = "Volve Field records (Operational History & Formation Picks)"
results = db.similarity_search(
question, k=25,
filter={"viking_namespace": "resources/volve/"}
)
if not results:
results = db.similarity_search(question, k=25)
for i, doc in enumerate(results):
source = doc.metadata.get('source', 'Unknown source')
if isinstance(source, str) and '/' in source:
source = source.split('/')[-1]
context_blocks.append(f"[{label} - Source: {source}]: {doc.page_content}")
if not context_blocks:
return "I couldn't find relevant technical or historical records for this query."
context_str = "\n\n".join(context_blocks)
# User Request: Technical Chat tone, weight Volve (65%).
# Align with SPE Challenge grading requirements.
system_prompt = f"""You are Odin, a strictly professional, highly technical, and analytical engineering AI system.
TONE: Maintain a serious, formal, and precise engineering tone. Provide logically structured, evidence-based answers.
DO NOT use casual language.
PRIORITY: When answering about operational concepts (like WOW or NPT),
give 65% more weight and detail to the Volve Field historical examples provided
over general definitions.
LANGUAGE: The Volve source documents may contain Norwegian text (from the Volve PUD and field reports).
If retrieved context contains Norwegian, translate it to English and present ONLY the English translation.
Never output Norwegian text to the user. Key translations: foringsrør=casing, borevæske=drilling fluid,
boreslam=drilling mud, brønn=well, hullseksjon=hole section, borekaks=drill cuttings.
EVIDENCE & ASSUMPTIONS: Always clearly state your evidence (e.g., "According to Volve DDR...") and declare any assumptions or confidence levels.
ONLY IF the user explicitly asks for a formal report, analysis, or structured breakdown, should you use rigorous sections like ## Evidence, ## Assumptions, etc. Otherwise, maintain a concise but highly professional technical summary.
CONTEXT:
{context_str}
QUESTION: {question}"""
try:
response = _genai_client.models.generate_content(
model=MODEL_NAME,
contents=system_prompt
)
return response.text
except Exception as e:
return f"LLM Error: {e}"
# ── 3. Streaming Orchestrator ─────────────────────────────────────────────────
def run_pipeline(question: str, chat_history=None) -> Generator[Dict[str, Any], None, None]:
"""
Generator that yields incremental status logs and the final answer.
"""
t0 = time.time()
def log_evt(icon, name, status, detail=""):
return {"event": "log", "icon": icon, "name": name, "status": status, "detail": detail, "time": time.time()}
# 1. Memory Analysis
if chat_history:
yield log_evt("🧠", "Memory", f"Analyzing {len(chat_history)} messages...", "Restoring context.")
# 2. Routing
yield log_evt("🔍", "Classifier", "Analyzing intent...", f"'{question[:50]}...'")
route = classify_question(question)
yield log_evt("🔀", "Router", f"Path: Agentic Loop", "Delegating to Multi-Agent Crew.")
# 3. Execution
answer = ""
charts = []
# CrewAI Path (100% routing to allow dynamic tool discovery)
yield log_evt("🤖", "Rig Crew", "Waking up Agents...", "Initializing reasoning loop.")
try:
# run_aggregation_loop is now a generator yielding log/answer events
for event in run_aggregation_loop(question):
if event["event"] == "log":
yield log_evt(event["icon"], event["name"], event["status"], event["detail"])
elif event["event"] == "final_answer":
answer = event["answer"]
elif event["event"] == "verbose_log":
yield {"event": "verbose_log", "content": event.get("content", "")}
elif event["event"] == "error":
answer = f"CrewAI Error: {event['message']}"
# Check for charts in outputs/figures
fig_dir = BASE_DIR / "outputs" / "figures"
if fig_dir.exists():
for ext in ["*.png", "*.html"]:
for p in fig_dir.glob(ext):
# Only append charts created in the last 2 minutes to avoid old charts
if time.time() - p.stat().st_mtime < 120:
if str(p.absolute()) not in charts:
charts.append(str(p.absolute()))
except Exception as e:
answer = f"Agent Error: {e}"
elapsed = time.time() - t0
yield log_evt("✅", "Complete", f"Done in {elapsed:.1f}s", "Finalizing response.")
yield {"event": "final_answer", "answer": str(answer), "route": route, "charts": charts}
|