Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from pathlib import Path | |
| from smolagents import Model | |
| try: | |
| from llama_cpp import Llama | |
| LLAMA_CPP_AVAILABLE = True | |
| except ImportError: | |
| LLAMA_CPP_AVAILABLE = False | |
| print("llama_cpp module not available, using fallback implementation") | |
| logger = logging.getLogger("LlamaCppModel") | |
| logger.setLevel(logging.DEBUG) | |
| ch = logging.StreamHandler() | |
| ch.setLevel(logging.DEBUG) | |
| formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
| ch.setFormatter(formatter) | |
| logger.addHandler(ch) | |
| class LlamaCppModel(Model): | |
| def __init__(self, model_path: str, n_ctx: int = 2048, n_gpu_layers: int = 0, max_tokens: int = 512, temperature: float = 0.7, verbose: bool = True): | |
| super().__init__() | |
| self.model_path = model_path | |
| self.n_ctx = n_ctx | |
| self.max_tokens = max_tokens | |
| self.temperature = temperature | |
| self.verbose = verbose | |
| self.llm = None | |
| if not LLAMA_CPP_AVAILABLE: | |
| logger.error("llama_cpp is not installed. Please install with 'pip install llama-cpp-python'") | |
| raise ImportError("llama_cpp is required but not installed.") | |
| if not os.path.exists(model_path): | |
| logger.error(f"Model file not found at: {model_path}") | |
| raise FileNotFoundError(f"Model file not found at: {model_path}") | |
| try: | |
| logger.info(f"Loading Llama model from: {model_path}") | |
| self.llm = Llama(model_path=model_path, n_ctx=n_ctx, n_gpu_layers=n_gpu_layers, verbose=verbose) | |
| logger.info("Llama model loaded successfully.") | |
| except Exception as e: | |
| logger.exception(f"Failed to initialize Llama model: {e}") | |
| raise | |
| def generate(self, prompt: str, **kwargs) -> str: | |
| try: | |
| logger.debug(f"Generating with prompt: {prompt[:100]}...") | |
| response = self.llm(prompt=prompt, max_tokens=self.max_tokens, temperature=self.temperature, echo=False) | |
| logger.debug(f"Raw response: {response}") | |
| if isinstance(response, dict) and 'choices' in response: | |
| text = response['choices'][0]['text'].strip() | |
| elif isinstance(response, list): | |
| text = response[0].get('text', '').strip() | |
| else: | |
| logger.warning("Unexpected response format from Llama.") | |
| text = str(response) | |
| logger.debug(f"Generated text: {text}") | |
| return text | |
| except Exception as e: | |
| logger.exception(f"Error generating text: {e}") | |
| return f"Error generating response: {e}" | |
| def generate_with_tools(self, messages: List[Dict[str, Any]], tools: Optional[List[Dict[str, Any]]] = None, **kwargs) -> Dict[str, Any]: | |
| try: | |
| prompt = self._format_messages_to_prompt(messages, tools) | |
| logger.debug(f"Formatted prompt: {prompt}") | |
| completion = self.generate(prompt) | |
| return {"message": {"role": "assistant", "content": completion}} | |
| except Exception as e: | |
| logger.exception(f"Error generating with tools: {e}") | |
| return {"message": {"role": "assistant", "content": f"Error: {e}"}} | |
| def _format_messages_to_prompt(self, messages: List[Dict[str, Any]], tools: Optional[List[Dict[str, Any]]] = None) -> str: | |
| formatted_prompt = "" | |
| if tools: | |
| tool_desc = "\n".join([f"Tool {i+1}: {t['name']} - {t['description']}" for i, t in enumerate(tools)]) | |
| formatted_prompt += f"Available tools:\n{tool_desc}\n\n" | |
| for msg in messages: | |
| role = msg.get("role", "") | |
| content = msg.get("content", "") | |
| if isinstance(content, list): | |
| content = " ".join([c.get("text", str(c)) if isinstance(c, dict) else str(c) for c in content]) | |
| formatted_prompt += f"{role.capitalize()}: {content}\n\n" | |
| formatted_prompt += "Assistant: " | |
| logger.debug(f"Constructed prompt: {formatted_prompt}") | |
| return formatted_prompt | |
| # Example usage (for testing): | |
| # model = LlamaCppModel(model_path="/path/to/your/llama-model.gguf") | |
| # print(model.generate("Hello, how are you?")) | |