Spaces:
Sleeping
Sleeping
File size: 5,012 Bytes
9e62f55 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | import os
import json
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError
DATASET_REPO_ID = "NextGenTech/geneticWFM-activities"
# --- ENVIRONMENT DETECTION ---
# Switch dinamico Dev/Prod basato sull'injection di SPACE_ID da parte del container HF
IS_LOCAL = os.environ.get("SPACE_ID") is None
# Risoluzione dinamica della project root per gestire l'I/O in local fallback
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
LOCAL_BASE_PATH = os.path.join(PROJECT_ROOT, "data", "activities")
def get_hf_api():
"""Istanzia il client HF sfruttando i secrets di environment."""
return HfApi(token=os.environ.get("HF_TOKEN"))
def list_activities():
"""
Discovery dinamica dei workspace operativi.
Implementa routing Dev (OS Locale) / Prod (HF Datasets API).
"""
if IS_LOCAL:
if not os.path.exists(LOCAL_BASE_PATH):
return []
return sorted([d for d in os.listdir(LOCAL_BASE_PATH) if os.path.isdir(os.path.join(LOCAL_BASE_PATH, d))])
# --- PROD LOGIC ---
api = get_hf_api()
try:
files = api.list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset")
activities = set()
for f in files:
parts = f.split("/")
if len(parts) > 2 and parts[0] == "activities":
activities.add(parts[1])
return sorted(list(activities))
except Exception as e:
print(f"[ERROR] Discovery attività fallita sul layer HF: {e}")
return []
def load_json(activity_name, filename):
"""
I/O Read: Deserializzazione del payload JSON.
"""
if IS_LOCAL:
path = os.path.join(LOCAL_BASE_PATH, activity_name, filename)
if os.path.exists(path):
with open(path, 'r') as f:
return json.load(f)
return None
# --- PROD LOGIC ---
repo_path = f"activities/{activity_name}/{filename}"
try:
downloaded_path = hf_hub_download(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
filename=repo_path,
token=os.environ.get("HF_TOKEN")
)
with open(downloaded_path, 'r') as f:
return json.load(f)
except EntryNotFoundError:
# Silenziamo l'errore se il file semplicemente non esiste ancora
return None
except Exception as e:
print(f"[ERROR] API Download fallito per {repo_path}: {e}")
return None
def save_json(activity_name, filename, data):
"""
I/O Write: Serializzazione su disco e successiva replica su object storage.
"""
if IS_LOCAL:
path = os.path.join(LOCAL_BASE_PATH, activity_name, filename)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
json.dump(data, f, indent=2)
return
# --- PROD LOGIC ---
repo_path = f"activities/{activity_name}/{filename}"
local_tmp_path = f"/tmp/{activity_name}_{filename}"
# Scrittura su layer effimero del container (/tmp)
os.makedirs(os.path.dirname(local_tmp_path), exist_ok=True)
with open(local_tmp_path, 'w') as f:
json.dump(data, f, indent=2)
api = get_hf_api()
api.upload_file(
path_or_fileobj=local_tmp_path,
path_in_repo=repo_path,
repo_id=DATASET_REPO_ID,
repo_type="dataset",
commit_message=f"Sync {filename} -> {activity_name}"
)
def upload_new_scenario(activity_name, activity_conf, employees, weekly_demand):
"""
Commit massivo (atomic push) di un nuovo scenario generato.
Evita commit parziali raggruppando Config, Anagrafica e Demand.
"""
if IS_LOCAL:
base_path = os.path.join(LOCAL_BASE_PATH, activity_name)
os.makedirs(base_path, exist_ok=True)
with open(os.path.join(base_path, "activity_config.json"), 'w') as f:
json.dump(activity_conf, f, indent=2)
with open(os.path.join(base_path, "employees.json"), 'w') as f:
json.dump(employees, f, indent=2)
with open(os.path.join(base_path, "demand.json"), 'w') as f:
json.dump(weekly_demand, f, indent=2)
return
# --- PROD LOGIC ---
api = get_hf_api()
local_dir = f"/tmp/activities/{activity_name}"
os.makedirs(local_dir, exist_ok=True)
# Scrittura layer effimero
with open(os.path.join(local_dir, "activity_config.json"), 'w') as f:
json.dump(activity_conf, f, indent=2)
with open(os.path.join(local_dir, "employees.json"), 'w') as f:
json.dump(employees, f, indent=2)
with open(os.path.join(local_dir, "demand.json"), 'w') as f:
json.dump(weekly_demand, f, indent=2)
# Push massivo della cartella temporanea
api.upload_folder(
folder_path=local_dir,
path_in_repo=f"activities/{activity_name}",
repo_id=DATASET_REPO_ID,
repo_type="dataset",
commit_message=f"Init workspace: {activity_name}"
) |