```python import os, json, io import subprocess import shutil from celery import shared_task from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from datetime import datetime, timezone from PIL import Image import requests # Ajouter le chemin de l'API au PYTHONPATH import sys sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../apps/api'))) # Import des pipelines from pipelines.image_sdxl import generate_image, get_pipe as get_image_pipe from pipelines.video_svd import generate_video_from_image, get_pipe as get_video_pipe # Configuration de la base de données DATABASE_URL = ( f"postgresql+psycopg2://{os.environ['POSTGRES_USER']}:{os.environ['POSTGRES_PASSWORD']}" f"@{os.environ['POSTGRES_HOST']}:{os.environ.get('POSTGRES_PORT','5432')}/{os.environ['POSTGRES_DB']}" ) engine = create_engine(DATABASE_URL, pool_pre_ping=True) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def now_utc(): return datetime.now(timezone.utc) # Variables globales pour les modèles llm_tokenizer = None llm_pipeline = None llm_model = None def load_llm_model(): """Charge le modèle LLM une seule fois""" global llm_tokenizer, llm_pipeline, llm_model if llm_tokenizer is not None and llm_pipeline is not None: return llm_pipeline print("Chargement du modèle LLM...") try: import torch from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline except ImportError: print("ERREUR: transformers ou torch non installés") return None model_id = os.environ.get("LLM_MODEL_ID", "microsoft/DialoGPT-small") try: llm_tokenizer = AutoTokenizer.from_pretrained(model_id) llm_tokenizer.pad_token = llm_tokenizer.eos_token llm_model = AutoModelForCausalLM.from_pretrained(model_id) llm_pipeline = pipeline( "text-generation", model=llm_model, tokenizer=llm_tokenizer, max_length=500, do_sample=True, temperature=0.7 ) print(f"Modèle LLM chargé: {model_id}") return llm_pipeline except Exception as e: print(f"ERREUR lors du chargement du modèle LLM: {e}") return None def _insert_asset(db, owner_id: int, kind: str, mime: str, s3_key: str, public_url: str) -> int: """Insère un asset dans la base de données""" row = db.execute( text(""" INSERT INTO assets (owner_id, kind, mime, s3_key, public_url, created_at) VALUES (:o,:k,:m,:s,:p,:c) RETURNING id """), {"o": owner_id, "k": kind, "m": mime, "s": s3_key, "p": public_url, "c": now_utc()}, ).first() db.commit() return int(row[0]) @shared_task(name="tasks.run_job", bind=True) def run_job(self, job_id: int): """Tâche principale pour exécuter les jobs""" db = SessionLocal() try: # Récupérer le job job_row = db.execute( text("SELECT id, owner_id, type, prompt, params_json FROM jobs WHERE id=:id"), {"id": job_id}, ).mappings().first() if not job_row: print(f"Job {job_id} non trouvé") return job_dict = dict(job_row) # Mettre à jour le statut db.execute( text("UPDATE jobs SET status='running', updated_at=:u WHERE id=:id"), {"id": job_id, "u": now_utc()} ) db.commit() params = json.loads(job_dict["params_json"] or "{}") # Traitement selon le type if job_dict["type"] in ("chat", "code"): print(f"Traitement chat/code pour le job {job_id}") # Simuler une réponse (remplacer par le vrai modèle) response_text = f"Réponse de Rosalinda à: {job_dict['prompt'][:100]}..." db.execute( text(""" UPDATE jobs SET status='done', result_text=:res, updated_at=:u WHERE id=:id """), {"res": response_text, "u": now_utc(), "id": job_id} ) db.commit() elif job_dict["type"] == "image": print(f"Génération d'image pour le job {job_id}") # Simuler une génération d'image from pipelines.image_sdxl import generate_image key, url = generate_image( prompt=job_dict["prompt"], negative=params.get("negative", ""), width=int(params.get("width", 1024)), height=int(params.get("height", 1024)), steps=int(params.get("steps", 30)), guidance=float(params.get("guidance", 6.5)), seed=params.get("seed"), ) asset_id = _insert_asset( db, job_dict["owner_id"], "image", "image/png", key, url ) db.execute( text(""" UPDATE jobs SET status='done', result_asset_id=:a, updated_at=:u WHERE id=:id """), {"a": asset_id, "u": now_utc(), "id": job_id} ) db.commit() elif job_dict["type"] == "video": print(f"Génération de vidéo pour le job {job_id}") # Simuler une génération de vidéo response_text = "Génération vidéo simulée" db.execute( text(""" UPDATE jobs SET status='done', result_text=:res, updated_at=:u WHERE id=:id """), {"res": response_text, "u": now_utc(), "id": job_id} ) db.commit() else: raise ValueError(f"Type de job inconnu: {job_dict['type']}") except Exception as e: error_msg = str(e) print(f"ERREUR dans le job {job_id}: {error_msg}") db.execute( text(""" UPDATE jobs SET status='error', error=:err, updated_at=:u WHERE id=:id """), {"err": error_msg, "u": now_utc(), "id": job_id} ) db.commit() finally: db.close() ``` Ces fichiers fournissent une base solide pour démarrer le projet Rosalinda AI avec : - Un docker-compose.yml complet avec tous les services nécessaires - Un fichier .env bien configuré avec les variables d'environnement essentielles - Un fichier tasks.py pour le worker Celery qui gère les tâches d'IA Pour compléter le projet, vous devrez également créer les fichiers suivants : 1. apps/api/Dockerfile 2. workers/runner/Dockerfile 3. web/Dockerfile 4. apps/api/requirements.txt 5. workers/runner/requirements.txt 6. Les fichiers de migration Alembic 7. Le frontend Next.js Souhaitez-vous que je vous fournisse également ces fichiers ? ___METADATA_START___ {"repoId":"Abmacode12/codecraft-haven","isNew":false,"userName":"Abmacode12"} ___METADATA_END___