llm-ready-data / app /services /extraction_service.py
Soumik Bose
ok
6c24b50
Raw
History Blame Contribute Delete
18.8 kB
from __future__ import annotations
import io
import re
import threading
import warnings
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import pandas as pd
from app.config import get_settings
from app.core.constants import (
MAX_CSV_ROWS,
MAX_EXCEL_ROWS,
MAX_MEMORY_CELLS,
TABULAR_EXTENSIONS,
)
from app.core.logger import get_logger
_logger = get_logger(__name__)
_settings = get_settings()
VALID_SPACY_LABELS: Dict[str, str] = {
"ORG": "Companies, agencies, institutions",
"PERSON": "People, including fictional",
"DATE": "Absolute or relative dates or periods",
"MONEY": "Monetary values, including unit",
"GPE": "Countries, cities, states",
"LOC": "Non-GPE locations, mountain ranges, bodies of water",
"PRODUCT": "Objects, vehicles, foods, etc.",
"EVENT": "Named hurricanes, battles, wars, sports events",
"CARDINAL": "Numerals that do not fall under another type",
"PERCENT": "Percentage, including '%'",
"QUANTITY": "Measurements, as of weight or distance",
"TIME": "Times smaller than a day",
"NORP": "Nationalities or religious or political groups",
"FAC": "Buildings, airports, highways, bridges",
"WORK_OF_ART": "Titles of books, songs, etc.",
"LAW": "Named documents made into laws",
"LANGUAGE": "Any named language",
"ORDINAL": "'first', 'second', etc.",
}
SchemaNode = Dict[str, Any]
ResultNode = Union[str, List[Any], Dict[str, Any], None]
_norm_lock = threading.Lock()
_NORMALIZERS: Dict[str, Callable[[str], str]] = {
"strip": lambda s: s.strip(),
"upper": lambda s: s.upper(),
"lower": lambda s: s.lower(),
"remove_commas": lambda s: s.replace(",", ""),
"remove_spaces": lambda s: s.replace(" ", ""),
"remove_newlines": lambda s: s.replace("\n", " ").replace("\r", ""),
"collapse_whitespace": lambda s: re.sub(r"\s+", " ", s).strip(),
"remove_currency": lambda s: re.sub(r"[$\u20ac\u00a3\u00a5\u20b9]", "", s),
"remove_non_numeric": lambda s: re.sub(r"[^\d.]", "", s),
"normalize_date_sep": lambda s: re.sub(r"[/.]", "-", s),
}
_resolver_lock = threading.Lock()
_RESOLVERS: Dict[str, Callable[[Dict[str, Any], Any, str], Optional[str]]] = {}
_nlp_lock = threading.Lock()
_nlp = None
def _get_nlp():
global _nlp
if _nlp is not None:
return _nlp
with _nlp_lock:
if _nlp is None:
import spacy
_nlp = spacy.load(
_settings.spacy_model,
exclude=["tagger", "parser", "lemmatizer", "attribute_ruler"],
)
_logger.info("spaCy %s loaded", _settings.spacy_model)
return _nlp
def _validate_file_size(size: int) -> Optional[str]:
max_size = _settings.max_upload_bytes
if size > max_size:
return f"File size {size} bytes exceeds limit of {max_size} bytes"
return None
def _check_memory_usage(rows: int, cols: int) -> Optional[str]:
approx_mb = (rows * cols * 50) / (1024 * 1024)
if rows * cols > MAX_MEMORY_CELLS:
return f"Data size too large (approx {approx_mb:.1f} MB). Too many cells: {rows}x{cols}"
return None
def _build_flags(rule: Dict[str, Any]) -> re.RegexFlag:
flags = re.RegexFlag(0)
for name in rule.get("flags", []):
obj = getattr(re, name.upper(), None)
if obj is None:
_logger.warning("Unknown re flag %s", name)
continue
flags |= obj
return flags
def _apply_normalizers(value: Optional[str], normalize: Any) -> Optional[str]:
if not isinstance(value, str):
return None
if not normalize:
return value
if isinstance(normalize, str):
normalize = [normalize]
for key in normalize:
with _norm_lock:
fn = _NORMALIZERS.get(key)
if fn is None:
_logger.warning("Unknown normalizer %s", key)
continue
try:
value = fn(value)
except Exception as exc:
_logger.error("Normalizer %s raised on value %s: %s", key, value, exc)
return value if value else None
def _try_group(match: re.Match, capture_group: Any) -> Tuple[bool, Optional[str]]:
try:
return True, match.group(capture_group)
except (IndexError, re.error):
_logger.warning("Group %s does not exist in pattern", capture_group)
return False, None
def _resolve_regex(rule: Dict[str, Any], text: str) -> Optional[str]:
primary = rule.get("pattern", "")
if not primary:
_logger.warning("Regex rule missing pattern")
return None
flags = _build_flags(rule)
capture_group = rule.get("capture_group", 0)
match_index = rule.get("match_index", 0)
normalize = rule.get("normalize", "")
strip_chars = rule.get("strip_chars", "")
fallbacks = rule.get("fallback_patterns", [])
for pat in [primary, *fallbacks]:
try:
matches = list(re.finditer(pat, text, flags))
except re.error as exc:
_logger.error("Invalid regex %s: %s", pat, exc)
continue
if not matches:
continue
try:
target_matches = [matches[match_index]]
except IndexError:
target_matches = [matches[-1]]
for m in target_matches:
exists, result = _try_group(m, capture_group)
if not exists or result is None:
break
result = _apply_normalizers(result, normalize)
if result is None:
break
result = result.strip(strip_chars) if strip_chars else result.strip()
return result or None
return None
def _resolve_regex_all(rule: Dict[str, Any], text: str) -> List[Optional[str]]:
primary = rule.get("pattern", "")
if not primary:
_logger.warning("Regex-array rule missing pattern")
return []
flags = _build_flags(rule)
capture_group = rule.get("capture_group", 0)
normalize = rule.get("normalize", "")
strip_chars = rule.get("strip_chars", "")
max_items = rule.get("max_items")
try:
matches = list(re.finditer(primary, text, flags))
except re.error as exc:
_logger.error("Invalid regex %s: %s", primary, exc)
return []
results: List[Optional[str]] = []
for m in matches:
exists, result = _try_group(m, capture_group)
if not exists or result is None:
continue
result = _apply_normalizers(result, normalize)
if result is None:
continue
result = result.strip(strip_chars) if strip_chars else result.strip()
if result:
results.append(result)
if max_items is not None and len(results) >= max_items:
break
return results
def _resolve_entity(rule: Dict[str, Any], doc: Any) -> Optional[str]:
if doc is None:
_logger.warning("Entity resolver received None doc")
return None
labels = rule.get("label")
if isinstance(labels, str):
labels = [labels]
labels = set(labels or [])
match_index = rule.get("match_index", 0)
min_length = rule.get("min_length", 1)
exclude_pat = rule.get("exclude_pattern", "")
exclude_flags = _build_flags({"flags": rule.get("exclude_flags", [])})
normalize = rule.get("normalize", "")
candidates = [
ent.text for ent in doc.ents
if ent.label_ in labels
and len(ent.text) >= min_length
and not (exclude_pat and re.search(exclude_pat, ent.text, exclude_flags))
]
if not candidates:
return None
try:
result = candidates[match_index]
except IndexError:
result = candidates[-1]
return _apply_normalizers(result, normalize)
def _resolve_entity_all(rule: Dict[str, Any], doc: Any) -> List[Optional[str]]:
if doc is None:
_logger.warning("Entity-array resolver received None doc")
return []
labels = rule.get("label")
if isinstance(labels, str):
labels = [labels]
labels = set(labels or [])
min_length = rule.get("min_length", 1)
exclude_pat = rule.get("exclude_pattern", "")
exclude_flags = _build_flags({"flags": rule.get("exclude_flags", [])})
normalize = rule.get("normalize", "")
max_items = rule.get("max_items")
unique = rule.get("unique", False)
results: List[str] = []
seen: set = set()
for ent in doc.ents:
if ent.label_ not in labels:
continue
if len(ent.text) < min_length:
continue
if exclude_pat and re.search(exclude_pat, ent.text, exclude_flags):
continue
value = _apply_normalizers(ent.text, normalize)
if not value:
continue
if unique:
if value in seen:
continue
seen.add(value)
results.append(value)
if max_items is not None and len(results) >= max_items:
break
return results
def _resolve_token_attr(rule: Dict[str, Any], doc: Any) -> Optional[str]:
if doc is None:
_logger.warning("Token-attr resolver received None doc")
return None
attr = rule.get("attr", "")
match_index = rule.get("match_index", 0)
normalize = rule.get("normalize", "")
candidates = [t.text for t in doc if getattr(t, attr, False)]
if not candidates:
return None
try:
result = candidates[match_index]
except IndexError:
result = candidates[-1]
return _apply_normalizers(result, normalize)
def _register_builtin_resolvers() -> None:
with _resolver_lock:
_RESOLVERS["regex"] = lambda rule, doc, text: _resolve_regex(rule, text)
_RESOLVERS["entity"] = lambda rule, doc, text: _resolve_entity(rule, doc)
_RESOLVERS["token_attr"] = lambda rule, doc, text: _resolve_token_attr(rule, doc)
_RESOLVERS["regex_all"] = lambda rule, doc, text: _resolve_regex_all(rule, text)
_RESOLVERS["entity_all"] = lambda rule, doc, text: _resolve_entity_all(rule, doc)
_register_builtin_resolvers()
def _resolve_scalar_field(rule: Dict[str, Any], doc: Any, text: str) -> Optional[str]:
src = rule.get("source_type")
with _resolver_lock:
fn = _RESOLVERS.get(src)
if fn is None:
_logger.warning("Unknown source_type %s", src)
return None
return fn(rule, doc, text)
def _resolve_node(node: SchemaNode, doc: Any, text: str) -> ResultNode:
node_type = node.get("type")
if node_type == "object":
return _resolve_object_node(node, doc, text)
if node_type == "array":
return _resolve_array_node(node, doc, text)
return _resolve_scalar_field(node, doc, text)
def _resolve_object_node(node: SchemaNode, doc: Any, text: str) -> Dict[str, ResultNode]:
fields: Dict[str, SchemaNode] = node.get("fields", {})
result: Dict[str, ResultNode] = {}
for field_name, child_node in fields.items():
try:
result[field_name] = _resolve_node(child_node, doc, text)
except Exception as exc:
_logger.error("Object field %s raised: %s", field_name, exc)
result[field_name] = None
return result
def _resolve_array_node(node: SchemaNode, doc: Any, text: str) -> List[ResultNode]:
item_schema: SchemaNode = node.get("items", {})
split_pat: Optional[str] = node.get("split_pattern")
split_flags_rule = {"flags": node.get("split_flags", [])}
max_items: Optional[int] = node.get("max_items")
results: List[ResultNode] = []
if split_pat:
try:
flags = _build_flags(split_flags_rule)
segments = re.split(split_pat, text, flags=flags)
except re.error as exc:
_logger.error("Invalid split_pattern %s: %s", split_pat, exc)
return []
nlp = _get_nlp()
try:
segment_docs = list(nlp.pipe(segments))
except Exception as exc:
_logger.error("spaCy pipe failed on array segments: %s", exc)
segment_docs = [None] * len(segments)
for seg_doc, seg_text in zip(segment_docs, segments):
if not seg_text.strip():
continue
try:
item_result = _resolve_node(item_schema, seg_doc, seg_text)
except Exception as exc:
_logger.error("Array item resolve raised: %s", exc)
item_result = None
results.append(item_result)
if max_items is not None and len(results) >= max_items:
break
else:
try:
raw = _resolve_node(item_schema, doc, text)
except Exception as exc:
_logger.error("Array item resolve raised: %s", exc)
return []
if isinstance(raw, list):
results = raw
elif raw is not None:
results = [raw]
if max_items is not None:
results = results[:max_items]
return results
def _safe_resolve_node(path: str, node: SchemaNode, doc: Any, text: str) -> ResultNode:
try:
return _resolve_node(node, doc, text)
except Exception as exc:
_logger.error("Schema path %s raised: %s", path, exc)
return None
def _extract_spacy_fields(text: str, fields: Dict[str, SchemaNode]) -> Dict[str, ResultNode]:
nlp = _get_nlp()
try:
doc = next(iter(nlp.pipe([text])))
except Exception as exc:
_logger.error("spaCy pipe failed: %s", exc)
doc = None
return {
field: _safe_resolve_node(field, node, doc, text)
for field, node in fields.items()
}
def _extract_tabular(file_path: Union[str, Path], file_data: Optional[bytes] = None) -> Dict[str, Any]:
ext = Path(file_path).suffix.lower()
if file_data is not None:
size_error = _validate_file_size(len(file_data))
if size_error:
return {"error": size_error, "file_type": ext}
elif Path(file_path).exists():
size_error = _validate_file_size(Path(file_path).stat().st_size)
if size_error:
return {"error": size_error, "file_type": ext}
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
if ext == ".csv":
if file_data:
df = pd.read_csv(io.BytesIO(file_data), nrows=MAX_CSV_ROWS + 1, low_memory=False)
else:
df = pd.read_csv(file_path, nrows=MAX_CSV_ROWS + 1, low_memory=False)
else:
if file_data:
df = pd.read_excel(
io.BytesIO(file_data),
engine="openpyxl" if ext == ".xlsx" else "xlrd",
)
else:
df = pd.read_excel(
file_path,
engine="openpyxl" if ext == ".xlsx" else "xlrd",
)
max_rows = MAX_EXCEL_ROWS if ext != ".csv" else MAX_CSV_ROWS
if len(df) > max_rows:
return {
"error": f"File contains {len(df)} rows, exceeds limit of {max_rows}",
"file_type": ext,
"row_count": len(df),
}
mem_error = _check_memory_usage(len(df), len(df.columns))
if mem_error:
return {"error": mem_error, "file_type": ext}
result = {
"success": True,
"file_type": ext,
"data": {
"columns": list(df.columns),
"rows": df.where(pd.notnull(df), None).to_dict(orient="records"),
"shape": [len(df), len(df.columns)],
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
},
}
_logger.info("Extracted JSON from %s: %d rows, %d cols", ext, len(df), len(df.columns))
return result
except pd.errors.EmptyDataError:
return {"error": "File is empty or has no data", "file_type": ext}
except MemoryError:
return {"error": "Out of memory processing file", "file_type": ext}
except Exception as exc:
_logger.exception("JSON extraction failed for %s", ext)
return {
"error": f"Processing failed: {exc}",
"file_type": ext,
"exception_type": type(exc).__name__,
}
class ExtractionService:
def __init__(self) -> None:
self._spacy_labels = VALID_SPACY_LABELS
def extract_structured(
self,
filename: Union[str, Path],
markdown_text: str,
mappings: Optional[Dict[str, Dict[str, Any]]] = None,
file_data: Optional[bytes] = None,
) -> Dict[str, Any]:
ext = Path(filename).suffix.lower()
if ext in TABULAR_EXTENSIONS:
result = _extract_tabular(filename, file_data)
if "error" not in result:
result["extractor"] = "pandas"
return result
if not mappings:
return {
"error": (
f"Cannot extract JSON from '{ext}' files without field mappings. "
"Provide a 'mappings' object with field extraction rules."
),
"file_type": ext,
}
valid, mapper_error = self.validate_mappings(mappings)
if not valid:
return {
"error": "invalid_spacy_labels",
"label_mapper": mapper_error,
"file_type": ext,
}
try:
data = _extract_spacy_fields(markdown_text, mappings)
_logger.info("spaCy extraction completed for %s: %d fields", ext, len(data))
return {
"success": True,
"extractor": "spacy",
"file_type": ext,
"data": data,
}
except Exception as exc:
_logger.exception("spaCy extraction failed for %s", ext)
return {
"error": f"spaCy extraction failed: {exc}",
"file_type": ext,
"exception_type": type(exc).__name__,
}
def validate_mappings(self, mappings: Dict[str, Dict[str, Any]]) -> Tuple[bool, Optional[str]]:
for key, rule in mappings.items():
source_type = rule.get("source_type")
if source_type == "entity":
label = rule.get("label")
if isinstance(label, str) and label not in self._spacy_labels:
return False, f"Invalid spaCy label '{label}' in field '{key}'"
if isinstance(label, list):
for lbl in label:
if lbl not in self._spacy_labels:
return False, f"Invalid spaCy label '{lbl}' in field '{key}'"
return True, None
def get_spacy_labels(self) -> Dict[str, str]:
return dict(self._spacy_labels)