ups-contract-faq2 / contract_analysis.py
Justin Tippins
Add contract analysis CLI + UI report
695ad19
import csv
import json
import re
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
DEFAULT_RISK_KEYWORDS: List[str] = [
"discipline",
"discharge",
"grievance",
"warning letter",
"suspension",
"seniority",
"overtime",
"arbitration",
"testing",
"harassment",
"safety",
"termination",
]
@dataclass
class ArticleStat:
article: str
chunk_count: int
risk_hits: int
@dataclass
class SectionStat:
article: str
section: str
risk_hits: int
def _normalize_article(value: Optional[str]) -> str:
if value is None or str(value).strip() == "":
return "Unknown"
return str(value).strip()
def _normalize_section(value: Optional[str]) -> str:
if value is None or str(value).strip() == "":
return "Unknown"
return str(value).strip()
def _compile_keyword_patterns(keywords: Sequence[str]) -> List[Tuple[str, re.Pattern]]:
patterns: List[Tuple[str, re.Pattern]] = []
for raw in keywords:
keyword = (raw or "").strip()
if not keyword:
continue
pattern = re.compile(rf"\b{re.escape(keyword)}\b", re.IGNORECASE)
patterns.append((keyword, pattern))
return patterns
def _count_hits(text: str, patterns: Sequence[Tuple[str, re.Pattern]]) -> int:
return sum(len(pattern.findall(text)) for _, pattern in patterns)
def load_chunks(chunks_path: Path) -> List[Dict]:
if not chunks_path.exists():
raise FileNotFoundError(f"Chunks file not found: {chunks_path}")
chunks: List[Dict] = []
for line_no, line in enumerate(chunks_path.read_text(encoding="utf-8", errors="ignore").splitlines(), start=1):
if not line.strip():
continue
try:
chunks.append(json.loads(line))
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON at line {line_no} in {chunks_path}: {exc}") from exc
return chunks
def analyze_contract_chunks(
chunks: Iterable[Dict],
keywords: Optional[Sequence[str]] = None,
top_sections: int = 15,
) -> Dict:
risk_keywords = [k for k in (keywords or DEFAULT_RISK_KEYWORDS) if (k or "").strip()]
patterns = _compile_keyword_patterns(risk_keywords)
article_chunk_counts: Dict[str, int] = defaultdict(int)
article_hit_counts: Dict[str, int] = defaultdict(int)
section_hit_counts: Dict[Tuple[str, str], int] = defaultdict(int)
total_chunks = 0
total_hits = 0
for chunk in chunks:
total_chunks += 1
article = _normalize_article(chunk.get("article"))
section = _normalize_section(chunk.get("section"))
text = chunk.get("text") or ""
article_chunk_counts[article] += 1
hits = _count_hits(text, patterns)
article_hit_counts[article] += hits
total_hits += hits
if hits > 0:
section_hit_counts[(article, section)] += hits
article_stats = [
ArticleStat(article=a, chunk_count=article_chunk_counts[a], risk_hits=article_hit_counts[a])
for a in sorted(article_chunk_counts.keys(), key=lambda v: (v == "Unknown", v))
]
section_stats = [
SectionStat(article=a, section=s, risk_hits=h)
for (a, s), h in sorted(section_hit_counts.items(), key=lambda x: x[1], reverse=True)[:top_sections]
]
return {
"total_chunks": total_chunks,
"total_hits": total_hits,
"keywords": risk_keywords,
"article_stats": article_stats,
"section_stats": section_stats,
"top_sections": top_sections,
}
def _article_rows(article_stats: Sequence[ArticleStat]) -> List[List[str]]:
rows: List[List[str]] = []
for stat in article_stats:
density = (stat.risk_hits / stat.chunk_count) if stat.chunk_count else 0.0
rows.append([
stat.article,
str(stat.chunk_count),
str(stat.risk_hits),
f"{density:.2f}",
])
return rows
def render_stdout_summary(report: Dict) -> str:
lines = [
"Contract Analysis",
"=" * 72,
f"Total chunks: {report['total_chunks']}",
f"Total risk keyword hits: {report['total_hits']}",
f"Risk keywords ({len(report['keywords'])}): {', '.join(report['keywords'])}",
"",
"Risk Hits by Article",
"-" * 72,
f"{'Article':<14} {'Chunks':>8} {'Risk Hits':>10} {'Hits/Chunk':>11}",
]
for row in _article_rows(report["article_stats"]):
lines.append(f"{row[0]:<14} {row[1]:>8} {row[2]:>10} {row[3]:>11}")
lines.extend([
"",
f"Top Sections by Risk Hits (Top {report['top_sections']})",
"-" * 72,
])
if report["section_stats"]:
lines.append(f"{'Article':<14} {'Section':<12} {'Risk Hits':>10}")
for stat in report["section_stats"]:
lines.append(f"{stat.article:<14} {stat.section:<12} {stat.risk_hits:>10}")
else:
lines.append("No risk keyword hits found in any section.")
return "\n".join(lines)
def render_markdown_summary(report: Dict) -> str:
md = [
"# Contract Analysis",
"",
f"- Total chunks: **{report['total_chunks']}**",
f"- Total risk keyword hits: **{report['total_hits']}**",
f"- Risk keywords ({len(report['keywords'])}): {', '.join(report['keywords'])}",
"",
"## Risk Hits by Article",
"",
"| Article | Chunks | Risk Hits | Hits/Chunk |",
"|---|---:|---:|---:|",
]
for row in _article_rows(report["article_stats"]):
md.append(f"| {row[0]} | {row[1]} | {row[2]} | {row[3]} |")
md.extend([
"",
f"## Top Sections by Risk Hits (Top {report['top_sections']})",
"",
])
if report["section_stats"]:
md.extend([
"| Article | Section | Risk Hits |",
"|---|---|---:|",
])
for stat in report["section_stats"]:
md.append(f"| {stat.article} | {stat.section} | {stat.risk_hits} |")
else:
md.append("No risk keyword hits found in any section.")
return "\n".join(md) + "\n"
def write_article_csv(article_stats: Sequence[ArticleStat], csv_path: Path) -> None:
csv_path.parent.mkdir(parents=True, exist_ok=True)
with csv_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["article", "chunk_count", "risk_keyword_hits", "hits_per_chunk"])
for row in _article_rows(article_stats):
writer.writerow(row)
def run_contract_analysis(
chunks_path: Path = Path("kb/chunks.jsonl"),
out_dir: Path = Path("outputs"),
keywords: Optional[Sequence[str]] = None,
top_sections: int = 15,
) -> Dict:
chunks = load_chunks(chunks_path)
report = analyze_contract_chunks(chunks=chunks, keywords=keywords, top_sections=top_sections)
out_dir.mkdir(parents=True, exist_ok=True)
markdown_path = out_dir / "domain_analysis.md"
csv_path = out_dir / "article_risk_report.csv"
markdown = render_markdown_summary(report)
stdout_summary = render_stdout_summary(report)
markdown_path.write_text(markdown, encoding="utf-8")
write_article_csv(report["article_stats"], csv_path)
return {
"report": report,
"stdout_summary": stdout_summary,
"markdown": markdown,
"markdown_path": str(markdown_path),
"csv_path": str(csv_path),
}