import streamlit as st import asyncio import httpx from pathlib import Path import os import base64 # Inicjalizacja katalogu wyjściowego OUTPUT_DIR = "generated_videos" Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True) # Aktualne URL dla API Hailuo CREATE_URL = "https://api.minimaxi.chat/v1/video_generation" STATUS_URL = "https://api.minimaxi.chat/v1/query/video_generation" RETRIEVE_URL = "https://api.minimaxi.chat/v1/files/retrieve" def encode_image(image_path): """Konwertuje obraz na base64.""" with open(image_path, "rb") as image_file: encoded = base64.b64encode(image_file.read()).decode('utf-8') st.write(f"Długość base64 obrazu: {len(encoded)} znaków") # Debugowanie return encoded async def generate_video(api_key: str, prompt: str, optimize_prompt: bool = True, image: str = None) -> str: """Funkcja generuje film na podstawie promptu i opcjonalnego obrazu.""" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } data = { "model": "video-01", "prompt": prompt, "prompt_optimizer": optimize_prompt # Dodana kontrola optymalizacji promptu } if image: if os.path.exists(image): # Jeśli to plik, przekonwertuj na base64 image = encode_image(image) data["first_frame_image"] = f"data:image/png;base64,{image}" # Poprawione przekazywanie base64 async with httpx.AsyncClient() as client: try: response = await client.post(CREATE_URL, headers=headers, json=data, timeout=60.0) response.raise_for_status() response_data = response.json() st.write("**Pełna odpowiedź API:**") st.json(response_data) # Wyświetlanie pełnej odpowiedzi API task_id = response_data.get('task_id') if not task_id: st.error("Brak task_id w odpowiedzi API") return None max_retries = 60 # maksymalnie 10 minut przy 10s odstępie retry_count = 0 while retry_count < max_retries: status_response = await client.get(f"{STATUS_URL}?task_id={task_id}", headers=headers) status_response.raise_for_status() status_data = status_response.json() status = status_data.get('status') if status == "Success": file_id = status_data.get('file_id') if not file_id: st.error("Brak file_id w odpowiedzi API") return None retrieve_response = await client.get(f"{RETRIEVE_URL}?file_id={file_id}", headers=headers) retrieve_response.raise_for_status() retrieve_data = retrieve_response.json() download_url = retrieve_data.get('file', {}).get('download_url') if not download_url: st.error("Nie można pobrać URL dla pliku") return None file_response = await client.get(download_url) file_response.raise_for_status() output_path = os.path.join(OUTPUT_DIR, "generated_video.mp4") with open(output_path, 'wb') as f: f.write(file_response.content) return output_path elif status == "Fail": st.error(f"Generowanie filmu nie powiodło się: {status_data.get('status_msg')}") return None await asyncio.sleep(10) retry_count += 1 st.error("Przekroczono limit czasu podczas generowania filmu") return None except httpx.HTTPStatusError as http_err: st.error(f"HTTP error: {http_err.response.status_code} - {http_err.response.text}") return None except httpx.RequestError as req_err: st.error(f"Request error: {req_err}") return None except Exception as e: st.error(f"Wystąpił błąd: {e}") return None def main(): """Interfejs użytkownika aplikacji.""" st.title("Generator Filmików Hailuo") st.write("Wprowadź swój klucz API, opis filmu i opcjonalnie obraz jako pierwszą klatkę.") api_key = st.text_input("Wprowadź swój klucz API", type="password") prompt = st.text_area("Wprowadź opis filmu", "prompt") # Dodana kontrola optymalizacji promptu optimize_prompt = st.checkbox("Optymalizuj prompt", value=True, help="Gdy włączone, model automatycznie optymalizuje prompt dla lepszych rezultatów") image_option = st.radio("Wybierz sposób dodawania obrazu", ("Brak obrazu", "Wklej adres URL obrazu", "Prześlij obraz")) image = None if image_option == "Wklej adres URL obrazu": image = st.text_input("Wprowadź adres URL obrazu") elif image_option == "Prześlij obraz": uploaded_file = st.file_uploader("Prześlij obraz", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: image_path = os.path.join(OUTPUT_DIR, uploaded_file.name) with open(image_path, "wb") as f: f.write(uploaded_file.getbuffer()) image = image_path if st.button("Generuj Film"): if not api_key: st.error("Proszę wprowadź swój klucz API.") return if not prompt: st.error("Proszę wprowadź opis filmu.") return st.info("Rozpoczęto generowanie filmu...") output_path = asyncio.run(generate_video(api_key, prompt, optimize_prompt, image)) if output_path: st.success("Film został wygenerowany!") st.video(output_path) else: st.error("Wystąpił błąd podczas generowania filmu.") if __name__ == "__main__": main()