scr / supabase_client.py
greeta's picture
Upload 8 files
4e285d0 verified
"""
Расширенный клиент Supabase с поддержкой векторного поиска и embeddings
"""
import os
import json
import requests
import torch
from typing import List, Dict, Optional, Any
from datetime import datetime
from dotenv import load_dotenv
# Загружаем переменные окружения
load_dotenv()
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
SUPABASE_ENABLED = bool(SUPABASE_URL and SUPABASE_KEY)
class SupabaseEmbeddings:
"""Генерация embeddings с помощью ruBERT"""
def __init__(self):
self.tokenizer = None
self.model = None
self._loaded = False
def load_model(self):
"""Загрузка модели ruBERT"""
if self._loaded:
return
try:
from transformers import AutoTokenizer, AutoModel
print("Загрузка ruBERT для embeddings...")
self.tokenizer = AutoTokenizer.from_pretrained("DeepPavlov/rubert-base-cased")
self.model = AutoModel.from_pretrained("DeepPavlov/rubert-base-cased")
self.model.eval()
self._loaded = True
print("[OK] ruBERT загружен")
except Exception as e:
print(f"[WARN] ruBERT не загружен: {e}")
def get_embedding(self, text: str, max_length: int = 512) -> Optional[List[float]]:
"""Получение векторного представления текста"""
if not self._loaded:
self.load_model()
if not self._loaded:
return None
try:
inputs = self.tokenizer(
text,
return_tensors="pt",
truncation=True,
max_length=max_length,
padding=True
)
with torch.no_grad():
outputs = self.model(**inputs)
# Mean pooling
token_embeddings = outputs.last_hidden_state
attention_mask = inputs["attention_mask"]
mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
embedding = torch.sum(token_embeddings * mask_expanded, 1) / torch.clamp(mask_expanded.sum(1), min=1e-9)
# Нормализация
embedding = torch.nn.functional.normalize(embedding, p=2, dim=1)
return embedding[0].tolist()
except Exception as e:
print(f"[ERROR] Ошибка генерации embeddings: {e}")
return None
class SupabaseClient:
"""Расширенный клиент Supabase с векторным поиском"""
def __init__(self):
self.embeddings = SupabaseEmbeddings()
self.session = requests.Session()
if SUPABASE_ENABLED:
print(f"[OK] Supabase подключен: {SUPABASE_URL}")
else:
print("[WARN] Supabase не настроен")
# ============================================================
# CRUD ОПЕРАЦИИ
# ============================================================
def create_task(self, task_data: Dict) -> Optional[int]:
"""Создание задания"""
if not SUPABASE_ENABLED:
return None
try:
url = f"{SUPABASE_URL}/rest/v1/tasks"
headers = self._get_headers()
# Генерируем embeddings для контента
content_text = f"{task_data.get('condition', '')} {task_data.get('content', '')}"
embedding = self.embeddings.get_embedding(content_text)
if embedding:
task_data['embeddings'] = json.dumps(embedding)
# Извлекаем ключевые слова
if 'keywords' not in task_data:
task_data['keywords'] = self._extract_keywords(content_text)
response = self.session.post(url, headers=headers, json=task_data, timeout=10)
if response.status_code in [200, 201]:
result = response.json()
if result:
return result[0].get("id")
print(f"[ERROR] Ошибка создания: {response.status_code}")
return None
except Exception as e:
print(f"[ERROR] Ошибка: {e}")
return None
def get_task(self, task_id: str) -> Optional[Dict]:
"""Получение задания по ID"""
if not SUPABASE_ENABLED:
return None
try:
url = f"{SUPABASE_URL}/rest/v1/tasks?task_id=eq.{task_id}"
headers = self._get_headers()
response = self.session.get(url, headers=headers, timeout=10)
if response.status_code == 200:
tasks = response.json()
return tasks[0] if tasks else None
return None
except Exception as e:
print(f"[ERROR] Ошибка: {e}")
return None
def get_tasks(
self,
topic: Optional[str] = None,
limit: int = 100,
offset: int = 0
) -> List[Dict]:
"""Получение списка заданий с фильтрацией"""
if not SUPABASE_ENABLED:
return []
try:
url = f"{SUPABASE_URL}/rest/v1/tasks?limit={limit}&offset={offset}"
headers = self._get_headers()
if topic:
url += f"&topic=eq.{topic}"
response = self.session.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
return []
except Exception as e:
print(f"[ERROR] Ошибка: {e}")
return []
def update_task(self, task_id: str, updates: Dict) -> bool:
"""Обновление задания"""
if not SUPABASE_ENABLED:
return False
try:
url = f"{SUPABASE_URL}/rest/v1/tasks?task_id=eq.{task_id}"
headers = self._get_headers()
response = self.session.patch(url, headers=headers, json=updates, timeout=10)
return response.status_code in [200, 204]
except Exception as e:
print(f"[ERROR] Ошибка: {e}")
return False
def delete_task(self, task_id: str) -> bool:
"""Удаление задания"""
if not SUPABASE_ENABLED:
return False
try:
url = f"{SUPABASE_URL}/rest/v1/tasks?task_id=eq.{task_id}"
headers = self._get_headers()
response = self.session.delete(url, headers=headers, timeout=10)
return response.status_code in [200, 204]
except Exception as e:
print(f"[ERROR] Ошибка: {e}")
return False
# ============================================================
# ВЕКТОРНЫЙ ПОИСК
# ============================================================
def search_similar_tasks(
self,
query_text: str,
threshold: float = 0.7,
limit: int = 10
) -> List[Dict]:
"""Поиск похожих заданий с помощью векторного поиска"""
if not SUPABASE_ENABLED:
return []
# Генерируем embeddings для запроса
query_embedding = self.embeddings.get_embedding(query_text)
if not query_embedding:
# Fallback: текстовый поиск
return self._text_search(query_text, limit)
try:
# Используем RPC функцию для векторного поиска
url = f"{SUPABASE_URL}/rest/v1/rpc/find_similar_tasks"
headers = self._get_headers()
payload = {
"search_text": query_text,
"match_threshold": threshold,
"match_count": limit
}
response = self.session.post(url, headers=headers, json=payload, timeout=10)
if response.status_code == 200:
return response.json()
return []
except Exception as e:
print(f"[ERROR] Ошибка векторного поиска: {e}")
return self._text_search(query_text, limit)
def _text_search(self, query: str, limit: int = 10) -> List[Dict]:
"""Текстовый поиск (fallback)"""
if not SUPABASE_ENABLED:
return []
try:
# Поиск по ключевым словам и теме
url = f"{SUPABASE_URL}/rest/v1/tasks?or=(topic.ilike.%{query}%,condition.ilike.%{query}%)&limit={limit}"
headers = self._get_headers()
response = self.session.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
return []
except Exception as e:
print(f"[ERROR] Ошибка текстового поиска: {e}")
return []
# ============================================================
# МАССОВЫЕ ОПЕРАЦИИ
# ============================================================
def save_tasks_batch(self, tasks: List[Dict]) -> Dict:
"""Массовое сохранение заданий"""
if not SUPABASE_ENABLED:
return {"saved": 0, "failed": 0, "total": len(tasks), "error": "Supabase не подключен"}
stats = {"saved": 0, "failed": 0, "total": len(tasks)}
print(f"\nСохранение {len(tasks)} заданий в Supabase...")
for i, task in enumerate(tasks, 1):
print(f" [{i}/{len(tasks)}]")
result = self.create_task(task)
if result:
stats["saved"] += 1
else:
stats["failed"] += 1
print(f"\n[OK] Сохранено: {stats['saved']}, Ошибок: {stats['failed']}")
return stats
# ============================================================
# АНАЛИТИКА
# ============================================================
def get_topic_stats(self) -> List[Dict]:
"""Статистика по темам"""
if not SUPABASE_ENABLED:
return []
try:
url = f"{SUPABASE_URL}/rest/v1/rpc/get_topic_stats"
headers = self._get_headers()
response = self.session.post(url, headers=headers, json={}, timeout=10)
if response.status_code == 200:
return response.json()
return []
except Exception as e:
print(f"[ERROR] Ошибка статистики: {e}")
return []
def get_random_tasks(self, topic: Optional[str] = None, limit: int = 10) -> List[Dict]:
"""Получение случайных заданий"""
if not SUPABASE_ENABLED:
return []
try:
url = f"{SUPABASE_URL}/rest/v1/rpc/get_random_tasks"
headers = self._get_headers()
payload = {"limit_count": limit}
if topic:
payload["topic_filter"] = topic
response = self.session.post(url, headers=headers, json=payload, timeout=10)
if response.status_code == 200:
return response.json()
return []
except Exception as e:
print(f"[ERROR] Ошибка: {e}")
return []
# ============================================================
# УТИЛИТЫ
# ============================================================
def _get_headers(self) -> Dict:
"""Получение заголовков для API запросов"""
return {
"apikey": SUPABASE_KEY,
"Authorization": f"Bearer {SUPABASE_KEY}",
"Content-Type": "application/json",
"Prefer": "return=representation"
}
def _extract_keywords(self, text: str, max_keywords: int = 10) -> List[str]:
"""Извлечение ключевых слов (простая реализация)"""
# Стоп-слова для русского языка
stop_words = {
'и', 'в', 'во', 'не', 'что', 'он', 'на', 'я', 'с', 'со', 'как', 'а', 'то',
'все', 'она', 'так', 'его', 'но', 'да', 'ты', 'к', 'у', 'же', 'вы', 'за',
'бы', 'по', 'только', 'ее', 'мне', 'было', 'вот', 'от', 'меня', 'еще',
'нет', 'о', 'из', 'ему', 'теперь', 'когда', 'даже', 'ну', 'вдруг', 'ли',
'если', 'уже', 'или', 'ни', 'быть', 'был', 'него', 'до', 'вас', 'нибудь',
'опять', 'уж', 'вам', 'вед', 'пусть', 'тогда', 'кто', 'этой', 'того',
'потому', 'этот', 'какой', 'совсем', 'ним', 'здесь', 'этом', 'один',
'почти', 'мой', 'тем', 'чтобы', 'нее', 'сейчас', 'были', 'куда', 'зачем',
'всех', 'никогда', 'можно', 'при', 'наконец', 'два', 'об', 'другой',
'хоть', 'после', 'над', 'больше', 'тот', 'через', 'эти', 'нас', 'про',
'всего', 'них', 'какая', 'много', 'разве', 'три', 'эту', 'моя', 'впрочем',
'хорошо', 'у', 'для', 'че', 'лет', 'который', 'правда', 'место', 'слово'
}
words = text.lower().split()
keywords = []
for word in words:
# Очищаем от знаков препинания
word = ''.join(c for c in word if c.isalpha())
if len(word) > 3 and word not in stop_words and word not in keywords:
keywords.append(word)
if len(keywords) >= max_keywords:
break
return keywords
def test_connection(self) -> bool:
"""Проверка подключения"""
if not SUPABASE_ENABLED:
return False
try:
url = f"{SUPABASE_URL}/rest/v1/tasks?limit=1"
headers = self._get_headers()
response = self.session.get(url, headers=headers, timeout=10)
return response.status_code == 200
except Exception as e:
print(f"[ERROR] Ошибка подключения: {e}")
return False
# ============================================================
# ДЕКОРАТОР ДЛЯ АСИНХРОННОЙ ОЧЕРЕДИ
# ============================================================
class EmbeddingsQueue:
"""Очередь для асинхронной генерации embeddings"""
def __init__(self, supabase_client: SupabaseClient):
self.client = supabase_client
def enqueue(self, task_id: str, text: str) -> bool:
"""Добавление задачи в очередь"""
if not SUPABASE_ENABLED:
return False
try:
url = f"{SUPABASE_URL}/rest/v1/rpc/pgmq_send"
headers = self.client._get_headers()
payload = {
"queue_name": "embeddings_queue",
"message": {
"task_id": task_id,
"text": text,
"created_at": datetime.now().isoformat()
}
}
response = self.client.session.post(url, headers=headers, json=payload, timeout=10)
return response.status_code in [200, 201]
except Exception as e:
print(f"[ERROR] Ошибка очереди: {e}")
return False
# ============================================================
# ЗАПУСК
# ============================================================
if __name__ == "__main__":
print("="*60)
print("Тестирование Supabase клиента")
print("="*60)
client = SupabaseClient()
if client.test_connection():
print("\n[OK] Подключение к Supabase успешно!")
# Тест получения заданий
tasks = client.get_tasks(limit=5)
print(f"\nПолучено заданий: {len(tasks)}")
# Тест статистики
stats = client.get_topic_stats()
print(f"\nСтатистика по темам: {stats}")
# Тест векторного поиска
similar = client.search_similar_tasks("орфография корни слов", limit=3)
print(f"\nПохожие задания: {len(similar)}")
else:
print("\n[WARN] Supabase не подключен")
print("Настройте переменные окружения:")
print(" SUPABASE_URL=https://your-project.supabase.co")
print(" SUPABASE_KEY=your-anon-key")