| import os |
| from datetime import datetime, timedelta |
| from typing import Optional, Union |
| from urllib.parse import urlparse |
|
|
| from pymongo import MongoClient, ReturnDocument |
| from pymongo.errors import DuplicateKeyError |
| from bson import ObjectId |
|
|
| import config |
|
|
| |
| mongodb_clients = {} |
|
|
|
|
| def _normalize_date(dt: datetime) -> datetime: |
| """Return UTC date at midnight for consistent daily counters.""" |
| return datetime(dt.year, dt.month, dt.day) |
|
|
|
|
| def _build_ai_edit_daily_count(existing: Optional[list], now: datetime) -> list: |
| """ |
| Ensure ai_edit_daily_count is present and includes all days up to today. |
| - If today's date exists: return list unchanged. |
| - If missing dates: fill gaps with count=0 and add today with count=1. |
| - Always keep only the most recent 32 entries (oldest removed first). |
| """ |
| today = _normalize_date(now) |
| if not existing: |
| return [{"date": today, "count": 1}] |
| |
| |
| normalized = [] |
| seen_dates = set() |
| for entry in existing: |
| date_val = entry.get("date") |
| if isinstance(date_val, datetime): |
| d = _normalize_date(date_val) |
| elif isinstance(date_val, str): |
| try: |
| d = _normalize_date(datetime.fromisoformat(date_val)) |
| except Exception: |
| continue |
| else: |
| continue |
| seen_dates.add(d) |
| normalized.append({"date": d, "count": entry.get("count", 0)}) |
| |
| if today in seen_dates: |
| |
| normalized.sort(key=lambda x: x["date"]) |
| if len(normalized) > 32: |
| normalized = normalized[-32:] |
| return normalized |
| |
| |
| normalized.sort(key=lambda x: x["date"]) |
| last_date = normalized[-1]["date"] |
| cursor = last_date + timedelta(days=1) |
| while cursor < today: |
| normalized.append({"date": cursor, "count": 0}) |
| cursor += timedelta(days=1) |
| |
| |
| normalized.append({"date": today, "count": 1}) |
|
|
| |
| normalized.sort(key=lambda x: x["date"]) |
| if len(normalized) > 32: |
| normalized = normalized[-32:] |
|
|
| return normalized |
|
|
|
|
| def get_selfi_short_client(): |
| """Get MongoDB client for the Selfie Short database (DEFAULT)""" |
| if "selfi_short" in mongodb_clients: |
| try: |
| mongodb_clients["selfi_short"].admin.command('ping') |
| return mongodb_clients["selfi_short"] |
| except: |
| del mongodb_clients["selfi_short"] |
| |
| if not config.MONGODB_SELFI_SHORT: |
| print("[MongoDB] MONGODB_SELFI_SHORT connection string not set") |
| return None |
| |
| try: |
| client = MongoClient( |
| config.MONGODB_SELFI_SHORT, |
| tlsAllowInvalidCertificates=True, |
| serverSelectionTimeoutMS=15000, |
| connectTimeoutMS=20000, |
| socketTimeoutMS=30000 |
| ) |
| client.admin.command('ping') |
| mongodb_clients["selfi_short"] = client |
| print(f"[MongoDB] Successfully connected to Selfie Short database") |
| return client |
| except Exception as e: |
| print(f"[MongoDB] Error connecting to Selfie Short MongoDB: {str(e)[:200]}") |
| return None |
|
|
|
|
| def get_collage_maker_client(): |
| """Get MongoDB client for the collage-maker database""" |
| if "collage_maker" in mongodb_clients: |
| try: |
| mongodb_clients["collage_maker"].admin.command('ping') |
| return mongodb_clients["collage_maker"] |
| except: |
| del mongodb_clients["collage_maker"] |
| |
| if not config.MONGODB_COLLAGE_MAKER: |
| print("[MongoDB] MONGODB_COLLAGE_MAKER connection string not set") |
| return None |
| |
| try: |
| client = MongoClient( |
| config.MONGODB_COLLAGE_MAKER, |
| tlsAllowInvalidCertificates=True, |
| serverSelectionTimeoutMS=15000, |
| connectTimeoutMS=20000, |
| socketTimeoutMS=30000 |
| ) |
| client.admin.command('ping') |
| mongodb_clients["collage_maker"] = client |
| print(f"[MongoDB] Successfully connected to collage-maker database") |
| return client |
| except Exception as e: |
| print(f"[MongoDB] Error connecting to collage-maker MongoDB: {str(e)[:200]}") |
| return None |
|
|
|
|
| def get_ai_enhancer_client(): |
| """Get MongoDB client for the AI-Enhancer database""" |
| if "ai_enhancer" in mongodb_clients: |
| try: |
| mongodb_clients["ai_enhancer"].admin.command('ping') |
| return mongodb_clients["ai_enhancer"] |
| except: |
| del mongodb_clients["ai_enhancer"] |
| |
| if not config.MONGODB_AI_ENHANCER: |
| print("[MongoDB] MONGODB_AI_ENHANCER connection string not set") |
| return None |
| |
| try: |
| client = MongoClient( |
| config.MONGODB_AI_ENHANCER, |
| tlsAllowInvalidCertificates=True, |
| serverSelectionTimeoutMS=15000, |
| connectTimeoutMS=20000, |
| socketTimeoutMS=30000 |
| ) |
| client.admin.command('ping') |
| mongodb_clients["ai_enhancer"] = client |
| print(f"[MongoDB] Successfully connected to AI-Enhancer database") |
| return client |
| except Exception as e: |
| print(f"[MongoDB] Error connecting to AI-Enhancer MongoDB: {str(e)[:200]}") |
| return None |
|
|
|
|
| def save_media_click(user_id: Optional[Union[int, str]], category_id: str, appname: Optional[str] = None): |
| """ |
| Save or update media click in the appropriate MongoDB database based on appname. |
| |
| Args: |
| user_id: Optional integer or ObjectId string. If None, a new ObjectId will be generated automatically. |
| category_id: Required MongoDB ObjectId string for category. |
| appname: Optional app name (case-insensitive). |
| - If "collage-maker", uses collage-maker MongoDB (adminPanel database) with "media_clicks" collection |
| - If "AI-Enhancer", uses AI-Enhancer MongoDB (test database) with "media_clicks" collection |
| - Otherwise (DEFAULT), uses Selfie Short MongoDB (adminPanel database) with "media_clicks" collection |
| |
| Document Structure: |
| { |
| userId: ObjectId, |
| categories: [ |
| { |
| categoryId: ObjectId, |
| click_count: int, |
| lastClickedAt: Date |
| } |
| ], |
| ai_edit_complete: int, # Count of times user used any model (default: 0 for new users) |
| ai_edit_last_date: Date, # Last date user used any model (stored as Date object) |
| ai_edit_daily_count: [ # Daily usage tracking (last 32 days) |
| {"date": Date, "count": int}, |
| ... |
| ], |
| createdAt: Date, |
| updatedAt: Date |
| } |
| """ |
| |
| print(f"[MongoDB] save_media_click called - appname: '{appname}', user_id: {user_id}, category_id: {category_id}") |
| |
| |
| use_collage_maker = (appname and appname.lower() == "collage-maker") |
| use_ai_enhancer = (appname and appname.lower() == "ai-enhancer") |
| |
| print(f"[MongoDB] Database selection - use_collage_maker: {use_collage_maker}, use_ai_enhancer: {use_ai_enhancer}, default: Selfie Short") |
| |
| |
| if use_collage_maker: |
| if not config.MONGODB_COLLAGE_MAKER: |
| print("[MongoDB] MONGODB_COLLAGE_MAKER not configured, skipping media_clicks logging") |
| return False |
| elif use_ai_enhancer: |
| if not config.MONGODB_AI_ENHANCER: |
| print("[MongoDB] MONGODB_AI_ENHANCER not configured, skipping media_clicks logging") |
| return False |
| else: |
| |
| if not config.MONGODB_SELFI_SHORT: |
| print("[MongoDB] MONGODB_SELFI_SHORT not configured, skipping media_clicks logging") |
| return False |
| |
| |
| if not category_id: |
| print("[MongoDB] category_id not provided, skipping media_clicks logging") |
| return False |
| |
| try: |
| |
| if use_collage_maker: |
| mongo_client = get_collage_maker_client() |
| if mongo_client is None: |
| print(f"[MongoDB] ERROR: Could not connect to collage-maker MongoDB") |
| if "collage_maker" in mongodb_clients: |
| del mongodb_clients["collage_maker"] |
| return False |
| db = mongo_client.get_database(config.MONGODB_COLLAGE_MAKER_ADMIN_DB_NAME) |
| collection_name = "media_clicks" |
| elif use_ai_enhancer: |
| mongo_client = get_ai_enhancer_client() |
| if mongo_client is None: |
| print(f"[MongoDB] ERROR: Could not connect to AI-Enhancer MongoDB") |
| if "ai_enhancer" in mongodb_clients: |
| del mongodb_clients["ai_enhancer"] |
| return False |
| db = mongo_client.get_database(config.MONGODB_AI_ENHANCER_ADMIN_DB_NAME) |
| collection_name = "media_clicks" |
| print(f"[MongoDB] AI-Enhancer: Using database '{config.MONGODB_AI_ENHANCER_ADMIN_DB_NAME}', collection '{collection_name}'") |
| else: |
| |
| mongo_client = get_selfi_short_client() |
| if mongo_client is None: |
| print(f"[MongoDB] ERROR: Could not connect to Selfie Short MongoDB") |
| if "selfi_short" in mongodb_clients: |
| del mongodb_clients["selfi_short"] |
| return False |
| db = mongo_client.get_database(config.MONGODB_SELFI_SHORT_ADMIN_DB_NAME) |
| collection_name = "media_clicks" |
| print(f"[MongoDB] Selfie Short (DEFAULT): Using database '{config.MONGODB_SELFI_SHORT_ADMIN_DB_NAME}', collection '{collection_name}'") |
| |
| collection = db.get_collection(collection_name) |
| |
| |
| if user_id is None: |
| user_object_id = ObjectId() |
| print(f"[MongoDB] user_id not provided, generated new ObjectId: {user_object_id}") |
| else: |
| try: |
| user_id_str = str(user_id).strip() |
| if len(user_id_str) == 24: |
| user_object_id = ObjectId(user_id_str) |
| print(f"[MongoDB] Using provided user_id as ObjectId: {user_object_id}") |
| else: |
| |
| user_id_int = int(user_id_str) |
| user_id_hex = hex(user_id_int)[2:].ljust(24, "0")[:24] |
| user_object_id = ObjectId(user_id_hex) |
| print(f"[MongoDB] Converted integer user_id '{user_id_str}' -> ObjectId: {user_object_id}") |
| except Exception as e: |
| print(f"[MongoDB] Error converting user_id to ObjectId: {e}") |
| user_object_id = ObjectId() |
| print(f"[MongoDB] Generated new ObjectId due to conversion error: {user_object_id}") |
| |
| |
| try: |
| category_object_id = ObjectId(category_id) |
| except Exception as e: |
| print(f"[MongoDB] Error converting category_id to ObjectId: {e}, category_id: {category_id}") |
| return False |
| |
| now = datetime.utcnow() |
| |
| |
| existing_doc = collection.find_one({"userId": user_object_id}) |
| |
| if existing_doc: |
| |
| today_daily_counts = _build_ai_edit_daily_count(existing_doc.get("ai_edit_daily_count"), now) |
| print(f"[MongoDB] Found existing document for userId: {user_object_id}") |
| |
| |
| category_exists = False |
| for cat in existing_doc.get("categories", []): |
| cat_id = cat.get("categoryId") |
| if isinstance(cat_id, ObjectId): |
| if cat_id == category_object_id: |
| category_exists = True |
| break |
| elif str(cat_id) == str(category_object_id): |
| category_exists = True |
| break |
| |
| if category_exists: |
| |
| result = collection.update_one( |
| { |
| "userId": user_object_id, |
| "categories.categoryId": category_object_id |
| }, |
| { |
| "$inc": { |
| "categories.$.click_count": 1, |
| "ai_edit_complete": 1 |
| }, |
| "$set": { |
| "categories.$.lastClickedAt": now, |
| "ai_edit_last_date": now, |
| "updatedAt": now, |
| "ai_edit_daily_count": today_daily_counts |
| } |
| } |
| ) |
| print(f"[MongoDB] Updated category click_count - userId: {user_object_id}, categoryId: {category_object_id}") |
| else: |
| |
| result = collection.update_one( |
| {"userId": user_object_id}, |
| { |
| "$push": { |
| "categories": { |
| "categoryId": category_object_id, |
| "click_count": 1, |
| "lastClickedAt": now |
| } |
| }, |
| "$inc": { |
| "ai_edit_complete": 1 |
| }, |
| "$set": { |
| "ai_edit_last_date": now, |
| "updatedAt": now, |
| "ai_edit_daily_count": today_daily_counts |
| } |
| } |
| ) |
| print(f"[MongoDB] Added new category - userId: {user_object_id}, categoryId: {category_object_id}") |
| else: |
| |
| today_daily_counts_new = _build_ai_edit_daily_count(None, now) |
| result = collection.find_one_and_update( |
| {"userId": user_object_id}, |
| { |
| "$setOnInsert": { |
| "userId": user_object_id, |
| "createdAt": now, |
| "ai_edit_complete": 1, |
| "ai_edit_daily_count": today_daily_counts_new |
| }, |
| "$push": { |
| "categories": { |
| "categoryId": category_object_id, |
| "click_count": 1, |
| "lastClickedAt": now |
| } |
| }, |
| "$set": { |
| "ai_edit_last_date": now, |
| "updatedAt": now |
| } |
| }, |
| upsert=True, |
| return_document=ReturnDocument.AFTER |
| ) |
| print(f"[MongoDB] Created new document - userId: {user_object_id}, categoryId: {category_object_id}") |
| |
| return True |
| |
| except Exception as e: |
| import traceback |
| print(f"[MongoDB] ERROR saving media_clicks: {str(e)}") |
| print(f"[MongoDB] Traceback: {traceback.format_exc()}") |
| |
| |
| if use_collage_maker: |
| if "collage_maker" in mongodb_clients: |
| del mongodb_clients["collage_maker"] |
| elif use_ai_enhancer: |
| if "ai_enhancer" in mongodb_clients: |
| del mongodb_clients["ai_enhancer"] |
| else: |
| if "selfi_short" in mongodb_clients: |
| del mongodb_clients["selfi_short"] |
| return False |
|
|
|
|
| def get_category_name(category_id: str, appname: Optional[str] = None) -> Optional[str]: |
| """ |
| Look up category name from the appropriate MongoDB based on appname -> admin DB -> category collection. |
| |
| Args: |
| category_id: MongoDB ObjectId string for the category |
| appname: App name to determine which MongoDB to use |
| - "collage-maker" → MONGODB_COLLAGE_MAKER |
| - "AI-Enhancer" → MONGODB_AI_ENHANCER |
| - default → MONGODB_SELFI_SHORT |
| |
| Returns: |
| Category name string or None if not found |
| """ |
| if not category_id: |
| return None |
| |
| |
| use_collage_maker = (appname and appname.lower() == "collage-maker") |
| use_ai_enhancer = (appname and appname.lower() == "ai-enhancer") |
| |
| try: |
| |
| if use_collage_maker: |
| if not config.MONGODB_COLLAGE_MAKER: |
| print("[MongoDB] MONGODB_COLLAGE_MAKER not configured, cannot lookup category name") |
| return None |
| mongo_client = get_collage_maker_client() |
| admin_db_name = config.MONGODB_COLLAGE_MAKER_ADMIN_DB_NAME |
| elif use_ai_enhancer: |
| if not config.MONGODB_AI_ENHANCER: |
| print("[MongoDB] MONGODB_AI_ENHANCER not configured, cannot lookup category name") |
| return None |
| mongo_client = get_ai_enhancer_client() |
| admin_db_name = config.MONGODB_AI_ENHANCER_ADMIN_DB_NAME |
| else: |
| |
| if not config.MONGODB_SELFI_SHORT: |
| print("[MongoDB] MONGODB_SELFI_SHORT not configured, cannot lookup category name") |
| return None |
| mongo_client = get_selfi_short_client() |
| admin_db_name = config.MONGODB_SELFI_SHORT_ADMIN_DB_NAME |
| |
| if mongo_client is None: |
| print(f"[MongoDB] Could not connect to MongoDB for category lookup") |
| return None |
| |
| |
| db = mongo_client.get_database(admin_db_name) |
| collection = db.get_collection("categories") |
| |
| |
| try: |
| category_object_id = ObjectId(category_id) |
| except Exception as e: |
| print(f"[MongoDB] Error converting category_id to ObjectId: {e}") |
| return None |
| |
| |
| category_doc = collection.find_one({"_id": category_object_id}) |
| |
| if category_doc: |
| category_name = category_doc.get("name") |
| print(f"[MongoDB] Found category name: {category_name} for id: {category_id} (appname: {appname})") |
| return category_name |
| else: |
| print(f"[MongoDB] Category not found for id: {category_id}") |
| return None |
| |
| except Exception as e: |
| print(f"[MongoDB] Error looking up category name: {str(e)}") |
| return None |
|
|
|
|
| |
| |
| PIXVERSE_CATEGORIES = { |
| "kiss": "Ai Kiss Video", |
| "hug": "Ai Hug Video", |
| "boyfriend": "Ai BF", |
| "bf": "Ai BF", |
| "girlfriend": "Ai GF", |
| "gf": "Ai GF" |
| } |
|
|
|
|
| def get_category_by_prompt(prompt_text: str, appname: Optional[str] = None) -> tuple: |
| """ |
| Match prompt_text against Pixverse category names and get category_id. |
| |
| Args: |
| prompt_text: The prompt text to match against categories |
| appname: App name to determine which MongoDB to use |
| |
| Returns: |
| Tuple of (category_id, category_name) or (None, None) if not found |
| """ |
| if not prompt_text: |
| return None, None |
| |
| prompt_lower = prompt_text.lower() |
| matched_category_name = None |
| |
| |
| for keyword, category_name in PIXVERSE_CATEGORIES.items(): |
| if keyword in prompt_lower: |
| matched_category_name = category_name |
| print(f"[MongoDB] Matched prompt keyword '{keyword}' to category: {category_name}") |
| break |
| |
| if not matched_category_name: |
| print(f"[MongoDB] No category match found for prompt: {prompt_text[:50]}...") |
| return None, None |
| |
| |
| use_collage_maker = (appname and appname.lower() == "collage-maker") |
| use_ai_enhancer = (appname and appname.lower() == "ai-enhancer") |
| |
| try: |
| |
| if use_collage_maker: |
| if not config.MONGODB_COLLAGE_MAKER: |
| print("[MongoDB] MONGODB_COLLAGE_MAKER not configured") |
| return None, matched_category_name |
| mongo_client = get_collage_maker_client() |
| admin_db_name = config.MONGODB_COLLAGE_MAKER_ADMIN_DB_NAME |
| elif use_ai_enhancer: |
| if not config.MONGODB_AI_ENHANCER: |
| print("[MongoDB] MONGODB_AI_ENHANCER not configured") |
| return None, matched_category_name |
| mongo_client = get_ai_enhancer_client() |
| admin_db_name = config.MONGODB_AI_ENHANCER_ADMIN_DB_NAME |
| else: |
| |
| if not config.MONGODB_SELFI_SHORT: |
| print("[MongoDB] MONGODB_SELFI_SHORT not configured") |
| return None, matched_category_name |
| mongo_client = get_selfi_short_client() |
| admin_db_name = config.MONGODB_SELFI_SHORT_ADMIN_DB_NAME |
| |
| if mongo_client is None: |
| print(f"[MongoDB] Could not connect to MongoDB for category lookup by prompt") |
| return None, matched_category_name |
| |
| |
| db = mongo_client.get_database(admin_db_name) |
| collection = db.get_collection("categories") |
| |
| |
| category_doc = collection.find_one({"name": matched_category_name}) |
| |
| if category_doc: |
| category_id = str(category_doc.get("_id")) |
| print(f"[MongoDB] Found category_id: {category_id} for name: {matched_category_name}") |
| return category_id, matched_category_name |
| else: |
| print(f"[MongoDB] Category not found in database for name: {matched_category_name}") |
| return None, matched_category_name |
| |
| except Exception as e: |
| print(f"[MongoDB] Error looking up category by prompt: {str(e)}") |
| return None, matched_category_name |
|
|
|
|
| def get_logs_client(): |
| """Get MongoDB client for the Logs database""" |
| if "logs" in mongodb_clients: |
| try: |
| mongodb_clients["logs"].admin.command('ping') |
| return mongodb_clients["logs"] |
| except: |
| del mongodb_clients["logs"] |
| |
| if not config.MONGODB_LOGS: |
| print("[MongoDB] MONGODB_LOGS connection string not set") |
| return None |
| |
| try: |
| client = MongoClient( |
| config.MONGODB_LOGS, |
| tlsAllowInvalidCertificates=True, |
| serverSelectionTimeoutMS=15000, |
| connectTimeoutMS=20000, |
| socketTimeoutMS=30000 |
| ) |
| client.admin.command('ping') |
| mongodb_clients["logs"] = client |
| print(f"[MongoDB] Successfully connected to Logs database") |
| return client |
| except Exception as e: |
| print(f"[MongoDB] Error connecting to Logs MongoDB: {str(e)[:200]}") |
| return None |
|
|
|
|
| def save_request_log( |
| user_id: Optional[str], |
| subcategory: str, |
| endpoint: str, |
| status: str, |
| response_time: float, |
| model: str = "PixVerse", |
| appname: Optional[str] = None, |
| error: Optional[str] = None |
| ): |
| """ |
| Save request log to MongoDB Logs database. |
| |
| Args: |
| user_id: User ID string |
| subcategory: Category/subcategory name |
| endpoint: API endpoint called |
| status: "success" or "error" |
| response_time: Time taken in seconds |
| model: Model name (default: "PixVerse") |
| appname: App name if provided |
| error: Error message if status is "error" |
| |
| Document Structure: |
| { |
| user_id: string, |
| subcategory: string, |
| endpoint: string, |
| status: string, |
| response_time: float, |
| model: string, |
| timestamp: Date, |
| appname: string or null, |
| error: string or null |
| } |
| """ |
| if not config.MONGODB_LOGS: |
| print("[MongoDB] MONGODB_LOGS not configured, skipping request logging") |
| return False |
| |
| try: |
| mongo_client = get_logs_client() |
| if mongo_client is None: |
| print(f"[MongoDB] ERROR: Could not connect to Logs MongoDB") |
| if "logs" in mongodb_clients: |
| del mongodb_clients["logs"] |
| return False |
| |
| db = mongo_client.get_database(config.MONGODB_LOGS_DB_NAME) |
| collection = db.get_collection(config.MONGODB_LOGS_COLLECTION) |
| |
| log_doc = { |
| "user_id": user_id, |
| "subcategory": subcategory, |
| "endpoint": endpoint, |
| "status": status, |
| "response_time": response_time, |
| "model": model, |
| "timestamp": datetime.utcnow(), |
| "appname": appname, |
| "error": error |
| } |
| |
| result = collection.insert_one(log_doc) |
| print(f"[MongoDB] Request log saved - user_id: {user_id}, status: {status}, endpoint: {endpoint}") |
| return True |
| |
| except Exception as e: |
| import traceback |
| print(f"[MongoDB] ERROR saving request log: {str(e)}") |
| print(f"[MongoDB] Traceback: {traceback.format_exc()}") |
| if "logs" in mongodb_clients: |
| del mongodb_clients["logs"] |
| return False |
|
|