Spaces:
Running
Running
| """ | |
| Enhanced Judgment Segmenter (FIXED) | |
| Segments judgments into: Facts, Issues, Arguments, Analysis, Decision | |
| """ | |
| import re | |
| import os | |
| import logging | |
| from typing import List, Dict, Tuple | |
| from dataclasses import dataclass | |
| try: | |
| from transformers import pipeline | |
| TRANSFORMERS_AVAILABLE = True | |
| except ImportError: | |
| TRANSFORMERS_AVAILABLE = False | |
| logger = logging.getLogger(__name__) | |
| class Section: | |
| type: str # facts/issues/arguments/analysis/decision/unknown | |
| text: str | |
| start_para_idx: int | |
| end_para_idx: int | |
| confidence: float | |
| class JudgmentSegmenter: | |
| MARKERS = { | |
| 'facts': [ | |
| r'\bbrief\s+facts?\b', | |
| r'\bfactual\s+(matrix|background)\b', | |
| r'\bcircumstances\s+of\s+the\s+case\b', | |
| r'\bbackground\b', | |
| ], | |
| 'issues': [ | |
| r'\bissues?\s+(for|of)\s+(consideration|determination)\b', | |
| r'\bsubstantial\s+questions?\b', | |
| r'\bpoints?\s+for\s+consideration\b', | |
| r'\bquestions?\s+framed\b', | |
| ], | |
| 'arguments': [ | |
| r'\blearned\s+counsel\b', | |
| r'\bsubmissions?\b', | |
| r'\b(argued|submitted|contended)\b', | |
| r'\bon\s+behalf\s+of\b', | |
| ], | |
| 'analysis': [ | |
| r'\bwe\s+have\s+(considered|examined|analysed)\b', | |
| r'\bthe\s+court\s+(finds|observes|notes|holds)\b', | |
| r'\bin\s+our\s+(view|opinion)\b', | |
| r'\bit\s+is\s+clear\s+that\b', | |
| ], | |
| 'decision': [ | |
| r'\b(appeal|petition|writ)\s+is\s+(allowed|dismissed)\b', | |
| r'\baccordingly\b', | |
| r'\bwe\s+direct\b', | |
| r'\bheld\s*:\b', | |
| r'\border\b', | |
| ] | |
| } | |
| def __init__(self, model_path: str = "models/segmentation_model"): | |
| """Initialize segmenter, preferring ML model if available, else Regex fallback""" | |
| self.use_ml = False | |
| self.classifier = None | |
| if TRANSFORMERS_AVAILABLE and os.path.exists(model_path): | |
| try: | |
| logger.info(f"Loading ML Segmentation model from {model_path}...") | |
| self.classifier = pipeline("text-classification", model=model_path, device=-1) | |
| self.use_ml = True | |
| logger.info("✓ ML Segmenter loaded successfully.") | |
| except Exception as e: | |
| logger.warning(f"Failed to load ML model, falling back to Regex: {e}") | |
| else: | |
| logger.info("ML model not found or transformers not installed. Using Regex fallback.") | |
| def detect_section(self, para: str, position_ratio: float) -> Tuple[str, float]: | |
| """ | |
| Detect section type for a paragraph | |
| Returns: (section_type, confidence) | |
| """ | |
| para_lower = para.lower() | |
| best_type = 'unknown' | |
| best_conf = 0.0 | |
| for sec_type, patterns in self.MARKERS.items(): | |
| for pattern in patterns: | |
| if re.search(pattern, para_lower): | |
| conf = 0.6 | |
| # Position-based bias | |
| if sec_type == 'facts' and position_ratio < 0.30: | |
| conf += 0.2 | |
| elif sec_type == 'decision' and position_ratio > 0.70: | |
| conf += 0.3 | |
| # Strong anchor near paragraph start | |
| if re.search(pattern, para_lower[:120]): | |
| conf += 0.2 | |
| conf = min(conf, 1.0) | |
| if conf > best_conf: | |
| best_type = sec_type | |
| best_conf = conf | |
| return best_type, best_conf | |
| def detect_section_ml(self, para: str) -> Tuple[str, float]: | |
| """Detect using HuggingFace classifier""" | |
| if not para.strip() or not self.classifier: | |
| return "unknown", 0.0 | |
| # Truncate to max length to avoid tokenization errors | |
| truncated = para[:512] | |
| result = self.classifier(truncated)[0] | |
| # Assume labels are like LABEL_FACTS, LABEL_ISSUES or directly facts, issues | |
| label = result['label'].lower().replace('label_', '') | |
| score = result['score'] | |
| # Enforce confidence threshold | |
| if score < 0.5: | |
| return "unknown", score | |
| return label, score | |
| def segment(self, paragraph_texts: List[str]) -> List[Section]: | |
| """ | |
| Segment judgment based on paragraph list (INDEX-ALIGNED) | |
| """ | |
| if not paragraph_texts: | |
| return [] | |
| sections: List[Section] = [] | |
| current_type = 'unknown' | |
| current_paras = [] | |
| current_conf = 0.0 | |
| start_idx = 0 | |
| total = len(paragraph_texts) | |
| for i, para in enumerate(paragraph_texts): | |
| position_ratio = i / max(total, 1) | |
| if self.use_ml: | |
| sec_type, conf = self.detect_section_ml(para) | |
| else: | |
| sec_type, conf = self.detect_section(para, position_ratio) | |
| # Fallback: early unknown paragraphs are likely facts | |
| if sec_type == 'unknown' and position_ratio < 0.30 and i > 0: | |
| sec_type = 'facts' | |
| conf = 0.4 | |
| # Section boundary | |
| if conf > 0.4 and sec_type != current_type: | |
| if current_paras: | |
| sections.append( | |
| Section( | |
| type=current_type, | |
| text="\n\n".join(current_paras), | |
| start_para_idx=start_idx, | |
| end_para_idx=i - 1, | |
| confidence=round(current_conf, 2) | |
| ) | |
| ) | |
| current_type = sec_type | |
| current_paras = [para] | |
| current_conf = conf | |
| start_idx = i | |
| else: | |
| current_paras.append(para) | |
| current_conf = max(current_conf, conf) | |
| # Final section | |
| if current_paras: | |
| sections.append( | |
| Section( | |
| type=current_type, | |
| text="\n\n".join(current_paras), | |
| start_para_idx=start_idx, | |
| end_para_idx=total - 1, | |
| confidence=round(current_conf, 2) | |
| ) | |
| ) | |
| return sections | |