| import gradio as gr |
| import asyncio |
| import aiohttp |
| import os |
| import shutil |
| import time |
| import random |
| import json |
| from PIL import Image |
| from aiohttp_socks import ProxyConnector |
| from typing import List, Dict, Optional, Tuple |
|
|
| |
| |
| TEXT_API_URL = "https://text.pollinations.ai/openai" |
| IMAGE_API_URL = "https://image.pollinations.ai/prompt" |
|
|
| |
| API_KEYS_FILE = "api_keys.txt" |
| TEMP_DIR = "temp_frames" |
| OUTPUT_DIR = "output_gifs" |
| ARCHIVE_DIR = "archives" |
|
|
| |
| IMAGE_MODELS_IMG2IMG = ["kontext", "gptimage", "flux"] |
|
|
|
|
| |
| class APIKeyRotator: |
| """Управляет загрузкой, ротацией и отслеживанием использования API ключей.""" |
| def __init__(self, filepath: str): |
| self.filepath = filepath |
| |
| self.keys: List[str] = self._load_keys_from_env_or_file() |
| self.current_index: int = 0 |
| self.usage_count: Dict[str, int] = {key: 0 for key in self.keys} |
|
|
| def _load_keys_from_env_or_file(self) -> List[str]: |
| """Загружает ключи из переменных окружения (секреты HF) или из файла.""" |
| |
| hf_secrets = os.environ.get("POLLINATIONS_API_KEYS") |
| if hf_secrets: |
| print("Загрузка API ключей из Секретов Hugging Face.") |
| return [key.strip() for key in hf_secrets.split(',') if key.strip()] |
| |
| |
| print("Секреты HF не найдены. Попытка загрузки из api_keys.txt.") |
| try: |
| with open(self.filepath, 'r', encoding='utf-8') as f: |
| return [line.strip() for line in f.readlines() if line.strip()] |
| except FileNotFoundError: |
| return [] |
|
|
| def get_next_key(self) -> Optional[str]: |
| if not self.keys: return None |
| key = self.keys[self.current_index] |
| self.usage_count[key] += 1 |
| self.current_index = (self.current_index + 1) % len(self.keys) |
| return key |
|
|
| def get_status(self) -> str: |
| if not self.keys: |
| return "🔑 **Статус**: API ключи для генерации изображений не найдены. Добавьте их в Секреты Hugging Face или в файл `api_keys.txt`." |
| status_lines = [f"🔑 **Загружено ключей (для изображений)**: {len(self.keys)}"] |
| for i, key in enumerate(self.keys): |
| masked_key = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else key |
| usage = self.usage_count.get(key, 0) |
| is_next = " ⬅️" if i == self.current_index else "" |
| status_lines.append(f" - `{masked_key}` (Исп: {usage}){is_next}") |
| return "\n".join(status_lines) |
|
|
| def refresh(self): |
| self.__init__(self.filepath) |
|
|
|
|
| |
| key_rotator = APIKeyRotator(API_KEYS_FILE) |
|
|
|
|
| |
| async def _call_director_api(session: aiohttp.ClientSession, prompt: str, seed: int) -> Tuple[Optional[List[str]], str]: |
| """Запрашивает сценарий у модели 'openai-fast'. Ключ не требуется.""" |
| payload = { |
| "model": "openai-fast", "response_format": {"type": "json_object"}, |
| "messages": [ |
| {"role": "system", "content": "You are a scriptwriter for a short animated GIF..."}, |
| {"role": "user", "content": prompt} |
| ], "seed": seed |
| } |
| try: |
| async with session.post(TEXT_API_URL, json=payload, headers={}, timeout=120) as response: |
| if response.status == 200: |
| response_text = await response.text() |
| prompts_str = json.loads(response_text).get('choices', [{}])[0].get('message', {}).get('content') |
| prompts = json.loads(prompts_str).get("prompts", []) |
| return prompts, f"✅ **Режиссёр (`openai-fast`)**: Сценарий на {len(prompts)} кадров получен." |
| return None, f"❌ **Режиссёр**: Ошибка API (Статус: {response.status})." |
| except Exception as e: |
| return None, f"❌ **Режиссёр**: Ошибка. {e}" |
|
|
|
|
| async def _call_artist_api(session: aiohttp.ClientSession, params: Dict, output_path: str, previous_image_path: Optional[str] = None) -> bool: |
| """Генерирует один кадр. Требует API-ключ.""" |
| max_retries = 3 |
| for attempt in range(max_retries): |
| headers = {} |
| api_key = key_rotator.get_next_key() |
| if api_key: headers["Authorization"] = f"Bearer {api_key}" |
| |
| form = aiohttp.FormData() |
| params["seed"] = params.get("seed", 0) + attempt * 10 |
| for key, value in params.items(): form.add_field(key, str(value)) |
| |
| if previous_image_path and os.path.exists(previous_image_path): |
| form.add_field('image', open(previous_image_path, 'rb')) |
| |
| try: |
| async with session.post(IMAGE_API_URL, data=form, headers=headers, timeout=240) as response: |
| if response.status == 200: |
| with open(output_path, "wb") as f: f.write(await response.read()) |
| return True |
| except Exception as e: |
| if attempt == max_retries - 1: print(f"Artist API System Error: {e}") |
| await asyncio.sleep(2) |
| return False |
|
|
|
|
| def _compile_gif_and_archive(image_files: List[str], fps: int, timestamp: str) -> str: |
| """Собирает GIF, архивирует сессию и возвращает финальный путь к GIF.""" |
| images = [Image.open(f) for f in image_files] |
| temp_gif_path = os.path.join(OUTPUT_DIR, f"animation_{timestamp}.gif") |
| images[0].save(temp_gif_path, save_all=True, append_images=images[1:], duration=1000 // fps, loop=0) |
| archive_path = os.path.join(ARCHIVE_DIR, timestamp) |
| frames_archive_path = os.path.join(archive_path, "frames") |
| shutil.move(os.path.dirname(image_files[0]), frames_archive_path) |
| final_gif_path = os.path.join(archive_path, os.path.basename(temp_gif_path)) |
| shutil.move(temp_gif_path, final_gif_path) |
| return final_gif_path |
|
|
|
|
| |
| async def generate_gif_flow( |
| proxy: str, main_idea: str, fps: int, duration: int, width: int, height: int, |
| artist_model: str, seed_val: int, delay: int, progress: gr.Progress |
| ) -> Tuple[Optional[str], str]: |
| status_log = ["🚀 **Старт**: Инициализация..."] |
| yield "\n".join(status_log) |
| if not main_idea: raise gr.Error("Пожалуйста, введите основную идею.") |
|
|
| timestamp = time.strftime("%Y%m%d-%H%M%S") |
| session_temp_dir = os.path.join(TEMP_DIR, timestamp) |
| for dir_path in [session_temp_dir, OUTPUT_DIR, ARCHIVE_DIR]: os.makedirs(dir_path, exist_ok=True) |
| |
| total_frames = int(fps * duration) |
| seed = int(seed_val) if seed_val != 0 else random.randint(1, 1000000) |
| status_log.append(f"🌱 **Параметры**: Seed: `{seed}`, Кадров: `{total_frames}`.") |
| yield "\n".join(status_log) |
|
|
| connector = ProxyConnector.from_url(proxy) if proxy else None |
| async with aiohttp.ClientSession(connector=connector) as session: |
| director_prompt = f"Based on the core idea '{main_idea}', generate {total_frames} prompts..." |
| prompts, msg = await _call_director_api(session, director_prompt, seed) |
| status_log.append(msg); yield "\n".join(status_log) |
| if not prompts: return None, "\n".join(status_log) |
|
|
| generated_files, previous_frame_path = [], None |
| for i in progress.tqdm(range(total_frames), desc="🎨 Генерация кадров"): |
| header = f"🎬 **Кадр {i + 1}/{total_frames}**" |
| frame_path = os.path.join(session_temp_dir, f"frame_{i:04d}.png") |
| params = {"width": width, "height": height, "seed": seed + i, "nologo": "true", "prompt": prompts[i % len(prompts)]} |
| |
| params["model"] = "flux" if i == 0 else artist_model |
| status_log.append(f"{header}: Режим `{'text2img' if i==0 else 'img2img'}` (модель `{params['model']}`).") |
| yield "\n".join(status_log) |
| |
| if await _call_artist_api(session, params, frame_path, previous_frame_path): |
| status_log.append(f"{header}: ✅ Успешно.") |
| generated_files.append(frame_path) |
| previous_frame_path = frame_path |
| else: |
| status_log.append(f"❌ **СТОП**: {header}: Не удалось сгенерировать."); yield "\n".join(status_log) |
| break |
| |
| yield "\n".join(status_log) |
| if i < total_frames - 1 and delay > 0: await asyncio.sleep(delay) |
|
|
| if not generated_files: |
| status_log.append("😭 **Результат**: Не создано ни одного кадра.") |
| return None, "\n".join(status_log) |
|
|
| status_log.append("🎞️ **Сборка**: Создание GIF и архивация...") |
| yield "\n".join(status_log) |
| final_gif_path = _compile_gif_and_archive(generated_files, fps, timestamp) |
| status_log.append(f"✅ **Готово**: GIF сохранена в `{final_gif_path}`.") |
| return final_gif_path, "\n".join(status_log) |
|
|
|
|
| |
| def create_ui(): |
| css = "h1 {text-align: center;} footer {display: none !important;}" |
| with gr.Blocks(css=css, title="🎬 GIF Animator Pro", theme=gr.themes.Soft()) as demo: |
| gr.HTML("""<div style="text-align: center; padding: 20px; background: linear-gradient(135deg, #89f7fe 0%, #66a6ff 100%); border-radius: 10px; margin-bottom: 20px;"><h1 style="color: white; margin: 0; font-size: 2.8em; text-shadow: 2px 2px 4px #00000040;">🎬 GIF Animator Pro</h1></div>""") |
| with gr.Row(): |
| with gr.Column(scale=2): |
| main_idea = gr.Textbox(label="💡 1. Главная идея анимации", lines=3) |
| with gr.Accordion("🤖 2. Выбор модели художника", open=True): |
| artist_model = gr.Dropdown(IMAGE_MODELS_IMG2IMG, label="Модель-художник", value="kontext") |
| with gr.Accordion("⚙️ 3. Параметры анимации", open=True): |
| fps = gr.Slider(5, 30, value=10, step=1, label="Кадров/сек") |
| duration = gr.Slider(1, 10, value=2, step=1, label="Длительность (сек)") |
| width = gr.Slider(256, 1024, value=512, step=64, label="Ширина") |
| height = gr.Slider(256, 1024, value=512, step=64, label="Высота") |
| with gr.Accordion("🔧 4. Дополнительные настройки", open=False): |
| seed = gr.Number(label="Seed (0 = случайный)", value=0) |
| delay = gr.Slider(0, 30, value=2, step=1, label="Пауза между кадрами") |
| proxy = gr.Textbox(label="SOCKS5 Прокси (опционально)") |
| generate_btn = gr.Button("✨ Сгенерировать GIF!", variant="primary", size="lg") |
| with gr.Column(scale=3): |
| output_gif = gr.Image(label="🎞️ Результат", interactive=False, height=400) |
| with gr.Tabs(): |
| with gr.TabItem("📋 Журнал событий"): status_box = gr.Markdown() |
| with gr.TabItem("🔑 Статус API ключей"): |
| keys_status = gr.Markdown(value=key_rotator.get_status) |
| refresh_btn = gr.Button("🔄 Обновить ключи") |
|
|
| def on_generate_start(): return {generate_btn: gr.update(value="⏳ Генерация...", interactive=False), status_box: None, output_gif: None} |
| def on_generate_finish(): return {generate_btn: gr.update(value="✨ Сгенерировать!", interactive=True)} |
|
|
| generate_btn.click(on_generate_start, outputs=[generate_btn, status_box, output_gif]).then( |
| fn=generate_gif_flow, |
| inputs=[proxy, main_idea, fps, duration, width, height, artist_model, seed, delay], |
| outputs=[output_gif, status_box] |
| ).then(on_generate_finish, outputs=[generate_btn]) |
| |
| refresh_btn.click(lambda: (key_rotator.refresh(), key_rotator.get_status())[1], outputs=[keys_status]) |
| |
| return demo |
|
|
| if __name__ == "__main__": |
| ui = create_ui() |
| |
| ui.launch() |