File size: 3,007 Bytes
e189a31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""Background worker process for AI summarization."""

import os
import time
import logging
import signal
import sqlite3
from typing import List, Tuple

from utils.llm_summarizer import OpenAICompatSummarizer
from utils.ai_summary_store import (
    init_storage,
    fetch_ready_batches,
    store_summaries,
    BATCH_MAX_CHARS,
    BUFFER_SECONDS,
)

logger = logging.getLogger(__name__)

PID_FILE = os.getenv("AI_SUMMARY_WORKER_PID", "/tmp/ai_summary_worker.pid")
POLL_SECONDS = int(os.getenv("AI_SUMMARY_POLL_SECONDS", "5"))
MAX_RETRIES = int(os.getenv("LLM_SUMMARY_RETRIES", "3"))


class Worker:
    def __init__(self):
        self._stop = False
        self.summarizer = OpenAICompatSummarizer()

    def stop(self, *_args):
        self._stop = True

    def run(self):
        init_storage()
        signal.signal(signal.SIGTERM, self.stop)
        signal.signal(signal.SIGINT, self.stop)

        while not self._stop:
            try:
                batches = fetch_ready_batches(BATCH_MAX_CHARS, BUFFER_SECONDS)
                for batch in batches:
                    self._process_batch(batch)
            except sqlite3.Error as exc:
                logger.warning(f"AI worker DB error: {exc}")
            except Exception as exc:
                logger.warning(f"AI worker error: {exc}")

            time.sleep(POLL_SECONDS)

    def _process_batch(self, batch: List[Tuple[str, str, str]]):
        if not batch or not self.summarizer.enabled:
            return

        texts = []
        for _, title, source in batch:
            if source:
                texts.append(f"Source: {source}\nTitle: {title}")
            else:
                texts.append(f"Title: {title}")

        for attempt in range(1, MAX_RETRIES + 1):
            summaries = self.summarizer._summarize_chunk(texts, source="dashboard")
            if summaries and len(summaries) == len(batch):
                break
            if attempt < MAX_RETRIES:
                time.sleep(2 ** attempt)
        else:
            logger.warning("AI worker failed to summarize batch after retries")
            return

        to_store = []
        for (item_key, title, source), summary in zip(batch, summaries):
            if not summary:
                continue
            to_store.append((item_key, title, source, summary))

        if to_store:
            store_summaries(to_store)


def _pid_running(pid: int) -> bool:
    try:
        os.kill(pid, 0)
        return True
    except Exception:
        return False


def start_worker_if_needed():
    if os.path.exists(PID_FILE):
        try:
            with open(PID_FILE, "r", encoding="utf-8") as f:
                pid = int(f.read().strip() or 0)
            if pid and _pid_running(pid):
                return
        except Exception:
            pass

    pid = os.fork()
    if pid != 0:
        return

    os.setsid()
    with open(PID_FILE, "w", encoding="utf-8") as f:
        f.write(str(os.getpid()))

    worker = Worker()
    worker.run()