Spaces:
Sleeping
Sleeping
| import csv | |
| import json | |
| import re | |
| from collections import defaultdict | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List, Optional, Sequence, Tuple | |
| DEFAULT_RISK_KEYWORDS: List[str] = [ | |
| "discipline", | |
| "discharge", | |
| "grievance", | |
| "warning letter", | |
| "suspension", | |
| "seniority", | |
| "overtime", | |
| "arbitration", | |
| "testing", | |
| "harassment", | |
| "safety", | |
| "termination", | |
| ] | |
| class ArticleStat: | |
| article: str | |
| chunk_count: int | |
| risk_hits: int | |
| class SectionStat: | |
| article: str | |
| section: str | |
| risk_hits: int | |
| def _normalize_article(value: Optional[str]) -> str: | |
| if value is None or str(value).strip() == "": | |
| return "Unknown" | |
| return str(value).strip() | |
| def _normalize_section(value: Optional[str]) -> str: | |
| if value is None or str(value).strip() == "": | |
| return "Unknown" | |
| return str(value).strip() | |
| def _compile_keyword_patterns(keywords: Sequence[str]) -> List[Tuple[str, re.Pattern]]: | |
| patterns: List[Tuple[str, re.Pattern]] = [] | |
| for raw in keywords: | |
| keyword = (raw or "").strip() | |
| if not keyword: | |
| continue | |
| pattern = re.compile(rf"\b{re.escape(keyword)}\b", re.IGNORECASE) | |
| patterns.append((keyword, pattern)) | |
| return patterns | |
| def _count_hits(text: str, patterns: Sequence[Tuple[str, re.Pattern]]) -> int: | |
| return sum(len(pattern.findall(text)) for _, pattern in patterns) | |
| def load_chunks(chunks_path: Path) -> List[Dict]: | |
| if not chunks_path.exists(): | |
| raise FileNotFoundError(f"Chunks file not found: {chunks_path}") | |
| chunks: List[Dict] = [] | |
| for line_no, line in enumerate(chunks_path.read_text(encoding="utf-8", errors="ignore").splitlines(), start=1): | |
| if not line.strip(): | |
| continue | |
| try: | |
| chunks.append(json.loads(line)) | |
| except json.JSONDecodeError as exc: | |
| raise ValueError(f"Invalid JSON at line {line_no} in {chunks_path}: {exc}") from exc | |
| return chunks | |
| def analyze_contract_chunks( | |
| chunks: Iterable[Dict], | |
| keywords: Optional[Sequence[str]] = None, | |
| top_sections: int = 15, | |
| ) -> Dict: | |
| risk_keywords = [k for k in (keywords or DEFAULT_RISK_KEYWORDS) if (k or "").strip()] | |
| patterns = _compile_keyword_patterns(risk_keywords) | |
| article_chunk_counts: Dict[str, int] = defaultdict(int) | |
| article_hit_counts: Dict[str, int] = defaultdict(int) | |
| section_hit_counts: Dict[Tuple[str, str], int] = defaultdict(int) | |
| total_chunks = 0 | |
| total_hits = 0 | |
| for chunk in chunks: | |
| total_chunks += 1 | |
| article = _normalize_article(chunk.get("article")) | |
| section = _normalize_section(chunk.get("section")) | |
| text = chunk.get("text") or "" | |
| article_chunk_counts[article] += 1 | |
| hits = _count_hits(text, patterns) | |
| article_hit_counts[article] += hits | |
| total_hits += hits | |
| if hits > 0: | |
| section_hit_counts[(article, section)] += hits | |
| article_stats = [ | |
| ArticleStat(article=a, chunk_count=article_chunk_counts[a], risk_hits=article_hit_counts[a]) | |
| for a in sorted(article_chunk_counts.keys(), key=lambda v: (v == "Unknown", v)) | |
| ] | |
| section_stats = [ | |
| SectionStat(article=a, section=s, risk_hits=h) | |
| for (a, s), h in sorted(section_hit_counts.items(), key=lambda x: x[1], reverse=True)[:top_sections] | |
| ] | |
| return { | |
| "total_chunks": total_chunks, | |
| "total_hits": total_hits, | |
| "keywords": risk_keywords, | |
| "article_stats": article_stats, | |
| "section_stats": section_stats, | |
| "top_sections": top_sections, | |
| } | |
| def _article_rows(article_stats: Sequence[ArticleStat]) -> List[List[str]]: | |
| rows: List[List[str]] = [] | |
| for stat in article_stats: | |
| density = (stat.risk_hits / stat.chunk_count) if stat.chunk_count else 0.0 | |
| rows.append([ | |
| stat.article, | |
| str(stat.chunk_count), | |
| str(stat.risk_hits), | |
| f"{density:.2f}", | |
| ]) | |
| return rows | |
| def render_stdout_summary(report: Dict) -> str: | |
| lines = [ | |
| "Contract Analysis", | |
| "=" * 72, | |
| f"Total chunks: {report['total_chunks']}", | |
| f"Total risk keyword hits: {report['total_hits']}", | |
| f"Risk keywords ({len(report['keywords'])}): {', '.join(report['keywords'])}", | |
| "", | |
| "Risk Hits by Article", | |
| "-" * 72, | |
| f"{'Article':<14} {'Chunks':>8} {'Risk Hits':>10} {'Hits/Chunk':>11}", | |
| ] | |
| for row in _article_rows(report["article_stats"]): | |
| lines.append(f"{row[0]:<14} {row[1]:>8} {row[2]:>10} {row[3]:>11}") | |
| lines.extend([ | |
| "", | |
| f"Top Sections by Risk Hits (Top {report['top_sections']})", | |
| "-" * 72, | |
| ]) | |
| if report["section_stats"]: | |
| lines.append(f"{'Article':<14} {'Section':<12} {'Risk Hits':>10}") | |
| for stat in report["section_stats"]: | |
| lines.append(f"{stat.article:<14} {stat.section:<12} {stat.risk_hits:>10}") | |
| else: | |
| lines.append("No risk keyword hits found in any section.") | |
| return "\n".join(lines) | |
| def render_markdown_summary(report: Dict) -> str: | |
| md = [ | |
| "# Contract Analysis", | |
| "", | |
| f"- Total chunks: **{report['total_chunks']}**", | |
| f"- Total risk keyword hits: **{report['total_hits']}**", | |
| f"- Risk keywords ({len(report['keywords'])}): {', '.join(report['keywords'])}", | |
| "", | |
| "## Risk Hits by Article", | |
| "", | |
| "| Article | Chunks | Risk Hits | Hits/Chunk |", | |
| "|---|---:|---:|---:|", | |
| ] | |
| for row in _article_rows(report["article_stats"]): | |
| md.append(f"| {row[0]} | {row[1]} | {row[2]} | {row[3]} |") | |
| md.extend([ | |
| "", | |
| f"## Top Sections by Risk Hits (Top {report['top_sections']})", | |
| "", | |
| ]) | |
| if report["section_stats"]: | |
| md.extend([ | |
| "| Article | Section | Risk Hits |", | |
| "|---|---|---:|", | |
| ]) | |
| for stat in report["section_stats"]: | |
| md.append(f"| {stat.article} | {stat.section} | {stat.risk_hits} |") | |
| else: | |
| md.append("No risk keyword hits found in any section.") | |
| return "\n".join(md) + "\n" | |
| def write_article_csv(article_stats: Sequence[ArticleStat], csv_path: Path) -> None: | |
| csv_path.parent.mkdir(parents=True, exist_ok=True) | |
| with csv_path.open("w", newline="", encoding="utf-8") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(["article", "chunk_count", "risk_keyword_hits", "hits_per_chunk"]) | |
| for row in _article_rows(article_stats): | |
| writer.writerow(row) | |
| def run_contract_analysis( | |
| chunks_path: Path = Path("kb/chunks.jsonl"), | |
| out_dir: Path = Path("outputs"), | |
| keywords: Optional[Sequence[str]] = None, | |
| top_sections: int = 15, | |
| ) -> Dict: | |
| chunks = load_chunks(chunks_path) | |
| report = analyze_contract_chunks(chunks=chunks, keywords=keywords, top_sections=top_sections) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| markdown_path = out_dir / "domain_analysis.md" | |
| csv_path = out_dir / "article_risk_report.csv" | |
| markdown = render_markdown_summary(report) | |
| stdout_summary = render_stdout_summary(report) | |
| markdown_path.write_text(markdown, encoding="utf-8") | |
| write_article_csv(report["article_stats"], csv_path) | |
| return { | |
| "report": report, | |
| "stdout_summary": stdout_summary, | |
| "markdown": markdown, | |
| "markdown_path": str(markdown_path), | |
| "csv_path": str(csv_path), | |
| } | |