Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
File size: 13,555 Bytes
61d29fc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 | """
Temporal Analysis Agent for election cycle and timing analysis.
Analyzes:
- Do high-visibility projects get approved before elections?
- Contention scores over time
- Deferral patterns (political sensitivity)
"""
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
from collections import defaultdict
from loguru import logger
@dataclass
class ElectionCyclePattern:
"""Pattern of decision-making relative to elections."""
jurisdiction: str
election_date: datetime
# Decision patterns
decisions_12mo_before: int
decisions_6mo_before: int
decisions_3mo_before: int
decisions_post_election: int
# High-visibility projects
stadium_or_parks_pre_election: List[str]
school_renovations_pre_election: List[str]
# Analysis
pre_election_spike: bool
avg_project_cost_pre_election: float
avg_project_cost_post_election: float
inference: str # "Incumbency protection", "Normal variance", etc.
@dataclass
class ContentionMetrics:
"""Metrics for analyzing decision contention."""
decision_id: str
decision_summary: str
meeting_date: datetime
# Vote analysis
ayes: int
nays: int
abstentions: int
contention_score: float # nays / total votes
unanimous: bool
# Discussion analysis
public_comments: int
discussion_length_minutes: Optional[int]
deferred_count: int # How many times tabled before decision
# Inference
contention_level: str # "High", "Medium", "Low"
likely_rationale: str
class TemporalAnalyzer:
"""
Analyze timing patterns in government decision-making.
Implements:
1. Election cycle correlation
2. Contention score tracking
3. Deferral pattern analysis
4. Temporal trend identification
"""
def __init__(self):
"""Initialize temporal analyzer."""
pass
def analyze_election_cycle(
self,
decisions: List[Any], # PolicyDecision objects
jurisdiction: str,
election_dates: List[datetime]
) -> List[ElectionCyclePattern]:
"""
Analyze decision patterns around election cycles.
Tests hypothesis: Do "legacy" projects get approved before elections?
"""
patterns = []
for election_date in election_dates:
# Define time windows
window_12mo = election_date - timedelta(days=365)
window_6mo = election_date - timedelta(days=180)
window_3mo = election_date - timedelta(days=90)
post_election = election_date + timedelta(days=180)
# Count decisions in each window
decisions_12mo = []
decisions_6mo = []
decisions_3mo = []
decisions_post = []
for decision in decisions:
if decision.meeting_date >= window_12mo and decision.meeting_date < election_date:
decisions_12mo.append(decision)
if decision.meeting_date >= window_6mo:
decisions_6mo.append(decision)
if decision.meeting_date >= window_3mo:
decisions_3mo.append(decision)
elif decision.meeting_date >= election_date and decision.meeting_date < post_election:
decisions_post.append(decision)
# Identify high-visibility projects
high_vis_keywords = ["stadium", "park", "renovation", "construction", "building"]
stadiums_parks = []
school_renovations = []
for decision in decisions_6mo:
summary_lower = decision.decision_summary.lower()
if any(kw in summary_lower for kw in high_vis_keywords):
if "school" in summary_lower or "education" in summary_lower:
school_renovations.append(decision.decision_summary)
else:
stadiums_parks.append(decision.decision_summary)
# Calculate costs if available
pre_costs = []
post_costs = []
for decision in decisions_6mo:
if decision.cost_estimate:
try:
# Extract numeric value from "$XXX,XXX" format
cost_str = decision.cost_estimate.replace('$', '').replace(',', '')
cost = float(cost_str.split()[0])
pre_costs.append(cost)
except:
pass
for decision in decisions_post:
if decision.cost_estimate:
try:
cost_str = decision.cost_estimate.replace('$', '').replace(',', '')
cost = float(cost_str.split()[0])
post_costs.append(cost)
except:
pass
# Detect spike
baseline = len(decisions_post) if decisions_post else 1
pre_election_spike = len(decisions_6mo) > baseline * 1.5
# Infer rationale
if pre_election_spike and (stadiums_parks or school_renovations):
inference = "Possible incumbency protection or legacy building"
elif pre_election_spike:
inference = "Increased activity before election (cause unclear)"
else:
inference = "No significant pre-election pattern detected"
pattern = ElectionCyclePattern(
jurisdiction=jurisdiction,
election_date=election_date,
decisions_12mo_before=len(decisions_12mo),
decisions_6mo_before=len(decisions_6mo),
decisions_3mo_before=len(decisions_3mo),
decisions_post_election=len(decisions_post),
stadium_or_parks_pre_election=stadiums_parks,
school_renovations_pre_election=school_renovations,
pre_election_spike=pre_election_spike,
avg_project_cost_pre_election=sum(pre_costs) / len(pre_costs) if pre_costs else 0,
avg_project_cost_post_election=sum(post_costs) / len(post_costs) if post_costs else 0,
inference=inference
)
patterns.append(pattern)
return patterns
def calculate_contention_scores(
self,
decisions: List[Any]
) -> List[ContentionMetrics]:
"""
Calculate contention scores for all decisions.
High contention = conflicting trade-offs or politically sensitive
"""
contention_metrics = []
for decision in decisions:
# Parse vote results
ayes, nays, abstentions = self._parse_vote_result(decision.vote_result)
total_votes = ayes + nays + abstentions
contention_score = nays / total_votes if total_votes > 0 else 0
unanimous = (nays == 0 and abstentions == 0)
# Estimate public engagement
public_comments = len(decision.supporters) + len(decision.opponents)
# Count deferrals (not directly available, would need meeting history)
deferred_count = 0 # Would need to track across meetings
# Classify contention level
if contention_score > 0.3 or public_comments > 10:
contention_level = "High"
likely_rationale = "Politically sensitive or conflicting stakeholder interests"
elif contention_score > 0.1 or public_comments > 3:
contention_level = "Medium"
likely_rationale = "Some disagreement but manageable"
else:
contention_level = "Low"
likely_rationale = "Consensus or administrative matter"
metrics = ContentionMetrics(
decision_id=decision.decision_id,
decision_summary=decision.decision_summary,
meeting_date=decision.meeting_date,
ayes=ayes,
nays=nays,
abstentions=abstentions,
contention_score=contention_score,
unanimous=unanimous,
public_comments=public_comments,
discussion_length_minutes=None, # Would need transcript timing
deferred_count=deferred_count,
contention_level=contention_level,
likely_rationale=likely_rationale
)
contention_metrics.append(metrics)
return contention_metrics
def _parse_vote_result(self, vote_result: Optional[str]) -> tuple[int, int, int]:
"""Parse vote result string into ayes, nays, abstentions."""
if not vote_result:
return (0, 0, 0)
# Pattern: "5-2", "7-0", "Unanimous", etc.
if "unanimous" in vote_result.lower():
return (7, 0, 0) # Assume typical board size
if "-" in vote_result:
parts = vote_result.split("-")
try:
ayes = int(parts[0].strip())
nays = int(parts[1].strip())
return (ayes, nays, 0)
except:
return (0, 0, 0)
return (0, 0, 0)
def analyze_keyword_density(
self,
documents: List[Dict[str, Any]]
) -> Dict[str, float]:
"""
Analyze keyword density to understand governance drivers.
"Grant" > "Taxpayer" = Outside funding driven
"Emergency" frequent = Administrative convenience
"""
keyword_categories = {
"funding_source": ["grant", "federal", "state funding", "matching", "taxpayer", "local"],
"urgency": ["emergency", "immediate", "urgent", "asap", "critical"],
"process": ["work session", "public hearing", "committee", "postpone", "table"],
"values": ["equity", "efficiency", "safety", "growth", "innovation"]
}
total_words = 0
keyword_counts = defaultdict(int)
for doc in documents:
content = doc.get("content", "").lower()
words = content.split()
total_words += len(words)
for category, keywords in keyword_categories.items():
for keyword in keywords:
count = content.count(keyword.lower())
keyword_counts[f"{category}:{keyword}"] += count
# Calculate density (per 1000 words)
densities = {}
for keyword, count in keyword_counts.items():
densities[keyword] = (count / total_words * 1000) if total_words > 0 else 0
return dict(sorted(densities.items(), key=lambda x: x[1], reverse=True))
def infer_governance_logic(
self,
keyword_densities: Dict[str, float],
contention_scores: List[ContentionMetrics],
election_patterns: List[ElectionCyclePattern]
) -> Dict[str, Any]:
"""
Synthesize all temporal analyses into governance logic inference.
Returns narrative about how this jurisdiction operates.
"""
# Analyze funding drivers
grant_density = keyword_densities.get("funding_source:grant", 0)
taxpayer_density = keyword_densities.get("funding_source:taxpayer", 0)
if grant_density > taxpayer_density * 2:
funding_driver = "Outside funding (grants/state) drives decisions more than local tax base"
else:
funding_driver = "Local taxpayer concerns are prominent in decision-making"
# Analyze urgency patterns
emergency_density = keyword_densities.get("urgency:emergency", 0)
if emergency_density > 2.0: # More than 2 per 1000 words
urgency_pattern = "Frequent 'emergency' framing - may indicate reactive governance"
else:
urgency_pattern = "Standard deliberative process"
# Analyze contention
avg_contention = sum(c.contention_score for c in contention_scores) / len(contention_scores) if contention_scores else 0
if avg_contention > 0.2:
decision_style = "Frequently contentious - diverse stakeholder interests or partisan divide"
else:
decision_style = "Consensus-oriented - low conflict or dominant coalition"
# Election influence
has_election_spike = any(p.pre_election_spike for p in election_patterns)
if has_election_spike:
election_influence = "Visible increase in high-profile projects before elections"
else:
election_influence = "No clear election cycle pattern"
return {
"primary_driver": funding_driver,
"urgency_pattern": urgency_pattern,
"decision_style": decision_style,
"election_influence": election_influence,
"keyword_densities": keyword_densities,
"avg_contention_score": avg_contention,
"summary": f"This jurisdiction appears to be driven by {funding_driver.lower()}, with {decision_style.lower()}. {election_influence}."
}
|