File size: 14,583 Bytes
f871fed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
"""
Auto-Update Agent Service

Monitors web sources for changes and generates notifications with LLM summaries.
"""

import hashlib
import asyncio
from datetime import datetime
from typing import Optional, List, Tuple
from difflib import unified_diff

import httpx
from loguru import logger
from ai_prompter import Prompter
from langchain_core.runnables import RunnableConfig

from open_notebook.domain.auto_update import (
    SourceMonitor,
    UpdateNotification,
    MonitorJobRun,
    MonitoringStats,
)
from open_notebook.domain.notebook import Source
from open_notebook.graphs.utils import provision_langchain_model


class AutoUpdateService:
    """Service for monitoring sources and detecting updates."""

    def __init__(self):
        self.http_client = httpx.AsyncClient(
            timeout=30.0,
            follow_redirects=True,
            headers={
                "User-Agent": "OpenNotebook/1.0 (Source Monitor)"
            }
        )

    async def close(self):
        """Close HTTP client."""
        await self.http_client.aclose()

    def _compute_hash(self, content: str) -> str:
        """Compute hash of content for change detection."""
        return hashlib.sha256(content.encode()).hexdigest()

    async def _fetch_url_content(self, url: str) -> Optional[str]:
        """Fetch content from URL."""
        try:
            response = await self.http_client.get(url)
            response.raise_for_status()
            return response.text
        except Exception as e:
            logger.error(f"Failed to fetch {url}: {e}")
            return None

    def _generate_diff(self, old_content: str, new_content: str) -> List[str]:
        """Generate diff highlights between old and new content."""
        old_lines = old_content.splitlines()
        new_lines = new_content.splitlines()
        
        diff = list(unified_diff(old_lines, new_lines, lineterm='', n=3))
        
        # Extract just the changed lines (limited)
        highlights = []
        for line in diff:
            if line.startswith('+') and not line.startswith('+++'):
                highlights.append(f"Added: {line[1:].strip()[:200]}")
            elif line.startswith('-') and not line.startswith('---'):
                highlights.append(f"Removed: {line[1:].strip()[:200]}")
            if len(highlights) >= 10:  # Limit highlights
                break
        
        return highlights

    async def _generate_change_summary(
        self, 
        source_title: str,
        old_content: str, 
        new_content: str,
        diff_highlights: List[str]
    ) -> Tuple[str, str]:
        """Generate LLM summary of changes and determine severity."""
        
        # Truncate content for LLM
        old_preview = old_content[:2000] if old_content else ""
        new_preview = new_content[:2000] if new_content else ""
        
        prompt_text = f"""Analyze the changes detected in this web source.

Source Title: {source_title}

Key Changes Detected:
{chr(10).join(diff_highlights[:5])}

Old Content Preview:
{old_preview}

New Content Preview:
{new_preview}

Please provide:
1. A brief summary (2-3 sentences) of what changed
2. The severity level: "info" (minor updates), "warning" (significant changes), or "critical" (major content changes or breaking changes)

Format your response as:
SUMMARY: <your summary>
SEVERITY: <info|warning|critical>"""

        try:
            # Use the same pattern as other graphs
            model = provision_langchain_model()
            response = await model.ainvoke(prompt_text)
            response_text = response.content if hasattr(response, 'content') else str(response)
            
            # Parse response
            summary = "Content has been updated."
            severity = "info"
            
            if response_text:
                lines = response_text.strip().split("\n")
                for line in lines:
                    if line.startswith("SUMMARY:"):
                        summary = line.replace("SUMMARY:", "").strip()
                    elif line.startswith("SEVERITY:"):
                        sev = line.replace("SEVERITY:", "").strip().lower()
                        if sev in ["info", "warning", "critical"]:
                            severity = sev
            
            return summary, severity
        except Exception as e:
            logger.error(f"Failed to generate change summary: {e}")
            return "Content has been updated.", "info"

    async def check_source(
        self, 
        monitor: SourceMonitor
    ) -> Optional[UpdateNotification]:
        """Check a single source for updates."""
        try:
            # Get the source
            source = Source.get(monitor.source_id)
            if not source:
                logger.warning(f"Source {monitor.source_id} not found")
                return None
            
            # Get URL from source asset
            url = getattr(source.asset, 'url', None) if source.asset else None
            if not url:
                logger.debug(f"Source {monitor.source_id} has no URL to monitor")
                return None
            
            # Fetch current content
            new_content = await self._fetch_url_content(url)
            if not new_content:
                # Update failure count
                monitor.consecutive_failures += 1
                monitor.updated_at = datetime.now()
                await monitor.save()
                return None
            
            # Reset failure count on success
            monitor.consecutive_failures = 0
            monitor.last_checked_at = datetime.now()
            
            # Compute new hash
            new_hash = self._compute_hash(new_content)
            
            # Check if content changed
            if monitor.last_content_hash and monitor.last_content_hash != new_hash:
                logger.info(f"Content change detected for source {source.title}")
                
                # Get old content from source
                old_content = source.full_text or ""
                
                # Generate diff highlights
                diff_highlights = self._generate_diff(old_content, new_content)
                
                # Generate summary
                summary, severity = await self._generate_change_summary(
                    source.title or "Untitled",
                    old_content,
                    new_content,
                    diff_highlights
                )
                
                # Create notification
                notification = UpdateNotification(
                    source_id=monitor.source_id,
                    source_title=source.title or "Untitled",
                    change_summary=summary,
                    diff_highlights=diff_highlights,
                    old_content_preview=old_content[:500] if old_content else None,
                    new_content_preview=new_content[:500] if new_content else None,
                    severity=severity,
                )
                await notification.save()
                
                # Update monitor hash
                monitor.last_content_hash = new_hash
                monitor.updated_at = datetime.now()
                await monitor.save()
                
                return notification
            
            # No change, just update hash and timestamp
            monitor.last_content_hash = new_hash
            monitor.updated_at = datetime.now()
            await monitor.save()
            
            return None
            
        except Exception as e:
            logger.error(f"Error checking source {monitor.source_id}: {e}")
            monitor.consecutive_failures += 1
            monitor.updated_at = datetime.now()
            await monitor.save()
            return None

    async def run_check_job(
        self, 
        frequency: Optional[str] = None
    ) -> MonitorJobRun:
        """Run a monitoring job for all due sources."""
        
        # Check if job already running
        running = await MonitorJobRun.get_running()
        if running:
            logger.warning("Monitor job already running")
            return running
        
        # Create job record
        job = MonitorJobRun(status="running")
        await job.save()
        
        try:
            # Get monitors due for check
            if frequency:
                monitors = await SourceMonitor.get_due_for_check(frequency)
            else:
                # Check all frequencies
                monitors = []
                for freq in ["hourly", "daily", "weekly"]:
                    monitors.extend(await SourceMonitor.get_due_for_check(freq))
            
            logger.info(f"Checking {len(monitors)} sources for updates")
            
            updates_found = 0
            errors = []
            
            for monitor in monitors:
                try:
                    notification = await self.check_source(monitor)
                    if notification:
                        updates_found += 1
                except Exception as e:
                    errors.append(f"Source {monitor.source_id}: {str(e)}")
            
            # Update job record
            job.status = "completed"
            job.completed_at = datetime.now()
            job.sources_checked = len(monitors)
            job.updates_found = updates_found
            job.errors = errors
            await job.save()
            
            logger.info(
                f"Monitor job completed: {len(monitors)} checked, "
                f"{updates_found} updates, {len(errors)} errors"
            )
            
            return job
            
        except Exception as e:
            logger.error(f"Monitor job failed: {e}")
            job.status = "failed"
            job.completed_at = datetime.now()
            job.errors = [str(e)]
            await job.save()
            return job

    # Source monitor management
    async def create_monitor(
        self, 
        source_id: str, 
        check_frequency: str = "daily",
        enabled: bool = True
    ) -> SourceMonitor:
        """Create a monitor for a source."""
        # Check if already exists
        existing = await SourceMonitor.get_by_source(source_id)
        if existing:
            # Update existing
            existing.check_frequency = check_frequency
            existing.enabled = enabled
            existing.updated_at = datetime.now()
            await existing.save()
            return existing
        
        # Create new
        monitor = SourceMonitor(
            source_id=source_id,
            check_frequency=check_frequency,
            enabled=enabled,
        )
        await monitor.save()
        return monitor

    async def get_monitor(self, source_id: str) -> Optional[SourceMonitor]:
        """Get monitor for a source."""
        return await SourceMonitor.get_by_source(source_id)

    async def update_monitor(
        self, 
        source_id: str, 
        check_frequency: Optional[str] = None,
        enabled: Optional[bool] = None
    ) -> Optional[SourceMonitor]:
        """Update a source monitor."""
        monitor = await SourceMonitor.get_by_source(source_id)
        if not monitor:
            return None
        
        if check_frequency is not None:
            monitor.check_frequency = check_frequency
        if enabled is not None:
            monitor.enabled = enabled
        monitor.updated_at = datetime.now()
        await monitor.save()
        return monitor

    async def delete_monitor(self, source_id: str) -> bool:
        """Delete a source monitor."""
        monitor = await SourceMonitor.get_by_source(source_id)
        if monitor:
            monitor.delete()
            return True
        return False

    async def get_all_monitors(self) -> List[SourceMonitor]:
        """Get all monitors."""
        return await SourceMonitor.get_enabled_monitors()

    # Notifications
    async def get_notifications(
        self, 
        include_dismissed: bool = False,
        limit: int = 100
    ) -> List[UpdateNotification]:
        """Get notifications."""
        return await UpdateNotification.get_all(include_dismissed, limit)

    async def get_unread_notifications(self, limit: int = 50) -> List[UpdateNotification]:
        """Get unread notifications."""
        return await UpdateNotification.get_unread(limit)

    async def mark_notification_read(self, notification_id: str) -> bool:
        """Mark notification as read."""
        notification = UpdateNotification.get(notification_id)
        if notification:
            notification.is_read = True
            await notification.save()
            return True
        return False

    async def dismiss_notification(self, notification_id: str) -> bool:
        """Dismiss a notification."""
        notification = UpdateNotification.get(notification_id)
        if notification:
            notification.is_dismissed = True
            await notification.save()
            return True
        return False

    async def mark_all_read(self) -> int:
        """Mark all notifications as read."""
        return await UpdateNotification.mark_all_read()

    async def get_unread_count(self) -> int:
        """Get count of unread notifications."""
        return await UpdateNotification.get_unread_count()

    async def get_stats(self) -> MonitoringStats:
        """Get monitoring statistics."""
        from open_notebook.database.repository import repo
        
        # Get total monitors
        total_result = await repo.query("SELECT count() FROM source_monitor GROUP ALL")
        total = total_result[0].get("count", 0) if total_result else 0
        
        # Get enabled monitors
        enabled_result = await repo.query(
            "SELECT count() FROM source_monitor WHERE enabled = true GROUP ALL"
        )
        enabled = enabled_result[0].get("count", 0) if enabled_result else 0
        
        # Get unread count
        unread = await self.get_unread_count()
        
        # Get last job
        jobs = await MonitorJobRun.get_latest(1)
        last_job = jobs[0] if jobs else None
        
        return MonitoringStats(
            total_monitors=total,
            enabled_monitors=enabled,
            unread_notifications=unread,
            last_job_run=last_job.started_at if last_job else None,
            last_job_status=last_job.status if last_job else None,
        )


# Create singleton instance
auto_update_service = AutoUpdateService()