Ali Hmaou
Version 1.94RC
a0a02f2
from typing import Dict, Any, Union
import json
import os
from huggingface_hub import hf_hub_download, HfApi
from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError
from src.core.state.session_manager import SessionManager
from src.core.builder.code_generator import CodeGenerator
from src.core.deployer.huggingface import HFDeployer
from src.core.builder.proposal_generator import proposal_generator
# Initialisation des singletons
session_manager = SessionManager()
# Note: HFDeployer est instancié à la demande pour avoir le token le plus à jour ou géré par contexte si besoin
# Pour l'instant on l'instancie à chaque déploiement.
def init_project(project_name: str, description: str, type: str = "adhoc") -> Dict[str, Any]:
"""
Creates a new empty project.
Args:
project_name: Technical name (e.g. strawberry-counter, ratp-api).
description: Tool description, or complete Technical Specification (e.g. content of a Swagger/OpenAPI JSON).
type: 'adhoc' (pure code), 'api_wrapper' (REST).
Returns:
A dictionary containing the 'draft_id' required for next steps.
"""
print(f"DEBUG [init_project]: project_name={project_name}, type={type}")
draft = session_manager.create_draft(project_name, description, type)
result = {
"draft_id": draft.draft_id,
"config": {
"name": draft.name,
"description": draft.description,
"files": list(draft.code_files.keys())
},
"message": f"Project '{project_name}' initialized. Draft ID: {draft.draft_id}"
}
print(f"DEBUG [init_project]: result={result}")
return result
def propose_implementation(project_name: str, description: str) -> Dict[str, Any]:
"""
Uses internal AI to propose a complete implementation from a description or Swagger.
Args:
project_name: The project name.
description: The description or Swagger/OpenAPI JSON.
Returns:
A dictionary containing the proposed Python code, detected inputs, and requirements.
The calling agent can then validate or modify this code before calling define_logic.
"""
print(f"DEBUG [propose_implementation]: project_name={project_name}")
try:
proposal = proposal_generator.generate_from_description(project_name, description)
result = {
"status": "success",
"proposal": proposal,
"message": "Implementation proposed. Please review 'python_code' and 'requirements' before calling define_logic."
}
print(f"DEBUG [propose_implementation]: result={result.keys()}")
return result
except Exception as e:
print(f"DEBUG [propose_implementation]: error={str(e)}")
return {"error": f"Error during generation: {str(e)}"}
def define_logic(draft_id: str, python_code: str, inputs: Union[Dict[str, str], str], output_desc: str, requirements: str = "", output_component: str = "text") -> Dict[str, Any]:
"""
Defines the internal logic of the tool.
Args:
inputs: Dictionary of inputs (e.g. {"word": "text"}). Can be a JSON string.
output_component: Output Gradio component type (text, image, audio, video, html, json, file).
"""
print(f"DEBUG [define_logic]: draft_id={draft_id}, output_component={output_component}")
draft = session_manager.get_draft(draft_id)
if not draft:
print(f"DEBUG [define_logic]: Draft not found")
return {"error": f"Draft {draft_id} not found."}
# Gestion des inputs (Dict ou JSON String)
if isinstance(inputs, str):
try:
inputs_dict = json.loads(inputs)
except json.JSONDecodeError:
print(f"DEBUG [define_logic]: Invalid JSON inputs: {inputs}")
return {"error": "inputs must be a valid JSON string or dictionary"}
else:
inputs_dict = inputs
# 1. Génération du module de l'outil (ex: tools/strawberry_counter.py)
# On utilise le nom du projet comme nom de fichier (nettoyé)
tool_filename = draft.name.replace("-", "_").lower()
tool_module_code = CodeGenerator.generate_tool_module(python_code, inputs_dict, output_desc, draft.name, output_component)
# 2. Génération de l'application maître (app.py)
master_app_code = CodeGenerator.generate_master_app()
# Sauvegarde dans le draft
# On place l'outil dans un sous-dossier 'tools'
session_manager.update_code(draft_id, f"tools/{tool_filename}.py", tool_module_code)
session_manager.update_code(draft_id, "tools/__init__.py", "") # Package marker
session_manager.update_code(draft_id, "app.py", master_app_code)
# Mise à jour des requirements
current_reqs = draft.code_files.get("requirements.txt", "")
new_reqs = current_reqs
# Ajout de gradio si manquant
if "gradio" not in new_reqs:
new_reqs += "\ngradio"
# Ajout des requirements spécifiques demandés par le LLM
if requirements:
# requirements peut être une liste ou une chaine (si via UI Textbox)
if isinstance(requirements, list):
req_list = requirements
elif isinstance(requirements, str):
requirements = requirements.strip()
# Tentative de parsing JSON (cas où ça vient de gr.Code/JSON)
try:
parsed = json.loads(requirements)
if isinstance(parsed, list):
req_list = parsed
else:
# Si c'est un JSON valide mais pas une liste, on considère comme string unique
req_list = [str(parsed)]
except json.JSONDecodeError:
# Fallback: format CSV classique "req1, req2"
req_list = [r.strip() for r in requirements.split(",") if r.strip()]
else:
req_list = []
for req in req_list:
req_clean = str(req).strip()
if req_clean and req_clean not in new_reqs:
new_reqs += f"\n{req_clean}"
draft.code_files["requirements.txt"] = new_reqs.strip()
return {
"status": "success",
"message": f"Logic generated for '{draft.name}'. Ready to deploy.",
"preview": tool_module_code[:200] + "..."
}
def deploy_to_space(draft_id: str, visibility: str = "public", space_target: str = "new", target_space_name: str = "") -> Dict[str, Any]:
"""
Deploys the project to Hugging Face Spaces.
"""
print(f"DEBUG [deploy_to_space]: draft_id={draft_id}, target={space_target}, name={target_space_name}")
draft = session_manager.get_draft(draft_id)
if not draft:
return {"error": f"Draft {draft_id} not found."}
deployer = HFDeployer()
# Logique de détermination de la cible (Toolbox Centralisée vs Nouveau Space)
default_space_env = os.environ.get("DEFAULT_SPACE")
if target_space_name:
final_space_name = target_space_name
elif default_space_env:
final_space_name = default_space_env
print(f"DEBUG: Using DEFAULT_SPACE env var: {final_space_name}")
space_target = "existing"
else:
final_space_name = draft.name
# Filtrage des fichiers à déployer
files_to_deploy = draft.code_files.copy()
# Si on ajoute à un space existant, on n'écrase pas le loader principal (app.py)
if space_target == "existing":
#if "app.py" in files_to_deploy:
# del files_to_deploy["app.py"]
# Fusion intelligente des requirements
if "requirements.txt" in files_to_deploy:
new_reqs = set(files_to_deploy["requirements.txt"].splitlines())
# Reconstruction du repo_id complet si nécessaire
repo_id_to_fetch = final_space_name
if "/" not in repo_id_to_fetch:
hf_user = os.environ.get("HF_USER")
if not hf_user:
# Tentative de récupération du user via l'API si non configuré
try:
user_info = deployer.api.whoami()
if user_info and "name" in user_info:
hf_user = user_info["name"]
except:
pass
if hf_user:
repo_id_to_fetch = f"{hf_user}/{final_space_name}"
import requests
existing_reqs = set()
fetch_success = False
# Méthode 1: Via API HuggingFace (hf_hub_download)
try:
print(f"DEBUG: Tentative de récupération des requirements via API sur {repo_id_to_fetch}...")
cached_path = hf_hub_download(
repo_id=repo_id_to_fetch,
filename="requirements.txt",
repo_type="space",
token=deployer.token
)
with open(cached_path, 'r') as f:
existing_reqs = set(f.read().splitlines())
fetch_success = True
print(f"DEBUG: Requirements récupérés via API ({len(existing_reqs)} items).")
except (EntryNotFoundError, RepositoryNotFoundError):
print("DEBUG: Pas de requirements.txt via API (404/Not Found).")
except Exception as e:
print(f"DEBUG: Erreur API lors de la récupération requirements: {e}")
# Méthode 2: Via URL directe (Fallback "Raw")
# Utile si l'API échoue ou si le cache local est incohérent
if not fetch_success and "/" in repo_id_to_fetch:
try:
raw_url = f"https://huggingface.co/spaces/{repo_id_to_fetch}/resolve/main/requirements.txt"
print(f"DEBUG: Tentative de récupération via URL Raw: {raw_url}")
headers = {}
if deployer.token:
headers["Authorization"] = f"Bearer {deployer.token}"
resp = requests.get(raw_url, headers=headers)
if resp.status_code == 200:
existing_reqs = set(resp.text.splitlines())
fetch_success = True
print(f"DEBUG: Requirements récupérés via URL Raw ({len(existing_reqs)} items).")
elif resp.status_code == 404:
print("DEBUG: requirements.txt non trouvé via URL Raw (404).")
else:
print(f"DEBUG: Erreur HTTP {resp.status_code} lors de la récupération via URL Raw.")
except Exception as e:
print(f"DEBUG: Exception lors de la récupération via URL Raw: {e}")
# Fusion finale
if fetch_success or existing_reqs:
merged_reqs = existing_reqs.union(new_reqs)
# Nettoyage
cleaned_reqs = sorted([r.strip() for r in merged_reqs if r.strip()])
files_to_deploy["requirements.txt"] = "\n".join(cleaned_reqs)
print(f"DEBUG: Fusion terminée. Total requirements: {len(cleaned_reqs)}")
else:
print("DEBUG: Aucun requirements existant trouvé, déploiement des nouveaux uniquement.")
try:
url = deployer.deploy_space(
space_name=final_space_name,
files=files_to_deploy,
sdk="gradio",
private=(visibility == "private")
)
mode_msg = "added to toolbox" if space_target == "existing" else "deployed (new space)"
# Standard MCP URL for Gradio
mcp_endpoint = url.rstrip("/") + "/gradio_api/mcp/"
# Nom du serveur pour la config Claude (nom du Space sans le username)
# Ex: alihmaou/mymcpserver -> mymcpserver
if "/" in final_space_name:
server_name = final_space_name.split("/")[-1]
else:
server_name = final_space_name
# Configuration pour Claude Desktop utilisant mcp-remote (via npx)
claude_config = f"""
{{
"mcpServers": {{
"{server_name}": {{
"command": "npx",
"args": [
"mcp-remote",
"{mcp_endpoint}",
"--transport",
"streamable-http"
]
}}
}}
}}
"""
return {
"status": "success",
"url": url,
"instructions": f"Tool '{draft.name}' {mode_msg} !",
"claude_config": claude_config
}
except Exception as e:
return {"error": f"Deployment error: {str(e)}"}
def delete_tool(space_name: str, tool_name: str) -> Dict[str, Any]:
"""
Deletes a tool from an existing Space.
Args:
space_name: Full Space name (e.g. user/space) or short name (if HF_USER configured).
tool_name: Tool name (e.g. strawberry_counter).
"""
deployer = HFDeployer()
api = HfApi(token=deployer.token)
# Repo name resolution
repo_id = space_name
if "/" not in repo_id:
hf_user = os.environ.get("HF_USER")
if hf_user:
repo_id = f"{hf_user}/{space_name}"
file_path = f"tools/{tool_name}.py"
try:
print(f"DEBUG [delete_tool]: Deleting {file_path} from {repo_id}")
api.delete_file(
path_in_repo=file_path,
repo_id=repo_id,
repo_type="space",
commit_message=f"Delete tool {tool_name} via Meta-MCP"
)
return {"status": "success", "message": f"Tool '{tool_name}' deleted from '{repo_id}'."}
except Exception as e:
print(f"DEBUG [delete_tool]: Error: {e}")
return {"error": f"Error during deletion: {str(e)}"}
def get_tool_code(space_name: str, tool_name: str) -> Dict[str, Any]:
"""
Retrieves the source code of an existing tool.
Args:
space_name: Full Space name (e.g. user/space).
tool_name: Tool name.
"""
deployer = HFDeployer()
repo_id = space_name
if "/" not in repo_id:
hf_user = os.environ.get("HF_USER")
if hf_user:
repo_id = f"{hf_user}/{space_name}"
filename = f"tools/{tool_name}.py"
try:
print(f"DEBUG [get_tool_code]: Fetching {filename} from {repo_id}")
path = hf_hub_download(
repo_id=repo_id,
filename=filename,
repo_type="space",
token=deployer.token
)
with open(path, "r") as f:
code = f.read()
return {"status": "success", "code": code}
except Exception as e:
print(f"DEBUG [get_tool_code]: Error: {e}")
return {"error": f"Error reading code: {str(e)}"}