Odin / src /data_pipeline /parse_witsml_logs.py
ODIN
Initial commit: ODIN multi-agent drilling intelligence system
67e93c9
"""
parse_witsml_logs.py
--------------------
Parses WITSML realtime drilling log data from data/raw/WITSML Realtime drilling data/
into clean CSV files in data/processed/witsml/
WITSML tree structure:
<well_dir>/
1/ <- wellbore
log/
MetaFileInfo.txt <- "1 Depth\n2 DateTime"
1/ <- Depth-indexed logs
MetaFileInfo.txt <- log run names (e.g. "26in section MD Log")
1/ <- log run 1
1/ <- sequence chunk number
00001.xml <- actual data XML
00002.xml
...
2/ <- Time-indexed logs
...
trajectory/
_wellboreInfo/
"""
import os
import sys
import xml.etree.ElementTree as ET
import pandas as pd
from pathlib import Path
import logging
from utils import normalize_well_name, safe_filename
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
# ── Paths ────────────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).resolve().parents[2]
RAW_WITSML_DIR = BASE_DIR / "data" / "raw" / "WITSML Realtime drilling data"
OUT_DIR = BASE_DIR / "data" / "processed" / "witsml"
OUT_DIR.mkdir(parents=True, exist_ok=True)
# WITSML namespace (varies; we strip to handle any)
def _strip_ns(tag: str) -> str:
return tag.split("}")[-1] if "}" in tag else tag
def read_meta(meta_path: Path) -> dict[str, str]:
"""Parse MetaFileInfo.txt: lines like '1 Log Name Here'"""
result = {}
if not meta_path.exists():
return result
for line in meta_path.read_text(encoding="utf-8", errors="ignore").splitlines():
parts = line.split(None, 1)
if len(parts) == 2:
result[parts[0]] = parts[1].strip()
return result
def parse_log_xml(xml_path: Path) -> pd.DataFrame | None:
"""
Parse a single WITSML log XML chunk file.
Returns a DataFrame with columns = logCurveInfo mnemonics.
"""
try:
tree = ET.parse(xml_path)
root = tree.getroot()
except ET.ParseError as e:
log.warning(f"XML parse error in {xml_path}: {e}")
return None
# Find all 'log' elements (handle namespace)
logs = [c for c in root.iter() if _strip_ns(c.tag) == "log"]
if not logs:
return None
all_frames = []
for log_elem in logs:
# ── extract curve headers ──────────────────────────────
curves = []
for curve in log_elem:
if _strip_ns(curve.tag) == "logCurveInfo":
mnemonic = None
unit = None
for sub in curve:
tag = _strip_ns(sub.tag)
if tag == "mnemonic":
mnemonic = sub.text.strip() if sub.text else None
elif tag == "unit":
unit = (sub.text.strip() if sub.text else "")
if mnemonic:
curves.append({"mnemonic": mnemonic, "unit": unit})
if not curves:
continue
# ── extract data rows ─────────────────────────────────
rows = []
for elem in log_elem:
if _strip_ns(elem.tag) == "logData":
for data_elem in elem:
if _strip_ns(data_elem.tag) == "data" and data_elem.text:
values = [v.strip() for v in data_elem.text.split(",")]
# Align to curve count (some rows may be partial)
while len(values) < len(curves):
values.append("")
rows.append(values[:len(curves)])
if not rows:
continue
col_names = [c["mnemonic"] for c in curves]
units_map = {c["mnemonic"]: c["unit"] for c in curves}
df = pd.DataFrame(rows, columns=col_names)
# Convert numeric columns
for col in df.columns:
df[col] = pd.to_numeric(df[col], errors="ignore")
# Tag unit metadata as attribute (not stored in CSV rows)
df.attrs["units"] = units_map
all_frames.append(df)
if not all_frames:
return None
return pd.concat(all_frames, ignore_index=True)
def collect_well_log_data(well_dir: Path) -> dict[str, pd.DataFrame]:
"""
Walk a single well directory and collect all log data.
Returns dict: {log_index_type + '_' + section_name → DataFrame}
"""
wellbore_dir = well_dir / "1"
log_dir = wellbore_dir / "log"
if not log_dir.exists():
log.warning(f"No log/ dir in {well_dir}")
return {}
# Top-level meta: "1 Depth", "2 DateTime"
top_meta = read_meta(log_dir / "MetaFileInfo.txt")
all_section_frames = {}
for index_type_num, index_type_name in top_meta.items():
index_subdir = log_dir / index_type_num
if not index_subdir.is_dir():
continue
section_meta = read_meta(index_subdir / "MetaFileInfo.txt")
for section_num, section_name in section_meta.items():
section_dir = index_subdir / section_num
if not section_dir.is_dir():
continue
frames = []
# Data chunks live in numbered subdirs then 00001.xml etc.
for chunk_dir in sorted(section_dir.iterdir()):
if not chunk_dir.is_dir():
continue
for xml_file in sorted(chunk_dir.glob("*.xml")):
df = parse_log_xml(xml_file)
if df is not None and not df.empty:
frames.append(df)
if frames:
combined = pd.concat(frames, ignore_index=True)
label = f"{index_type_name}|{section_name}"
all_section_frames[label] = combined
log.info(f" [{label}] → {len(combined)} rows, {combined.shape[1]} cols")
return all_section_frames
def get_well_name_from_dir(well_dir: Path, meta_map: dict[str, str]) -> str:
"""Map folder name like 'Norway-Statoil-15_$47$_9-F-12' → well name."""
folder = well_dir.name
# Look up in MetaFileInfo mapping (folder → well name)
for k, v in meta_map.items():
if k.strip() == folder.strip():
return v
# Fallback: convert $47$ → /
return folder.replace("_$47$_", "/").replace("$47$", "/")
def parse_all_wells():
# Read global meta mapping
global_meta_file = RAW_WITSML_DIR / "MetaFileInfo.txt"
folder_to_well = {}
if global_meta_file.exists():
for line in global_meta_file.read_text(encoding="utf-8", errors="ignore").splitlines():
parts = line.split(" ", 1)
if len(parts) == 2:
folder_to_well[parts[0].strip()] = parts[1].strip()
well_dirs = [d for d in RAW_WITSML_DIR.iterdir()
if d.is_dir() and d.name not in ("__pycache__",)]
all_wells_summary = []
for well_dir in sorted(well_dirs):
well_name_raw = get_well_name_from_dir(well_dir, folder_to_well)
well_name_canonical = normalize_well_name(well_name_raw)
# Sanitize for filename
well_name_safe = safe_filename(well_name_canonical)
log.info(f"\n=== Processing well: {well_name_canonical} ({well_dir.name}) ===")
section_frames = collect_well_log_data(well_dir)
if not section_frames:
log.warning(f" No data found for {well_name_canonical}")
continue
# ── Strategy: prefer Depth-indexed data, pick the richest sections ──
# Merge sections that share the first index column (depth) if possible
depth_frames = {k: v for k, v in section_frames.items()
if k.startswith("Depth")}
time_frames = {k: v for k, v in section_frames.items()
if k.startswith("DateTime")}
saved_files = []
def save_frames(frames_dict: dict, suffix: str):
for label, df in frames_dict.items():
# Sanitize label for filename
label_safe = label.replace("|", "_").replace("/", "-").replace(" ", "_")[:80]
out_path = OUT_DIR / f"{well_name_safe}__{label_safe}.csv"
df.to_csv(out_path, index=False)
saved_files.append(str(out_path))
log.info(f" Saved: {out_path.name} ({len(df)} rows)")
save_frames(depth_frames, "depth")
save_frames(time_frames, "time")
all_wells_summary.append({
"well_name": well_name_canonical,
"well_folder": well_dir.name,
"n_depth_sections": len(depth_frames),
"n_time_sections": len(time_frames),
"total_sections": len(section_frames),
})
# Save summary
if all_wells_summary:
summary_df = pd.DataFrame(all_wells_summary)
summary_path = OUT_DIR / "_witsml_extraction_summary.csv"
summary_df.to_csv(summary_path, index=False)
log.info(f"\nSummary saved to {summary_path}")
print(summary_df.to_string(index=False))
else:
log.warning("No data was extracted from any well.")
if __name__ == "__main__":
parse_all_wells()