contractpulse / scheduler_api.py
GitHub Actions
sync: bug fixes-8 (127d34b99d54db6691aa8dcebf7db87ffdc0073c)
ec1ec6e
"""
ContractPulse Scheduler β€” Flask REST API
=========================================
Wraps the TaskScheduler + MeetingRoomScheduler into a clean HTTP API.
Endpoints:
POST /api/process_breach β€” process a single breach
POST /api/process_batch β€” process multiple breaches
GET /api/tasks β€” list all tasks (optional ?severity=CRITICAL)
GET /api/meetings β€” list all booked meetings
GET /api/departments β€” department workload summary
POST /api/reset β€” reset the scheduler state
GET /api/health β€” health check
"""
from __future__ import annotations
import os
import smtplib
from email.message import EmailMessage
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
from flask import Blueprint, jsonify, request
scheduler_bp = Blueprint("scheduler", __name__)
# ─────────────────────────────────────────────────────────────
# SCHEDULER CORE
# ─────────────────────────────────────────────────────────────
class Department(Enum):
FINANCE = "Finance"
LEGAL = "Legal"
TECH = "Tech"
OPERATIONS = "Operations"
COMPLIANCE = "Compliance"
EXECUTIVE = "Executive"
class Severity(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
class ObligationType(Enum):
REVENUE = "revenue"
DEBT_TO_EQUITY = "debt_to_equity_ratio"
CURRENT_RATIO = "current_ratio"
REPORT_SUBMISSION = "report_submission"
WORKFORCE_SIZE = "workforce_size"
CAPEX_LIMIT = "capex_limit"
LIQUIDITY_RATIO = "liquidity_ratio"
INSURANCE_COVERAGE = "insurance_coverage"
UNKNOWN = "unknown"
OBLIGATION_OWNERS: dict[ObligationType, list[Department]] = {
ObligationType.REVENUE: [Department.FINANCE, Department.EXECUTIVE],
ObligationType.DEBT_TO_EQUITY: [Department.FINANCE, Department.LEGAL],
ObligationType.CURRENT_RATIO: [Department.FINANCE],
ObligationType.REPORT_SUBMISSION: [Department.COMPLIANCE, Department.LEGAL],
ObligationType.WORKFORCE_SIZE: [Department.OPERATIONS, Department.LEGAL],
ObligationType.CAPEX_LIMIT: [Department.FINANCE, Department.OPERATIONS],
ObligationType.LIQUIDITY_RATIO: [Department.FINANCE, Department.EXECUTIVE],
ObligationType.INSURANCE_COVERAGE: [Department.COMPLIANCE, Department.LEGAL],
ObligationType.UNKNOWN: [Department.LEGAL],
}
SEVERITY_ESCALATION: dict[Severity, list[Department]] = {
Severity.LOW: [],
Severity.MEDIUM: [],
Severity.HIGH: [Department.LEGAL],
Severity.CRITICAL: [Department.LEGAL, Department.EXECUTIVE],
}
SEVERITY_DUE_HOURS: dict[Severity, int] = {
Severity.LOW: 72,
Severity.MEDIUM: 48,
Severity.HIGH: 24,
Severity.CRITICAL: 4,
}
@dataclass
class Room:
room_id: str
name: str
capacity: int
has_av: bool = True
@dataclass(order=True)
class MeetingSlot:
start: datetime
end: datetime
meeting_id: str = field(compare=False)
room_id: str = field(compare=False)
title: str = field(compare=False)
attendees: int = field(compare=False, default=4)
def overlaps(self, start: datetime, end: datetime) -> bool:
return self.start < end and self.end > start
def to_dict(self) -> dict:
return {
"meeting_id": self.meeting_id,
"title": self.title,
"room": self.room_id,
"start": self.start.isoformat(),
"end": self.end.isoformat(),
"attendees": self.attendees,
}
@dataclass
class Meeting:
title: str
start: datetime
end: datetime
attendees: int
needs_av: bool = False
meeting_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
class MeetingRoomScheduler:
def __init__(self, rooms: list[Room]) -> None:
self.rooms = sorted(rooms, key=lambda r: r.capacity)
self._bookings: dict[str, list[MeetingSlot]] = {r.room_id: [] for r in rooms}
def reset(self) -> None:
self._bookings = {r.room_id: [] for r in self.rooms}
def schedule(self, meeting: Meeting) -> Optional[MeetingSlot]:
if meeting.end <= meeting.start:
return None
for room in self._eligible_rooms(meeting):
if self._is_free(room, meeting.start, meeting.end):
slot = MeetingSlot(
start=meeting.start, end=meeting.end,
meeting_id=meeting.meeting_id, room_id=room.room_id,
title=meeting.title, attendees=meeting.attendees,
)
self._bookings[room.room_id].append(slot)
self._bookings[room.room_id].sort()
return slot
return None
def free_slots(self, room_id: str, window_start: datetime, window_end: datetime, min_duration_minutes: int = 30) -> list[tuple]:
slots = self._bookings.get(room_id, [])
free, cursor = [], window_start
for slot in slots:
if slot.start > cursor:
if (slot.start - cursor).total_seconds() / 60 >= min_duration_minutes:
free.append((cursor, slot.start))
cursor = max(cursor, slot.end)
if cursor < window_end and (window_end - cursor).total_seconds() / 60 >= min_duration_minutes:
free.append((cursor, window_end))
return free
def all_bookings(self) -> list[MeetingSlot]:
result = []
for slots in self._bookings.values():
result.extend(slots)
return sorted(result)
def _eligible_rooms(self, meeting: Meeting) -> list[Room]:
return [r for r in self.rooms if r.capacity >= meeting.attendees and (not meeting.needs_av or r.has_av)]
def _is_free(self, room: Room, start: datetime, end: datetime) -> bool:
return not any(s.overlaps(start, end) for s in self._bookings[room.room_id])
@dataclass
class BreachedObligation:
contract_id: str
obligation_type: ObligationType
metric_name: str
threshold_value: float
current_value: float
predicted_value: Optional[float]
deadline: str
consequence: str
conflict_with: Optional[str] = None
@property
def breach_gap(self) -> float:
return self.current_value - self.threshold_value
def auto_severity(self) -> Severity:
critical_kw = {"termination", "default", "liquidation", "penalty"}
high_kw = {"notice", "cure period", "acceleration"}
cl = self.consequence.lower()
if any(k in cl for k in critical_kw) or self.conflict_with:
return Severity.CRITICAL
if any(k in cl for k in high_kw):
return Severity.HIGH
gap_pct = abs(self.breach_gap) / (abs(self.threshold_value) + 1e-9)
if gap_pct > 0.30:
return Severity.HIGH
if gap_pct > 0.10:
return Severity.MEDIUM
return Severity.LOW
@dataclass
class Task:
task_id: str
title: str
description: str
assigned_to: list[Department]
severity: Severity
due_by: datetime
contract_id: str
obligation_type: ObligationType
created_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> dict:
return {
"task_id": self.task_id,
"title": self.title,
"description": self.description,
"assigned_to": [d.value for d in self.assigned_to],
"severity": self.severity.name,
"due_by": self.due_by.isoformat(),
"contract_id": self.contract_id,
"obligation_type": self.obligation_type.value,
"created_at": self.created_at.isoformat(),
}
class TaskScheduler:
def __init__(self, room_scheduler: Optional[MeetingRoomScheduler] = None) -> None:
self._tasks: list[Task] = []
self._room_scheduler = room_scheduler
self._task_counter = 0
self._auto_meetings: list[MeetingSlot] = []
def reset(self) -> None:
self._tasks = []
self._task_counter = 0
self._auto_meetings = []
if self._room_scheduler:
self._room_scheduler.reset()
def process_breach(self, breach: BreachedObligation) -> tuple[Task, Optional[MeetingSlot]]:
severity = breach.auto_severity()
departments = self._resolve_departments(breach.obligation_type, severity)
due_by = datetime.now() + timedelta(hours=SEVERITY_DUE_HOURS[severity])
self._task_counter += 1
task = Task(
task_id=f"TASK-{self._task_counter:04d}",
title=f"[{severity.name}] Breach: {breach.metric_name} in {breach.contract_id}",
description=self._build_description(breach, severity),
assigned_to=departments,
severity=severity,
due_by=due_by,
contract_id=breach.contract_id,
obligation_type=breach.obligation_type,
)
self._tasks.append(task)
meeting_slot = None
if breach.conflict_with and self._room_scheduler:
meeting_slot = self._book_conflict_meeting(task, breach)
self._alert_team_via_email(task)
return task, meeting_slot
def _alert_team_via_email(self, task: Task) -> None:
if task.severity not in [Severity.CRITICAL, Severity.HIGH]:
return
email_body = f"""
[URGENT] Covenant Breach Alert: {task.title}
A {task.severity.name} severity breach has been processed by the ContractPulse Response Engine.
Task ID: {task.task_id}
Assigned To: {[d.value for d in task.assigned_to]}
Due By: {task.due_by.strftime('%Y-%m-%d %H:%M:%S')}
Details:
{task.description}
Please take immediate action.
"""
# Try to send a real email if credentials are set
sender_email = os.getenv("SMTP_EMAIL")
sender_password = os.getenv("SMTP_PASSWORD")
receiver_email = os.getenv("ALERT_RECEIVER_EMAIL", sender_email) # Send to self if no specific receiver
if sender_email and sender_password and receiver_email:
try:
msg = EmailMessage()
msg.set_content(email_body)
msg['Subject'] = f"[URGENT] Covenant Breach Alert: {task.title}"
msg['From'] = sender_email
msg['To'] = receiver_email
# Assuming Gmail SMTP for this example
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)
print(f"πŸ“§ REAL EMAIL SUCCESSFULLY SENT TO {receiver_email}")
return
except Exception as e:
print(f"Failed to send real email: {e}. Falling back to terminal log.")
# Fallback to terminal if no credentials or sending fails
print("\n" + "="*60)
print("πŸ“§ AUTOMATED EMAIL ALERT DISPATCHED (Terminal Mock)")
print("="*60)
print(email_body)
print("="*60 + "\n")
def process_batch(self, breaches: list[BreachedObligation]) -> tuple[list[Task], list[MeetingSlot]]:
results = [self.process_breach(b) for b in breaches]
tasks = [r[0] for r in results]
meetings = [r[1] for r in results if r[1]]
return sorted(tasks, key=lambda t: t.severity.value, reverse=True), meetings
def all_tasks(self) -> list[Task]:
return sorted(self._tasks, key=lambda t: t.severity.value, reverse=True)
def department_summary(self) -> dict:
summary = {}
for task in self._tasks:
for dept in task.assigned_to:
key = dept.value
if key not in summary:
summary[key] = {s.name: 0 for s in Severity}
summary[key][task.severity.name] += 1
return summary
def _resolve_departments(self, ob_type: ObligationType, severity: Severity) -> list[Department]:
primary = OBLIGATION_OWNERS.get(ob_type, [Department.LEGAL])
escalated = SEVERITY_ESCALATION.get(severity, [])
return list(dict.fromkeys(primary + escalated))
def _build_description(self, breach: BreachedObligation, severity: Severity) -> str:
lines = [
f"Contract : {breach.contract_id}",
f"Metric : {breach.metric_name}",
f"Threshold : {breach.threshold_value}",
f"Current : {breach.current_value} (gap={breach.breach_gap:+.2f})",
]
if breach.predicted_value is not None:
lines.append(f"Predicted : {breach.predicted_value} (next period)")
lines += [f"Deadline : {breach.deadline}", f"Consequence: {breach.consequence}"]
if breach.conflict_with:
lines.append(f"⚠ Conflicts with: {breach.conflict_with}")
return "\n ".join(lines)
def _book_conflict_meeting(self, task: Task, breach: BreachedObligation) -> Optional[MeetingSlot]:
now = datetime.now().replace(second=0, microsecond=0)
window_end = now + timedelta(hours=8)
for room in self._room_scheduler.rooms:
free = self._room_scheduler.free_slots(room.room_id, now, window_end, min_duration_minutes=60)
if free:
start, _ = free[0]
meeting = Meeting(
title=f"Conflict Review: {breach.contract_id} vs {breach.conflict_with}",
start=start, end=start + timedelta(minutes=60),
attendees=max(3, len(task.assigned_to) * 2),
needs_av=True,
)
slot = self._room_scheduler.schedule(meeting)
if slot:
self._auto_meetings.append(slot)
return slot
return None
# ─────────────────────────────────────────────────────────────
# GLOBAL SCHEDULER INSTANCE
# ─────────────────────────────────────────────────────────────
def _build_scheduler() -> TaskScheduler:
rooms = [
Room("R1", "Boardroom", capacity=20, has_av=True),
Room("R2", "Conference A", capacity=10, has_av=True),
Room("R3", "Conference B", capacity=8, has_av=False),
Room("R4", "Huddle Room 1", capacity=4, has_av=False),
Room("R5", "Huddle Room 2", capacity=4, has_av=True),
]
return TaskScheduler(room_scheduler=MeetingRoomScheduler(rooms))
# Use a mutable container so blueprint routes can rebind it reliably
_state = {"scheduler": _build_scheduler()}
def get_scheduler() -> TaskScheduler:
return _state["scheduler"]
# ─────────────────────────────────────────────────────────────
# HELPERS
# ─────────────────────────────────────────────────────────────
def _parse_obligation_type(raw: str) -> ObligationType:
for ot in ObligationType:
if ot.value == raw:
return ot
return ObligationType.UNKNOWN
def _breach_from_dict(d: dict) -> BreachedObligation:
return BreachedObligation(
contract_id=d["contract_id"],
obligation_type=_parse_obligation_type(d.get("obligation_type", "unknown")),
metric_name=d.get("metric_name", d.get("obligation_type", "unknown")),
threshold_value=float(d["threshold_value"]),
current_value=float(d["current_value"]),
predicted_value=float(d["predicted_value"]) if d.get("predicted_value") is not None else None,
deadline=d.get("deadline", "unknown"),
consequence=d.get("consequence", "notice"),
# BUG FIX: treat empty string as None so conflict meetings are not
# accidentally triggered when the frontend sends conflict_with: ""
conflict_with=d.get("conflict_with") or None,
)
# ─────────────────────────────────────────────────────────────
# ROUTES
# ─────────────────────────────────────────────────────────────
@scheduler_bp.get("/api/health")
def health():
return jsonify({"status": "ok", "timestamp": datetime.now().isoformat()})
@scheduler_bp.post("/api/process_breach")
def process_breach():
"""
Body (JSON):
{
"contract_id": "CTR-001",
"obligation_type": "revenue",
"metric_name": "revenue",
"threshold_value": 5000000,
"current_value": 3800000,
"predicted_value": 3500000, // optional
"deadline": "annually",
"consequence": "termination",
"conflict_with": null // optional contract_id
}
"""
body = request.get_json(force=True)
try:
breach = _breach_from_dict(body)
except (KeyError, ValueError) as e:
return jsonify({"error": f"Invalid payload: {e}"}), 400
task, meeting_slot = get_scheduler().process_breach(breach)
return jsonify({
"task": task.to_dict(),
"meeting": meeting_slot.to_dict() if meeting_slot else None,
}), 201
@scheduler_bp.post("/api/process_batch")
def process_batch():
"""
Body: { "breaches": [ <breach>, ... ] }
Returns tasks sorted by severity desc + any auto-booked meetings.
"""
body = request.get_json(force=True)
raw_breaches = body.get("breaches", [])
if not raw_breaches:
return jsonify({"error": "breaches list is empty"}), 400
try:
breaches = [_breach_from_dict(b) for b in raw_breaches]
except (KeyError, ValueError) as e:
return jsonify({"error": f"Invalid breach in batch: {e}"}), 400
tasks, meetings = get_scheduler().process_batch(breaches)
return jsonify({
"tasks": [t.to_dict() for t in tasks],
"meetings": [m.to_dict() for m in meetings],
"count": len(tasks),
}), 201
@scheduler_bp.get("/api/tasks")
def get_tasks():
"""
Query params:
?severity=CRITICAL (optional filter)
?department=Finance (optional filter)
"""
sev_filter = request.args.get("severity", "").upper()
dept_filter = request.args.get("department", "")
tasks = get_scheduler().all_tasks()
if sev_filter and sev_filter in Severity.__members__:
target = Severity[sev_filter]
tasks = [t for t in tasks if t.severity == target]
if dept_filter:
dept_vals = [d.value for d in Department]
if dept_filter in dept_vals:
target_dept = Department(dept_filter)
tasks = [t for t in tasks if target_dept in t.assigned_to]
return jsonify({"tasks": [t.to_dict() for t in tasks], "count": len(tasks)})
# BUG FIX: There were TWO @scheduler_bp.get("/api/meetings") routes in the
# original file. Flask registers only the FIRST one it sees, which called a
# non-existent method `_task_scheduler_meetings()` and crashed on every
# request. The second (correct) route was silently ignored.
# Fixed: one route that correctly reads _auto_meetings from the scheduler.
@scheduler_bp.get("/api/meetings")
def get_meetings():
"""Return all auto-booked conflict-resolution meeting slots."""
auto = get_scheduler()._auto_meetings or []
return jsonify({"meetings": [m.to_dict() for m in auto], "count": len(auto)})
@scheduler_bp.get("/api/departments")
def get_departments():
return jsonify({"summary": get_scheduler().department_summary()})
@scheduler_bp.post("/api/reset")
def reset():
# BUG FIX: rebinding a module-level `scheduler` variable inside a
# blueprint function is unreliable β€” the local name rebinds but callers
# that already imported the old object keep the stale reference.
# Fixed: mutate the shared _state dict so all routes see the new instance.
_state["scheduler"] = _build_scheduler()
return jsonify({"status": "reset", "timestamp": datetime.now().isoformat()})
# Add this at the very bottom of scheduler_api.py
scheduler = _state["scheduler"]