Testgen / app.py
Kgshop's picture
Update app.py
02746dc verified
import os
import uuid
from flask import Flask, request, jsonify, Response, Blueprint, render_template_string
import google.generativeai as genai
import json
import logging
import threading
import time
from datetime import datetime
from huggingface_hub import HfApi, hf_hub_download, create_repo, snapshot_download
from huggingface_hub.utils import RepositoryNotFoundError, HfHubHTTPError
from werkzeug.utils import secure_filename # Unused in final version, but kept for context if file uploads were needed
from dotenv import load_dotenv
import requests
import importlib.util
import sys
import shutil # For deleting directories
# Load environment variables from .env file
load_dotenv()
app = Flask(__name__)
# --- Configuration from Code 2 and New/Modified for Code 1 ---
REPO_ID = "Kgshop/testsynk"
HF_TOKEN_WRITE = os.getenv("HF_TOKEN") # Ensure this is set for write access
HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") # Optional, falls back to WRITE if not set
GENERATED_SITES_DIR = 'generated_sites'
GENERATED_APPS_METADATA_FILE = os.path.join(GENERATED_SITES_DIR, 'generated_apps.json')
# Global dictionary to hold dynamically loaded blueprints
# Key: app_uuid, Value: blueprint_object
_GENERATED_BLUEPRINTS = {}
# Ensure base directory for generated sites exists
if not os.path.exists(GENERATED_SITES_DIR):
os.makedirs(GENERATED_SITES_DIR)
# --- Logging Setup from Code 2 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Hugging Face Utility Functions (Adapted from Code 2 for directory sync) ---
DOWNLOAD_RETRIES = 3
DOWNLOAD_DELAY = 5
def create_hf_repo_if_not_exists():
"""Ensures the Hugging Face dataset repository exists."""
if not HF_TOKEN_WRITE:
logging.warning("HF_TOKEN for writing not set. Cannot create Hugging Face repo.")
return False
try:
api = HfApi(token=HF_TOKEN_WRITE)
api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True, private=True)
logging.info(f"Hugging Face dataset repo '{REPO_ID}' ensured to exist.")
return True
except Exception as e:
logging.error(f"Failed to create/ensure Hugging Face repo '{REPO_ID}': {e}")
return False
def download_data_from_hf(target_dir="."):
"""Downloads the entire repository snapshot from Hugging Face."""
if not REPO_ID:
logging.warning("REPO_ID not set. Skipping Hugging Face download.")
return False
token_to_use = HF_TOKEN_READ if HF_TOKEN_READ else HF_TOKEN_WRITE
if not token_to_use:
logging.warning("No Hugging Face token available for download. Skipping.")
return False
logging.info(f"Attempting full repo snapshot download for '{REPO_ID}' to '{target_dir}'...")
success = False
for attempt in range(DOWNLOAD_RETRIES + 1):
try:
# `snapshot_download` downloads the entire repo to the specified local_dir
logging.info(f"Downloading snapshot of repo '{REPO_ID}' (Attempt {attempt + 1}/{DOWNLOAD_RETRIES + 1})...")
# Clear existing content in target_dir if it's the `generated_sites` directory
if os.path.exists(target_dir) and target_dir == GENERATED_SITES_DIR:
logging.info(f"Clearing existing local '{target_dir}' before download to ensure full sync.")
# Keep the base directory but remove its contents
for item in os.listdir(target_dir):
item_path = os.path.join(target_dir, item)
if os.path.isfile(item_path):
os.remove(item_path)
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
# Download the snapshot into the current working directory, then copy `generated_sites`
downloaded_repo_path = snapshot_download(
repo_id=REPO_ID,
repo_type="dataset",
token=token_to_use,
local_dir="hf_download_temp", # Download to a temp directory first
local_dir_use_symlinks=False,
# force_download=True # Uncomment to always redownload, useful for development
)
logging.info(f"Repo snapshot downloaded to {downloaded_repo_path}")
# Move contents of `generated_sites` from temp download to actual GENERATED_SITES_DIR
source_generated_sites = os.path.join(downloaded_repo_path, 'generated_sites')
if os.path.exists(source_generated_sites):
for item in os.listdir(source_generated_sites):
s = os.path.join(source_generated_sites, item)
d = os.path.join(GENERATED_SITES_DIR, item)
if os.path.isdir(s):
if os.path.exists(d): shutil.rmtree(d) # Remove existing sub-dir before moving
shutil.move(s, d)
else:
if os.path.exists(d): os.remove(d) # Remove existing file before moving
shutil.move(s, d)
logging.info(f"Moved generated_sites content from {source_generated_sites} to {GENERATED_SITES_DIR}")
else:
logging.warning(f"No 'generated_sites' folder found in the downloaded repo {downloaded_repo_path}.")
# Clean up the temporary download directory
if os.path.exists(downloaded_repo_path):
shutil.rmtree(downloaded_repo_path)
logging.info(f"Cleaned up temporary download directory {downloaded_repo_path}.")
success = True
break
except RepositoryNotFoundError:
logging.error(f"Repository {REPO_ID} not found on Hugging Face. Ensuring its creation...")
if create_hf_repo_if_not_exists():
logging.info(f"Repository {REPO_ID} created. Retrying download after creation.")
continue # Retry the download as repo now exists
return False # Cannot proceed without repo
except HfHubHTTPError as e:
logging.error(f"HTTP error downloading repo (Attempt {attempt + 1}): {e}. Retrying in {DOWNLOAD_DELAY}s...")
except requests.exceptions.RequestException as e:
logging.error(f"Network error downloading repo (Attempt {attempt + 1}): {e}. Retrying in {DOWNLOAD_DELAY}s...", exc_info=True)
except Exception as e:
logging.error(f"Unexpected error during repo snapshot download (Attempt {attempt + 1}): {e}. Retrying in {DOWNLOAD_DELAY}s...", exc_info=True)
if attempt < DOWNLOAD_RETRIES:
time.sleep(DOWNLOAD_DELAY)
if not success:
logging.error(f"Failed to download repo '{REPO_ID}' after {DOWNLOAD_RETRIES + 1} attempts.")
return success
def upload_data_to_hf(target_dir="."):
"""Uploads the specified directory to Hugging Face."""
if not HF_TOKEN_WRITE:
logging.warning("HF_TOKEN for writing not set. Skipping upload to Hugging Face.")
return
try:
api = HfApi(token=HF_TOKEN_WRITE)
logging.info(f"Starting upload of directory '{target_dir}' to HF repo '{REPO_ID}'...")
# Use upload_folder to sync the entire directory
# path_in_repo='.' means upload contents of target_dir to root of repo.
# If target_dir is 'generated_sites', it will upload 'generated_sites' itself to the repo root.
# This is desired to keep the folder structure consistent.
api.upload_folder(
folder_path=target_dir,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN_WRITE,
commit_message=f"Sync {target_dir} {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
logging.info(f"Directory '{target_dir}' successfully uploaded to Hugging Face.")
except Exception as e:
logging.error(f"Error during Hugging Face folder upload for '{target_dir}': {e}", exc_info=True)
def load_generated_apps_metadata():
"""Loads metadata about generated apps from generated_apps.json."""
if not os.path.exists(GENERATED_APPS_METADATA_FILE):
logging.warning(f"Metadata file {GENERATED_APPS_METADATA_FILE} not found. Creating empty structure.")
return {"apps": {}}
try:
with open(GENERATED_APPS_METADATA_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
if not isinstance(data, dict) or "apps" not in data:
logging.warning(f"Invalid format in {GENERATED_APPS_METADATA_FILE}. Resetting to empty.")
return {"apps": {}}
return data
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON from {GENERATED_APPS_METADATA_FILE}: {e}. File might be corrupt. Resetting to empty.")
return {"apps": {}}
except Exception as e:
logging.error(f"Unexpected error loading metadata from {GENERATED_APPS_METADATA_FILE}: {e}. Resetting to empty.", exc_info=True)
return {"apps": {}}
def save_generated_apps_metadata(metadata):
"""Saves metadata about generated apps to generated_apps.json and triggers HF upload."""
try:
with open(GENERATED_APPS_METADATA_FILE, 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=4)
logging.info(f"Metadata saved to {GENERATED_APPS_METADATA_FILE}")
# Trigger upload of the entire generated_sites directory including the metadata file
upload_data_to_hf(GENERATED_SITES_DIR)
except Exception as e:
logging.error(f"Error saving metadata to {GENERATED_APPS_METADATA_FILE}: {e}", exc_info=True)
def load_and_register_blueprint(app_uuid, app_path):
"""
Dynamically loads a Flask Blueprint from 'app.py' within a given app_path
and registers it with the main Flask app.
"""
if app_uuid in _GENERATED_BLUEPRINTS:
logging.info(f"Blueprint {app_uuid} already loaded.")
return True
module_path = os.path.join(app_path, 'app.py')
if not os.path.exists(module_path):
logging.error(f"App module not found at {module_path} for UUID {app_uuid}. Skipping.")
return False
try:
# Create a module spec from the file path
# Use a unique name for the module to avoid conflicts, e.g., 'generated_app_UUID'
spec = importlib.util.spec_from_file_location(f"generated_app_{app_uuid}", module_path)
if spec is None:
raise ImportError(f"Could not create module spec for {module_path}")
# Create a new module based on the spec
module = importlib.util.module_from_spec(spec)
# Add to sys.modules for proper import behavior and to allow other modules to import it if needed
sys.modules[spec.name] = module
# Execute the module's code
spec.loader.exec_module(module)
# Look for the 'generated_blueprint' instance in the module
blueprint_found = getattr(module, 'generated_blueprint', None)
if isinstance(blueprint_found, Blueprint):
app.register_blueprint(blueprint_found, url_prefix=f'/generated_sites/{app_uuid}')
_GENERATED_BLUEPRINTS[app_uuid] = blueprint_found
logging.info(f"Blueprint '{blueprint_found.name}' for UUID {app_uuid} loaded and registered under /generated_sites/{app_uuid}")
return True
else:
logging.error(f"No Flask Blueprint named 'generated_blueprint' found in {module_path} for UUID {app_uuid}.")
return False
except Exception as e:
logging.error(f"Failed to load and register blueprint from {module_path} for UUID {app_uuid}: {e}", exc_info=True)
return False
def load_all_generated_blueprints():
"""Loads all generated blueprints listed in metadata on application startup."""
logging.info("Attempting to load all previously generated blueprints...")
metadata = load_generated_apps_metadata()
if not metadata.get("apps"):
logging.info("No generated apps metadata found to load.")
return
for app_uuid, app_info in metadata.get("apps", {}).items():
app_dir = os.path.join(GENERATED_SITES_DIR, app_uuid)
if not os.path.exists(app_dir):
logging.warning(f"Directory for app {app_uuid} not found locally at {app_dir}. Skipping load.")
continue
load_and_register_blueprint(app_uuid, app_dir)
logging.info("Completed loading of existing blueprints.")
def periodic_backup():
"""Starts a periodic backup thread for the generated_sites directory."""
backup_interval = 1800 # 30 minutes
logging.info(f"Setting up periodic backup of generated_sites directory every {backup_interval} seconds.")
while True:
time.sleep(backup_interval)
logging.info("Starting periodic backup of generated_sites directory...")
upload_data_to_hf(GENERATED_SITES_DIR)
logging.info("Periodic backup finished.")
# --- Flask App Routes and AI Generation Logic ---
# Initial HTML template for the generator UI (from Code 1, slightly adapted for clarity)
html_template = """
<!DOCTYPE html>
<html lang="ru">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0, viewport-fit=cover">
<title>EVA - Генератор Сайтов</title>
<style>
:root {
--system-gray-100-light: #f2f2f7;
--system-gray-75-light: #f8f8fa;
--system-gray-50-light: #ffffff;
--system-gray-dark-100-light: #000000;
--system-gray-dark-75-light: #1c1c1e;
--system-gray-dark-50-light: #3a3a3c;
--system-gray-light-75-light: #8e8e93;
--system-gray-light-50-light: #aeaeb2;
--system-blue-light: #007aff;
--system-blue-light-hover: #005ecf;
--system-red-light: #ff3b30;
--system-separator-light: rgba(60, 60, 67, 0.29);
--system-separator-opaque-light: #d1d1d6;
--system-gray-100-dark: #1c1c1e;
--system-gray-75-dark: #2c2c2e;
--system-gray-50-dark: #000000;
--system-gray-dark-100-dark: #ffffff;
--system-gray-dark-75-dark: #f2f2f7;
--system-gray-dark-50-dark: #e5e5ea;
--system-gray-light-75-dark: #8e8e93;
--system-gray-light-50-dark: #636366;
--system-blue-dark: #0a84ff;
--system-blue-dark-hover: #3b9eff;
--system-red-dark: #ff453a;
--system-separator-dark: rgba(84, 84, 88, 0.65);
--system-separator-opaque-dark: #38383a;
}
@media (prefers-color-scheme: dark) {
:root {
--bg-color: var(--system-gray-50-dark);
--content-bg: var(--system-gray-100-dark);
--text-color: var(--system-gray-dark-100-dark);
--secondary-text-color: var(--system-gray-light-75-dark);
--tertiary-text-color: var(--system-gray-light-50-dark);
--border-color: var(--system-separator-dark);
--border-color-opaque: var(--system-separator-opaque-dark);
--input-bg: var(--system-gray-75-dark);
--primary-color: var(--system-blue-dark);
--primary-color-hover: var(--system-blue-dark-hover);
--error-color: var(--system-red-dark);
}
}
@media (prefers-color-scheme: light) {
:root {
--bg-color: var(--system-gray-100-light);
--content-bg: var(--system-gray-50-light);
--text-color: var(--system-gray-dark-100-light);
--secondary-text-color: var(--system-gray-light-75-light);
--tertiary-text-color: var(--system-gray-light-50-light);
--border-color: var(--system-separator-light);
--border-color-opaque: var(--system-separator-opaque-light);
--input-bg: var(--system-gray-75-light);
--primary-color: var(--system-blue-light);
--primary-color-hover: var(--system-blue-light-hover);
--error-color: var(--system-red-light);
}
}
html {
height: -webkit-fill-available;
}
body {
font-family: -apple-system, BlinkMacSystemFont, "SF Pro Text", "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif;
margin: 0;
padding: 20px env(safe-area-inset-top) 20px env(safe-area-inset-bottom);
background-color: var(--bg-color);
color: var(--text-color);
display: flex;
justify-content: center;
align-items: flex-start;
min-height: 100vh;
min-height: -webkit-fill-available;
line-height: 1.45;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
}
.container {
background-color: var(--content-bg);
padding: 25px 30px 30px 30px;
border-radius: 24px;
box-shadow: 0 12px 28px rgba(0, 0, 0, 0.08), 0 2px 4px rgba(0,0,0,0.05);
max-width: 580px;
width: calc(100% - 40px);
box-sizing: border-box;
margin-top: 30px;
}
h1 {
font-size: 32px;
font-weight: 700;
text-align: center;
margin-bottom: 8px;
color: var(--text-color);
letter-spacing: -0.5px;
}
p.subtitle {
font-size: 17px;
color: var(--secondary-text-color);
text-align: center;
margin-bottom: 35px;
font-weight: 400;
}
.form-group {
margin-bottom: 28px;
}
label.input-label {
display: block;
font-weight: 500;
margin-bottom: 10px;
font-size: 15px;
color: var(--secondary-text-color);
padding-left: 5px;
}
textarea#prompt-input {
width: 100%;
padding: 14px 18px;
border: 1px solid var(--border-color-opaque);
border-radius: 12px;
font-size: 16px;
background-color: var(--input-bg);
color: var(--text-color);
box-sizing: border-box;
transition: border-color 0.2s ease, box-shadow 0.2s ease;
font-family: inherit;
resize: vertical;
min-height: 120px;
}
textarea#prompt-input:focus {
border-color: var(--primary-color);
box-shadow: 0 0 0 3px color-mix(in srgb, var(--primary-color) 25%, transparent);
outline: none;
}
button#generate-button {
width: 100%;
padding: 16px;
background-color: var(--primary-color);
color: white;
border: none;
border-radius: 12px;
font-size: 17px;
font-weight: 600;
cursor: pointer;
transition: background-color 0.2s ease, transform 0.1s ease;
margin-top: 15px;
}
button#generate-button:hover {
background-color: var(--primary-color-hover);
}
button#generate-button:active {
transform: scale(0.98);
}
button#generate-button:disabled {
background-color: var(--tertiary-text-color);
cursor: not-allowed;
}
.output-section {
margin-top: 35px;
}
.output-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 10px;
}
label#output-label {
font-weight: 500;
font-size: 15px;
color: var(--secondary-text-color);
padding-left: 5px;
}
button#copy-button {
background-color: transparent;
border: none;
color: var(--primary-color);
font-size: 14px;
font-weight: 500;
cursor: pointer;
padding: 5px 8px;
border-radius: 6px;
transition: background-color 0.2s ease, color 0.2s ease;
display: none;
}
button#copy-button:hover {
background-color: color-mix(in srgb, var(--primary-color) 15%, transparent);
}
button#copy-button:active {
background-color: color-mix(in srgb, var(--primary-color) 25%, transparent);
}
button#copy-button.copied {
color: #34c759;
}
@media (prefers-color-scheme: dark) {
button#copy-button.copied {
color: #30d158;
}
}
#output-container {
background-color: var(--input-bg);
padding: 18px 20px;
border-radius: 12px;
min-height: 60px;
border: 1px solid var(--border-color);
word-wrap: break-word;
font-size: 15px;
color: var(--text-color);
line-height: 1.5;
transition: border-color 0.2s ease, background-color 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
}
#output-container a {
color: var(--primary-color);
text-decoration: none;
font-weight: 500;
}
#output-container a:hover {
text-decoration: underline;
}
#output-container.loading::before {
content: "Генерация сайта...";
display: block;
text-align: center;
font-style: italic;
color: var(--secondary-text-color);
animation: fadePulse 1.8s infinite ease-in-out;
}
#output-container.error {
color: var(--error-color);
font-weight: 500;
border-color: color-mix(in srgb, var(--error-color) 50%, transparent);
background-color: color-mix(in srgb, var(--error-color) 10%, var(--input-bg));
justify-content: flex-start;
}
@keyframes fadePulse {
0%, 100% { opacity: 0.6; }
50% { opacity: 1; }
}
@media (max-width: 620px) {
body {
padding: 15px env(safe-area-inset-top) 15px env(safe-area-inset-bottom);
align-items: flex-start;
}
.container {
padding: 20px 20px 25px 20px;
margin-top: 15px;
border-radius: 20px;
width: calc(100% - 30px);
}
h1 {
font-size: 28px;
}
p.subtitle {
font-size: 16px;
margin-bottom: 25px;
}
.form-group {
margin-bottom: 22px;
}
textarea#prompt-input {
padding: 12px 15px;
min-height: 100px;
}
button#generate-button {
padding: 15px;
font-size: 16px;
}
#output-container {
padding: 15px 18px;
font-size: 14px;
min-height: 50px;
}
.output-section {
margin-top: 30px;
}
}
</style>
</head>
<body>
<div class="container">
<h1>EVA</h1>
<p class="subtitle">Генератор сайтов на базе ИИ</p>
<form id="generate-form">
<div class="form-group">
<label for="prompt-input" class="input-label">Опишите сайт, который вы хотите создать</label>
<textarea id="prompt-input" name="prompt" rows="6" required placeholder="Например: создай одностраничный сайт-портфолио для веб-дизайнера по имени Алия, с секциями 'Обо мне', 'Мои работы' и 'Контакты'. Используй современный минималистичный дизайн."></textarea>
</div>
<button type="submit" id="generate-button">Создать сайт</button>
</form>
<div class="output-section">
<div class="output-header">
<label id="output-label">Ссылка на сайт</label>
<button id="copy-button">Копировать</button>
</div>
<div id="output-container" aria-live="polite">
</div>
</div>
</div>
<script>
const form = document.getElementById('generate-form');
const promptInput = document.getElementById('prompt-input');
const outputContainer = document.getElementById('output-container');
const generateButton = document.getElementById('generate-button');
const copyButton = document.getElementById('copy-button');
form.addEventListener('submit', async (event) => {
event.preventDefault();
if (!promptInput.value.trim()) {
showError("Пожалуйста, опишите сайт, который вы хотите создать.");
return;
}
const formData = new FormData(form);
generateButton.disabled = true;
generateButton.textContent = 'Генерация...';
outputContainer.innerHTML = '';
outputContainer.classList.add('loading');
outputContainer.classList.remove('error');
copyButton.style.display = 'none';
copyButton.textContent = 'Копировать';
copyButton.classList.remove('copied');
try {
const response = await fetch('/generate', {
method: 'POST',
body: formData
});
const result = await response.json();
if (!response.ok) {
throw new Error(result.error || `Ошибка сервера: ${response.status}`);
}
if (result.site_url) {
const link = document.createElement('a');
link.href = result.site_url;
link.textContent = "Открыть сгенерированный сайт";
link.target = "_blank";
outputContainer.innerHTML = '';
outputContainer.appendChild(link);
copyButton.style.display = 'block';
copyButton.dataset.copyText = window.location.origin + result.site_url;
} else if (result.error) {
showError(result.error);
} else {
showError("Не удалось получить ссылку на сайт. Ответ сервера не содержит URL.");
}
} catch (error) {
console.error("Fetch Error:", error);
showError(`Ошибка: ${error.message}`);
copyButton.style.display = 'none';
} finally {
generateButton.disabled = false;
generateButton.textContent = 'Создать сайт';
outputContainer.classList.remove('loading');
}
});
copyButton.addEventListener('click', () => {
const textToCopy = copyButton.dataset.copyText;
if (!textToCopy) return;
navigator.clipboard.writeText(textToCopy).then(() => {
copyButton.textContent = 'Скопировано!';
copyButton.classList.add('copied');
setTimeout(() => {
copyButton.textContent = 'Копировать';
copyButton.classList.remove('copied');
}, 1500);
}).catch(err => {
console.error('Ошибка копирования: ', err);
copyButton.textContent = 'Ошибка';
setTimeout(() => {
copyButton.textContent = 'Копировать';
}, 1500);
});
});
function showError(message) {
outputContainer.innerHTML = '';
const errorMessageElement = document.createElement('span');
errorMessageElement.textContent = message;
outputContainer.appendChild(errorMessageElement);
outputContainer.classList.add('error');
outputContainer.classList.remove('loading');
copyButton.style.display = 'none';
}
</script>
</body>
</html>
"""
def generate_website_code_from_prompt(user_prompt):
"""
Generates Flask Blueprint Python code based on the user prompt.
"""
try:
# Using GEMINI_API_KEY from .env
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
raise ValueError("GEMINI_API_KEY environment variable not set. Please add it to your .env file.")
genai.configure(api_key=gemini_api_key)
except Exception as e:
logging.error(f"Error configuring GenAI: {e}")
raise ValueError(f"Не удалось настроить Google AI. Проблема с конфигурацией. Ошибка: {e}")
if not user_prompt or not user_prompt.strip():
raise ValueError("Текстовый запрос (промпт) не может быть пустым.")
# Updated system instruction for generating a Flask Blueprint
system_instruction = (
"You are an expert web developer specializing in Flask. Your task is to generate a complete, "
"self-contained Python Flask Blueprint code string based on the user's request. "
"This Blueprint will be dynamically loaded into a larger Flask application.\n"
"The generated Python string must adhere to the following rules:\n"
"1. It must start with necessary imports: `from flask import Blueprint, render_template_string, request, jsonify`.\n"
" If managing data via JSON file, also include `import json` and `import os`.\n"
"2. It must define a Flask Blueprint instance, named `generated_blueprint`. For example: `generated_blueprint = Blueprint('my_app_name', __name__)`.\n"
"3. All HTML, CSS (in `<style>` tags), and JavaScript (in `<script>` tags) should be embedded directly as multi-line Python strings within the blueprint's route functions. Do NOT use external files for these. Use f-strings for dynamic content where needed.\n"
"4. If the user requests 'database' functionality, simulate it with simple in-memory Python lists or dictionaries. If persistence is implied, provide a basic JSON file read/write mechanism within the Blueprint. The JSON file should be stored in the same directory as the `app.py` file using `os.path.join(os.path.dirname(__file__), 'data.json')`. Implement simple `load_data()` and `save_data()` functions inside the blueprint.\n"
"5. Define at least one route, typically the root (`/`), within the Blueprint, like `@generated_blueprint.route('/')`.\n"
"6. The code should be functional and visually appealing, using basic modern styling where appropriate.\n"
"7. The code must NOT include `app.run()` or any top-level Flask application instance creation (only a Blueprint).\n"
"8. For images or other assets, assume placeholder services (e.g., `https://via.placeholder.com/`) or base64 if small. Do not assume local static files that are not part of the `app.py` itself unless explicitly handled by Blueprint's `static_folder` (which you should define if needed, e.g., `static_folder='static', static_url_path='/static'`). For simplicity, prefer embedding or external links.\n"
"9. Directly output ONLY the Python code string. Do not include any explanatory text, markdown formatting (like ```python), or anything else before or after the code itself."
)
full_prompt = f"{system_instruction}\n\nUser request: \"{user_prompt}\""
response = None
try:
model = genai.GenerativeModel('gemini-1.5-flash-latest')
response = model.generate_content(full_prompt)
generated_text = ""
if hasattr(response, 'text') and response.text:
generated_text = response.text
elif response.parts:
generated_text = "".join(part.text for part in response.parts if hasattr(part, 'text'))
# Attempt to clean markdown if present (common issue with AI outputs)
clean_text = generated_text.strip()
if clean_text.startswith("```python"):
clean_text = clean_text[len("```python"):].strip()
if clean_text.endswith("```"):
clean_text = clean_text[:-len("```")].strip()
generated_text = clean_text
if not generated_text.strip():
if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason:
reason = response.prompt_feedback.block_reason
raise ValueError(f"Генерация контента заблокирована из-за политики безопасности (причина: {reason}). Попробуйте другой запрос.")
else:
raise ValueError("Модель вернула пустой результат. Попробуйте изменить запрос.")
# Basic validation for expected blueprint structure
if "from flask import Blueprint" not in generated_text or "generated_blueprint = Blueprint(" not in generated_text:
logging.warning(f"Generated text does not seem to contain a Flask Blueprint definition as expected. Preview: {generated_text[:500]}")
raise ValueError("Модель не сгенерировала корректный код Flask Blueprint. Попробуйте другой запрос или переформулируйте.")
return generated_text
except Exception as e:
logging.error(f"Error generating content with GenAI: {e}")
error_message = str(e)
if "API key not valid" in error_message:
raise ValueError("Внутренняя ошибка конфигурации API. Проверьте GEMINI_API_KEY в .env.")
elif "Billing account not found" in error_message or "billing account" in error_message.lower():
raise ValueError("Проблема с биллингом аккаунта Google Cloud. Проверьте статус вашего биллинг-аккаунта.")
elif "Could not find model" in error_message:
raise ValueError(f"Модель 'gemini-1.5-flash-latest' не найдена или недоступна. Возможно, регион или квоты.")
elif "resource has been exhausted" in error_message.lower() or "quota" in error_message.lower():
raise ValueError("Квота запросов к AI-модели исчерпана. Попробуйте позже.")
elif ("content has been blocked" in error_message.lower() or
(response and hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason)):
reason = "неизвестна"
if response and hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason:
reason = response.prompt_feedback.block_reason
elif "safety settings" in error_message.lower():
reason = "настройки безопасности"
raise ValueError(f"Генерация контента заблокирована (причина: {reason}). Попробуйте другой запрос.")
else:
raise ValueError(f"Ошибка при генерации Flask Blueprint кода: {e}")
@app.route('/')
def index():
return Response(html_template, mimetype='text/html')
@app.route('/generate', methods=['POST'])
def handle_generate():
if 'prompt' not in request.form:
return jsonify({"error": "Текстовый запрос (промпт) не найден."}), 400
user_prompt = request.form['prompt']
if not user_prompt or not user_prompt.strip():
return jsonify({"error": "Текстовый запрос не может быть пустым."}), 400
app_uuid = str(uuid.uuid4())
app_dir = os.path.join(GENERATED_SITES_DIR, app_uuid)
app_file_path = os.path.join(app_dir, 'app.py')
try:
logging.info(f"Generating blueprint for prompt: '{user_prompt[:100]}...' [UUID: {app_uuid}]")
blueprint_code = generate_website_code_from_prompt(user_prompt)
if not blueprint_code or not blueprint_code.strip():
return jsonify({"error": "Сгенерированный Python код Blueprint пуст."}), 500
# Create the unique directory for this app
os.makedirs(app_dir, exist_ok=True)
# Save the generated Blueprint code
with open(app_file_path, "w", encoding="utf-8") as f:
f.write(blueprint_code)
logging.info(f"Generated blueprint saved to {app_file_path}")
# Dynamically load and register the new blueprint
if not load_and_register_blueprint(app_uuid, app_dir):
# If loading fails, ensure cleanup of the created directory
if os.path.exists(app_dir):
shutil.rmtree(app_dir)
raise Exception("Не удалось загрузить или зарегистрировать сгенерированный Blueprint. Возможно, код содержит ошибки.")
# Update metadata for persistence and future loading
metadata = load_generated_apps_metadata()
metadata["apps"][app_uuid] = {
"created_at": datetime.now().isoformat(),
"prompt": user_prompt,
"path": os.path.relpath(app_dir, GENERATED_SITES_DIR), # Store relative path
"url_prefix": f"/generated_sites/{app_uuid}"
}
save_generated_apps_metadata(metadata) # This also triggers HF upload
logging.info(f"Metadata updated and triggered Hugging Face upload for app: {app_uuid}.")
site_url = f"/generated_sites/{app_uuid}"
return jsonify({"site_url": site_url})
except ValueError as ve:
logging.error(f"ValueError during site generation: {ve}", exc_info=True)
# Clean up partial directory if creation failed midway
if os.path.exists(app_dir) and not os.listdir(app_dir):
os.rmdir(app_dir)
return jsonify({"error": str(ve)}), 400
except Exception as e:
logging.error(f"Unexpected error during Flask Blueprint generation/registration: {e}", exc_info=True)
# Attempt to clean up generated files and directory if an error occurred
if os.path.exists(app_dir):
shutil.rmtree(app_dir)
return jsonify({"error": f"Внутренняя ошибка сервера при генерации сайта: {e}"}), 500
if __name__ == '__main__':
logging.info("Application starting up. Performing initial setup and data synchronization...")
# 1. Ensure Hugging Face repository exists
if HF_TOKEN_WRITE:
create_hf_repo_if_not_exists()
else:
logging.warning("HF_TOKEN for writing is not set. Cannot create Hugging Face repo or upload data. Periodic backup will not run.")
# 2. Attempt to download existing generated sites data from Hugging Face
# This will pull the 'generated_sites' folder contents into the local 'generated_sites' folder.
if REPO_ID:
logging.info(f"Attempting to download latest generated sites from Hugging Face repo '{REPO_ID}'...")
download_data_from_hf(GENERATED_SITES_DIR) # Target_dir should be 'generated_sites'
else:
logging.warning("REPO_ID not set, skipping Hugging Face snapshot download.")
# 3. Load and register existing generated blueprints from local storage (after potential download)
load_all_generated_blueprints()
logging.info("Initial blueprint loading complete.")
# 4. Start periodic backup thread if HF_TOKEN_WRITE is set
if HF_TOKEN_WRITE:
backup_thread = threading.Thread(target=periodic_backup, daemon=True)
backup_thread.start()
logging.info("Periodic backup thread started.")
else:
logging.warning("Periodic backup will NOT run (HF_TOKEN for writing not set).")
port = int(os.environ.get('PORT', 7860))
logging.info(f"Starting Flask app on host 0.0.0.0 and port {port}")
app.run(host='0.0.0.0', port=port, debug=False)