File size: 5,342 Bytes
02a56a9
 
 
2ed2bd7
02a56a9
 
29ed661
 
02a56a9
 
 
 
 
d42ed51
 
 
2ed2bd7
d42ed51
 
 
2ed2bd7
 
 
d42ed51
02a56a9
 
 
 
 
 
29ed661
2ed2bd7
d42ed51
2ed2bd7
 
 
d42ed51
2ed2bd7
02a56a9
2ed2bd7
02a56a9
 
29ed661
 
 
02a56a9
 
 
 
d42ed51
02a56a9
 
 
 
 
 
d42ed51
 
 
2ed2bd7
02a56a9
2ed2bd7
02a56a9
 
 
 
 
 
 
 
 
 
 
 
 
d42ed51
02a56a9
 
 
 
 
 
29ed661
02a56a9
 
2ed2bd7
02a56a9
 
 
 
2ed2bd7
02a56a9
 
 
d42ed51
 
 
 
 
 
 
 
 
2ed2bd7
02a56a9
 
2ed2bd7
 
 
 
 
02a56a9
 
 
 
 
 
 
 
 
 
 
2ed2bd7
02a56a9
2ed2bd7
02a56a9
2ed2bd7
 
02a56a9
 
 
 
 
2ed2bd7
02a56a9
 
 
 
 
2ed2bd7
02a56a9
 
2ed2bd7
02a56a9
 
 
 
 
 
 
 
2ed2bd7
 
 
 
 
02a56a9
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""
Transformers service for fast text summarization using Hugging Face models.
"""

import asyncio
import time
from collections.abc import AsyncGenerator
from typing import Any

from app.core.logging import get_logger

logger = get_logger(__name__)

# Try to import transformers, but make it optional
try:
    from transformers import pipeline

    TRANSFORMERS_AVAILABLE = True
except ImportError:
    TRANSFORMERS_AVAILABLE = False
    logger.warning(
        "Transformers library not available. Pipeline endpoint will be disabled."
    )


class TransformersSummarizer:
    """Service for fast text summarization using Hugging Face Transformers."""

    def __init__(self):
        """Initialize the Transformers pipeline with distilbart model."""
        self.summarizer: Any | None = None

        if not TRANSFORMERS_AVAILABLE:
            logger.warning(
                "⚠️ Transformers not available - pipeline endpoint will not work"
            )
            return

        logger.info("Initializing Transformers pipeline...")

        try:
            self.summarizer = pipeline(
                "summarization",
                model="sshleifer/distilbart-cnn-6-6",
                device=-1,  # CPU
            )
            logger.info("βœ… Transformers pipeline initialized successfully")
        except Exception as e:
            logger.error(f"❌ Failed to initialize Transformers pipeline: {e}")
            self.summarizer = None

    async def warm_up_model(self) -> None:
        """
        Warm up the model with a test input to load weights into memory.
        This speeds up subsequent requests.
        """
        if not self.summarizer:
            logger.warning("⚠️ Transformers pipeline not initialized, skipping warmup")
            return

        test_text = "This is a test text to warm up the model."

        try:
            # Run in executor to avoid blocking
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(
                None,
                self.summarizer,
                test_text,
                30,  # max_length
                10,  # min_length
            )
            logger.info("βœ… Transformers model warmup successful")
        except Exception as e:
            logger.error(f"❌ Transformers model warmup failed: {e}")
            # Don't raise - allow app to start even if warmup fails

    async def summarize_text_stream(
        self,
        text: str,
        max_length: int = 130,
        min_length: int = 30,
    ) -> AsyncGenerator[dict[str, Any], None]:
        """
        Stream text summarization results word-by-word.

        Args:
            text: Input text to summarize
            max_length: Maximum length of summary
            min_length: Minimum length of summary

        Yields:
            Dict containing 'content' (word chunk) and 'done' (completion flag)
        """
        if not self.summarizer:
            error_msg = "Transformers pipeline not available. Please install transformers and torch."
            logger.error(f"❌ {error_msg}")
            yield {
                "content": "",
                "done": True,
                "error": error_msg,
            }
            return

        start_time = time.time()
        text_length = len(text)

        logger.info(
            f"Processing text of {text_length} chars with Transformers pipeline"
        )

        try:
            # Run summarization in executor to avoid blocking
            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(
                None,
                lambda: self.summarizer(
                    text,
                    max_length=max_length,
                    min_length=min_length,
                    do_sample=False,  # Deterministic output for consistency
                    truncation=True,
                ),
            )

            # Extract summary text
            summary_text = result[0]["summary_text"] if result else ""

            # Stream the summary word by word for real-time feel
            words = summary_text.split()
            for i, word in enumerate(words):
                # Add space except for first word
                content = word if i == 0 else f" {word}"

                yield {
                    "content": content,
                    "done": False,
                    "tokens_used": 0,  # Transformers doesn't provide token count easily
                }

                # Small delay for streaming effect (optional)
                await asyncio.sleep(0.02)

            # Send final "done" chunk
            latency_ms = (time.time() - start_time) * 1000.0
            yield {
                "content": "",
                "done": True,
                "tokens_used": len(words),
                "latency_ms": round(latency_ms, 2),
            }

            logger.info(
                f"βœ… Transformers summarization completed in {latency_ms:.2f}ms"
            )

        except Exception as e:
            logger.error(f"❌ Transformers summarization failed: {e}")
            # Yield error chunk
            yield {
                "content": "",
                "done": True,
                "error": str(e),
            }


# Global service instance
transformers_service = TransformersSummarizer()