Spaces:
Sleeping
Sleeping
| import os | |
| from supabase import create_client | |
| from dotenv import load_dotenv | |
| import bcrypt | |
| import uuid | |
| from datetime import datetime, timedelta | |
| from typing import Optional, Dict, Any | |
| load_dotenv() | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
| supabase = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| def get_supabase_client(): | |
| return supabase | |
| async def save_card(supabase_client, card_data: dict): | |
| """Speichert die Daten der generierten Horoskopkarte in Supabase.""" | |
| try: | |
| # Map the dictionary fields to the database table | |
| db_record = { | |
| "terms": card_data["terms"], | |
| "card_date": card_data["card_date"], | |
| "card_text": card_data["card_text"], | |
| "image_filename": card_data["image_filename"], | |
| "qr_code_filename": card_data["qr_code_filename"], | |
| "qr_code_link": card_data["qr_code_link"], | |
| "session_id": str(card_data["session_id"]), # Convert UUID to string | |
| "lang": card_data.get("lang", "de"), | |
| "prompt_text": card_data.get("prompt_text"), | |
| "ml_model_info": card_data.get("ml_model_info"), | |
| "generation_params": card_data.get("generation_params") | |
| # created_at wird von Supabase automatisch gesetzt, wenn entsprechend konfiguriert | |
| } | |
| response = supabase_client.table("cards").insert(db_record).execute() | |
| return response | |
| except Exception as e: | |
| # Hier wäre ein besseres Logging/Fehlerhandling gut | |
| print(f"Error saving to Supabase: {e}") | |
| raise | |
| # User management functions | |
| async def get_user_by_username(username: str) -> Optional[Dict[str, Any]]: | |
| """Get user by username from the database""" | |
| try: | |
| response = supabase.table("users").select("*").eq("username", username).execute() | |
| if response.data: | |
| return response.data[0] | |
| return None | |
| except Exception as e: | |
| print(f"Error getting user by username: {e}") | |
| return None | |
| async def get_user_by_email(email: str) -> Optional[Dict[str, Any]]: | |
| """Get user by email from the database""" | |
| try: | |
| response = supabase.table("users").select("*").eq("email", email).execute() | |
| if response.data: | |
| return response.data[0] | |
| return None | |
| except Exception as e: | |
| print(f"Error getting user by email: {e}") | |
| return None | |
| async def create_user(username: str, email: str, password: str) -> Optional[Dict[str, Any]]: | |
| """Create a new user with hashed password""" | |
| try: | |
| # Hash the password | |
| password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') | |
| user_data = { | |
| "username": username, | |
| "email": email, | |
| "password_hash": password_hash | |
| } | |
| response = supabase.table("users").insert(user_data).execute() | |
| if response.data: | |
| return response.data[0] | |
| return None | |
| except Exception as e: | |
| print(f"Error creating user: {e}") | |
| return None | |
| async def verify_password(plain_password: str, hashed_password: str) -> bool: | |
| """Verify a password against its hash""" | |
| try: | |
| return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8')) | |
| except Exception as e: | |
| print(f"Error verifying password: {e}") | |
| return False | |
| async def update_last_login(user_id: str): | |
| """Update the last login timestamp for a user""" | |
| try: | |
| from datetime import datetime | |
| current_time = datetime.utcnow().isoformat() | |
| supabase.table("users").update({"last_login": current_time}).eq("id", user_id).execute() | |
| except Exception as e: | |
| print(f"Error updating last login: {e}") | |
| # Session management functions | |
| async def create_user_session(user_id: str, token_jti: str, expires_at: datetime) -> Optional[Dict[str, Any]]: | |
| """Create a new user session""" | |
| try: | |
| session_data = { | |
| "user_id": user_id, | |
| "token_jti": token_jti, | |
| "expires_at": expires_at.isoformat(), | |
| "is_revoked": False | |
| } | |
| response = supabase.table("user_sessions").insert(session_data).execute() | |
| if response.data: | |
| return response.data[0] | |
| return None | |
| except Exception as e: | |
| print(f"Error creating user session: {e}") | |
| return None | |
| async def get_user_session(token_jti: str) -> Optional[Dict[str, Any]]: | |
| """Get user session by token JTI""" | |
| try: | |
| response = supabase.table("user_sessions").select("*").eq("token_jti", token_jti).eq("is_revoked", False).execute() | |
| if response.data: | |
| return response.data[0] | |
| return None | |
| except Exception as e: | |
| print(f"Error getting user session: {e}") | |
| return None | |
| async def revoke_user_session(token_jti: str): | |
| """Revoke a user session""" | |
| try: | |
| supabase.table("user_sessions").update({"is_revoked": True}).eq("token_jti", token_jti).execute() | |
| except Exception as e: | |
| print(f"Error revoking user session: {e}") | |
| async def cleanup_expired_sessions(): | |
| """Remove expired sessions from the database""" | |
| try: | |
| current_time = datetime.utcnow().isoformat() | |
| supabase.table("user_sessions").delete().lt("expires_at", current_time).execute() | |
| except Exception as e: | |
| print(f"Error cleaning up expired sessions: {e}") | |