baveshraam's picture
FIX: SurrealDB 2.0 migration syntax and Frontend/CORS link
f871fed
"""
Auto-Update Agent Domain Models
Handles source monitoring, content change detection, and notifications.
"""
from datetime import datetime
from typing import Optional, List, Literal
from pydantic import BaseModel, Field
from open_notebook.domain.base import ObjectModel
class SourceMonitor(ObjectModel):
"""Configuration for monitoring a source for updates."""
table_name = "source_monitor"
source_id: str = Field(..., description="ID of the source to monitor")
enabled: bool = Field(default=True, description="Whether monitoring is enabled")
check_frequency: Literal["hourly", "daily", "weekly"] = Field(
default="daily",
description="How often to check for updates"
)
last_checked_at: Optional[datetime] = Field(
default=None,
description="When the source was last checked"
)
last_content_hash: Optional[str] = Field(
default=None,
description="Hash of content at last check"
)
consecutive_failures: int = Field(
default=0,
description="Number of consecutive check failures"
)
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
@classmethod
async def get_by_source(cls, source_id: str) -> Optional["SourceMonitor"]:
"""Get monitor config for a source."""
from open_notebook.database.repository import repo
result = await repo.query(
f"SELECT * FROM {cls.table_name} WHERE source_id = $source_id LIMIT 1",
{"source_id": source_id}
)
if result and len(result) > 0:
return cls(**result[0])
return None
@classmethod
async def get_enabled_monitors(cls) -> List["SourceMonitor"]:
"""Get all enabled monitors."""
from open_notebook.database.repository import repo
result = await repo.query(
f"SELECT * FROM {cls.table_name} WHERE enabled = true"
)
return [cls(**r) for r in result] if result else []
@classmethod
async def get_due_for_check(cls, frequency: str) -> List["SourceMonitor"]:
"""Get monitors due for checking based on frequency."""
from open_notebook.database.repository import repo
# Calculate cutoff time based on frequency
frequency_hours = {"hourly": 1, "daily": 24, "weekly": 168}
hours = frequency_hours.get(frequency, 24)
result = await repo.query(
f"""SELECT * FROM {cls.table_name}
WHERE enabled = true
AND check_frequency = $frequency
AND (last_checked_at IS NONE OR last_checked_at < time::now() - {hours}h)
ORDER BY last_checked_at ASC"""
,
{"frequency": frequency}
)
return [cls(**r) for r in result] if result else []
class UpdateNotification(ObjectModel):
"""Notification about a source content update."""
table_name = "update_notification"
source_id: str = Field(..., description="ID of the updated source")
source_title: str = Field(..., description="Title of the source")
change_summary: str = Field(..., description="LLM-generated summary of changes")
diff_highlights: List[str] = Field(
default_factory=list,
description="Key differences found"
)
old_content_preview: Optional[str] = Field(
default=None,
description="Preview of old content"
)
new_content_preview: Optional[str] = Field(
default=None,
description="Preview of new content"
)
severity: Literal["info", "warning", "critical"] = Field(
default="info",
description="Severity level of the update"
)
is_read: bool = Field(default=False, description="Whether notification was read")
is_dismissed: bool = Field(default=False, description="Whether notification was dismissed")
created_at: datetime = Field(default_factory=datetime.now)
@classmethod
async def get_unread(cls, limit: int = 50) -> List["UpdateNotification"]:
"""Get unread notifications."""
from open_notebook.database.repository import repo
result = await repo.query(
f"""SELECT * FROM {cls.table_name}
WHERE is_read = false AND is_dismissed = false
ORDER BY created_at DESC
LIMIT $limit""",
{"limit": limit}
)
return [cls(**r) for r in result] if result else []
@classmethod
async def get_all(cls, include_dismissed: bool = False, limit: int = 100) -> List["UpdateNotification"]:
"""Get all notifications."""
from open_notebook.database.repository import repo
where_clause = "" if include_dismissed else "WHERE is_dismissed = false"
result = await repo.query(
f"""SELECT * FROM {cls.table_name}
{where_clause}
ORDER BY created_at DESC
LIMIT $limit""",
{"limit": limit}
)
return [cls(**r) for r in result] if result else []
@classmethod
async def get_for_source(cls, source_id: str, limit: int = 20) -> List["UpdateNotification"]:
"""Get notifications for a specific source."""
from open_notebook.database.repository import repo
result = await repo.query(
f"""SELECT * FROM {cls.table_name}
WHERE source_id = $source_id
ORDER BY created_at DESC
LIMIT $limit""",
{"source_id": source_id, "limit": limit}
)
return [cls(**r) for r in result] if result else []
@classmethod
async def mark_all_read(cls) -> int:
"""Mark all notifications as read."""
from open_notebook.database.repository import repo
result = await repo.query(
f"UPDATE {cls.table_name} SET is_read = true WHERE is_read = false"
)
return len(result) if result else 0
@classmethod
async def get_unread_count(cls) -> int:
"""Get count of unread notifications."""
from open_notebook.database.repository import repo
result = await repo.query(
f"SELECT count() FROM {cls.table_name} WHERE is_read = false AND is_dismissed = false GROUP ALL"
)
if result and len(result) > 0:
return result[0].get("count", 0)
return 0
class MonitorJobRun(ObjectModel):
"""Record of a monitoring job execution."""
table_name = "monitor_job_run"
started_at: datetime = Field(default_factory=datetime.now)
completed_at: Optional[datetime] = Field(default=None)
status: Literal["running", "completed", "failed"] = Field(default="running")
sources_checked: int = Field(default=0)
updates_found: int = Field(default=0)
errors: List[str] = Field(default_factory=list)
@classmethod
async def get_latest(cls, limit: int = 10) -> List["MonitorJobRun"]:
"""Get latest job runs."""
from open_notebook.database.repository import repo
result = await repo.query(
f"""SELECT * FROM {cls.table_name}
ORDER BY started_at DESC
LIMIT $limit""",
{"limit": limit}
)
return [cls(**r) for r in result] if result else []
@classmethod
async def get_running(cls) -> Optional["MonitorJobRun"]:
"""Get currently running job if any."""
from open_notebook.database.repository import repo
result = await repo.query(
f"SELECT * FROM {cls.table_name} WHERE status = 'running' LIMIT 1"
)
if result and len(result) > 0:
return cls(**result[0])
return None
# Request/Response models for API
class SourceMonitorCreate(BaseModel):
"""Request to create source monitoring."""
source_id: str
check_frequency: Literal["hourly", "daily", "weekly"] = "daily"
enabled: bool = True
class SourceMonitorUpdate(BaseModel):
"""Request to update source monitoring."""
check_frequency: Optional[Literal["hourly", "daily", "weekly"]] = None
enabled: Optional[bool] = None
class NotificationAction(BaseModel):
"""Action to perform on notifications."""
action: Literal["mark_read", "dismiss", "mark_all_read", "dismiss_all"]
notification_ids: Optional[List[str]] = None
class MonitoringStats(BaseModel):
"""Statistics about monitoring."""
total_monitors: int
enabled_monitors: int
unread_notifications: int
last_job_run: Optional[datetime]
last_job_status: Optional[str]