ai_chat_api / services /text_service.py
Soumik Bose
go
cde2f6e
import logging
from typing import Optional, Dict, Any, List, AsyncIterator
from llama_cpp import Llama
from huggingface_hub import hf_hub_download
import json
from config import config
from utils.json_extractor import extract_json_from_content
logger = logging.getLogger("text-service")
class TextService:
"""Service for text-based language model interactions"""
def __init__(self):
self.model: Optional[Llama] = None
async def initialize(self) -> None:
"""Initialize the text model"""
try:
logger.info(f"Downloading text model: {config.TEXT_MODEL_FILE}...")
model_path = hf_hub_download(
repo_id=config.TEXT_MODEL_REPO,
filename=config.TEXT_MODEL_FILE,
cache_dir=config.HF_HOME
)
logger.info(f"Loading text model (Threads: {config.N_THREADS})...")
self.model = Llama(
model_path=model_path,
n_ctx=config.TEXT_MODEL_CTX,
n_threads=config.N_THREADS,
n_batch=config.TEXT_MODEL_BATCH,
verbose=False
)
logger.info("✓ Text model loaded successfully")
except Exception as e:
logger.error(f"Failed to initialize text model: {e}")
raise
def is_ready(self) -> bool:
"""Check if the model is loaded and ready"""
return self.model is not None
async def generate_completion(
self,
messages: List[Dict[str, str]],
temperature: float = 0.6,
max_tokens: int = 512,
stream: bool = False,
return_json: bool = False
) -> Any:
"""
Generate text completion
Args:
messages: List of message dictionaries with 'role' and 'content'
temperature: Sampling temperature
max_tokens: Maximum tokens to generate
stream: Whether to stream the response
return_json: Whether to extract JSON from response
Returns:
Generated completion (dict or stream)
"""
if not self.is_ready():
raise RuntimeError("Text model not initialized")
# Validate conflicting parameters
if stream and return_json:
raise ValueError("Cannot use both 'stream' and 'return_json' simultaneously")
# Prepare messages for JSON extraction mode
if return_json:
system_prompt = {
"role": "system",
"content": (
"You are a strict JSON generator. "
"Convert the user's input into valid JSON format. "
"Output strictly in markdown code blocks like ```json ... ```. "
"Do not add conversational filler."
)
}
messages = [system_prompt] + messages
if messages[-1]['role'] == 'user':
messages[-1]['content'] += "\n\nReturn structured JSON of this content."
logger.info(f"Generating completion: {len(messages)} messages | Stream: {stream}")
try:
response = self.model.create_chat_completion(
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
stream=stream
)
# Handle streaming response
if stream:
return self._create_stream_iterator(response)
# Handle JSON extraction
if return_json:
content_text = response['choices'][0]['message']['content']
extracted_data = extract_json_from_content(content_text)
return {
"status": "success",
"data": extracted_data
}
return response
except Exception as e:
logger.error(f"Error generating completion: {e}")
raise
async def _create_stream_iterator(self, response_stream) -> AsyncIterator[str]:
"""Create an async iterator for streaming responses"""
for chunk in response_stream:
yield f"data: {json.dumps(chunk)}\n\n"
yield "data: [DONE]\n\n"
async def cleanup(self) -> None:
"""Cleanup resources"""
if self.model:
del self.model
self.model = None
logger.info("Text model unloaded")
# Global instance
text_service = TextService()