Spaces:
Sleeping
Sleeping
File size: 14,583 Bytes
f871fed | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 | """
Auto-Update Agent Service
Monitors web sources for changes and generates notifications with LLM summaries.
"""
import hashlib
import asyncio
from datetime import datetime
from typing import Optional, List, Tuple
from difflib import unified_diff
import httpx
from loguru import logger
from ai_prompter import Prompter
from langchain_core.runnables import RunnableConfig
from open_notebook.domain.auto_update import (
SourceMonitor,
UpdateNotification,
MonitorJobRun,
MonitoringStats,
)
from open_notebook.domain.notebook import Source
from open_notebook.graphs.utils import provision_langchain_model
class AutoUpdateService:
"""Service for monitoring sources and detecting updates."""
def __init__(self):
self.http_client = httpx.AsyncClient(
timeout=30.0,
follow_redirects=True,
headers={
"User-Agent": "OpenNotebook/1.0 (Source Monitor)"
}
)
async def close(self):
"""Close HTTP client."""
await self.http_client.aclose()
def _compute_hash(self, content: str) -> str:
"""Compute hash of content for change detection."""
return hashlib.sha256(content.encode()).hexdigest()
async def _fetch_url_content(self, url: str) -> Optional[str]:
"""Fetch content from URL."""
try:
response = await self.http_client.get(url)
response.raise_for_status()
return response.text
except Exception as e:
logger.error(f"Failed to fetch {url}: {e}")
return None
def _generate_diff(self, old_content: str, new_content: str) -> List[str]:
"""Generate diff highlights between old and new content."""
old_lines = old_content.splitlines()
new_lines = new_content.splitlines()
diff = list(unified_diff(old_lines, new_lines, lineterm='', n=3))
# Extract just the changed lines (limited)
highlights = []
for line in diff:
if line.startswith('+') and not line.startswith('+++'):
highlights.append(f"Added: {line[1:].strip()[:200]}")
elif line.startswith('-') and not line.startswith('---'):
highlights.append(f"Removed: {line[1:].strip()[:200]}")
if len(highlights) >= 10: # Limit highlights
break
return highlights
async def _generate_change_summary(
self,
source_title: str,
old_content: str,
new_content: str,
diff_highlights: List[str]
) -> Tuple[str, str]:
"""Generate LLM summary of changes and determine severity."""
# Truncate content for LLM
old_preview = old_content[:2000] if old_content else ""
new_preview = new_content[:2000] if new_content else ""
prompt_text = f"""Analyze the changes detected in this web source.
Source Title: {source_title}
Key Changes Detected:
{chr(10).join(diff_highlights[:5])}
Old Content Preview:
{old_preview}
New Content Preview:
{new_preview}
Please provide:
1. A brief summary (2-3 sentences) of what changed
2. The severity level: "info" (minor updates), "warning" (significant changes), or "critical" (major content changes or breaking changes)
Format your response as:
SUMMARY: <your summary>
SEVERITY: <info|warning|critical>"""
try:
# Use the same pattern as other graphs
model = provision_langchain_model()
response = await model.ainvoke(prompt_text)
response_text = response.content if hasattr(response, 'content') else str(response)
# Parse response
summary = "Content has been updated."
severity = "info"
if response_text:
lines = response_text.strip().split("\n")
for line in lines:
if line.startswith("SUMMARY:"):
summary = line.replace("SUMMARY:", "").strip()
elif line.startswith("SEVERITY:"):
sev = line.replace("SEVERITY:", "").strip().lower()
if sev in ["info", "warning", "critical"]:
severity = sev
return summary, severity
except Exception as e:
logger.error(f"Failed to generate change summary: {e}")
return "Content has been updated.", "info"
async def check_source(
self,
monitor: SourceMonitor
) -> Optional[UpdateNotification]:
"""Check a single source for updates."""
try:
# Get the source
source = Source.get(monitor.source_id)
if not source:
logger.warning(f"Source {monitor.source_id} not found")
return None
# Get URL from source asset
url = getattr(source.asset, 'url', None) if source.asset else None
if not url:
logger.debug(f"Source {monitor.source_id} has no URL to monitor")
return None
# Fetch current content
new_content = await self._fetch_url_content(url)
if not new_content:
# Update failure count
monitor.consecutive_failures += 1
monitor.updated_at = datetime.now()
await monitor.save()
return None
# Reset failure count on success
monitor.consecutive_failures = 0
monitor.last_checked_at = datetime.now()
# Compute new hash
new_hash = self._compute_hash(new_content)
# Check if content changed
if monitor.last_content_hash and monitor.last_content_hash != new_hash:
logger.info(f"Content change detected for source {source.title}")
# Get old content from source
old_content = source.full_text or ""
# Generate diff highlights
diff_highlights = self._generate_diff(old_content, new_content)
# Generate summary
summary, severity = await self._generate_change_summary(
source.title or "Untitled",
old_content,
new_content,
diff_highlights
)
# Create notification
notification = UpdateNotification(
source_id=monitor.source_id,
source_title=source.title or "Untitled",
change_summary=summary,
diff_highlights=diff_highlights,
old_content_preview=old_content[:500] if old_content else None,
new_content_preview=new_content[:500] if new_content else None,
severity=severity,
)
await notification.save()
# Update monitor hash
monitor.last_content_hash = new_hash
monitor.updated_at = datetime.now()
await monitor.save()
return notification
# No change, just update hash and timestamp
monitor.last_content_hash = new_hash
monitor.updated_at = datetime.now()
await monitor.save()
return None
except Exception as e:
logger.error(f"Error checking source {monitor.source_id}: {e}")
monitor.consecutive_failures += 1
monitor.updated_at = datetime.now()
await monitor.save()
return None
async def run_check_job(
self,
frequency: Optional[str] = None
) -> MonitorJobRun:
"""Run a monitoring job for all due sources."""
# Check if job already running
running = await MonitorJobRun.get_running()
if running:
logger.warning("Monitor job already running")
return running
# Create job record
job = MonitorJobRun(status="running")
await job.save()
try:
# Get monitors due for check
if frequency:
monitors = await SourceMonitor.get_due_for_check(frequency)
else:
# Check all frequencies
monitors = []
for freq in ["hourly", "daily", "weekly"]:
monitors.extend(await SourceMonitor.get_due_for_check(freq))
logger.info(f"Checking {len(monitors)} sources for updates")
updates_found = 0
errors = []
for monitor in monitors:
try:
notification = await self.check_source(monitor)
if notification:
updates_found += 1
except Exception as e:
errors.append(f"Source {monitor.source_id}: {str(e)}")
# Update job record
job.status = "completed"
job.completed_at = datetime.now()
job.sources_checked = len(monitors)
job.updates_found = updates_found
job.errors = errors
await job.save()
logger.info(
f"Monitor job completed: {len(monitors)} checked, "
f"{updates_found} updates, {len(errors)} errors"
)
return job
except Exception as e:
logger.error(f"Monitor job failed: {e}")
job.status = "failed"
job.completed_at = datetime.now()
job.errors = [str(e)]
await job.save()
return job
# Source monitor management
async def create_monitor(
self,
source_id: str,
check_frequency: str = "daily",
enabled: bool = True
) -> SourceMonitor:
"""Create a monitor for a source."""
# Check if already exists
existing = await SourceMonitor.get_by_source(source_id)
if existing:
# Update existing
existing.check_frequency = check_frequency
existing.enabled = enabled
existing.updated_at = datetime.now()
await existing.save()
return existing
# Create new
monitor = SourceMonitor(
source_id=source_id,
check_frequency=check_frequency,
enabled=enabled,
)
await monitor.save()
return monitor
async def get_monitor(self, source_id: str) -> Optional[SourceMonitor]:
"""Get monitor for a source."""
return await SourceMonitor.get_by_source(source_id)
async def update_monitor(
self,
source_id: str,
check_frequency: Optional[str] = None,
enabled: Optional[bool] = None
) -> Optional[SourceMonitor]:
"""Update a source monitor."""
monitor = await SourceMonitor.get_by_source(source_id)
if not monitor:
return None
if check_frequency is not None:
monitor.check_frequency = check_frequency
if enabled is not None:
monitor.enabled = enabled
monitor.updated_at = datetime.now()
await monitor.save()
return monitor
async def delete_monitor(self, source_id: str) -> bool:
"""Delete a source monitor."""
monitor = await SourceMonitor.get_by_source(source_id)
if monitor:
monitor.delete()
return True
return False
async def get_all_monitors(self) -> List[SourceMonitor]:
"""Get all monitors."""
return await SourceMonitor.get_enabled_monitors()
# Notifications
async def get_notifications(
self,
include_dismissed: bool = False,
limit: int = 100
) -> List[UpdateNotification]:
"""Get notifications."""
return await UpdateNotification.get_all(include_dismissed, limit)
async def get_unread_notifications(self, limit: int = 50) -> List[UpdateNotification]:
"""Get unread notifications."""
return await UpdateNotification.get_unread(limit)
async def mark_notification_read(self, notification_id: str) -> bool:
"""Mark notification as read."""
notification = UpdateNotification.get(notification_id)
if notification:
notification.is_read = True
await notification.save()
return True
return False
async def dismiss_notification(self, notification_id: str) -> bool:
"""Dismiss a notification."""
notification = UpdateNotification.get(notification_id)
if notification:
notification.is_dismissed = True
await notification.save()
return True
return False
async def mark_all_read(self) -> int:
"""Mark all notifications as read."""
return await UpdateNotification.mark_all_read()
async def get_unread_count(self) -> int:
"""Get count of unread notifications."""
return await UpdateNotification.get_unread_count()
async def get_stats(self) -> MonitoringStats:
"""Get monitoring statistics."""
from open_notebook.database.repository import repo
# Get total monitors
total_result = await repo.query("SELECT count() FROM source_monitor GROUP ALL")
total = total_result[0].get("count", 0) if total_result else 0
# Get enabled monitors
enabled_result = await repo.query(
"SELECT count() FROM source_monitor WHERE enabled = true GROUP ALL"
)
enabled = enabled_result[0].get("count", 0) if enabled_result else 0
# Get unread count
unread = await self.get_unread_count()
# Get last job
jobs = await MonitorJobRun.get_latest(1)
last_job = jobs[0] if jobs else None
return MonitoringStats(
total_monitors=total,
enabled_monitors=enabled,
unread_notifications=unread,
last_job_run=last_job.started_at if last_job else None,
last_job_status=last_job.status if last_job else None,
)
# Create singleton instance
auto_update_service = AutoUpdateService()
|