Spaces:
Running
Running
| from __future__ import annotations | |
| import io | |
| import re | |
| import threading | |
| import warnings | |
| from pathlib import Path | |
| from typing import Any, Callable, Dict, List, Optional, Tuple, Union | |
| import pandas as pd | |
| from app.config import get_settings | |
| from app.core.constants import ( | |
| MAX_CSV_ROWS, | |
| MAX_EXCEL_ROWS, | |
| MAX_MEMORY_CELLS, | |
| TABULAR_EXTENSIONS, | |
| ) | |
| from app.core.logger import get_logger | |
| _logger = get_logger(__name__) | |
| _settings = get_settings() | |
| VALID_SPACY_LABELS: Dict[str, str] = { | |
| "ORG": "Companies, agencies, institutions", | |
| "PERSON": "People, including fictional", | |
| "DATE": "Absolute or relative dates or periods", | |
| "MONEY": "Monetary values, including unit", | |
| "GPE": "Countries, cities, states", | |
| "LOC": "Non-GPE locations, mountain ranges, bodies of water", | |
| "PRODUCT": "Objects, vehicles, foods, etc.", | |
| "EVENT": "Named hurricanes, battles, wars, sports events", | |
| "CARDINAL": "Numerals that do not fall under another type", | |
| "PERCENT": "Percentage, including '%'", | |
| "QUANTITY": "Measurements, as of weight or distance", | |
| "TIME": "Times smaller than a day", | |
| "NORP": "Nationalities or religious or political groups", | |
| "FAC": "Buildings, airports, highways, bridges", | |
| "WORK_OF_ART": "Titles of books, songs, etc.", | |
| "LAW": "Named documents made into laws", | |
| "LANGUAGE": "Any named language", | |
| "ORDINAL": "'first', 'second', etc.", | |
| } | |
| SchemaNode = Dict[str, Any] | |
| ResultNode = Union[str, List[Any], Dict[str, Any], None] | |
| _norm_lock = threading.Lock() | |
| _NORMALIZERS: Dict[str, Callable[[str], str]] = { | |
| "strip": lambda s: s.strip(), | |
| "upper": lambda s: s.upper(), | |
| "lower": lambda s: s.lower(), | |
| "remove_commas": lambda s: s.replace(",", ""), | |
| "remove_spaces": lambda s: s.replace(" ", ""), | |
| "remove_newlines": lambda s: s.replace("\n", " ").replace("\r", ""), | |
| "collapse_whitespace": lambda s: re.sub(r"\s+", " ", s).strip(), | |
| "remove_currency": lambda s: re.sub(r"[$\u20ac\u00a3\u00a5\u20b9]", "", s), | |
| "remove_non_numeric": lambda s: re.sub(r"[^\d.]", "", s), | |
| "normalize_date_sep": lambda s: re.sub(r"[/.]", "-", s), | |
| } | |
| _resolver_lock = threading.Lock() | |
| _RESOLVERS: Dict[str, Callable[[Dict[str, Any], Any, str], Optional[str]]] = {} | |
| _nlp_lock = threading.Lock() | |
| _nlp = None | |
| def _get_nlp(): | |
| global _nlp | |
| if _nlp is not None: | |
| return _nlp | |
| with _nlp_lock: | |
| if _nlp is None: | |
| import spacy | |
| _nlp = spacy.load( | |
| _settings.spacy_model, | |
| exclude=["tagger", "parser", "lemmatizer", "attribute_ruler"], | |
| ) | |
| _logger.info("spaCy %s loaded", _settings.spacy_model) | |
| return _nlp | |
| def _validate_file_size(size: int) -> Optional[str]: | |
| max_size = _settings.max_upload_bytes | |
| if size > max_size: | |
| return f"File size {size} bytes exceeds limit of {max_size} bytes" | |
| return None | |
| def _check_memory_usage(rows: int, cols: int) -> Optional[str]: | |
| approx_mb = (rows * cols * 50) / (1024 * 1024) | |
| if rows * cols > MAX_MEMORY_CELLS: | |
| return f"Data size too large (approx {approx_mb:.1f} MB). Too many cells: {rows}x{cols}" | |
| return None | |
| def _build_flags(rule: Dict[str, Any]) -> re.RegexFlag: | |
| flags = re.RegexFlag(0) | |
| for name in rule.get("flags", []): | |
| obj = getattr(re, name.upper(), None) | |
| if obj is None: | |
| _logger.warning("Unknown re flag %s", name) | |
| continue | |
| flags |= obj | |
| return flags | |
| def _apply_normalizers(value: Optional[str], normalize: Any) -> Optional[str]: | |
| if not isinstance(value, str): | |
| return None | |
| if not normalize: | |
| return value | |
| if isinstance(normalize, str): | |
| normalize = [normalize] | |
| for key in normalize: | |
| with _norm_lock: | |
| fn = _NORMALIZERS.get(key) | |
| if fn is None: | |
| _logger.warning("Unknown normalizer %s", key) | |
| continue | |
| try: | |
| value = fn(value) | |
| except Exception as exc: | |
| _logger.error("Normalizer %s raised on value %s: %s", key, value, exc) | |
| return value if value else None | |
| def _try_group(match: re.Match, capture_group: Any) -> Tuple[bool, Optional[str]]: | |
| try: | |
| return True, match.group(capture_group) | |
| except (IndexError, re.error): | |
| _logger.warning("Group %s does not exist in pattern", capture_group) | |
| return False, None | |
| def _resolve_regex(rule: Dict[str, Any], text: str) -> Optional[str]: | |
| primary = rule.get("pattern", "") | |
| if not primary: | |
| _logger.warning("Regex rule missing pattern") | |
| return None | |
| flags = _build_flags(rule) | |
| capture_group = rule.get("capture_group", 0) | |
| match_index = rule.get("match_index", 0) | |
| normalize = rule.get("normalize", "") | |
| strip_chars = rule.get("strip_chars", "") | |
| fallbacks = rule.get("fallback_patterns", []) | |
| for pat in [primary, *fallbacks]: | |
| try: | |
| matches = list(re.finditer(pat, text, flags)) | |
| except re.error as exc: | |
| _logger.error("Invalid regex %s: %s", pat, exc) | |
| continue | |
| if not matches: | |
| continue | |
| try: | |
| target_matches = [matches[match_index]] | |
| except IndexError: | |
| target_matches = [matches[-1]] | |
| for m in target_matches: | |
| exists, result = _try_group(m, capture_group) | |
| if not exists or result is None: | |
| break | |
| result = _apply_normalizers(result, normalize) | |
| if result is None: | |
| break | |
| result = result.strip(strip_chars) if strip_chars else result.strip() | |
| return result or None | |
| return None | |
| def _resolve_regex_all(rule: Dict[str, Any], text: str) -> List[Optional[str]]: | |
| primary = rule.get("pattern", "") | |
| if not primary: | |
| _logger.warning("Regex-array rule missing pattern") | |
| return [] | |
| flags = _build_flags(rule) | |
| capture_group = rule.get("capture_group", 0) | |
| normalize = rule.get("normalize", "") | |
| strip_chars = rule.get("strip_chars", "") | |
| max_items = rule.get("max_items") | |
| try: | |
| matches = list(re.finditer(primary, text, flags)) | |
| except re.error as exc: | |
| _logger.error("Invalid regex %s: %s", primary, exc) | |
| return [] | |
| results: List[Optional[str]] = [] | |
| for m in matches: | |
| exists, result = _try_group(m, capture_group) | |
| if not exists or result is None: | |
| continue | |
| result = _apply_normalizers(result, normalize) | |
| if result is None: | |
| continue | |
| result = result.strip(strip_chars) if strip_chars else result.strip() | |
| if result: | |
| results.append(result) | |
| if max_items is not None and len(results) >= max_items: | |
| break | |
| return results | |
| def _resolve_entity(rule: Dict[str, Any], doc: Any) -> Optional[str]: | |
| if doc is None: | |
| _logger.warning("Entity resolver received None doc") | |
| return None | |
| labels = rule.get("label") | |
| if isinstance(labels, str): | |
| labels = [labels] | |
| labels = set(labels or []) | |
| match_index = rule.get("match_index", 0) | |
| min_length = rule.get("min_length", 1) | |
| exclude_pat = rule.get("exclude_pattern", "") | |
| exclude_flags = _build_flags({"flags": rule.get("exclude_flags", [])}) | |
| normalize = rule.get("normalize", "") | |
| candidates = [ | |
| ent.text for ent in doc.ents | |
| if ent.label_ in labels | |
| and len(ent.text) >= min_length | |
| and not (exclude_pat and re.search(exclude_pat, ent.text, exclude_flags)) | |
| ] | |
| if not candidates: | |
| return None | |
| try: | |
| result = candidates[match_index] | |
| except IndexError: | |
| result = candidates[-1] | |
| return _apply_normalizers(result, normalize) | |
| def _resolve_entity_all(rule: Dict[str, Any], doc: Any) -> List[Optional[str]]: | |
| if doc is None: | |
| _logger.warning("Entity-array resolver received None doc") | |
| return [] | |
| labels = rule.get("label") | |
| if isinstance(labels, str): | |
| labels = [labels] | |
| labels = set(labels or []) | |
| min_length = rule.get("min_length", 1) | |
| exclude_pat = rule.get("exclude_pattern", "") | |
| exclude_flags = _build_flags({"flags": rule.get("exclude_flags", [])}) | |
| normalize = rule.get("normalize", "") | |
| max_items = rule.get("max_items") | |
| unique = rule.get("unique", False) | |
| results: List[str] = [] | |
| seen: set = set() | |
| for ent in doc.ents: | |
| if ent.label_ not in labels: | |
| continue | |
| if len(ent.text) < min_length: | |
| continue | |
| if exclude_pat and re.search(exclude_pat, ent.text, exclude_flags): | |
| continue | |
| value = _apply_normalizers(ent.text, normalize) | |
| if not value: | |
| continue | |
| if unique: | |
| if value in seen: | |
| continue | |
| seen.add(value) | |
| results.append(value) | |
| if max_items is not None and len(results) >= max_items: | |
| break | |
| return results | |
| def _resolve_token_attr(rule: Dict[str, Any], doc: Any) -> Optional[str]: | |
| if doc is None: | |
| _logger.warning("Token-attr resolver received None doc") | |
| return None | |
| attr = rule.get("attr", "") | |
| match_index = rule.get("match_index", 0) | |
| normalize = rule.get("normalize", "") | |
| candidates = [t.text for t in doc if getattr(t, attr, False)] | |
| if not candidates: | |
| return None | |
| try: | |
| result = candidates[match_index] | |
| except IndexError: | |
| result = candidates[-1] | |
| return _apply_normalizers(result, normalize) | |
| def _register_builtin_resolvers() -> None: | |
| with _resolver_lock: | |
| _RESOLVERS["regex"] = lambda rule, doc, text: _resolve_regex(rule, text) | |
| _RESOLVERS["entity"] = lambda rule, doc, text: _resolve_entity(rule, doc) | |
| _RESOLVERS["token_attr"] = lambda rule, doc, text: _resolve_token_attr(rule, doc) | |
| _RESOLVERS["regex_all"] = lambda rule, doc, text: _resolve_regex_all(rule, text) | |
| _RESOLVERS["entity_all"] = lambda rule, doc, text: _resolve_entity_all(rule, doc) | |
| _register_builtin_resolvers() | |
| def _resolve_scalar_field(rule: Dict[str, Any], doc: Any, text: str) -> Optional[str]: | |
| src = rule.get("source_type") | |
| with _resolver_lock: | |
| fn = _RESOLVERS.get(src) | |
| if fn is None: | |
| _logger.warning("Unknown source_type %s", src) | |
| return None | |
| return fn(rule, doc, text) | |
| def _resolve_node(node: SchemaNode, doc: Any, text: str) -> ResultNode: | |
| node_type = node.get("type") | |
| if node_type == "object": | |
| return _resolve_object_node(node, doc, text) | |
| if node_type == "array": | |
| return _resolve_array_node(node, doc, text) | |
| return _resolve_scalar_field(node, doc, text) | |
| def _resolve_object_node(node: SchemaNode, doc: Any, text: str) -> Dict[str, ResultNode]: | |
| fields: Dict[str, SchemaNode] = node.get("fields", {}) | |
| result: Dict[str, ResultNode] = {} | |
| for field_name, child_node in fields.items(): | |
| try: | |
| result[field_name] = _resolve_node(child_node, doc, text) | |
| except Exception as exc: | |
| _logger.error("Object field %s raised: %s", field_name, exc) | |
| result[field_name] = None | |
| return result | |
| def _resolve_array_node(node: SchemaNode, doc: Any, text: str) -> List[ResultNode]: | |
| item_schema: SchemaNode = node.get("items", {}) | |
| split_pat: Optional[str] = node.get("split_pattern") | |
| split_flags_rule = {"flags": node.get("split_flags", [])} | |
| max_items: Optional[int] = node.get("max_items") | |
| results: List[ResultNode] = [] | |
| if split_pat: | |
| try: | |
| flags = _build_flags(split_flags_rule) | |
| segments = re.split(split_pat, text, flags=flags) | |
| except re.error as exc: | |
| _logger.error("Invalid split_pattern %s: %s", split_pat, exc) | |
| return [] | |
| nlp = _get_nlp() | |
| try: | |
| segment_docs = list(nlp.pipe(segments)) | |
| except Exception as exc: | |
| _logger.error("spaCy pipe failed on array segments: %s", exc) | |
| segment_docs = [None] * len(segments) | |
| for seg_doc, seg_text in zip(segment_docs, segments): | |
| if not seg_text.strip(): | |
| continue | |
| try: | |
| item_result = _resolve_node(item_schema, seg_doc, seg_text) | |
| except Exception as exc: | |
| _logger.error("Array item resolve raised: %s", exc) | |
| item_result = None | |
| results.append(item_result) | |
| if max_items is not None and len(results) >= max_items: | |
| break | |
| else: | |
| try: | |
| raw = _resolve_node(item_schema, doc, text) | |
| except Exception as exc: | |
| _logger.error("Array item resolve raised: %s", exc) | |
| return [] | |
| if isinstance(raw, list): | |
| results = raw | |
| elif raw is not None: | |
| results = [raw] | |
| if max_items is not None: | |
| results = results[:max_items] | |
| return results | |
| def _safe_resolve_node(path: str, node: SchemaNode, doc: Any, text: str) -> ResultNode: | |
| try: | |
| return _resolve_node(node, doc, text) | |
| except Exception as exc: | |
| _logger.error("Schema path %s raised: %s", path, exc) | |
| return None | |
| def _extract_spacy_fields(text: str, fields: Dict[str, SchemaNode]) -> Dict[str, ResultNode]: | |
| nlp = _get_nlp() | |
| try: | |
| doc = next(iter(nlp.pipe([text]))) | |
| except Exception as exc: | |
| _logger.error("spaCy pipe failed: %s", exc) | |
| doc = None | |
| return { | |
| field: _safe_resolve_node(field, node, doc, text) | |
| for field, node in fields.items() | |
| } | |
| def _extract_tabular(file_path: Union[str, Path], file_data: Optional[bytes] = None) -> Dict[str, Any]: | |
| ext = Path(file_path).suffix.lower() | |
| if file_data is not None: | |
| size_error = _validate_file_size(len(file_data)) | |
| if size_error: | |
| return {"error": size_error, "file_type": ext} | |
| elif Path(file_path).exists(): | |
| size_error = _validate_file_size(Path(file_path).stat().st_size) | |
| if size_error: | |
| return {"error": size_error, "file_type": ext} | |
| try: | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore", UserWarning) | |
| if ext == ".csv": | |
| if file_data: | |
| df = pd.read_csv(io.BytesIO(file_data), nrows=MAX_CSV_ROWS + 1, low_memory=False) | |
| else: | |
| df = pd.read_csv(file_path, nrows=MAX_CSV_ROWS + 1, low_memory=False) | |
| else: | |
| if file_data: | |
| df = pd.read_excel( | |
| io.BytesIO(file_data), | |
| engine="openpyxl" if ext == ".xlsx" else "xlrd", | |
| ) | |
| else: | |
| df = pd.read_excel( | |
| file_path, | |
| engine="openpyxl" if ext == ".xlsx" else "xlrd", | |
| ) | |
| max_rows = MAX_EXCEL_ROWS if ext != ".csv" else MAX_CSV_ROWS | |
| if len(df) > max_rows: | |
| return { | |
| "error": f"File contains {len(df)} rows, exceeds limit of {max_rows}", | |
| "file_type": ext, | |
| "row_count": len(df), | |
| } | |
| mem_error = _check_memory_usage(len(df), len(df.columns)) | |
| if mem_error: | |
| return {"error": mem_error, "file_type": ext} | |
| result = { | |
| "success": True, | |
| "file_type": ext, | |
| "data": { | |
| "columns": list(df.columns), | |
| "rows": df.where(pd.notnull(df), None).to_dict(orient="records"), | |
| "shape": [len(df), len(df.columns)], | |
| "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}, | |
| }, | |
| } | |
| _logger.info("Extracted JSON from %s: %d rows, %d cols", ext, len(df), len(df.columns)) | |
| return result | |
| except pd.errors.EmptyDataError: | |
| return {"error": "File is empty or has no data", "file_type": ext} | |
| except MemoryError: | |
| return {"error": "Out of memory processing file", "file_type": ext} | |
| except Exception as exc: | |
| _logger.exception("JSON extraction failed for %s", ext) | |
| return { | |
| "error": f"Processing failed: {exc}", | |
| "file_type": ext, | |
| "exception_type": type(exc).__name__, | |
| } | |
| class ExtractionService: | |
| def __init__(self) -> None: | |
| self._spacy_labels = VALID_SPACY_LABELS | |
| def extract_structured( | |
| self, | |
| filename: Union[str, Path], | |
| markdown_text: str, | |
| mappings: Optional[Dict[str, Dict[str, Any]]] = None, | |
| file_data: Optional[bytes] = None, | |
| ) -> Dict[str, Any]: | |
| ext = Path(filename).suffix.lower() | |
| if ext in TABULAR_EXTENSIONS: | |
| result = _extract_tabular(filename, file_data) | |
| if "error" not in result: | |
| result["extractor"] = "pandas" | |
| return result | |
| if not mappings: | |
| return { | |
| "error": ( | |
| f"Cannot extract JSON from '{ext}' files without field mappings. " | |
| "Provide a 'mappings' object with field extraction rules." | |
| ), | |
| "file_type": ext, | |
| } | |
| valid, mapper_error = self.validate_mappings(mappings) | |
| if not valid: | |
| return { | |
| "error": "invalid_spacy_labels", | |
| "label_mapper": mapper_error, | |
| "file_type": ext, | |
| } | |
| try: | |
| data = _extract_spacy_fields(markdown_text, mappings) | |
| _logger.info("spaCy extraction completed for %s: %d fields", ext, len(data)) | |
| return { | |
| "success": True, | |
| "extractor": "spacy", | |
| "file_type": ext, | |
| "data": data, | |
| } | |
| except Exception as exc: | |
| _logger.exception("spaCy extraction failed for %s", ext) | |
| return { | |
| "error": f"spaCy extraction failed: {exc}", | |
| "file_type": ext, | |
| "exception_type": type(exc).__name__, | |
| } | |
| def validate_mappings(self, mappings: Dict[str, Dict[str, Any]]) -> Tuple[bool, Optional[str]]: | |
| for key, rule in mappings.items(): | |
| source_type = rule.get("source_type") | |
| if source_type == "entity": | |
| label = rule.get("label") | |
| if isinstance(label, str) and label not in self._spacy_labels: | |
| return False, f"Invalid spaCy label '{label}' in field '{key}'" | |
| if isinstance(label, list): | |
| for lbl in label: | |
| if lbl not in self._spacy_labels: | |
| return False, f"Invalid spaCy label '{lbl}' in field '{key}'" | |
| return True, None | |
| def get_spacy_labels(self) -> Dict[str, str]: | |
| return dict(self._spacy_labels) | |