import logging from typing import Optional, Dict, Any, List, AsyncIterator from llama_cpp import Llama from huggingface_hub import hf_hub_download import json from config import config from utils.json_extractor import extract_json_from_content logger = logging.getLogger("text-service") class TextService: """Service for text-based language model interactions""" def __init__(self): self.model: Optional[Llama] = None async def initialize(self) -> None: """Initialize the text model""" try: logger.info(f"Downloading text model: {config.TEXT_MODEL_FILE}...") model_path = hf_hub_download( repo_id=config.TEXT_MODEL_REPO, filename=config.TEXT_MODEL_FILE, cache_dir=config.HF_HOME ) logger.info(f"Loading text model (Threads: {config.N_THREADS})...") self.model = Llama( model_path=model_path, n_ctx=config.TEXT_MODEL_CTX, n_threads=config.N_THREADS, n_batch=config.TEXT_MODEL_BATCH, verbose=False ) logger.info("✓ Text model loaded successfully") except Exception as e: logger.error(f"Failed to initialize text model: {e}") raise def is_ready(self) -> bool: """Check if the model is loaded and ready""" return self.model is not None async def generate_completion( self, messages: List[Dict[str, str]], temperature: float = 0.6, max_tokens: int = 512, stream: bool = False, return_json: bool = False ) -> Any: """ Generate text completion Args: messages: List of message dictionaries with 'role' and 'content' temperature: Sampling temperature max_tokens: Maximum tokens to generate stream: Whether to stream the response return_json: Whether to extract JSON from response Returns: Generated completion (dict or stream) """ if not self.is_ready(): raise RuntimeError("Text model not initialized") # Validate conflicting parameters if stream and return_json: raise ValueError("Cannot use both 'stream' and 'return_json' simultaneously") # Prepare messages for JSON extraction mode if return_json: system_prompt = { "role": "system", "content": ( "You are a strict JSON generator. " "Convert the user's input into valid JSON format. " "Output strictly in markdown code blocks like ```json ... ```. " "Do not add conversational filler." ) } messages = [system_prompt] + messages if messages[-1]['role'] == 'user': messages[-1]['content'] += "\n\nReturn structured JSON of this content." logger.info(f"Generating completion: {len(messages)} messages | Stream: {stream}") try: response = self.model.create_chat_completion( messages=messages, temperature=temperature, max_tokens=max_tokens, stream=stream ) # Handle streaming response if stream: return self._create_stream_iterator(response) # Handle JSON extraction if return_json: content_text = response['choices'][0]['message']['content'] extracted_data = extract_json_from_content(content_text) return { "status": "success", "data": extracted_data } return response except Exception as e: logger.error(f"Error generating completion: {e}") raise async def _create_stream_iterator(self, response_stream) -> AsyncIterator[str]: """Create an async iterator for streaming responses""" for chunk in response_stream: yield f"data: {json.dumps(chunk)}\n\n" yield "data: [DONE]\n\n" async def cleanup(self) -> None: """Cleanup resources""" if self.model: del self.model self.model = None logger.info("Text model unloaded") # Global instance text_service = TextService()