Spaces:
Sleeping
Sleeping
| # src/llm/client.py | |
| """ | |
| Модуль клиента для взаимодействия с локальным API-сервером vLLM. | |
| Отвечает за чтение промпта из внешнего текстового файла конфигурации, | |
| отправку запросов к модели, отказоустойчивый парсинг JSON-ответов | |
| (с индустриальным авто-восстановлением через json-repair) и сохранение истории. | |
| """ | |
| import json | |
| import logging | |
| import re | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| import requests | |
| import json_repair | |
| from omegaconf import DictConfig | |
| logger: logging.Logger = logging.getLogger(__name__) | |
| # Определение базовых путей проекта относительно расположения файла | |
| CURRENT_FILE_PATH: Path = Path(__file__).resolve() | |
| PROJECT_DIR: Path = CURRENT_FILE_PATH.parent.parent.parent | |
| LOGS_DIR: Path = PROJECT_DIR / "logs" | |
| CONFIGS_DIR: Path = PROJECT_DIR / "configs" | |
| class InventoryLLMClient: | |
| """ | |
| Класс-клиент для отправки запросов к локальному LLM-бэкенду. | |
| Генерирует графовые структуры торгового зала на основе внешнего текстового промпта. | |
| """ | |
| def __init__(self, cfg: DictConfig) -> None: | |
| """ | |
| Инициализация клиента на основе конфигурации проекта. | |
| Args: | |
| cfg (DictConfig): Конфигурация проекта (OmegaConf). | |
| """ | |
| llm_cfg: Any = cfg.get("llm", {}) | |
| # Настройки API (адрес, модель, параметры генерации) | |
| self.api_url: str = llm_cfg.get("api_url", "http://127.0.0.1:8000/v1") | |
| self.model_name: str = llm_cfg.get("served_model_name", "qwen") | |
| self.temperature: float = llm_cfg.get("temperature", 0.1) | |
| self.max_tokens: int = llm_cfg.get("max_tokens", 2048) | |
| self.headers: Dict[str, str] = { | |
| "Content-Type": "application/json" | |
| } | |
| # Пути к файлам системного промпта и истории | |
| self.prompt_file_path: Path = CONFIGS_DIR / "llm_prompt.txt" | |
| LOGS_DIR.mkdir(parents=True, exist_ok=True) | |
| self.history_file: Path = LOGS_DIR / "llm_history.jsonl" | |
| logger.info(f"🚀 Инициализирован LLM-клиент (API: {self.api_url}, Модель: {self.model_name})") | |
| def _save_to_history(self, prompt: str, raw_response: str, parsed_json: Optional[Dict[str, Any]]) -> None: | |
| """ | |
| Сохраняет артефакты запроса и ответа в лог-файл (JSON Lines). | |
| Это необходимо для анализа галлюцинаций модели в пост-обработке. | |
| Args: | |
| prompt (str): Отправленный промпт. | |
| raw_response (str): Сырой текстовый ответ от LLM. | |
| parsed_json (Optional[Dict[str, Any]]): Распарсенный JSON или None. | |
| """ | |
| log_entry: Dict[str, Any] = { | |
| "timestamp": datetime.now().isoformat(), | |
| "prompt": prompt, | |
| "raw_response": raw_response, | |
| "parsed_json": parsed_json, | |
| "success": parsed_json is not None | |
| } | |
| try: | |
| with open(self.history_file, "a", encoding="utf-8") as f: | |
| f.write(json.dumps(log_entry, ensure_ascii=False) + "\n") | |
| logger.debug(f"📝 Запись добавлена в историю LLM: {self.history_file.name}") | |
| except Exception as e: | |
| logger.error(f"❌ Ошибка при записи в файл истории: {e}") | |
| def _extract_json(self, text: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Отказоустойчивое извлечение JSON-объекта из текстового ответа нейросети. | |
| Использует json-repair для лечения синтаксических ошибок (запятые, кавычки и т.д.). | |
| Args: | |
| text (str): Текст ответа LLM. | |
| Returns: | |
| Optional[Dict[str, Any]]: Извлеченный словарь или None при критической ошибке. | |
| """ | |
| if not text: | |
| return None | |
| text = text.strip() | |
| json_str: str = "" | |
| bt = chr(96) * 3 | |
| json_pattern = f"{re.escape(bt)}(?:json)?\\s*(\\{{.*?\\}})\\s*{re.escape(bt)}" | |
| match = re.search(json_pattern, text, re.DOTALL | re.IGNORECASE) | |
| if match: | |
| json_str = match.group(1) | |
| logger.debug("🔍 JSON успешно извлечен из Markdown-блока.") | |
| else: | |
| start_idx = text.find('{') | |
| end_idx = text.rfind('}') | |
| if start_idx != -1 and end_idx != -1 and end_idx > start_idx: | |
| json_str = text[start_idx:end_idx + 1] | |
| logger.debug("🔍 JSON извлечен методом поиска границ объекта (фигурные скобки).") | |
| else: | |
| logger.warning("⚠️ В ответе LLM не обнаружено валидной JSON-структуры.") | |
| return None | |
| try: | |
| # 1. Пробуем стандартный строгий парсинг | |
| return json.loads(json_str) | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"⚠️ Ошибка строгого парсинга JSON: {e}. Запускаю json-repair...") | |
| try: | |
| # 2. Мощное авто-восстановление синтаксиса | |
| result = json_repair.loads(json_str) | |
| if isinstance(result, dict): | |
| logger.info("🛠️ JSON успешно восстановлен с помощью json-repair!") | |
| return result | |
| else: | |
| logger.error("❌ json-repair вернул данные не в формате словаря.") | |
| return None | |
| except Exception as e2: | |
| logger.error(f"❌ Авто-восстановление json-repair не помогло: {e2}") | |
| logger.debug(f"Текст до лечения:\n{json_str}") | |
| return None | |
| def generate_layout( | |
| self, | |
| user_request: str, | |
| available_categories: List[str], | |
| equipment_catalog: str, | |
| size_n: float, | |
| size_m: float | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Генерирует JSON-граф расстановки через LLM. | |
| Args: | |
| user_request (str): Текстовый промпт пользователя. | |
| available_categories (List[str]): Список доступных SKU/категорий. | |
| equipment_catalog (str): Строка со списком оборудования и его габаритами. | |
| size_n (float): Ширина комнаты по оси X. | |
| size_m (float): Глубина комнаты по оси Y. | |
| Returns: | |
| Optional[Dict[str, Any]]: Словарь с узлами и связями, или None при ошибке. | |
| """ | |
| endpoint: str = f"{self.api_url}/chat/completions" | |
| if not self.prompt_file_path.exists(): | |
| logger.error(f"⚠️ Файл промпта не найден: {self.prompt_file_path}") | |
| return None | |
| try: | |
| with open(self.prompt_file_path, "r", encoding="utf-8") as f: | |
| raw_prompt = f.read() | |
| except Exception as e: | |
| logger.error(f"❌ Ошибка при чтении файла промпта: {e}") | |
| return None | |
| entrance_coords: str = f"({size_n / 2.0:.1f}, 0.0)" | |
| try: | |
| system_prompt: str = raw_prompt.format( | |
| size_n=size_n, | |
| size_m=size_m, | |
| entrance_coords=entrance_coords, | |
| equipment_catalog=equipment_catalog, | |
| available_categories=", ".join(available_categories) | |
| ) | |
| except KeyError as e: | |
| logger.error(f"❌ Ошибка форматирования промпта (пропущен ключ {e}). Проверьте фигурные скобки в llm_prompt.txt!") | |
| return None | |
| logger.info(f"🧠 Отправляем запрос в LLM. Габариты: {size_n}x{size_m}, Дверь: {entrance_coords}") | |
| payload: Dict[str, Any] = { | |
| "model": self.model_name, | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_request} | |
| ], | |
| "temperature": self.temperature, | |
| "max_tokens": self.max_tokens | |
| } | |
| raw_content: str = "" | |
| result: Optional[Dict[str, Any]] = None | |
| try: | |
| response = requests.post(endpoint, headers=self.headers, json=payload, timeout=120) | |
| response.raise_for_status() | |
| data: dict = response.json() | |
| raw_content = data["choices"][0]["message"]["content"] | |
| result = self._extract_json(raw_content) | |
| if result: | |
| # Узлы (nodes) — это критически важно. Без них графа нет. | |
| if "nodes" not in result: | |
| logger.warning("⚠️ Полученный JSON не содержит обязательного ключа 'nodes'.") | |
| result = None | |
| else: | |
| # Связи (edges) — опциональны. Если ИИ их не дописал, просто создаем пустой список. | |
| if "edges" not in result: | |
| logger.info("ℹ️ Ключ 'edges' отсутствует (обрыв генерации или нет связей). Подставляем пустой список.") | |
| result["edges"] = [] | |
| logger.info(f"✅ Успешно получен граф: {len(result['nodes'])} узлов, {len(result['edges'])} связей.") | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"❌ Ошибка сети при запросе к LLM API: {e}") | |
| except Exception as e: | |
| logger.error(f"❌ Непредвиденная ошибка в LLM-клиенте: {e}", exc_info=True) | |
| finally: | |
| self._save_to_history(user_request, raw_content, result) | |
| return result |