File size: 3,146 Bytes
7b14cb0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df070dd
 
 
 
 
 
 
 
 
 
 
 
7b14cb0
df070dd
 
7b14cb0
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""Background worker process for AI summarization."""

import os
import time
import logging
import signal
import sqlite3
from typing import List, Tuple

from utils.llm_summarizer import OpenAICompatSummarizer
from utils.ai_summary_store import (
    init_storage,
    fetch_ready_batches,
    store_summaries,
    BATCH_MAX_CHARS,
    BUFFER_SECONDS,
)

logger = logging.getLogger(__name__)

PID_FILE = os.getenv("AI_SUMMARY_WORKER_PID", "/tmp/ai_summary_worker.pid")
POLL_SECONDS = int(os.getenv("AI_SUMMARY_POLL_SECONDS", "5"))
MAX_RETRIES = int(os.getenv("LLM_SUMMARY_RETRIES", "3"))


class Worker:
    def __init__(self):
        self._stop = False
        self.summarizer = OpenAICompatSummarizer()

    def stop(self, *_args):
        self._stop = True

    def run(self):
        init_storage()
        signal.signal(signal.SIGTERM, self.stop)
        signal.signal(signal.SIGINT, self.stop)

        while not self._stop:
            try:
                batches = fetch_ready_batches(BATCH_MAX_CHARS, BUFFER_SECONDS)
                for batch in batches:
                    self._process_batch(batch)
            except sqlite3.Error as exc:
                logger.warning(f"AI worker DB error: {exc}")
            except Exception as exc:
                logger.warning(f"AI worker error: {exc}")

            time.sleep(POLL_SECONDS)

    def _process_batch(self, batch: List[Tuple[str, str, str]]):
        if not batch or not self.summarizer.enabled:
            return

        texts = []
        for _, title, source in batch:
            if source:
                texts.append(f"Source: {source}\nTitle: {title}")
            else:
                texts.append(f"Title: {title}")

        for attempt in range(1, MAX_RETRIES + 1):
            summaries = self.summarizer._summarize_chunk(texts, source="dashboard")
            if summaries and len(summaries) == len(batch):
                break
            if attempt < MAX_RETRIES:
                time.sleep(2 ** attempt)
        else:
            logger.warning("AI worker failed to summarize batch after retries")
            return

        to_store = []
        for (item_key, title, source), summary in zip(batch, summaries):
            if not summary:
                continue
            to_store.append((item_key, title, source, summary))

        if to_store:
            store_summaries(to_store)


def _pid_running(pid: int) -> bool:
    try:
        os.kill(pid, 0)
        return True
    except Exception:
        return False


def is_worker_running() -> bool:
    """Return True if the worker process is currently running."""
    if not os.path.exists(PID_FILE):
        return False
    try:
        with open(PID_FILE, "r", encoding="utf-8") as f:
            pid = int(f.read().strip() or 0)
        return bool(pid and _pid_running(pid))
    except Exception:
        return False


def start_worker_if_needed():
    if is_worker_running():
        return

    pid = os.fork()
    if pid != 0:
        return

    os.setsid()
    with open(PID_FILE, "w", encoding="utf-8") as f:
        f.write(str(os.getpid()))

    worker = Worker()
    worker.run()