import gradio as gr import requests import asyncio import aiohttp from aiohttp_socks import ProxyConnector import os import shutil import time import random import json import base64 from PIL import Image from urllib.parse import quote, urlencode # --- Конфигурация --- TEXT_API_URL = "https://text.pollinations.ai/openai" IMAGE_API_URL = "https://image.pollinations.ai/prompt/" IMGBB_API_URL = "https://api.imgbb.com/1/upload" IMGBB_API_KEY = "ed401abea23acbc672ce915ae0f53cf1" TEMP_DIR = "temp_frames" OUTPUT_DIR = "output_gifs" ARCHIVE_DIR = "archives" API_KEYS_FILE = "api_keys.txt" # --- Загрузка API ключей из файла --- def load_api_keys(): """Загружает API ключи из текстового файла""" try: with open(API_KEYS_FILE, 'r', encoding='utf-8') as f: keys = [line.strip() for line in f.readlines() if line.strip()] return keys except FileNotFoundError: print(f"WARNING: {API_KEYS_FILE} не найден. Работаем без ключей.") return [] # --- Класс для ротации API ключей --- class APIKeyRotator: def __init__(self): self.keys = load_api_keys() self.current_index = 0 self.usage_count = {} def get_next_key(self): """Возвращает следующий API ключ в ротации""" if not self.keys: return None key = self.keys[self.current_index] self.usage_count[key] = self.usage_count.get(key, 0) + 1 self.current_index = (self.current_index + 1) % len(self.keys) return key def get_status(self): """Возвращает статус использования ключей""" if not self.keys: return "🔑 API ключи не загружены" status = f"🔑 Загружено {len(self.keys)} API ключей\n" for i, key in enumerate(self.keys): masked_key = key[:8] + "..." + key[-4:] if len(key) > 12 else key usage = self.usage_count.get(key, 0) current_marker = " ← Текущий" if i == self.current_index else "" status += f" {i+1}. {masked_key} (использован {usage} раз){current_marker}\n" return status # Глобальный объект ротатора key_rotator = APIKeyRotator() # --- Загрузка доступных моделей из файлов --- try: with open('modelsTEXT.json', 'r', encoding='utf-8') as f: all_text_models = json.load(f) TEXT_MODELS = sorted([ m['name'] for m in all_text_models if "text" in m.get("input_modalities", []) and not m.get("audio") ]) except FileNotFoundError: print("WARNING: modelsTEXT.json not found. Using default list.") TEXT_MODELS = ["openai", "grok", "mistral-roblox", "deepseek-reasoning"] IMAGE_MODELS_IMG2IMG = ["kontext", "gptimage", "flux"] # --- Функция загрузки изображения на ImgBB --- async def upload_to_imgbb(session, image_path, log_status_func, frame_num): """Загружает изображение на ImgBB и возвращает прямую ссылку""" try: # Читаем изображение и конвертируем в base64 with open(image_path, 'rb') as image_file: image_data = base64.b64encode(image_file.read()).decode('utf-8') # Подготавливаем данные для отправки payload = { 'key': IMGBB_API_KEY, 'image': image_data, 'name': f'frame_{frame_num}_{int(time.time())}' } log_status_func(f" 📤 Кадр {frame_num}: загрузка на ImgBB...") # Отправляем запрос на ImgBB async with session.post(IMGBB_API_URL, data=payload, timeout=60) as response: if response.status == 200: result = await response.json() if result.get('success'): image_url = result['data']['url'] log_status_func(f" ✅ Кадр {frame_num}: загружен на ImgBB ({image_url[:50]}...)") return image_url else: log_status_func(f" ❌ Кадр {frame_num}: ImgBB ошибка - {result.get('error', {}).get('message', 'Unknown error')}") return None else: error_text = await response.text() log_status_func(f" ❌ Кадр {frame_num}: ImgBB HTTP {response.status} - {error_text[:100]}") return None except Exception as e: log_status_func(f" ❌ Кадр {frame_num}: ошибка загрузки на ImgBB - {str(e)[:100]}") return None # --- Асинхронные функции для работы с API --- async def call_director_api(session, prompt, director_model, seed, headers, log_status_func): """Отправляет запрос к API модели-режиссёра для получения сценария""" payload = { "model": director_model, "response_format": {"type": "json_object"}, "messages": [ { "role": "system", "content": "You are a scriptwriter for a short animated GIF. You describe how a scene should EVOLVE and TRANSFORM frame by frame. Generate a JSON object with a key 'prompts', containing a list of strings. The first prompt is a full description of the initial scene. Subsequent prompts are short, describing the CHANGES to the previous frame (e.g., 'the man starts smiling', 'a bird flies into the frame', 'the colors become more vibrant')." }, {"role": "user", "content": prompt} ], "seed": seed } max_retries = 3 for attempt in range(max_retries): api_key = key_rotator.get_next_key() if api_key: headers["Authorization"] = f"Bearer {api_key}" log_status_func(f"🔑 Используем API ключ #{key_rotator.current_index} для режиссёра") try: async with session.post(TEXT_API_URL, json=payload, headers=headers, timeout=120) as response: if response.status == 200: resp_json = await response.json() content_str = resp_json.get('choices', [{}])[0].get('message', {}).get('content') if content_str: return content_str return "{}" else: error_text = await response.text() log_status_func(f"⚠️ Режиссёр API попытка {attempt + 1}: статус {response.status}") if attempt == max_retries - 1: return f"API Error: {response.status}" except Exception as e: log_status_func(f"⚠️ Режиссёр API попытка {attempt + 1}: ошибка {e}") if attempt == max_retries - 1: return f"System Error: {e}" if attempt < max_retries - 1: await asyncio.sleep(3) return "Error: All attempts failed" async def call_artist_api_with_retries(session, base_url, params, frame_path, headers, log_status_func, frame_num): """Скачивает изображение с ротацией API ключей""" max_retries = 3 for attempt in range(max_retries): api_key = key_rotator.get_next_key() if api_key: headers["Authorization"] = f"Bearer {api_key}" key_info = f"ключ #{key_rotator.current_index}" else: key_info = "без ключа" params_copy = params.copy() params_copy["seed"] = params["seed"] + attempt * 1000 full_url = f"{base_url}?{urlencode(params_copy)}" try: log_status_func(f" 🎨 Кадр {frame_num}: попытка {attempt + 1}/{max_retries} ({key_info})") async with session.get(full_url, timeout=240, headers=headers) as response: if response.status == 200: content = await response.read() with open(frame_path, "wb") as f: f.write(content) log_status_func(f" ✅ Кадр {frame_num}: успешно сгенерирован ({key_info})") return frame_path else: log_status_func(f" ❌ Кадр {frame_num}: статус {response.status} ({key_info})") except Exception as e: log_status_func(f" ❌ Кадр {frame_num}: ошибка {str(e)[:50]}... ({key_info})") if attempt < max_retries - 1: wait_time = 5 + attempt * 3 log_status_func(f" ⏳ Пауза {wait_time} сек перед следующей попыткой...") await asyncio.sleep(wait_time) return None # --- Основная логика --- async def generate_gif_with_rotation( proxy_string, main_idea, fps, duration, width, height, director_model, artist_model, seed, delay_between_frames, progress=gr.Progress(track_tqdm=True) ): """Главная функция с ротацией API ключей и загрузкой на ImgBB""" status_updates = [] def log_status(message): print(message) status_updates.append(message) log_status("🚀 Запуск GIF-генератора с ImgBB загрузкой!") log_status(key_rotator.get_status()) if not main_idea: raise gr.Error("Основная идея не может быть пустой!") # Настройка прокси connector = None if proxy_string: try: connector = ProxyConnector.from_url(proxy_string) log_status(f"🔗 Прокси: {proxy_string.split('@')[-1]}") except Exception as e: raise gr.Error(f"Ошибка прокси: {e}") # Подготовка директорий for dir_path in [TEMP_DIR, OUTPUT_DIR, ARCHIVE_DIR]: os.makedirs(dir_path, exist_ok=True) for filename in os.listdir(TEMP_DIR): os.remove(os.path.join(TEMP_DIR, filename)) total_frames = int(fps * duration) seed = int(seed) if seed != 0 else random.randint(1, 1000000) log_status(f"🌱 Seed: {seed} | 🎬 Кадров: {total_frames}") headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession(connector=connector) as session: # 1. Получение сценария от режиссёра log_status(f"🧠 Запрос к режиссёру ({director_model})...") director_prompt = f"Based on the core idea '{main_idea}', generate a list of exactly {total_frames} prompts for an evolving animation." director_result_str = await call_director_api(session, director_prompt, director_model, seed, headers, log_status) # 2. Парсинг сценария key_prompts = [] try: director_data = json.loads(director_result_str) key_prompts = director_data.get("prompts", []) except (json.JSONDecodeError, AttributeError): log_status(f"❌ Ошибка парсинга от режиссёра: {director_result_str}") return None, "\n".join(status_updates) log_status(f"📝 Получено {len(key_prompts)} промптов от режиссёра") if not key_prompts: return None, "\n".join(status_updates) # 3. Генерация кадров с загрузкой на ImgBB generated_files = [] previous_frame_url = None for i in progress.tqdm(range(total_frames), desc="🎨 Генерация кадров"): log_status(f"🖼️ Кадр {i + 1}/{total_frames}") frame_path = os.path.join(TEMP_DIR, f"frame_{i:04d}.png") frame_seed = seed + i * 10 current_prompt = key_prompts[i % len(key_prompts)] # Базовые параметры params = { "seed": frame_seed, "width": width, "height": height, "nologo": "true" } if i == 0: # Первый кадр: text-to-image params["model"] = "flux" base_url = f"{IMAGE_API_URL}{quote(current_prompt)}" else: # Последующие кадры: img-to-img с прямой ссылкой на ImgBB params["model"] = artist_model if previous_frame_url: params["image"] = previous_frame_url log_status(f" 🔗 Используем ImgBB ссылку: {previous_frame_url[:50]}...") base_url = f"{IMAGE_API_URL}{quote(current_prompt)}" # Генерация кадра с ротацией ключей file_path = await call_artist_api_with_retries( session, base_url, params, frame_path, headers, log_status, i + 1 ) if file_path: generated_files.append(file_path) # Загружаем сгенерированный кадр на ImgBB для следующего кадра imgbb_url = await upload_to_imgbb(session, file_path, log_status, i + 1) if imgbb_url: previous_frame_url = imgbb_url else: log_status(f"⚠️ Не удалось загрузить кадр {i + 1} на ImgBB, продолжаем без img2img...") previous_frame_url = None else: log_status(f"💥 Генерация прервана на кадре {i + 1}") break # Пауза между кадрами if i < total_frames - 1 and delay_between_frames > 0: log_status(f"⏸️ Пауза {delay_between_frames} сек...") await asyncio.sleep(delay_between_frames) if not generated_files: log_status("❌ Не удалось сгенерировать ни одного кадра") return None, "\n".join(status_updates) # 4. Сборка GIF log_status(f"🎞️ Сборка GIF из {len(generated_files)} кадров...") images = [Image.open(f) for f in generated_files] timestamp = time.strftime("%Y%m%d-%H%M%S") gif_path = os.path.join(OUTPUT_DIR, f"animation_{timestamp}.gif") images[0].save( gif_path, save_all=True, append_images=images[1:], duration=1000 // fps, loop=0 ) # 5. Финальная статистика log_status(f"✅ GIF готова: {gif_path}") log_status("📊 Финальная статистика использования ключей:") log_status(key_rotator.get_status()) return gif_path, "\n".join(status_updates) # --- Улучшенный интерфейс Gradio --- def refresh_keys(): """Обновляет список API ключей""" global key_rotator key_rotator = APIKeyRotator() return key_rotator.get_status() with gr.Blocks(theme=gr.themes.Soft(), title="🎬 GIF Generator Pro") as demo: gr.HTML("""
Генерация анимированных GIF с ImgBB загрузкой и ротацией API ключей