Spaces:
Sleeping
Sleeping
| import spacy | |
| from huggingface_hub import snapshot_download | |
| from typing import Dict, Any | |
| def extract_legal_entities(text, model_id=None, hf_token=None): | |
| """ | |
| Extract named entities from legal text | |
| Args: | |
| text: Input text to process | |
| model_id: Optional Hugging Face model ID (defaults to en_core_web_sm) | |
| hf_token: Optional Hugging Face token | |
| Returns: | |
| Dictionary with entities and counts | |
| """ | |
| if not text or not text.strip(): | |
| return { | |
| "error": "Empty text provided", | |
| "entities": [], | |
| "entity_counts": {}, | |
| "total_entities": 0 | |
| } | |
| # Load model | |
| nlp = _load_ner_model(model_id, hf_token) | |
| if not nlp: | |
| return { | |
| "error": "Failed to load NER model", | |
| "entities": [], | |
| "entity_counts": {}, | |
| "total_entities": 0 | |
| } | |
| try: | |
| # Process text (handle large texts by chunking) | |
| if len(text) > 4000000: | |
| return _process_large_text(text, nlp) | |
| doc = nlp(text) | |
| entities = [] | |
| entity_counts = {} | |
| for ent in doc.ents: | |
| processed_entities = _process_entity(ent) | |
| for entity_text, entity_label in processed_entities: | |
| entity_info = { | |
| "text": entity_text, | |
| "label": entity_label, | |
| "start": ent.start_char, | |
| "end": ent.end_char | |
| } | |
| entities.append(entity_info) | |
| if entity_label not in entity_counts: | |
| entity_counts[entity_label] = [] | |
| entity_counts[entity_label].append(entity_text) | |
| # Process counts | |
| for label in entity_counts: | |
| unique_entities = list(set(entity_counts[label])) | |
| entity_counts[label] = { | |
| "entities": unique_entities, | |
| "count": len(unique_entities) | |
| } | |
| return { | |
| "entities": entities, | |
| "entity_counts": entity_counts, | |
| "total_entities": len(entities), | |
| "unique_labels": list(entity_counts.keys()) | |
| } | |
| except Exception as e: | |
| return { | |
| "error": str(e), | |
| "entities": [], | |
| "entity_counts": {}, | |
| "total_entities": 0 | |
| } | |
| def _load_ner_model(model_id, hf_token): | |
| """Load spaCy NER model""" | |
| if not model_id: | |
| model_id = 'en_core_web_sm' | |
| try: | |
| # Try loading from Hugging Face | |
| if model_id != 'en_core_web_sm': | |
| local_dir = snapshot_download( | |
| repo_id=model_id, | |
| token=hf_token if hf_token else None | |
| ) | |
| return spacy.load(local_dir) | |
| else: | |
| # Load standard model | |
| return spacy.load("en_core_web_sm") | |
| except Exception: | |
| # Fallback to standard English model | |
| try: | |
| return spacy.load("en_core_web_sm") | |
| except Exception: | |
| return None | |
| def _process_large_text(text, nlp, chunk_size=3000000): | |
| """Process large text by chunking""" | |
| chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)] | |
| all_entities = [] | |
| all_entity_counts = {} | |
| for i, chunk in enumerate(chunks): | |
| try: | |
| doc = nlp(chunk) | |
| for ent in doc.ents: | |
| processed_entities = _process_entity(ent) | |
| for entity_text, entity_label in processed_entities: | |
| entity_info = { | |
| "text": entity_text, | |
| "label": entity_label, | |
| "start": ent.start_char + (i * chunk_size), | |
| "end": ent.end_char + (i * chunk_size) | |
| } | |
| all_entities.append(entity_info) | |
| if entity_label not in all_entity_counts: | |
| all_entity_counts[entity_label] = [] | |
| all_entity_counts[entity_label].append(entity_text) | |
| except Exception: | |
| continue | |
| # Process counts | |
| for label in all_entity_counts: | |
| unique_entities = list(set(all_entity_counts[label])) | |
| all_entity_counts[label] = { | |
| "entities": unique_entities, | |
| "count": len(unique_entities) | |
| } | |
| return { | |
| "entities": all_entities, | |
| "entity_counts": all_entity_counts, | |
| "total_entities": len(all_entities), | |
| "unique_labels": list(all_entity_counts.keys()), | |
| "processed_in_chunks": True, | |
| "num_chunks": len(chunks) | |
| } | |
| def _process_entity(ent): | |
| """Process individual entity (handle special cases like 'X and Y')""" | |
| if ent.label_ in ["PRECEDENT", "ORG"] and " and " in ent.text: | |
| parts = ent.text.split(" and ") | |
| return [(p.strip(), "ORG") for p in parts] | |
| return [(ent.text, ent.label_)] |