Zirnavis9 / app.py
Hamed744's picture
Upload app.py
ed72784 verified
import os
import json
import time
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS
import logging
import threading
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import RepositoryNotFoundError, EntryNotFoundError
import google.generativeai as genai
# --- تنظیمات اصلی ---
DATASET_REPO = "Ezmary/Karbaran-rayegan-tedad"
DATASET_FILENAME = "text_video_usage_data.json"
USAGE_LIMIT = 5
HF_TOKEN = os.environ.get("HF_TOKEN")
# --- تنظیمات چرخش کلیدهای Gemini ---
ALL_GEMINI_API_KEYS = os.environ.get("ALL_GEMINI_API_KEYS")
gemini_keys = []
key_index = 0
key_rotation_lock = threading.Lock()
# --- راه‌اندازی Flask و لاگ‌ها ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
app = Flask(__name__)
CORS(app)
# ======================================================================
# ✅✅✅ بخش جدید: تابع تبدیل اعداد به فارسی ✅✅✅
# ======================================================================
def to_persian_numerals(number):
"""یک عدد (یا رشته عددی) را به رشته‌ای با اعداد فارسی تبدیل می‌کند."""
try:
# بهترین و بهینه‌ترین روش برای جایگزینی کاراکترها در پایتون
persian_map = str.maketrans('0123456789', '۰۱۲۳۴۵۶۷۸۹')
return str(number).translate(persian_map)
except Exception:
# اگر به هر دلیلی خطا رخ داد، خود عدد اصلی را برگردان
return str(number)
# ======================================================================
# --- مدیریت داده‌های کاربران ---
usage_data_cache = []
cache_lock = threading.Lock()
data_changed = threading.Event()
api = None
if not HF_TOKEN:
logging.error("CRITICAL: Secret 'HF_TOKEN' not found. Cannot access the private dataset.")
else:
api = HfApi(token=HF_TOKEN)
logging.info("HfApi initialized successfully.")
# --- راه‌اندازی و لاگ کردن کلیدهای Gemini ---
if not ALL_GEMINI_API_KEYS:
logging.error("CRITICAL: Secret 'ALL_GEMINI_API_KEYS' not found or is empty. Prompt enhancement will fail.")
else:
gemini_keys = [key.strip() for key in ALL_GEMINI_API_KEYS.split(',') if key.strip()]
if gemini_keys:
# ======================================================================
# ✅✅✅ بخش اصلاح شده: استفاده از تابع تبدیل و پیام فارسی ✅✅✅
# ======================================================================
num_keys_persian = to_persian_numerals(len(gemini_keys))
logging.info(f"✅ با موفقیت {num_keys_persian} کلید Gemini برای چرخش بارگیری شد.")
# ======================================================================
else:
logging.error("CRITICAL: 'ALL_GEMINI_API_KEYS' secret was found but contained no valid keys after parsing.")
def load_initial_data():
global usage_data_cache
with cache_lock:
if not api: return
try:
logging.info(f"Attempting to load data from '{DATASET_REPO}'...")
local_path = hf_hub_download(
repo_id=DATASET_REPO,
filename=DATASET_FILENAME,
repo_type="dataset",
token=HF_TOKEN,
force_download=True
)
with open(local_path, 'r', encoding='utf-8') as f:
content = f.read()
usage_data_cache = json.loads(content) if content else []
# لاگ تعداد رکوردهای بارگیری شده را هم فارسی می‌کنیم
logging.info(f"تعداد {to_persian_numerals(len(usage_data_cache))} رکورد بارگیری شد.")
except (RepositoryNotFoundError, EntryNotFoundError):
logging.warning("فایل دیتاسیت پیدا نشد. یک فایل جدید ساخته خواهد شد.")
usage_data_cache = []
except Exception as e:
logging.warning(f"Could not load initial data (this is often normal in Spaces due to cache permissions): {e}")
usage_data_cache = []
def persist_data_to_hub():
with cache_lock:
if not data_changed.is_set() or not api:
return
logging.info("تغییرات شناسایی شد، در حال آماده‌سازی برای نوشتن در هاب...")
try:
data_to_write = list(usage_data_cache)
temp_filepath = "/tmp/temp_usage_data.json"
with open(temp_filepath, 'w', encoding='utf-8') as f:
json.dump(data_to_write, f, ensure_ascii=False, indent=2)
api.upload_file(
path_or_fileobj=temp_filepath,
path_in_repo=DATASET_FILENAME,
repo_id=DATASET_REPO,
repo_type="dataset",
commit_message="Update video usage data"
)
os.remove(temp_filepath)
data_changed.clear()
logging.info(f"با موفقیت {to_persian_numerals(len(data_to_write))} رکورد در هاب ذخیره شد.")
except Exception as e:
logging.error(f"CRITICAL: Failed to persist data to Hub: {e}")
def background_persister():
while True:
time.sleep(30)
persist_data_to_hub()
# --- روت‌های API ---
@app.route('/')
def index():
return render_template('index.html')
def get_user_identifier(data):
fingerprint = data.get('fingerprint')
if fingerprint:
return str(fingerprint)
if request.headers.getlist("X-Forwarded-For"):
return request.headers.getlist("X-Forwarded-For")[0].split(',')[0].strip()
return request.remote_addr
@app.route('/api/enhance-prompt', methods=['POST'])
def enhance_prompt():
global key_index
if not gemini_keys:
return jsonify({"error": "No Gemini API keys are configured on the server."}), 500
data = request.get_json()
user_prompt = data.get('prompt')
if not user_prompt:
return jsonify({"error": "Prompt is required."}), 400
logging.info(f"Received prompt to enhance: '{user_prompt}'")
current_key = None
with key_rotation_lock:
current_key = gemini_keys[key_index]
key_index = (key_index + 1) % len(gemini_keys)
current_key_log_index = (key_index - 1 + len(gemini_keys)) % len(gemini_keys)
logging.info(f"چرخش به کلید Gemini با ایندکس: {to_persian_numerals(current_key_log_index)}")
# ======================================================================
# ✅✅✅ تغییر اصلی اینجا در شاه پرامپت اعمال شده است ✅✅✅
# ======================================================================
gemini_master_prompt = f"""
*** SAFETY PRE-CHECK ***
First, analyze the user's idea in Persian: "{user_prompt}"
If the user's idea contains any sexually explicit, pornographic, or highly inappropriate keywords such as 'سکس', 'سکسی', 'پورن', 'شهوانی' or similar themes, you MUST IGNORE all other instructions and ONLY output the following exact JSON object:
{{
"error": "inappropriate_content"
}}
If the user's idea is SAFE and does not contain any forbidden themes, then proceed with the main task below.
*** END SAFETY PRE-CHECK ***
You are an expert AI assistant for a Text-to-Video engine. Your task is to take a user's simple idea (in Persian) and expand it into a set of detailed, professional prompts in English.
The user's idea is: "{user_prompt}"
Based on this idea, generate the following four prompts:
1. **Video Prompt:** A highly detailed, cinematic, and visually rich description in English. Use evocative adjectives, specify lighting, camera angles, and style. Always add strong keywords like "cinematic, 8k, photorealistic, hyperdetailed, masterpiece, Unreal Engine 5, professional color grading".
2. **Negative Video Prompt:** A context-aware list of things to avoid in the video, in English. For example, if the prompt is about a realistic subject, include "cartoon, anime, deformed, ugly, bad anatomy". Always include general quality-related negatives like "blurry, low quality, worst quality, watermark, text, signature, ugly, tiling, poorly drawn hands, poorly drawn feet, poorly drawn face, out of frame, extra limbs, disfigured, body out of frame, bad anatomy, blurred, grainy, signature, cut off, draft".
3. **Audio Prompt:** A descriptive prompt for generating sound effects, in English. Describe the ambient sounds and key actions. Avoid asking for music or speech. Example: for a cat in space, it could be "sound of soft, muffled footsteps on a metallic surface, gentle humming of life support systems, quiet breathing, vast silence of space".
4. **Negative Audio Prompt:** A list of sounds to avoid. This should almost always be "music, speech, singing, narration, voice, low quality audio".
Provide the output ONLY in a clean JSON format, without any markdown formatting. The JSON must look exactly like this:
{{
"video_prompt": "...",
"video_negative_prompt": "...",
"audio_prompt": "...",
"audio_negative_prompt": "..."
}}
"""
try:
genai.configure(api_key=current_key)
model = genai.GenerativeModel('gemini-2.5-flash') # Updated to a more recent model
response = model.generate_content(gemini_master_prompt)
cleaned_response = response.text.strip().replace("```json", "").replace("```", "").strip()
logging.info(f"Gemini raw response: {cleaned_response}")
enhanced_prompts = json.loads(cleaned_response)
# ✅ بررسی پاسخ برای خطای محتوای نامناسب
if "error" in enhanced_prompts and enhanced_prompts["error"] == "inappropriate_content":
logging.warning(f"Inappropriate content detected for prompt: '{user_prompt}'")
# یک کد خطای مشخص به فرانت‌اند ارسال می‌کنیم
return jsonify({
"error": "متن ورودی به عنوان محتوای نامناسب تشخیص داده شد.",
"code": "INAPPROPRIATE_CONTENT"
}), 400
return jsonify(enhanced_prompts)
except json.JSONDecodeError as e:
logging.error(f"Failed to decode JSON from Gemini: {e}. Response was: {cleaned_response}")
return jsonify({"error": "Failed to parse response from AI. The AI may have returned an invalid format."}), 500
except Exception as e:
logging.error(f"An error occurred with the Gemini API: {e}")
return jsonify({"error": f"An error occurred while communicating with the enhancement AI: {str(e)}"}), 500
@app.route('/api/check-credit', methods=['POST'])
def check_credit():
# این بخش بدون تغییر باقی می‌ماند
data = request.get_json()
if not data: return jsonify({"error": "Invalid request"}), 400
user_id = get_user_identifier(data)
if not user_id: return jsonify({"error": "User identifier is required."}), 400
with cache_lock:
now = time.time()
one_week_seconds = 7 * 24 * 60 * 60
user_record = next((user for user in usage_data_cache if user.get('id') == user_id), None)
credits_remaining = USAGE_LIMIT
limit_reached = False
reset_timestamp = 0
if user_record:
if user_record.get('week_start', 0) < (now - one_week_seconds):
user_record['count'] = 0
user_record['week_start'] = now
data_changed.set()
credits_remaining = max(0, USAGE_LIMIT - user_record.get('count', 0))
if credits_remaining == 0:
limit_reached = True
reset_timestamp = user_record.get('week_start', now) + one_week_seconds
return jsonify({
"credits_remaining": credits_remaining,
"limit_reached": limit_reached,
"reset_timestamp": reset_timestamp
})
@app.route('/api/use-credit', methods=['POST'])
def use_credit():
# این بخش بدون تغییر باقی می‌ماند
data = request.get_json()
if not data: return jsonify({"error": "Invalid request"}), 400
user_id = get_user_identifier(data)
if not user_id: return jsonify({"error": "User identifier is required."}), 400
with cache_lock:
now = time.time()
one_week_seconds = 7 * 24 * 60 * 60
user_record = next((user for user in usage_data_cache if user.get('id') == user_id), None)
if user_record:
if user_record.get('week_start', 0) < (now - one_week_seconds):
user_record['count'] = 0
user_record['week_start'] = now
if user_record.get('count', 0) >= USAGE_LIMIT:
reset_timestamp = user_record.get('week_start', now) + one_week_seconds
return jsonify({
"status": "limit_reached",
"credits_remaining": 0,
"reset_timestamp": reset_timestamp
}), 429
user_record['count'] += 1
else:
user_record = {"id": user_id, "count": 1, "week_start": now}
usage_data_cache.append(user_record)
credits_remaining = USAGE_LIMIT - user_record['count']
data_changed.set()
return jsonify({"status": "success", "credits_remaining": credits_remaining})
# --- اجرای برنامه ---
if __name__ != '__main__':
load_initial_data()
persister_thread = threading.Thread(target=background_persister, daemon=True)
persister_thread.start()
if __name__ == '__main__':
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port)