File size: 4,651 Bytes
1b8d07e
 
 
 
 
 
 
a0a02f2
 
 
1b8d07e
 
 
a0a02f2
1b8d07e
 
 
 
a0a02f2
1b8d07e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a0a02f2
1b8d07e
 
a0a02f2
 
 
 
 
1b8d07e
 
a0a02f2
1b8d07e
 
a0a02f2
1b8d07e
 
 
 
a0a02f2
1b8d07e
 
a0a02f2
1b8d07e
a0a02f2
1b8d07e
 
 
 
 
 
 
 
a0a02f2
1b8d07e
a0a02f2
1b8d07e
a0a02f2
1b8d07e
 
a0a02f2
1b8d07e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a0a02f2
1b8d07e
 
 
 
 
 
 
 
a0a02f2
1b8d07e
a0a02f2
1b8d07e
a0a02f2
 
1b8d07e
 
a0a02f2
1b8d07e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
from typing import Dict, Optional
from huggingface_hub import HfApi, get_token

class HFDeployer:
    def __init__(self, token: Optional[str] = None):
        """
        Initializes the Hugging Face deployer.
        If token is None, tries to retrieve it from HF_TOKEN environment variable
        or local cache.
        """
        self.token = token or os.environ.get("HF_TOKEN") or get_token()
        if not self.token:
            raise ValueError("No Hugging Face token found. Please set HF_TOKEN.")
        
        self.api = HfApi(token=self.token)

    def _sanitize_repo_id(self, input_name: str, current_username: str) -> str:
        """Sanitizes the repo/space name to handle URLs and partial formats."""
        input_name = input_name.strip()
        
        # Cas URL complète : https://huggingface.co/spaces/user/repo
        if "huggingface.co" in input_name:
            parts = input_name.split("huggingface.co/")
            if len(parts) > 1:
                path = parts[1]
                # Retire 'spaces/' si présent
                if path.startswith("spaces/"):
                    path = path[7:]
                # Retire le slash final
                return path.rstrip("/")
        
        # Cas user/repo
        if "/" in input_name:
            return input_name
            
        # Cas repo seul -> user/repo
        return f"{current_username}/{input_name}"

    def deploy_space(self, 
                     space_name: str, 
                     files: Dict[str, str], 
                     username: Optional[str] = None,
                     sdk: str = "gradio", 
                     private: bool = False) -> str:
        """
        Creates a Space and deploys files.
        
        Args:
            space_name: Space name (e.g. 'strawberry-counter')
            files: Dictionary {filename: content} (e.g. {'app.py': '...'})
            username: Target username or organization. If None, uses current user.
            sdk: 'gradio', 'streamlit', or 'docker'
            private: If True, creates a private repo
            
        Returns:
            The deployed Space URL.
        """
        
        # 1. Determine full repo_id
        if not username:
            user_info = self.api.whoami()
            username = user_info["name"]
        
        # Use sanitization method
        repo_id = self._sanitize_repo_id(space_name, username)
        
        print(f"🚀 Preparing deployment to {repo_id}...")

        # 2. Repo creation (idempotent: does not crash if already exists)
        try:
            self.api.create_repo(
                repo_id=repo_id,
                repo_type="space",
                space_sdk=sdk,
                private=private,
                exist_ok=True
            )
            print(f"✅ Repo {repo_id} ready.")
        except Exception as e:
            raise RuntimeError(f"Error creating repo: {str(e)}")

        # 3. File upload
        operations = []
        for filename, content in files.items():
            # Encode content to bytes for upload
            content_bytes = content.encode("utf-8")
            operations.append(
                self.api.run_as_future(
                    self.api.upload_file,
                    path_or_fileobj=content_bytes,
                    path_in_repo=filename,
                    repo_id=repo_id,
                    repo_type="space"
                )
            )

        # Note: Pour simplifier ici on fait séquentiel ou on utilise upload_file direct.
        # Pour un vrai batch, commit_operation serait mieux, mais upload_file est simple pour démarrer.
        # Re-implémentation propre avec upload_file direct pour éviter complexité async pour l'instant
        
        try:
            for filename, content in files.items():
                print(f"📤 Uploading {filename}...")
                content_bytes = content.encode("utf-8")
                self.api.upload_file(
                    path_or_fileobj=content_bytes,
                    path_in_repo=filename,
                    repo_id=repo_id,
                    repo_type="space",
                    commit_message=f"Deploy {filename} via Meta-MCP"
                )
            print("✅ All files uploaded.")
        except Exception as e:
             raise RuntimeError(f"Error uploading files: {str(e)}")

        # 4. URL Construction
        # Standard URL is https://huggingface.co/spaces/USERNAME/SPACE_NAME
        space_url = f"https://huggingface.co/spaces/{repo_id}"
        
        print(f"🎉 Deployment finished! Space accessible here: {space_url}")
        return space_url