""" Auto-Update Agent Service Monitors web sources for changes and generates notifications with LLM summaries. """ import hashlib import asyncio from datetime import datetime from typing import Optional, List, Tuple from difflib import unified_diff import httpx from loguru import logger from ai_prompter import Prompter from langchain_core.runnables import RunnableConfig from open_notebook.domain.auto_update import ( SourceMonitor, UpdateNotification, MonitorJobRun, MonitoringStats, ) from open_notebook.domain.notebook import Source from open_notebook.graphs.utils import provision_langchain_model class AutoUpdateService: """Service for monitoring sources and detecting updates.""" def __init__(self): self.http_client = httpx.AsyncClient( timeout=30.0, follow_redirects=True, headers={ "User-Agent": "OpenNotebook/1.0 (Source Monitor)" } ) async def close(self): """Close HTTP client.""" await self.http_client.aclose() def _compute_hash(self, content: str) -> str: """Compute hash of content for change detection.""" return hashlib.sha256(content.encode()).hexdigest() async def _fetch_url_content(self, url: str) -> Optional[str]: """Fetch content from URL.""" try: response = await self.http_client.get(url) response.raise_for_status() return response.text except Exception as e: logger.error(f"Failed to fetch {url}: {e}") return None def _generate_diff(self, old_content: str, new_content: str) -> List[str]: """Generate diff highlights between old and new content.""" old_lines = old_content.splitlines() new_lines = new_content.splitlines() diff = list(unified_diff(old_lines, new_lines, lineterm='', n=3)) # Extract just the changed lines (limited) highlights = [] for line in diff: if line.startswith('+') and not line.startswith('+++'): highlights.append(f"Added: {line[1:].strip()[:200]}") elif line.startswith('-') and not line.startswith('---'): highlights.append(f"Removed: {line[1:].strip()[:200]}") if len(highlights) >= 10: # Limit highlights break return highlights async def _generate_change_summary( self, source_title: str, old_content: str, new_content: str, diff_highlights: List[str] ) -> Tuple[str, str]: """Generate LLM summary of changes and determine severity.""" # Truncate content for LLM old_preview = old_content[:2000] if old_content else "" new_preview = new_content[:2000] if new_content else "" prompt_text = f"""Analyze the changes detected in this web source. Source Title: {source_title} Key Changes Detected: {chr(10).join(diff_highlights[:5])} Old Content Preview: {old_preview} New Content Preview: {new_preview} Please provide: 1. A brief summary (2-3 sentences) of what changed 2. The severity level: "info" (minor updates), "warning" (significant changes), or "critical" (major content changes or breaking changes) Format your response as: SUMMARY: SEVERITY: """ try: # Use the same pattern as other graphs model = provision_langchain_model() response = await model.ainvoke(prompt_text) response_text = response.content if hasattr(response, 'content') else str(response) # Parse response summary = "Content has been updated." severity = "info" if response_text: lines = response_text.strip().split("\n") for line in lines: if line.startswith("SUMMARY:"): summary = line.replace("SUMMARY:", "").strip() elif line.startswith("SEVERITY:"): sev = line.replace("SEVERITY:", "").strip().lower() if sev in ["info", "warning", "critical"]: severity = sev return summary, severity except Exception as e: logger.error(f"Failed to generate change summary: {e}") return "Content has been updated.", "info" async def check_source( self, monitor: SourceMonitor ) -> Optional[UpdateNotification]: """Check a single source for updates.""" try: # Get the source source = Source.get(monitor.source_id) if not source: logger.warning(f"Source {monitor.source_id} not found") return None # Get URL from source asset url = getattr(source.asset, 'url', None) if source.asset else None if not url: logger.debug(f"Source {monitor.source_id} has no URL to monitor") return None # Fetch current content new_content = await self._fetch_url_content(url) if not new_content: # Update failure count monitor.consecutive_failures += 1 monitor.updated_at = datetime.now() await monitor.save() return None # Reset failure count on success monitor.consecutive_failures = 0 monitor.last_checked_at = datetime.now() # Compute new hash new_hash = self._compute_hash(new_content) # Check if content changed if monitor.last_content_hash and monitor.last_content_hash != new_hash: logger.info(f"Content change detected for source {source.title}") # Get old content from source old_content = source.full_text or "" # Generate diff highlights diff_highlights = self._generate_diff(old_content, new_content) # Generate summary summary, severity = await self._generate_change_summary( source.title or "Untitled", old_content, new_content, diff_highlights ) # Create notification notification = UpdateNotification( source_id=monitor.source_id, source_title=source.title or "Untitled", change_summary=summary, diff_highlights=diff_highlights, old_content_preview=old_content[:500] if old_content else None, new_content_preview=new_content[:500] if new_content else None, severity=severity, ) await notification.save() # Update monitor hash monitor.last_content_hash = new_hash monitor.updated_at = datetime.now() await monitor.save() return notification # No change, just update hash and timestamp monitor.last_content_hash = new_hash monitor.updated_at = datetime.now() await monitor.save() return None except Exception as e: logger.error(f"Error checking source {monitor.source_id}: {e}") monitor.consecutive_failures += 1 monitor.updated_at = datetime.now() await monitor.save() return None async def run_check_job( self, frequency: Optional[str] = None ) -> MonitorJobRun: """Run a monitoring job for all due sources.""" # Check if job already running running = await MonitorJobRun.get_running() if running: logger.warning("Monitor job already running") return running # Create job record job = MonitorJobRun(status="running") await job.save() try: # Get monitors due for check if frequency: monitors = await SourceMonitor.get_due_for_check(frequency) else: # Check all frequencies monitors = [] for freq in ["hourly", "daily", "weekly"]: monitors.extend(await SourceMonitor.get_due_for_check(freq)) logger.info(f"Checking {len(monitors)} sources for updates") updates_found = 0 errors = [] for monitor in monitors: try: notification = await self.check_source(monitor) if notification: updates_found += 1 except Exception as e: errors.append(f"Source {monitor.source_id}: {str(e)}") # Update job record job.status = "completed" job.completed_at = datetime.now() job.sources_checked = len(monitors) job.updates_found = updates_found job.errors = errors await job.save() logger.info( f"Monitor job completed: {len(monitors)} checked, " f"{updates_found} updates, {len(errors)} errors" ) return job except Exception as e: logger.error(f"Monitor job failed: {e}") job.status = "failed" job.completed_at = datetime.now() job.errors = [str(e)] await job.save() return job # Source monitor management async def create_monitor( self, source_id: str, check_frequency: str = "daily", enabled: bool = True ) -> SourceMonitor: """Create a monitor for a source.""" # Check if already exists existing = await SourceMonitor.get_by_source(source_id) if existing: # Update existing existing.check_frequency = check_frequency existing.enabled = enabled existing.updated_at = datetime.now() await existing.save() return existing # Create new monitor = SourceMonitor( source_id=source_id, check_frequency=check_frequency, enabled=enabled, ) await monitor.save() return monitor async def get_monitor(self, source_id: str) -> Optional[SourceMonitor]: """Get monitor for a source.""" return await SourceMonitor.get_by_source(source_id) async def update_monitor( self, source_id: str, check_frequency: Optional[str] = None, enabled: Optional[bool] = None ) -> Optional[SourceMonitor]: """Update a source monitor.""" monitor = await SourceMonitor.get_by_source(source_id) if not monitor: return None if check_frequency is not None: monitor.check_frequency = check_frequency if enabled is not None: monitor.enabled = enabled monitor.updated_at = datetime.now() await monitor.save() return monitor async def delete_monitor(self, source_id: str) -> bool: """Delete a source monitor.""" monitor = await SourceMonitor.get_by_source(source_id) if monitor: monitor.delete() return True return False async def get_all_monitors(self) -> List[SourceMonitor]: """Get all monitors.""" return await SourceMonitor.get_enabled_monitors() # Notifications async def get_notifications( self, include_dismissed: bool = False, limit: int = 100 ) -> List[UpdateNotification]: """Get notifications.""" return await UpdateNotification.get_all(include_dismissed, limit) async def get_unread_notifications(self, limit: int = 50) -> List[UpdateNotification]: """Get unread notifications.""" return await UpdateNotification.get_unread(limit) async def mark_notification_read(self, notification_id: str) -> bool: """Mark notification as read.""" notification = UpdateNotification.get(notification_id) if notification: notification.is_read = True await notification.save() return True return False async def dismiss_notification(self, notification_id: str) -> bool: """Dismiss a notification.""" notification = UpdateNotification.get(notification_id) if notification: notification.is_dismissed = True await notification.save() return True return False async def mark_all_read(self) -> int: """Mark all notifications as read.""" return await UpdateNotification.mark_all_read() async def get_unread_count(self) -> int: """Get count of unread notifications.""" return await UpdateNotification.get_unread_count() async def get_stats(self) -> MonitoringStats: """Get monitoring statistics.""" from open_notebook.database.repository import repo # Get total monitors total_result = await repo.query("SELECT count() FROM source_monitor GROUP ALL") total = total_result[0].get("count", 0) if total_result else 0 # Get enabled monitors enabled_result = await repo.query( "SELECT count() FROM source_monitor WHERE enabled = true GROUP ALL" ) enabled = enabled_result[0].get("count", 0) if enabled_result else 0 # Get unread count unread = await self.get_unread_count() # Get last job jobs = await MonitorJobRun.get_latest(1) last_job = jobs[0] if jobs else None return MonitoringStats( total_monitors=total, enabled_monitors=enabled, unread_notifications=unread, last_job_run=last_job.started_at if last_job else None, last_job_status=last_job.status if last_job else None, ) # Create singleton instance auto_update_service = AutoUpdateService()