File size: 7,833 Bytes
aeb3f7c
 
 
 
 
 
 
f277022
aeb3f7c
 
 
 
 
 
 
 
 
 
 
 
 
 
f277022
aeb3f7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f277022
 
 
 
 
 
 
 
 
 
aeb3f7c
 
f277022
aeb3f7c
f277022
aeb3f7c
 
f277022
aeb3f7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f277022
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aeb3f7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
"""Model management and text generation service."""

import hashlib
import time
from functools import lru_cache
from typing import Any, Dict, Optional

from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM

from writing_studio.core.config import settings
from writing_studio.core.exceptions import ModelLoadError, TextGenerationError
from writing_studio.utils.logging import logger
from writing_studio.utils.validation import validate_generation_params, validate_model_name


class ModelService:
    """Service for managing language models and text generation."""

    def __init__(self):
        """Initialize the model service."""
        self._current_model: Optional[Any] = None
        self._current_model_name: Optional[str] = None
        self._task_type: str = "text2text-generation"  # Default for FLAN-T5
        self._cache: Dict[str, Any] = {}
        self._load_default_model()

    def _load_default_model(self) -> None:
        """Load the default model at initialization."""
        try:
            logger.info(f"Loading default model: {settings.default_model}")
            self.load_model(settings.default_model)
        except Exception as e:
            logger.error(f"Failed to load default model: {e}")
            raise ModelLoadError(
                f"Failed to load default model: {settings.default_model}",
                {"error": str(e)},
            )

    def load_model(self, model_name: str) -> None:
        """
        Load a language model from HuggingFace.

        Args:
            model_name: HuggingFace model identifier

        Raises:
            ModelLoadError: If model loading fails
        """
        try:
            # Validate model name
            model_name = validate_model_name(model_name)

            # Check if already loaded
            if self._current_model_name == model_name:
                logger.debug(f"Model {model_name} already loaded")
                return

            logger.info(f"Loading model: {model_name}")
            start_time = time.time()

            # Detect model type and use appropriate pipeline
            # FLAN-T5, T5 = text2text-generation
            # GPT-2, GPT = text-generation
            if any(x in model_name.lower() for x in ['t5', 'flan']):
                task = "text2text-generation"
                logger.info(f"Detected instruction-following model, using {task} pipeline")
            else:
                task = "text-generation"
                logger.info(f"Detected text generation model, using {task} pipeline")

            # Load model with error handling
            self._current_model = pipeline(
                task,
                model=model_name,
                max_length=settings.max_model_length,
            )
            self._current_model_name = model_name
            self._task_type = task

            load_time = time.time() - start_time
            logger.info(f"Model loaded successfully in {load_time:.2f}s: {model_name}")

        except Exception as e:
            logger.error(f"Failed to load model {model_name}: {e}")
            raise ModelLoadError(
                f"Failed to load model: {model_name}", {"error": str(e)}
            )

    def generate_text(
        self,
        prompt: str,
        max_length: Optional[int] = None,
        num_sequences: Optional[int] = None,
        temperature: float = 1.0,
        use_cache: bool = True,
    ) -> str:
        """
        Generate text using the loaded model.

        Args:
            prompt: Input prompt for generation
            max_length: Maximum generation length
            num_sequences: Number of sequences to generate
            temperature: Sampling temperature
            use_cache: Whether to use caching

        Returns:
            Generated text

        Raises:
            TextGenerationError: If generation fails
        """
        if self._current_model is None:
            raise TextGenerationError("No model loaded")

        # Use defaults if not provided
        max_length = max_length or settings.default_max_length
        num_sequences = num_sequences or settings.default_num_sequences

        # Validate parameters
        params = validate_generation_params(max_length, num_sequences, temperature)

        # Check cache if enabled
        if use_cache and settings.enable_cache:
            cache_key = self._get_cache_key(prompt, params)
            if cache_key in self._cache:
                logger.debug("Returning cached result")
                return self._cache[cache_key]

        try:
            logger.info(f"Generating text with model: {self._current_model_name}")
            start_time = time.time()

            # Generate text with parameters appropriate for model type
            if self._task_type == "text2text-generation":
                # T5/FLAN-T5 models
                result = self._current_model(
                    prompt,
                    max_new_tokens=params["max_length"],
                    num_return_sequences=params["num_sequences"],
                    do_sample=True,
                    temperature=params["temperature"],
                    truncation=True,
                )
                # T5 models return generated_text directly
                generated_text = result[0]["generated_text"]
            else:
                # GPT-2 style models
                result = self._current_model(
                    prompt,
                    max_length=params["max_length"],
                    num_return_sequences=params["num_sequences"],
                    do_sample=True,
                    temperature=params["temperature"],
                    pad_token_id=self._current_model.tokenizer.eos_token_id,
                )
                generated_text = result[0]["generated_text"]

            generation_time = time.time() - start_time
            logger.info(f"Text generated in {generation_time:.2f}s")

            # Cache result if enabled
            if use_cache and settings.enable_cache:
                self._cache_result(cache_key, generated_text)

            return generated_text

        except Exception as e:
            logger.error(f"Text generation failed: {e}")
            raise TextGenerationError("Text generation failed", {"error": str(e)})

    def _get_cache_key(self, prompt: str, params: dict) -> str:
        """
        Generate cache key for prompt and parameters.

        Args:
            prompt: Input prompt
            params: Generation parameters

        Returns:
            Cache key hash
        """
        key_str = f"{prompt}:{params['max_length']}:{params['num_sequences']}:{params['temperature']}"
        return hashlib.sha256(key_str.encode()).hexdigest()

    def _cache_result(self, key: str, result: str) -> None:
        """
        Cache generation result with size limit.

        Args:
            key: Cache key
            result: Result to cache
        """
        if len(self._cache) >= settings.cache_max_size:
            # Remove oldest entry (simple FIFO)
            self._cache.pop(next(iter(self._cache)))
        self._cache[key] = result

    def clear_cache(self) -> None:
        """Clear the generation cache."""
        self._cache.clear()
        logger.info("Generation cache cleared")

    def get_model_info(self) -> Dict[str, Any]:
        """
        Get information about the currently loaded model.

        Returns:
            Model information dictionary
        """
        return {
            "model_name": self._current_model_name,
            "cache_size": len(self._cache),
            "cache_enabled": settings.enable_cache,
        }


# Global model service instance
@lru_cache(maxsize=1)
def get_model_service() -> ModelService:
    """Get the global model service instance."""
    return ModelService()