import csv import json import re from collections import defaultdict from dataclasses import dataclass from pathlib import Path from typing import Dict, Iterable, List, Optional, Sequence, Tuple DEFAULT_RISK_KEYWORDS: List[str] = [ "discipline", "discharge", "grievance", "warning letter", "suspension", "seniority", "overtime", "arbitration", "testing", "harassment", "safety", "termination", ] @dataclass class ArticleStat: article: str chunk_count: int risk_hits: int @dataclass class SectionStat: article: str section: str risk_hits: int def _normalize_article(value: Optional[str]) -> str: if value is None or str(value).strip() == "": return "Unknown" return str(value).strip() def _normalize_section(value: Optional[str]) -> str: if value is None or str(value).strip() == "": return "Unknown" return str(value).strip() def _compile_keyword_patterns(keywords: Sequence[str]) -> List[Tuple[str, re.Pattern]]: patterns: List[Tuple[str, re.Pattern]] = [] for raw in keywords: keyword = (raw or "").strip() if not keyword: continue pattern = re.compile(rf"\b{re.escape(keyword)}\b", re.IGNORECASE) patterns.append((keyword, pattern)) return patterns def _count_hits(text: str, patterns: Sequence[Tuple[str, re.Pattern]]) -> int: return sum(len(pattern.findall(text)) for _, pattern in patterns) def load_chunks(chunks_path: Path) -> List[Dict]: if not chunks_path.exists(): raise FileNotFoundError(f"Chunks file not found: {chunks_path}") chunks: List[Dict] = [] for line_no, line in enumerate(chunks_path.read_text(encoding="utf-8", errors="ignore").splitlines(), start=1): if not line.strip(): continue try: chunks.append(json.loads(line)) except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON at line {line_no} in {chunks_path}: {exc}") from exc return chunks def analyze_contract_chunks( chunks: Iterable[Dict], keywords: Optional[Sequence[str]] = None, top_sections: int = 15, ) -> Dict: risk_keywords = [k for k in (keywords or DEFAULT_RISK_KEYWORDS) if (k or "").strip()] patterns = _compile_keyword_patterns(risk_keywords) article_chunk_counts: Dict[str, int] = defaultdict(int) article_hit_counts: Dict[str, int] = defaultdict(int) section_hit_counts: Dict[Tuple[str, str], int] = defaultdict(int) total_chunks = 0 total_hits = 0 for chunk in chunks: total_chunks += 1 article = _normalize_article(chunk.get("article")) section = _normalize_section(chunk.get("section")) text = chunk.get("text") or "" article_chunk_counts[article] += 1 hits = _count_hits(text, patterns) article_hit_counts[article] += hits total_hits += hits if hits > 0: section_hit_counts[(article, section)] += hits article_stats = [ ArticleStat(article=a, chunk_count=article_chunk_counts[a], risk_hits=article_hit_counts[a]) for a in sorted(article_chunk_counts.keys(), key=lambda v: (v == "Unknown", v)) ] section_stats = [ SectionStat(article=a, section=s, risk_hits=h) for (a, s), h in sorted(section_hit_counts.items(), key=lambda x: x[1], reverse=True)[:top_sections] ] return { "total_chunks": total_chunks, "total_hits": total_hits, "keywords": risk_keywords, "article_stats": article_stats, "section_stats": section_stats, "top_sections": top_sections, } def _article_rows(article_stats: Sequence[ArticleStat]) -> List[List[str]]: rows: List[List[str]] = [] for stat in article_stats: density = (stat.risk_hits / stat.chunk_count) if stat.chunk_count else 0.0 rows.append([ stat.article, str(stat.chunk_count), str(stat.risk_hits), f"{density:.2f}", ]) return rows def render_stdout_summary(report: Dict) -> str: lines = [ "Contract Analysis", "=" * 72, f"Total chunks: {report['total_chunks']}", f"Total risk keyword hits: {report['total_hits']}", f"Risk keywords ({len(report['keywords'])}): {', '.join(report['keywords'])}", "", "Risk Hits by Article", "-" * 72, f"{'Article':<14} {'Chunks':>8} {'Risk Hits':>10} {'Hits/Chunk':>11}", ] for row in _article_rows(report["article_stats"]): lines.append(f"{row[0]:<14} {row[1]:>8} {row[2]:>10} {row[3]:>11}") lines.extend([ "", f"Top Sections by Risk Hits (Top {report['top_sections']})", "-" * 72, ]) if report["section_stats"]: lines.append(f"{'Article':<14} {'Section':<12} {'Risk Hits':>10}") for stat in report["section_stats"]: lines.append(f"{stat.article:<14} {stat.section:<12} {stat.risk_hits:>10}") else: lines.append("No risk keyword hits found in any section.") return "\n".join(lines) def render_markdown_summary(report: Dict) -> str: md = [ "# Contract Analysis", "", f"- Total chunks: **{report['total_chunks']}**", f"- Total risk keyword hits: **{report['total_hits']}**", f"- Risk keywords ({len(report['keywords'])}): {', '.join(report['keywords'])}", "", "## Risk Hits by Article", "", "| Article | Chunks | Risk Hits | Hits/Chunk |", "|---|---:|---:|---:|", ] for row in _article_rows(report["article_stats"]): md.append(f"| {row[0]} | {row[1]} | {row[2]} | {row[3]} |") md.extend([ "", f"## Top Sections by Risk Hits (Top {report['top_sections']})", "", ]) if report["section_stats"]: md.extend([ "| Article | Section | Risk Hits |", "|---|---|---:|", ]) for stat in report["section_stats"]: md.append(f"| {stat.article} | {stat.section} | {stat.risk_hits} |") else: md.append("No risk keyword hits found in any section.") return "\n".join(md) + "\n" def write_article_csv(article_stats: Sequence[ArticleStat], csv_path: Path) -> None: csv_path.parent.mkdir(parents=True, exist_ok=True) with csv_path.open("w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(["article", "chunk_count", "risk_keyword_hits", "hits_per_chunk"]) for row in _article_rows(article_stats): writer.writerow(row) def run_contract_analysis( chunks_path: Path = Path("kb/chunks.jsonl"), out_dir: Path = Path("outputs"), keywords: Optional[Sequence[str]] = None, top_sections: int = 15, ) -> Dict: chunks = load_chunks(chunks_path) report = analyze_contract_chunks(chunks=chunks, keywords=keywords, top_sections=top_sections) out_dir.mkdir(parents=True, exist_ok=True) markdown_path = out_dir / "domain_analysis.md" csv_path = out_dir / "article_risk_report.csv" markdown = render_markdown_summary(report) stdout_summary = render_stdout_summary(report) markdown_path.write_text(markdown, encoding="utf-8") write_article_csv(report["article_stats"], csv_path) return { "report": report, "stdout_summary": stdout_summary, "markdown": markdown, "markdown_path": str(markdown_path), "csv_path": str(csv_path), }