| import requests |
| import logging |
| import re |
| from typing import List, Dict, Optional, Union |
| from core.providers.base import LLMProvider |
| from utils.config import config |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class OllamaProvider(LLMProvider): |
| """Ollama LLM provider implementation with commentary support""" |
|
|
| def __init__(self, model_name: str, timeout: int = 120, max_retries: int = 3): |
| super().__init__(model_name, timeout, max_retries) |
| self.host = self._sanitize_host(config.ollama_host or "http://localhost:11434") |
| self.headers = { |
| "ngrok-skip-browser-warning": "true", |
| "User-Agent": "CosmicCat-AI-Assistant" |
| } |
|
|
| def _sanitize_host(self, host: str) -> str: |
| """Sanitize host URL by removing whitespace and control characters""" |
| if not host: |
| return "http://localhost:11434" |
| host = host.strip() |
| host = re.sub(r'[\r\n\t\0]+', '', host) |
| if not host.startswith(('http://', 'https://')): |
| host = 'http://' + host |
| return host |
|
|
| def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]: |
| """Generate a response synchronously""" |
| try: |
| return self._retry_with_backoff(self._generate_impl, prompt, conversation_history) |
| except Exception as e: |
| logger.error(f"Ollama generation failed: {e}") |
| return None |
|
|
| def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]: |
| """Generate a response with streaming support""" |
| try: |
| return self._retry_with_backoff(self._stream_generate_impl, prompt, conversation_history) |
| except Exception as e: |
| logger.error(f"Ollama stream generation failed: {e}") |
| return None |
|
|
| def validate_model(self) -> bool: |
| """Validate if the model is available""" |
| try: |
| response = requests.get( |
| f"{self.host}/api/tags", |
| headers=self.headers, |
| timeout=self.timeout |
| ) |
| if response.status_code == 200: |
| models = response.json().get("models", []) |
| model_names = [model.get("name") for model in models] |
| return self.model_name in model_names |
| elif response.status_code == 404: |
| response2 = requests.get( |
| f"{self.host}", |
| headers=self.headers, |
| timeout=self.timeout |
| ) |
| return response2.status_code == 200 |
| return False |
| except Exception as e: |
| logger.warning(f"Model validation failed: {e}") |
| return False |
|
|
| def generate_commentary(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> Optional[str]: |
| """Generate commentary on HF response""" |
| try: |
| commentary_prompt = self._create_commentary_prompt(user_prompt, hf_response, conversation_history) |
| return self._retry_with_backoff(self._generate_impl, commentary_prompt, []) |
| except Exception as e: |
| logger.error(f"Ollama commentary generation failed: {e}") |
| return None |
|
|
| def generate_self_commentary(self, user_prompt: str, ollama_response: str, conversation_history: List[Dict]) -> Optional[str]: |
| """Generate self-commentary on own response""" |
| try: |
| commentary_prompt = self._create_self_commentary_prompt(user_prompt, ollama_response, conversation_history) |
| return self._retry_with_backoff(self._generate_impl, commentary_prompt, []) |
| except Exception as e: |
| logger.error(f"Ollama self-commentary generation failed: {e}") |
| return None |
|
|
| def _create_commentary_prompt(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> str: |
| """Create prompt for Ollama to comment on HF response""" |
| conversation_context = "\n".join([ |
| f"{msg['role']}: {msg['content']}" |
| for msg in conversation_history[-3:] |
| ]) |
| |
| prompt = f""" |
| You are an AI mentor and conversation analyst. Your job is to analyze the interaction between a user and an expert AI, then provide insightful commentary. |
| |
| ANALYZE THIS INTERACTION: |
| User Question: "{user_prompt}" |
| Expert Response: "{hf_response}" |
| |
| Recent Conversation Context: {conversation_context} |
| |
| PROVIDE YOUR COMMENTARY IN THIS FORMAT: |
| |
| I've reviewed the HF expert's response and here's my insight: |
| |
| Key Points Observed: |
| |
| [Point 1] |
| [Point 2] |
| My Perspective: [Your commentary on the HF response] |
| |
| Suggestions: |
| |
| [Suggestion 1] |
| [Suggestion 2] |
| |
| Keep your analysis concise but insightful. Focus on helping the user achieve their goals through better questioning and information gathering. |
| """ |
| return prompt |
|
|
| def _create_self_commentary_prompt(self, user_prompt: str, ollama_response: str, conversation_history: List[Dict]) -> str: |
| """Create prompt for Ollama to comment on its own response""" |
| conversation_context = "\n".join([ |
| f"{msg['role']}: {msg['content']}" |
| for msg in conversation_history[-3:] |
| ]) |
| |
| prompt = f""" |
| You are an AI mentor and conversation analyst. Your job is to analyze your own response to a user question, then provide insightful self-reflection. |
| |
| ANALYZE YOUR RESPONSE: |
| User Question: "{user_prompt}" |
| Your Response: "{ollama_response}" |
| |
| Recent Conversation Context: {conversation_context} |
| |
| PROVIDE YOUR SELF-COMMENTARY IN THIS FORMAT: |
| |
| I've reviewed my own response and here's my self-reflection: |
| |
| Key Points Addressed: |
| |
| [Point 1] |
| [Point 2] |
| My Self-Assessment: [Your reflection on your own response quality] |
| |
| Areas for Improvement: |
| |
| [Area 1] |
| [Area 2] |
| |
| Keep your analysis honest and constructive. Focus on how you could have provided better assistance. |
| """ |
| return prompt |
|
|
| def _generate_impl(self, prompt: str, conversation_history: List[Dict]) -> str: |
| """Implementation of synchronous generation""" |
| try: |
| url = f"{self.host}/api/chat" |
| messages = conversation_history.copy() |
| messages.append({"role": "user", "content": prompt}) |
| |
| payload = { |
| "model": self.model_name, |
| "messages": messages, |
| "stream": False |
| } |
| |
| logger.info(f"Ollama request URL: {url}") |
| logger.info(f"Ollama request payload: {payload}") |
| logger.info(f"Ollama headers: {self.headers}") |
| |
| response = requests.post( |
| url, |
| json=payload, |
| headers=self.headers, |
| timeout=self.timeout |
| ) |
| |
| logger.info(f"Ollama response status: {response.status_code}") |
| logger.info(f"Ollama response headers: {dict(response.headers)}") |
| |
| response.raise_for_status() |
| result = response.json() |
| logger.info(f"Ollama response body: {result}") |
| |
| content = None |
| if "message" in result and "content" in result["message"]: |
| content = result["message"]["content"] |
| elif "response" in result: |
| content = result["response"] |
| else: |
| content = str(result) |
| |
| logger.info(f"Extracted content length: {len(content) if content else 0}") |
| return content if content else "" |
| |
| except Exception as e: |
| logger.error(f"Ollama API request error: {str(e)}") |
| raise Exception(f"Ollama API error: {str(e)}") |
|
|
| def _stream_generate_impl(self, prompt: str, conversation_history: List[Dict]) -> List[str]: |
| """Implementation of streaming generation""" |
| try: |
| url = f"{self.host}/api/chat" |
| messages = conversation_history.copy() |
| messages.append({"role": "user", "content": prompt}) |
| |
| payload = { |
| "model": self.model_name, |
| "messages": messages, |
| "stream": True |
| } |
| |
| response = requests.post( |
| url, |
| json=payload, |
| headers=self.headers, |
| timeout=self.timeout, |
| stream=True |
| ) |
| response.raise_for_status() |
| |
| chunks = [] |
| for line in response.iter_lines(): |
| if line: |
| chunk = line.decode('utf-8') |
| try: |
| data = eval(chunk) |
| content = data.get("message", {}).get("content", "") |
| if content: |
| chunks.append(content) |
| except: |
| continue |
| return chunks |
| except Exception as e: |
| logger.error(f"Ollama stream generation failed: {e}") |
| raise |
|
|
| |
| ollama_provider = OllamaProvider(config.local_model_name) |
|
|