""" Расширенный клиент Supabase с поддержкой векторного поиска и embeddings """ import os import json import requests import torch from typing import List, Dict, Optional, Any from datetime import datetime from dotenv import load_dotenv # Загружаем переменные окружения load_dotenv() SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_KEY") SUPABASE_ENABLED = bool(SUPABASE_URL and SUPABASE_KEY) class SupabaseEmbeddings: """Генерация embeddings с помощью ruBERT""" def __init__(self): self.tokenizer = None self.model = None self._loaded = False def load_model(self): """Загрузка модели ruBERT""" if self._loaded: return try: from transformers import AutoTokenizer, AutoModel print("Загрузка ruBERT для embeddings...") self.tokenizer = AutoTokenizer.from_pretrained("DeepPavlov/rubert-base-cased") self.model = AutoModel.from_pretrained("DeepPavlov/rubert-base-cased") self.model.eval() self._loaded = True print("[OK] ruBERT загружен") except Exception as e: print(f"[WARN] ruBERT не загружен: {e}") def get_embedding(self, text: str, max_length: int = 512) -> Optional[List[float]]: """Получение векторного представления текста""" if not self._loaded: self.load_model() if not self._loaded: return None try: inputs = self.tokenizer( text, return_tensors="pt", truncation=True, max_length=max_length, padding=True ) with torch.no_grad(): outputs = self.model(**inputs) # Mean pooling token_embeddings = outputs.last_hidden_state attention_mask = inputs["attention_mask"] mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() embedding = torch.sum(token_embeddings * mask_expanded, 1) / torch.clamp(mask_expanded.sum(1), min=1e-9) # Нормализация embedding = torch.nn.functional.normalize(embedding, p=2, dim=1) return embedding[0].tolist() except Exception as e: print(f"[ERROR] Ошибка генерации embeddings: {e}") return None class SupabaseClient: """Расширенный клиент Supabase с векторным поиском""" def __init__(self): self.embeddings = SupabaseEmbeddings() self.session = requests.Session() if SUPABASE_ENABLED: print(f"[OK] Supabase подключен: {SUPABASE_URL}") else: print("[WARN] Supabase не настроен") # ============================================================ # CRUD ОПЕРАЦИИ # ============================================================ def create_task(self, task_data: Dict) -> Optional[int]: """Создание задания""" if not SUPABASE_ENABLED: return None try: url = f"{SUPABASE_URL}/rest/v1/tasks" headers = self._get_headers() # Генерируем embeddings для контента content_text = f"{task_data.get('condition', '')} {task_data.get('content', '')}" embedding = self.embeddings.get_embedding(content_text) if embedding: task_data['embeddings'] = json.dumps(embedding) # Извлекаем ключевые слова if 'keywords' not in task_data: task_data['keywords'] = self._extract_keywords(content_text) response = self.session.post(url, headers=headers, json=task_data, timeout=10) if response.status_code in [200, 201]: result = response.json() if result: return result[0].get("id") print(f"[ERROR] Ошибка создания: {response.status_code}") return None except Exception as e: print(f"[ERROR] Ошибка: {e}") return None def get_task(self, task_id: str) -> Optional[Dict]: """Получение задания по ID""" if not SUPABASE_ENABLED: return None try: url = f"{SUPABASE_URL}/rest/v1/tasks?task_id=eq.{task_id}" headers = self._get_headers() response = self.session.get(url, headers=headers, timeout=10) if response.status_code == 200: tasks = response.json() return tasks[0] if tasks else None return None except Exception as e: print(f"[ERROR] Ошибка: {e}") return None def get_tasks( self, topic: Optional[str] = None, limit: int = 100, offset: int = 0 ) -> List[Dict]: """Получение списка заданий с фильтрацией""" if not SUPABASE_ENABLED: return [] try: url = f"{SUPABASE_URL}/rest/v1/tasks?limit={limit}&offset={offset}" headers = self._get_headers() if topic: url += f"&topic=eq.{topic}" response = self.session.get(url, headers=headers, timeout=10) if response.status_code == 200: return response.json() return [] except Exception as e: print(f"[ERROR] Ошибка: {e}") return [] def update_task(self, task_id: str, updates: Dict) -> bool: """Обновление задания""" if not SUPABASE_ENABLED: return False try: url = f"{SUPABASE_URL}/rest/v1/tasks?task_id=eq.{task_id}" headers = self._get_headers() response = self.session.patch(url, headers=headers, json=updates, timeout=10) return response.status_code in [200, 204] except Exception as e: print(f"[ERROR] Ошибка: {e}") return False def delete_task(self, task_id: str) -> bool: """Удаление задания""" if not SUPABASE_ENABLED: return False try: url = f"{SUPABASE_URL}/rest/v1/tasks?task_id=eq.{task_id}" headers = self._get_headers() response = self.session.delete(url, headers=headers, timeout=10) return response.status_code in [200, 204] except Exception as e: print(f"[ERROR] Ошибка: {e}") return False # ============================================================ # ВЕКТОРНЫЙ ПОИСК # ============================================================ def search_similar_tasks( self, query_text: str, threshold: float = 0.7, limit: int = 10 ) -> List[Dict]: """Поиск похожих заданий с помощью векторного поиска""" if not SUPABASE_ENABLED: return [] # Генерируем embeddings для запроса query_embedding = self.embeddings.get_embedding(query_text) if not query_embedding: # Fallback: текстовый поиск return self._text_search(query_text, limit) try: # Используем RPC функцию для векторного поиска url = f"{SUPABASE_URL}/rest/v1/rpc/find_similar_tasks" headers = self._get_headers() payload = { "search_text": query_text, "match_threshold": threshold, "match_count": limit } response = self.session.post(url, headers=headers, json=payload, timeout=10) if response.status_code == 200: return response.json() return [] except Exception as e: print(f"[ERROR] Ошибка векторного поиска: {e}") return self._text_search(query_text, limit) def _text_search(self, query: str, limit: int = 10) -> List[Dict]: """Текстовый поиск (fallback)""" if not SUPABASE_ENABLED: return [] try: # Поиск по ключевым словам и теме url = f"{SUPABASE_URL}/rest/v1/tasks?or=(topic.ilike.%{query}%,condition.ilike.%{query}%)&limit={limit}" headers = self._get_headers() response = self.session.get(url, headers=headers, timeout=10) if response.status_code == 200: return response.json() return [] except Exception as e: print(f"[ERROR] Ошибка текстового поиска: {e}") return [] # ============================================================ # МАССОВЫЕ ОПЕРАЦИИ # ============================================================ def save_tasks_batch(self, tasks: List[Dict]) -> Dict: """Массовое сохранение заданий""" if not SUPABASE_ENABLED: return {"saved": 0, "failed": 0, "total": len(tasks), "error": "Supabase не подключен"} stats = {"saved": 0, "failed": 0, "total": len(tasks)} print(f"\nСохранение {len(tasks)} заданий в Supabase...") for i, task in enumerate(tasks, 1): print(f" [{i}/{len(tasks)}]") result = self.create_task(task) if result: stats["saved"] += 1 else: stats["failed"] += 1 print(f"\n[OK] Сохранено: {stats['saved']}, Ошибок: {stats['failed']}") return stats # ============================================================ # АНАЛИТИКА # ============================================================ def get_topic_stats(self) -> List[Dict]: """Статистика по темам""" if not SUPABASE_ENABLED: return [] try: url = f"{SUPABASE_URL}/rest/v1/rpc/get_topic_stats" headers = self._get_headers() response = self.session.post(url, headers=headers, json={}, timeout=10) if response.status_code == 200: return response.json() return [] except Exception as e: print(f"[ERROR] Ошибка статистики: {e}") return [] def get_random_tasks(self, topic: Optional[str] = None, limit: int = 10) -> List[Dict]: """Получение случайных заданий""" if not SUPABASE_ENABLED: return [] try: url = f"{SUPABASE_URL}/rest/v1/rpc/get_random_tasks" headers = self._get_headers() payload = {"limit_count": limit} if topic: payload["topic_filter"] = topic response = self.session.post(url, headers=headers, json=payload, timeout=10) if response.status_code == 200: return response.json() return [] except Exception as e: print(f"[ERROR] Ошибка: {e}") return [] # ============================================================ # УТИЛИТЫ # ============================================================ def _get_headers(self) -> Dict: """Получение заголовков для API запросов""" return { "apikey": SUPABASE_KEY, "Authorization": f"Bearer {SUPABASE_KEY}", "Content-Type": "application/json", "Prefer": "return=representation" } def _extract_keywords(self, text: str, max_keywords: int = 10) -> List[str]: """Извлечение ключевых слов (простая реализация)""" # Стоп-слова для русского языка stop_words = { 'и', 'в', 'во', 'не', 'что', 'он', 'на', 'я', 'с', 'со', 'как', 'а', 'то', 'все', 'она', 'так', 'его', 'но', 'да', 'ты', 'к', 'у', 'же', 'вы', 'за', 'бы', 'по', 'только', 'ее', 'мне', 'было', 'вот', 'от', 'меня', 'еще', 'нет', 'о', 'из', 'ему', 'теперь', 'когда', 'даже', 'ну', 'вдруг', 'ли', 'если', 'уже', 'или', 'ни', 'быть', 'был', 'него', 'до', 'вас', 'нибудь', 'опять', 'уж', 'вам', 'вед', 'пусть', 'тогда', 'кто', 'этой', 'того', 'потому', 'этот', 'какой', 'совсем', 'ним', 'здесь', 'этом', 'один', 'почти', 'мой', 'тем', 'чтобы', 'нее', 'сейчас', 'были', 'куда', 'зачем', 'всех', 'никогда', 'можно', 'при', 'наконец', 'два', 'об', 'другой', 'хоть', 'после', 'над', 'больше', 'тот', 'через', 'эти', 'нас', 'про', 'всего', 'них', 'какая', 'много', 'разве', 'три', 'эту', 'моя', 'впрочем', 'хорошо', 'у', 'для', 'че', 'лет', 'который', 'правда', 'место', 'слово' } words = text.lower().split() keywords = [] for word in words: # Очищаем от знаков препинания word = ''.join(c for c in word if c.isalpha()) if len(word) > 3 and word not in stop_words and word not in keywords: keywords.append(word) if len(keywords) >= max_keywords: break return keywords def test_connection(self) -> bool: """Проверка подключения""" if not SUPABASE_ENABLED: return False try: url = f"{SUPABASE_URL}/rest/v1/tasks?limit=1" headers = self._get_headers() response = self.session.get(url, headers=headers, timeout=10) return response.status_code == 200 except Exception as e: print(f"[ERROR] Ошибка подключения: {e}") return False # ============================================================ # ДЕКОРАТОР ДЛЯ АСИНХРОННОЙ ОЧЕРЕДИ # ============================================================ class EmbeddingsQueue: """Очередь для асинхронной генерации embeddings""" def __init__(self, supabase_client: SupabaseClient): self.client = supabase_client def enqueue(self, task_id: str, text: str) -> bool: """Добавление задачи в очередь""" if not SUPABASE_ENABLED: return False try: url = f"{SUPABASE_URL}/rest/v1/rpc/pgmq_send" headers = self.client._get_headers() payload = { "queue_name": "embeddings_queue", "message": { "task_id": task_id, "text": text, "created_at": datetime.now().isoformat() } } response = self.client.session.post(url, headers=headers, json=payload, timeout=10) return response.status_code in [200, 201] except Exception as e: print(f"[ERROR] Ошибка очереди: {e}") return False # ============================================================ # ЗАПУСК # ============================================================ if __name__ == "__main__": print("="*60) print("Тестирование Supabase клиента") print("="*60) client = SupabaseClient() if client.test_connection(): print("\n[OK] Подключение к Supabase успешно!") # Тест получения заданий tasks = client.get_tasks(limit=5) print(f"\nПолучено заданий: {len(tasks)}") # Тест статистики stats = client.get_topic_stats() print(f"\nСтатистика по темам: {stats}") # Тест векторного поиска similar = client.search_similar_tasks("орфография корни слов", limit=3) print(f"\nПохожие задания: {len(similar)}") else: print("\n[WARN] Supabase не подключен") print("Настройте переменные окружения:") print(" SUPABASE_URL=https://your-project.supabase.co") print(" SUPABASE_KEY=your-anon-key")