Spaces:
Running
Running
| from typing import Dict, Any, Union | |
| import json | |
| import os | |
| from huggingface_hub import hf_hub_download, HfApi | |
| from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError | |
| from src.core.state.session_manager import SessionManager | |
| from src.core.builder.code_generator import CodeGenerator | |
| from src.core.deployer.huggingface import HFDeployer | |
| from src.core.builder.proposal_generator import proposal_generator | |
| # Initialisation des singletons | |
| session_manager = SessionManager() | |
| # Note: HFDeployer est instancié à la demande pour avoir le token le plus à jour ou géré par contexte si besoin | |
| # Pour l'instant on l'instancie à chaque déploiement. | |
| def init_project(project_name: str, description: str, type: str = "adhoc") -> Dict[str, Any]: | |
| """ | |
| Creates a new empty project. | |
| Args: | |
| project_name: Technical name (e.g. strawberry-counter, ratp-api). | |
| description: Tool description, or complete Technical Specification (e.g. content of a Swagger/OpenAPI JSON). | |
| type: 'adhoc' (pure code), 'api_wrapper' (REST). | |
| Returns: | |
| A dictionary containing the 'draft_id' required for next steps. | |
| """ | |
| print(f"DEBUG [init_project]: project_name={project_name}, type={type}") | |
| draft = session_manager.create_draft(project_name, description, type) | |
| result = { | |
| "draft_id": draft.draft_id, | |
| "config": { | |
| "name": draft.name, | |
| "description": draft.description, | |
| "files": list(draft.code_files.keys()) | |
| }, | |
| "message": f"Project '{project_name}' initialized. Draft ID: {draft.draft_id}" | |
| } | |
| print(f"DEBUG [init_project]: result={result}") | |
| return result | |
| def propose_implementation(project_name: str, description: str) -> Dict[str, Any]: | |
| """ | |
| Uses internal AI to propose a complete implementation from a description or Swagger. | |
| Args: | |
| project_name: The project name. | |
| description: The description or Swagger/OpenAPI JSON. | |
| Returns: | |
| A dictionary containing the proposed Python code, detected inputs, and requirements. | |
| The calling agent can then validate or modify this code before calling define_logic. | |
| """ | |
| print(f"DEBUG [propose_implementation]: project_name={project_name}") | |
| try: | |
| proposal = proposal_generator.generate_from_description(project_name, description) | |
| result = { | |
| "status": "success", | |
| "proposal": proposal, | |
| "message": "Implementation proposed. Please review 'python_code' and 'requirements' before calling define_logic." | |
| } | |
| print(f"DEBUG [propose_implementation]: result={result.keys()}") | |
| return result | |
| except Exception as e: | |
| print(f"DEBUG [propose_implementation]: error={str(e)}") | |
| return {"error": f"Error during generation: {str(e)}"} | |
| def define_logic(draft_id: str, python_code: str, inputs: Union[Dict[str, str], str], output_desc: str, requirements: str = "", output_component: str = "text") -> Dict[str, Any]: | |
| """ | |
| Defines the internal logic of the tool. | |
| Args: | |
| inputs: Dictionary of inputs (e.g. {"word": "text"}). Can be a JSON string. | |
| output_component: Output Gradio component type (text, image, audio, video, html, json, file). | |
| """ | |
| print(f"DEBUG [define_logic]: draft_id={draft_id}, output_component={output_component}") | |
| draft = session_manager.get_draft(draft_id) | |
| if not draft: | |
| print(f"DEBUG [define_logic]: Draft not found") | |
| return {"error": f"Draft {draft_id} not found."} | |
| # Gestion des inputs (Dict ou JSON String) | |
| if isinstance(inputs, str): | |
| try: | |
| inputs_dict = json.loads(inputs) | |
| except json.JSONDecodeError: | |
| print(f"DEBUG [define_logic]: Invalid JSON inputs: {inputs}") | |
| return {"error": "inputs must be a valid JSON string or dictionary"} | |
| else: | |
| inputs_dict = inputs | |
| # 1. Génération du module de l'outil (ex: tools/strawberry_counter.py) | |
| # On utilise le nom du projet comme nom de fichier (nettoyé) | |
| tool_filename = draft.name.replace("-", "_").lower() | |
| tool_module_code = CodeGenerator.generate_tool_module(python_code, inputs_dict, output_desc, draft.name, output_component) | |
| # 2. Génération de l'application maître (app.py) | |
| master_app_code = CodeGenerator.generate_master_app() | |
| # Sauvegarde dans le draft | |
| # On place l'outil dans un sous-dossier 'tools' | |
| session_manager.update_code(draft_id, f"tools/{tool_filename}.py", tool_module_code) | |
| session_manager.update_code(draft_id, "tools/__init__.py", "") # Package marker | |
| session_manager.update_code(draft_id, "app.py", master_app_code) | |
| # Mise à jour des requirements | |
| current_reqs = draft.code_files.get("requirements.txt", "") | |
| new_reqs = current_reqs | |
| # Ajout de gradio si manquant | |
| if "gradio" not in new_reqs: | |
| new_reqs += "\ngradio" | |
| # Ajout des requirements spécifiques demandés par le LLM | |
| if requirements: | |
| # requirements peut être une liste ou une chaine (si via UI Textbox) | |
| if isinstance(requirements, list): | |
| req_list = requirements | |
| elif isinstance(requirements, str): | |
| requirements = requirements.strip() | |
| # Tentative de parsing JSON (cas où ça vient de gr.Code/JSON) | |
| try: | |
| parsed = json.loads(requirements) | |
| if isinstance(parsed, list): | |
| req_list = parsed | |
| else: | |
| # Si c'est un JSON valide mais pas une liste, on considère comme string unique | |
| req_list = [str(parsed)] | |
| except json.JSONDecodeError: | |
| # Fallback: format CSV classique "req1, req2" | |
| req_list = [r.strip() for r in requirements.split(",") if r.strip()] | |
| else: | |
| req_list = [] | |
| for req in req_list: | |
| req_clean = str(req).strip() | |
| if req_clean and req_clean not in new_reqs: | |
| new_reqs += f"\n{req_clean}" | |
| draft.code_files["requirements.txt"] = new_reqs.strip() | |
| return { | |
| "status": "success", | |
| "message": f"Logic generated for '{draft.name}'. Ready to deploy.", | |
| "preview": tool_module_code[:200] + "..." | |
| } | |
| def deploy_to_space(draft_id: str, visibility: str = "public", space_target: str = "new", target_space_name: str = "") -> Dict[str, Any]: | |
| """ | |
| Deploys the project to Hugging Face Spaces. | |
| """ | |
| print(f"DEBUG [deploy_to_space]: draft_id={draft_id}, target={space_target}, name={target_space_name}") | |
| draft = session_manager.get_draft(draft_id) | |
| if not draft: | |
| return {"error": f"Draft {draft_id} not found."} | |
| deployer = HFDeployer() | |
| # Logique de détermination de la cible (Toolbox Centralisée vs Nouveau Space) | |
| default_space_env = os.environ.get("DEFAULT_SPACE") | |
| if target_space_name: | |
| final_space_name = target_space_name | |
| elif default_space_env: | |
| final_space_name = default_space_env | |
| print(f"DEBUG: Using DEFAULT_SPACE env var: {final_space_name}") | |
| space_target = "existing" | |
| else: | |
| final_space_name = draft.name | |
| # Filtrage des fichiers à déployer | |
| files_to_deploy = draft.code_files.copy() | |
| # Si on ajoute à un space existant, on n'écrase pas le loader principal (app.py) | |
| if space_target == "existing": | |
| #if "app.py" in files_to_deploy: | |
| # del files_to_deploy["app.py"] | |
| # Fusion intelligente des requirements | |
| if "requirements.txt" in files_to_deploy: | |
| new_reqs = set(files_to_deploy["requirements.txt"].splitlines()) | |
| # Reconstruction du repo_id complet si nécessaire | |
| repo_id_to_fetch = final_space_name | |
| if "/" not in repo_id_to_fetch: | |
| hf_user = os.environ.get("HF_USER") | |
| if not hf_user: | |
| # Tentative de récupération du user via l'API si non configuré | |
| try: | |
| user_info = deployer.api.whoami() | |
| if user_info and "name" in user_info: | |
| hf_user = user_info["name"] | |
| except: | |
| pass | |
| if hf_user: | |
| repo_id_to_fetch = f"{hf_user}/{final_space_name}" | |
| import requests | |
| existing_reqs = set() | |
| fetch_success = False | |
| # Méthode 1: Via API HuggingFace (hf_hub_download) | |
| try: | |
| print(f"DEBUG: Tentative de récupération des requirements via API sur {repo_id_to_fetch}...") | |
| cached_path = hf_hub_download( | |
| repo_id=repo_id_to_fetch, | |
| filename="requirements.txt", | |
| repo_type="space", | |
| token=deployer.token | |
| ) | |
| with open(cached_path, 'r') as f: | |
| existing_reqs = set(f.read().splitlines()) | |
| fetch_success = True | |
| print(f"DEBUG: Requirements récupérés via API ({len(existing_reqs)} items).") | |
| except (EntryNotFoundError, RepositoryNotFoundError): | |
| print("DEBUG: Pas de requirements.txt via API (404/Not Found).") | |
| except Exception as e: | |
| print(f"DEBUG: Erreur API lors de la récupération requirements: {e}") | |
| # Méthode 2: Via URL directe (Fallback "Raw") | |
| # Utile si l'API échoue ou si le cache local est incohérent | |
| if not fetch_success and "/" in repo_id_to_fetch: | |
| try: | |
| raw_url = f"https://huggingface.co/spaces/{repo_id_to_fetch}/resolve/main/requirements.txt" | |
| print(f"DEBUG: Tentative de récupération via URL Raw: {raw_url}") | |
| headers = {} | |
| if deployer.token: | |
| headers["Authorization"] = f"Bearer {deployer.token}" | |
| resp = requests.get(raw_url, headers=headers) | |
| if resp.status_code == 200: | |
| existing_reqs = set(resp.text.splitlines()) | |
| fetch_success = True | |
| print(f"DEBUG: Requirements récupérés via URL Raw ({len(existing_reqs)} items).") | |
| elif resp.status_code == 404: | |
| print("DEBUG: requirements.txt non trouvé via URL Raw (404).") | |
| else: | |
| print(f"DEBUG: Erreur HTTP {resp.status_code} lors de la récupération via URL Raw.") | |
| except Exception as e: | |
| print(f"DEBUG: Exception lors de la récupération via URL Raw: {e}") | |
| # Fusion finale | |
| if fetch_success or existing_reqs: | |
| merged_reqs = existing_reqs.union(new_reqs) | |
| # Nettoyage | |
| cleaned_reqs = sorted([r.strip() for r in merged_reqs if r.strip()]) | |
| files_to_deploy["requirements.txt"] = "\n".join(cleaned_reqs) | |
| print(f"DEBUG: Fusion terminée. Total requirements: {len(cleaned_reqs)}") | |
| else: | |
| print("DEBUG: Aucun requirements existant trouvé, déploiement des nouveaux uniquement.") | |
| try: | |
| url = deployer.deploy_space( | |
| space_name=final_space_name, | |
| files=files_to_deploy, | |
| sdk="gradio", | |
| private=(visibility == "private") | |
| ) | |
| mode_msg = "added to toolbox" if space_target == "existing" else "deployed (new space)" | |
| # Standard MCP URL for Gradio | |
| mcp_endpoint = url.rstrip("/") + "/gradio_api/mcp/" | |
| # Nom du serveur pour la config Claude (nom du Space sans le username) | |
| # Ex: alihmaou/mymcpserver -> mymcpserver | |
| if "/" in final_space_name: | |
| server_name = final_space_name.split("/")[-1] | |
| else: | |
| server_name = final_space_name | |
| # Configuration pour Claude Desktop utilisant mcp-remote (via npx) | |
| claude_config = f""" | |
| {{ | |
| "mcpServers": {{ | |
| "{server_name}": {{ | |
| "command": "npx", | |
| "args": [ | |
| "mcp-remote", | |
| "{mcp_endpoint}", | |
| "--transport", | |
| "streamable-http" | |
| ] | |
| }} | |
| }} | |
| }} | |
| """ | |
| return { | |
| "status": "success", | |
| "url": url, | |
| "instructions": f"Tool '{draft.name}' {mode_msg} !", | |
| "claude_config": claude_config | |
| } | |
| except Exception as e: | |
| return {"error": f"Deployment error: {str(e)}"} | |
| def delete_tool(space_name: str, tool_name: str) -> Dict[str, Any]: | |
| """ | |
| Deletes a tool from an existing Space. | |
| Args: | |
| space_name: Full Space name (e.g. user/space) or short name (if HF_USER configured). | |
| tool_name: Tool name (e.g. strawberry_counter). | |
| """ | |
| deployer = HFDeployer() | |
| api = HfApi(token=deployer.token) | |
| # Repo name resolution | |
| repo_id = space_name | |
| if "/" not in repo_id: | |
| hf_user = os.environ.get("HF_USER") | |
| if hf_user: | |
| repo_id = f"{hf_user}/{space_name}" | |
| file_path = f"tools/{tool_name}.py" | |
| try: | |
| print(f"DEBUG [delete_tool]: Deleting {file_path} from {repo_id}") | |
| api.delete_file( | |
| path_in_repo=file_path, | |
| repo_id=repo_id, | |
| repo_type="space", | |
| commit_message=f"Delete tool {tool_name} via Meta-MCP" | |
| ) | |
| return {"status": "success", "message": f"Tool '{tool_name}' deleted from '{repo_id}'."} | |
| except Exception as e: | |
| print(f"DEBUG [delete_tool]: Error: {e}") | |
| return {"error": f"Error during deletion: {str(e)}"} | |
| def get_tool_code(space_name: str, tool_name: str) -> Dict[str, Any]: | |
| """ | |
| Retrieves the source code of an existing tool. | |
| Args: | |
| space_name: Full Space name (e.g. user/space). | |
| tool_name: Tool name. | |
| """ | |
| deployer = HFDeployer() | |
| repo_id = space_name | |
| if "/" not in repo_id: | |
| hf_user = os.environ.get("HF_USER") | |
| if hf_user: | |
| repo_id = f"{hf_user}/{space_name}" | |
| filename = f"tools/{tool_name}.py" | |
| try: | |
| print(f"DEBUG [get_tool_code]: Fetching {filename} from {repo_id}") | |
| path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=filename, | |
| repo_type="space", | |
| token=deployer.token | |
| ) | |
| with open(path, "r") as f: | |
| code = f.read() | |
| return {"status": "success", "code": code} | |
| except Exception as e: | |
| print(f"DEBUG [get_tool_code]: Error: {e}") | |
| return {"error": f"Error reading code: {str(e)}"} | |