| import os |
| import json |
| import time |
| from flask import Flask, request, jsonify, render_template |
| from flask_cors import CORS |
| import logging |
| import threading |
| from huggingface_hub import HfApi, hf_hub_download |
| from huggingface_hub.utils import RepositoryNotFoundError, EntryNotFoundError |
| import google.generativeai as genai |
|
|
| |
| DATASET_REPO = "Ezmary/Karbaran-rayegan-tedad" |
| DATASET_FILENAME = "text_video_usage_data.json" |
| USAGE_LIMIT = 5 |
| HF_TOKEN = os.environ.get("HF_TOKEN") |
|
|
| |
| ALL_GEMINI_API_KEYS = os.environ.get("ALL_GEMINI_API_KEYS") |
| gemini_keys = [] |
| key_index = 0 |
| key_rotation_lock = threading.Lock() |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| app = Flask(__name__) |
| CORS(app) |
|
|
| |
| |
| |
| def to_persian_numerals(number): |
| """یک عدد (یا رشته عددی) را به رشتهای با اعداد فارسی تبدیل میکند.""" |
| try: |
| |
| persian_map = str.maketrans('0123456789', '۰۱۲۳۴۵۶۷۸۹') |
| return str(number).translate(persian_map) |
| except Exception: |
| |
| return str(number) |
| |
|
|
| |
| usage_data_cache = [] |
| cache_lock = threading.Lock() |
| data_changed = threading.Event() |
| api = None |
|
|
| if not HF_TOKEN: |
| logging.error("CRITICAL: Secret 'HF_TOKEN' not found. Cannot access the private dataset.") |
| else: |
| api = HfApi(token=HF_TOKEN) |
| logging.info("HfApi initialized successfully.") |
|
|
| |
| if not ALL_GEMINI_API_KEYS: |
| logging.error("CRITICAL: Secret 'ALL_GEMINI_API_KEYS' not found or is empty. Prompt enhancement will fail.") |
| else: |
| gemini_keys = [key.strip() for key in ALL_GEMINI_API_KEYS.split(',') if key.strip()] |
| if gemini_keys: |
| |
| |
| |
| num_keys_persian = to_persian_numerals(len(gemini_keys)) |
| logging.info(f"✅ با موفقیت {num_keys_persian} کلید Gemini برای چرخش بارگیری شد.") |
| |
| else: |
| logging.error("CRITICAL: 'ALL_GEMINI_API_KEYS' secret was found but contained no valid keys after parsing.") |
|
|
| def load_initial_data(): |
| global usage_data_cache |
| with cache_lock: |
| if not api: return |
| try: |
| logging.info(f"Attempting to load data from '{DATASET_REPO}'...") |
| local_path = hf_hub_download( |
| repo_id=DATASET_REPO, |
| filename=DATASET_FILENAME, |
| repo_type="dataset", |
| token=HF_TOKEN, |
| force_download=True |
| ) |
| with open(local_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| usage_data_cache = json.loads(content) if content else [] |
| |
| logging.info(f"تعداد {to_persian_numerals(len(usage_data_cache))} رکورد بارگیری شد.") |
| except (RepositoryNotFoundError, EntryNotFoundError): |
| logging.warning("فایل دیتاسیت پیدا نشد. یک فایل جدید ساخته خواهد شد.") |
| usage_data_cache = [] |
| except Exception as e: |
| logging.warning(f"Could not load initial data (this is often normal in Spaces due to cache permissions): {e}") |
| usage_data_cache = [] |
|
|
| def persist_data_to_hub(): |
| with cache_lock: |
| if not data_changed.is_set() or not api: |
| return |
| |
| logging.info("تغییرات شناسایی شد، در حال آمادهسازی برای نوشتن در هاب...") |
| try: |
| data_to_write = list(usage_data_cache) |
| temp_filepath = "/tmp/temp_usage_data.json" |
| |
| with open(temp_filepath, 'w', encoding='utf-8') as f: |
| json.dump(data_to_write, f, ensure_ascii=False, indent=2) |
| |
| api.upload_file( |
| path_or_fileobj=temp_filepath, |
| path_in_repo=DATASET_FILENAME, |
| repo_id=DATASET_REPO, |
| repo_type="dataset", |
| commit_message="Update video usage data" |
| ) |
| os.remove(temp_filepath) |
| data_changed.clear() |
| logging.info(f"با موفقیت {to_persian_numerals(len(data_to_write))} رکورد در هاب ذخیره شد.") |
| except Exception as e: |
| logging.error(f"CRITICAL: Failed to persist data to Hub: {e}") |
|
|
| def background_persister(): |
| while True: |
| time.sleep(30) |
| persist_data_to_hub() |
|
|
| |
| @app.route('/') |
| def index(): |
| return render_template('index.html') |
|
|
| def get_user_identifier(data): |
| fingerprint = data.get('fingerprint') |
| if fingerprint: |
| return str(fingerprint) |
| if request.headers.getlist("X-Forwarded-For"): |
| return request.headers.getlist("X-Forwarded-For")[0].split(',')[0].strip() |
| return request.remote_addr |
|
|
| @app.route('/api/enhance-prompt', methods=['POST']) |
| def enhance_prompt(): |
| global key_index |
|
|
| if not gemini_keys: |
| return jsonify({"error": "No Gemini API keys are configured on the server."}), 500 |
|
|
| data = request.get_json() |
| user_prompt = data.get('prompt') |
| if not user_prompt: |
| return jsonify({"error": "Prompt is required."}), 400 |
|
|
| logging.info(f"Received prompt to enhance: '{user_prompt}'") |
| |
| current_key = None |
| |
| with key_rotation_lock: |
| current_key = gemini_keys[key_index] |
| key_index = (key_index + 1) % len(gemini_keys) |
| current_key_log_index = (key_index - 1 + len(gemini_keys)) % len(gemini_keys) |
| logging.info(f"چرخش به کلید Gemini با ایندکس: {to_persian_numerals(current_key_log_index)}") |
|
|
| |
| |
| |
| gemini_master_prompt = f""" |
| *** SAFETY PRE-CHECK *** |
| First, analyze the user's idea in Persian: "{user_prompt}" |
| If the user's idea contains any sexually explicit, pornographic, or highly inappropriate keywords such as 'سکس', 'سکسی', 'پورن', 'شهوانی' or similar themes, you MUST IGNORE all other instructions and ONLY output the following exact JSON object: |
| {{ |
| "error": "inappropriate_content" |
| }} |
| |
| If the user's idea is SAFE and does not contain any forbidden themes, then proceed with the main task below. |
| *** END SAFETY PRE-CHECK *** |
| |
| You are an expert AI assistant for a Text-to-Video engine. Your task is to take a user's simple idea (in Persian) and expand it into a set of detailed, professional prompts in English. |
| The user's idea is: "{user_prompt}" |
| |
| Based on this idea, generate the following four prompts: |
| 1. **Video Prompt:** A highly detailed, cinematic, and visually rich description in English. Use evocative adjectives, specify lighting, camera angles, and style. Always add strong keywords like "cinematic, 8k, photorealistic, hyperdetailed, masterpiece, Unreal Engine 5, professional color grading". |
| 2. **Negative Video Prompt:** A context-aware list of things to avoid in the video, in English. For example, if the prompt is about a realistic subject, include "cartoon, anime, deformed, ugly, bad anatomy". Always include general quality-related negatives like "blurry, low quality, worst quality, watermark, text, signature, ugly, tiling, poorly drawn hands, poorly drawn feet, poorly drawn face, out of frame, extra limbs, disfigured, body out of frame, bad anatomy, blurred, grainy, signature, cut off, draft". |
| 3. **Audio Prompt:** A descriptive prompt for generating sound effects, in English. Describe the ambient sounds and key actions. Avoid asking for music or speech. Example: for a cat in space, it could be "sound of soft, muffled footsteps on a metallic surface, gentle humming of life support systems, quiet breathing, vast silence of space". |
| 4. **Negative Audio Prompt:** A list of sounds to avoid. This should almost always be "music, speech, singing, narration, voice, low quality audio". |
| |
| Provide the output ONLY in a clean JSON format, without any markdown formatting. The JSON must look exactly like this: |
| {{ |
| "video_prompt": "...", |
| "video_negative_prompt": "...", |
| "audio_prompt": "...", |
| "audio_negative_prompt": "..." |
| }} |
| """ |
| try: |
| genai.configure(api_key=current_key) |
| model = genai.GenerativeModel('gemini-2.5-flash') |
| response = model.generate_content(gemini_master_prompt) |
| |
| cleaned_response = response.text.strip().replace("```json", "").replace("```", "").strip() |
| logging.info(f"Gemini raw response: {cleaned_response}") |
| |
| enhanced_prompts = json.loads(cleaned_response) |
|
|
| |
| if "error" in enhanced_prompts and enhanced_prompts["error"] == "inappropriate_content": |
| logging.warning(f"Inappropriate content detected for prompt: '{user_prompt}'") |
| |
| return jsonify({ |
| "error": "متن ورودی به عنوان محتوای نامناسب تشخیص داده شد.", |
| "code": "INAPPROPRIATE_CONTENT" |
| }), 400 |
|
|
| return jsonify(enhanced_prompts) |
|
|
| except json.JSONDecodeError as e: |
| logging.error(f"Failed to decode JSON from Gemini: {e}. Response was: {cleaned_response}") |
| return jsonify({"error": "Failed to parse response from AI. The AI may have returned an invalid format."}), 500 |
| except Exception as e: |
| logging.error(f"An error occurred with the Gemini API: {e}") |
| return jsonify({"error": f"An error occurred while communicating with the enhancement AI: {str(e)}"}), 500 |
|
|
|
|
| @app.route('/api/check-credit', methods=['POST']) |
| def check_credit(): |
| |
| data = request.get_json() |
| if not data: return jsonify({"error": "Invalid request"}), 400 |
| user_id = get_user_identifier(data) |
| if not user_id: return jsonify({"error": "User identifier is required."}), 400 |
| with cache_lock: |
| now = time.time() |
| one_week_seconds = 7 * 24 * 60 * 60 |
| user_record = next((user for user in usage_data_cache if user.get('id') == user_id), None) |
| credits_remaining = USAGE_LIMIT |
| limit_reached = False |
| reset_timestamp = 0 |
| if user_record: |
| if user_record.get('week_start', 0) < (now - one_week_seconds): |
| user_record['count'] = 0 |
| user_record['week_start'] = now |
| data_changed.set() |
| credits_remaining = max(0, USAGE_LIMIT - user_record.get('count', 0)) |
| if credits_remaining == 0: |
| limit_reached = True |
| reset_timestamp = user_record.get('week_start', now) + one_week_seconds |
| return jsonify({ |
| "credits_remaining": credits_remaining, |
| "limit_reached": limit_reached, |
| "reset_timestamp": reset_timestamp |
| }) |
|
|
| @app.route('/api/use-credit', methods=['POST']) |
| def use_credit(): |
| |
| data = request.get_json() |
| if not data: return jsonify({"error": "Invalid request"}), 400 |
| user_id = get_user_identifier(data) |
| if not user_id: return jsonify({"error": "User identifier is required."}), 400 |
| with cache_lock: |
| now = time.time() |
| one_week_seconds = 7 * 24 * 60 * 60 |
| user_record = next((user for user in usage_data_cache if user.get('id') == user_id), None) |
| if user_record: |
| if user_record.get('week_start', 0) < (now - one_week_seconds): |
| user_record['count'] = 0 |
| user_record['week_start'] = now |
| if user_record.get('count', 0) >= USAGE_LIMIT: |
| reset_timestamp = user_record.get('week_start', now) + one_week_seconds |
| return jsonify({ |
| "status": "limit_reached", |
| "credits_remaining": 0, |
| "reset_timestamp": reset_timestamp |
| }), 429 |
| user_record['count'] += 1 |
| else: |
| user_record = {"id": user_id, "count": 1, "week_start": now} |
| usage_data_cache.append(user_record) |
| credits_remaining = USAGE_LIMIT - user_record['count'] |
| data_changed.set() |
| return jsonify({"status": "success", "credits_remaining": credits_remaining}) |
|
|
| |
| if __name__ != '__main__': |
| load_initial_data() |
| persister_thread = threading.Thread(target=background_persister, daemon=True) |
| persister_thread.start() |
|
|
| if __name__ == '__main__': |
| port = int(os.environ.get('PORT', 7860)) |
| app.run(host='0.0.0.0', port=port) |