from __future__ import annotations import io import re import threading import warnings from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple, Union import pandas as pd from app.config import get_settings from app.core.constants import ( MAX_CSV_ROWS, MAX_EXCEL_ROWS, MAX_MEMORY_CELLS, TABULAR_EXTENSIONS, ) from app.core.logger import get_logger _logger = get_logger(__name__) _settings = get_settings() VALID_SPACY_LABELS: Dict[str, str] = { "ORG": "Companies, agencies, institutions", "PERSON": "People, including fictional", "DATE": "Absolute or relative dates or periods", "MONEY": "Monetary values, including unit", "GPE": "Countries, cities, states", "LOC": "Non-GPE locations, mountain ranges, bodies of water", "PRODUCT": "Objects, vehicles, foods, etc.", "EVENT": "Named hurricanes, battles, wars, sports events", "CARDINAL": "Numerals that do not fall under another type", "PERCENT": "Percentage, including '%'", "QUANTITY": "Measurements, as of weight or distance", "TIME": "Times smaller than a day", "NORP": "Nationalities or religious or political groups", "FAC": "Buildings, airports, highways, bridges", "WORK_OF_ART": "Titles of books, songs, etc.", "LAW": "Named documents made into laws", "LANGUAGE": "Any named language", "ORDINAL": "'first', 'second', etc.", } SchemaNode = Dict[str, Any] ResultNode = Union[str, List[Any], Dict[str, Any], None] _norm_lock = threading.Lock() _NORMALIZERS: Dict[str, Callable[[str], str]] = { "strip": lambda s: s.strip(), "upper": lambda s: s.upper(), "lower": lambda s: s.lower(), "remove_commas": lambda s: s.replace(",", ""), "remove_spaces": lambda s: s.replace(" ", ""), "remove_newlines": lambda s: s.replace("\n", " ").replace("\r", ""), "collapse_whitespace": lambda s: re.sub(r"\s+", " ", s).strip(), "remove_currency": lambda s: re.sub(r"[$\u20ac\u00a3\u00a5\u20b9]", "", s), "remove_non_numeric": lambda s: re.sub(r"[^\d.]", "", s), "normalize_date_sep": lambda s: re.sub(r"[/.]", "-", s), } _resolver_lock = threading.Lock() _RESOLVERS: Dict[str, Callable[[Dict[str, Any], Any, str], Optional[str]]] = {} _nlp_lock = threading.Lock() _nlp = None def _get_nlp(): global _nlp if _nlp is not None: return _nlp with _nlp_lock: if _nlp is None: import spacy _nlp = spacy.load( _settings.spacy_model, exclude=["tagger", "parser", "lemmatizer", "attribute_ruler"], ) _logger.info("spaCy %s loaded", _settings.spacy_model) return _nlp def _validate_file_size(size: int) -> Optional[str]: max_size = _settings.max_upload_bytes if size > max_size: return f"File size {size} bytes exceeds limit of {max_size} bytes" return None def _check_memory_usage(rows: int, cols: int) -> Optional[str]: approx_mb = (rows * cols * 50) / (1024 * 1024) if rows * cols > MAX_MEMORY_CELLS: return f"Data size too large (approx {approx_mb:.1f} MB). Too many cells: {rows}x{cols}" return None def _build_flags(rule: Dict[str, Any]) -> re.RegexFlag: flags = re.RegexFlag(0) for name in rule.get("flags", []): obj = getattr(re, name.upper(), None) if obj is None: _logger.warning("Unknown re flag %s", name) continue flags |= obj return flags def _apply_normalizers(value: Optional[str], normalize: Any) -> Optional[str]: if not isinstance(value, str): return None if not normalize: return value if isinstance(normalize, str): normalize = [normalize] for key in normalize: with _norm_lock: fn = _NORMALIZERS.get(key) if fn is None: _logger.warning("Unknown normalizer %s", key) continue try: value = fn(value) except Exception as exc: _logger.error("Normalizer %s raised on value %s: %s", key, value, exc) return value if value else None def _try_group(match: re.Match, capture_group: Any) -> Tuple[bool, Optional[str]]: try: return True, match.group(capture_group) except (IndexError, re.error): _logger.warning("Group %s does not exist in pattern", capture_group) return False, None def _resolve_regex(rule: Dict[str, Any], text: str) -> Optional[str]: primary = rule.get("pattern", "") if not primary: _logger.warning("Regex rule missing pattern") return None flags = _build_flags(rule) capture_group = rule.get("capture_group", 0) match_index = rule.get("match_index", 0) normalize = rule.get("normalize", "") strip_chars = rule.get("strip_chars", "") fallbacks = rule.get("fallback_patterns", []) for pat in [primary, *fallbacks]: try: matches = list(re.finditer(pat, text, flags)) except re.error as exc: _logger.error("Invalid regex %s: %s", pat, exc) continue if not matches: continue try: target_matches = [matches[match_index]] except IndexError: target_matches = [matches[-1]] for m in target_matches: exists, result = _try_group(m, capture_group) if not exists or result is None: break result = _apply_normalizers(result, normalize) if result is None: break result = result.strip(strip_chars) if strip_chars else result.strip() return result or None return None def _resolve_regex_all(rule: Dict[str, Any], text: str) -> List[Optional[str]]: primary = rule.get("pattern", "") if not primary: _logger.warning("Regex-array rule missing pattern") return [] flags = _build_flags(rule) capture_group = rule.get("capture_group", 0) normalize = rule.get("normalize", "") strip_chars = rule.get("strip_chars", "") max_items = rule.get("max_items") try: matches = list(re.finditer(primary, text, flags)) except re.error as exc: _logger.error("Invalid regex %s: %s", primary, exc) return [] results: List[Optional[str]] = [] for m in matches: exists, result = _try_group(m, capture_group) if not exists or result is None: continue result = _apply_normalizers(result, normalize) if result is None: continue result = result.strip(strip_chars) if strip_chars else result.strip() if result: results.append(result) if max_items is not None and len(results) >= max_items: break return results def _resolve_entity(rule: Dict[str, Any], doc: Any) -> Optional[str]: if doc is None: _logger.warning("Entity resolver received None doc") return None labels = rule.get("label") if isinstance(labels, str): labels = [labels] labels = set(labels or []) match_index = rule.get("match_index", 0) min_length = rule.get("min_length", 1) exclude_pat = rule.get("exclude_pattern", "") exclude_flags = _build_flags({"flags": rule.get("exclude_flags", [])}) normalize = rule.get("normalize", "") candidates = [ ent.text for ent in doc.ents if ent.label_ in labels and len(ent.text) >= min_length and not (exclude_pat and re.search(exclude_pat, ent.text, exclude_flags)) ] if not candidates: return None try: result = candidates[match_index] except IndexError: result = candidates[-1] return _apply_normalizers(result, normalize) def _resolve_entity_all(rule: Dict[str, Any], doc: Any) -> List[Optional[str]]: if doc is None: _logger.warning("Entity-array resolver received None doc") return [] labels = rule.get("label") if isinstance(labels, str): labels = [labels] labels = set(labels or []) min_length = rule.get("min_length", 1) exclude_pat = rule.get("exclude_pattern", "") exclude_flags = _build_flags({"flags": rule.get("exclude_flags", [])}) normalize = rule.get("normalize", "") max_items = rule.get("max_items") unique = rule.get("unique", False) results: List[str] = [] seen: set = set() for ent in doc.ents: if ent.label_ not in labels: continue if len(ent.text) < min_length: continue if exclude_pat and re.search(exclude_pat, ent.text, exclude_flags): continue value = _apply_normalizers(ent.text, normalize) if not value: continue if unique: if value in seen: continue seen.add(value) results.append(value) if max_items is not None and len(results) >= max_items: break return results def _resolve_token_attr(rule: Dict[str, Any], doc: Any) -> Optional[str]: if doc is None: _logger.warning("Token-attr resolver received None doc") return None attr = rule.get("attr", "") match_index = rule.get("match_index", 0) normalize = rule.get("normalize", "") candidates = [t.text for t in doc if getattr(t, attr, False)] if not candidates: return None try: result = candidates[match_index] except IndexError: result = candidates[-1] return _apply_normalizers(result, normalize) def _register_builtin_resolvers() -> None: with _resolver_lock: _RESOLVERS["regex"] = lambda rule, doc, text: _resolve_regex(rule, text) _RESOLVERS["entity"] = lambda rule, doc, text: _resolve_entity(rule, doc) _RESOLVERS["token_attr"] = lambda rule, doc, text: _resolve_token_attr(rule, doc) _RESOLVERS["regex_all"] = lambda rule, doc, text: _resolve_regex_all(rule, text) _RESOLVERS["entity_all"] = lambda rule, doc, text: _resolve_entity_all(rule, doc) _register_builtin_resolvers() def _resolve_scalar_field(rule: Dict[str, Any], doc: Any, text: str) -> Optional[str]: src = rule.get("source_type") with _resolver_lock: fn = _RESOLVERS.get(src) if fn is None: _logger.warning("Unknown source_type %s", src) return None return fn(rule, doc, text) def _resolve_node(node: SchemaNode, doc: Any, text: str) -> ResultNode: node_type = node.get("type") if node_type == "object": return _resolve_object_node(node, doc, text) if node_type == "array": return _resolve_array_node(node, doc, text) return _resolve_scalar_field(node, doc, text) def _resolve_object_node(node: SchemaNode, doc: Any, text: str) -> Dict[str, ResultNode]: fields: Dict[str, SchemaNode] = node.get("fields", {}) result: Dict[str, ResultNode] = {} for field_name, child_node in fields.items(): try: result[field_name] = _resolve_node(child_node, doc, text) except Exception as exc: _logger.error("Object field %s raised: %s", field_name, exc) result[field_name] = None return result def _resolve_array_node(node: SchemaNode, doc: Any, text: str) -> List[ResultNode]: item_schema: SchemaNode = node.get("items", {}) split_pat: Optional[str] = node.get("split_pattern") split_flags_rule = {"flags": node.get("split_flags", [])} max_items: Optional[int] = node.get("max_items") results: List[ResultNode] = [] if split_pat: try: flags = _build_flags(split_flags_rule) segments = re.split(split_pat, text, flags=flags) except re.error as exc: _logger.error("Invalid split_pattern %s: %s", split_pat, exc) return [] nlp = _get_nlp() try: segment_docs = list(nlp.pipe(segments)) except Exception as exc: _logger.error("spaCy pipe failed on array segments: %s", exc) segment_docs = [None] * len(segments) for seg_doc, seg_text in zip(segment_docs, segments): if not seg_text.strip(): continue try: item_result = _resolve_node(item_schema, seg_doc, seg_text) except Exception as exc: _logger.error("Array item resolve raised: %s", exc) item_result = None results.append(item_result) if max_items is not None and len(results) >= max_items: break else: try: raw = _resolve_node(item_schema, doc, text) except Exception as exc: _logger.error("Array item resolve raised: %s", exc) return [] if isinstance(raw, list): results = raw elif raw is not None: results = [raw] if max_items is not None: results = results[:max_items] return results def _safe_resolve_node(path: str, node: SchemaNode, doc: Any, text: str) -> ResultNode: try: return _resolve_node(node, doc, text) except Exception as exc: _logger.error("Schema path %s raised: %s", path, exc) return None def _extract_spacy_fields(text: str, fields: Dict[str, SchemaNode]) -> Dict[str, ResultNode]: nlp = _get_nlp() try: doc = next(iter(nlp.pipe([text]))) except Exception as exc: _logger.error("spaCy pipe failed: %s", exc) doc = None return { field: _safe_resolve_node(field, node, doc, text) for field, node in fields.items() } def _extract_tabular(file_path: Union[str, Path], file_data: Optional[bytes] = None) -> Dict[str, Any]: ext = Path(file_path).suffix.lower() if file_data is not None: size_error = _validate_file_size(len(file_data)) if size_error: return {"error": size_error, "file_type": ext} elif Path(file_path).exists(): size_error = _validate_file_size(Path(file_path).stat().st_size) if size_error: return {"error": size_error, "file_type": ext} try: with warnings.catch_warnings(): warnings.simplefilter("ignore", UserWarning) if ext == ".csv": if file_data: df = pd.read_csv(io.BytesIO(file_data), nrows=MAX_CSV_ROWS + 1, low_memory=False) else: df = pd.read_csv(file_path, nrows=MAX_CSV_ROWS + 1, low_memory=False) else: if file_data: df = pd.read_excel( io.BytesIO(file_data), engine="openpyxl" if ext == ".xlsx" else "xlrd", ) else: df = pd.read_excel( file_path, engine="openpyxl" if ext == ".xlsx" else "xlrd", ) max_rows = MAX_EXCEL_ROWS if ext != ".csv" else MAX_CSV_ROWS if len(df) > max_rows: return { "error": f"File contains {len(df)} rows, exceeds limit of {max_rows}", "file_type": ext, "row_count": len(df), } mem_error = _check_memory_usage(len(df), len(df.columns)) if mem_error: return {"error": mem_error, "file_type": ext} result = { "success": True, "file_type": ext, "data": { "columns": list(df.columns), "rows": df.where(pd.notnull(df), None).to_dict(orient="records"), "shape": [len(df), len(df.columns)], "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}, }, } _logger.info("Extracted JSON from %s: %d rows, %d cols", ext, len(df), len(df.columns)) return result except pd.errors.EmptyDataError: return {"error": "File is empty or has no data", "file_type": ext} except MemoryError: return {"error": "Out of memory processing file", "file_type": ext} except Exception as exc: _logger.exception("JSON extraction failed for %s", ext) return { "error": f"Processing failed: {exc}", "file_type": ext, "exception_type": type(exc).__name__, } class ExtractionService: def __init__(self) -> None: self._spacy_labels = VALID_SPACY_LABELS def extract_structured( self, filename: Union[str, Path], markdown_text: str, mappings: Optional[Dict[str, Dict[str, Any]]] = None, file_data: Optional[bytes] = None, ) -> Dict[str, Any]: ext = Path(filename).suffix.lower() if ext in TABULAR_EXTENSIONS: result = _extract_tabular(filename, file_data) if "error" not in result: result["extractor"] = "pandas" return result if not mappings: return { "error": ( f"Cannot extract JSON from '{ext}' files without field mappings. " "Provide a 'mappings' object with field extraction rules." ), "file_type": ext, } valid, mapper_error = self.validate_mappings(mappings) if not valid: return { "error": "invalid_spacy_labels", "label_mapper": mapper_error, "file_type": ext, } try: data = _extract_spacy_fields(markdown_text, mappings) _logger.info("spaCy extraction completed for %s: %d fields", ext, len(data)) return { "success": True, "extractor": "spacy", "file_type": ext, "data": data, } except Exception as exc: _logger.exception("spaCy extraction failed for %s", ext) return { "error": f"spaCy extraction failed: {exc}", "file_type": ext, "exception_type": type(exc).__name__, } def validate_mappings(self, mappings: Dict[str, Dict[str, Any]]) -> Tuple[bool, Optional[str]]: for key, rule in mappings.items(): source_type = rule.get("source_type") if source_type == "entity": label = rule.get("label") if isinstance(label, str) and label not in self._spacy_labels: return False, f"Invalid spaCy label '{label}' in field '{key}'" if isinstance(label, list): for lbl in label: if lbl not in self._spacy_labels: return False, f"Invalid spaCy label '{lbl}' in field '{key}'" return True, None def get_spacy_labels(self) -> Dict[str, str]: return dict(self._spacy_labels)