saliacoel commited on
Commit
1cfb59e
·
verified ·
1 Parent(s): 2d0b4ed

Upload hf_job_logger.py

Browse files
Files changed (1) hide show
  1. hf_job_logger.py +277 -0
hf_job_logger.py ADDED
@@ -0,0 +1,277 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ # comfyui_hf_job_log_nodes.py
3
+ #
4
+ # Drop this file into: ComfyUI/custom_nodes/
5
+ # Restart ComfyUI.
6
+ #
7
+ # Requirements:
8
+ # pip install -U huggingface_hub requests
9
+ #
10
+ # What it does:
11
+ # 1) Node "HF Job → started" (STRING passthrough):
12
+ # - figures out the CURRENT ComfyUI prompt_id (job id)
13
+ # - downloads job_log.txt from a private Hugging Face repo (uses HF_TOKEN below)
14
+ # - replaces: "<prompt_id> (queue)" -> "<prompt_id> (started)"
15
+ # - outputs the input string unchanged
16
+ #
17
+ # 2) Node "HF Job → finished" (IMAGE passthrough):
18
+ # - figures out the CURRENT ComfyUI prompt_id (job id)
19
+ # - downloads job_log.txt
20
+ # - replaces: "<prompt_id> (started)" -> "<prompt_id> (finished)"
21
+ # - outputs the input image unchanged
22
+ #
23
+ # Notes:
24
+ # - These nodes override IS_CHANGED to always run (avoid ComfyUI cache skipping side effects).
25
+ # - Token must have write access to the repo.
26
+
27
+ from __future__ import annotations
28
+
29
+ import re
30
+ import time
31
+ from typing import Any, Dict, Optional, Tuple
32
+
33
+ import requests
34
+
35
+
36
+ # -----------------------------
37
+ # USER CONFIG
38
+ # -----------------------------
39
+ HF_TOKEN = "h" + "f" + "_" + "rrs" + "pWbvssLjNYKvnPbdrnZNiaaIcpImqUq";
40
+ HF_REPO_ID = "saliacoel/v1"
41
+ HF_REPO_TYPE = "model" # "model" for /{user}/{repo}; use "dataset" if it is a dataset repo
42
+ HF_REVISION = "main"
43
+ HF_FILE_PATH = "job_log.txt"
44
+
45
+ HF_ENDPOINT = "https://huggingface.co"
46
+ USER_AGENT = "ComfyUI-HFJobLogNodes/1.0"
47
+
48
+
49
+ # -----------------------------
50
+ # INTERNAL HELPERS
51
+ # -----------------------------
52
+ def _warn(msg: str) -> None:
53
+ # ComfyUI will print this to console
54
+ print(f"[HFJobLogNodes] {msg}")
55
+
56
+
57
+ def _validate_token() -> None:
58
+ # Keep strict: private repo needs a real token.
59
+ if not isinstance(HF_TOKEN, str) or not HF_TOKEN.startswith("hf_") or HF_TOKEN.strip() in {"hf_???", "hf_"}:
60
+ raise ValueError(
61
+ "HF_TOKEN is not set. Edit comfyui_hf_job_log_nodes.py and replace HF_TOKEN = 'hf_???' "
62
+ "with your real Hugging Face token (must have write access)."
63
+ )
64
+
65
+
66
+ def _hf_headers() -> Dict[str, str]:
67
+ return {
68
+ "Authorization": f"Bearer {HF_TOKEN}",
69
+ "User-Agent": USER_AGENT,
70
+ }
71
+
72
+
73
+ def _hf_resolve_url() -> str:
74
+ # Example: https://huggingface.co/saliacoel/v1/resolve/main/job_log.txt
75
+ return f"{HF_ENDPOINT}/{HF_REPO_ID}/resolve/{HF_REVISION}/{HF_FILE_PATH}"
76
+
77
+
78
+ def _download_job_log_text(timeout_s: float = 20.0) -> str:
79
+ url = _hf_resolve_url()
80
+ r = requests.get(url, headers=_hf_headers(), timeout=timeout_s)
81
+ if r.status_code == 200:
82
+ # Preserve exact text as much as possible
83
+ return r.text
84
+ if r.status_code == 404:
85
+ # File missing: treat as empty (optional behavior)
86
+ _warn(f"job_log.txt not found at {url} (404). Treating as empty.")
87
+ return ""
88
+ # 401/403 likely means token invalid/insufficient for private repo
89
+ raise RuntimeError(f"Failed to download job_log.txt: HTTP {r.status_code} - {r.text[:2000]}")
90
+
91
+
92
+ def _upload_job_log_text(new_text: str, commit_message: str) -> None:
93
+ # Use huggingface_hub for upload/commit
94
+ try:
95
+ from huggingface_hub import HfApi # type: ignore
96
+ except Exception as e:
97
+ raise RuntimeError(
98
+ "huggingface_hub is required for uploading. Install it with: pip install -U huggingface_hub"
99
+ ) from e
100
+
101
+ api = HfApi(token=HF_TOKEN)
102
+ # upload_file accepts bytes, file-like, or a local path.
103
+ # We pass bytes so we don't need temp files.
104
+ api.upload_file(
105
+ repo_id=HF_REPO_ID,
106
+ repo_type=HF_REPO_TYPE,
107
+ revision=HF_REVISION,
108
+ path_in_repo=HF_FILE_PATH,
109
+ path_or_fileobj=new_text.encode("utf-8"),
110
+ commit_message=commit_message,
111
+ )
112
+
113
+
114
+ def _get_current_prompt_id(prompt: Optional[Dict[str, Any]], unique_id: Optional[str]) -> str:
115
+ """
116
+ "Must somehow get the CURRENT job ID":
117
+ - We read it from ComfyUI's PromptServer queue state (currently_running).
118
+ - Structure is a dict: worker_id -> (queue_number, prompt_id, prompt_graph, extra_data)
119
+ """
120
+ from server import PromptServer # ComfyUI module
121
+
122
+ ps = getattr(PromptServer, "instance", None)
123
+ if ps is None:
124
+ raise RuntimeError("PromptServer.instance is None (ComfyUI server not initialized?).")
125
+
126
+ pq = getattr(ps, "prompt_queue", None)
127
+ if pq is None:
128
+ raise RuntimeError("PromptServer.instance.prompt_queue is missing.")
129
+
130
+ running = getattr(pq, "currently_running", None) or {}
131
+ if not isinstance(running, dict) or len(running) == 0:
132
+ raise RuntimeError("prompt_queue.currently_running is empty; cannot determine current job id.")
133
+
134
+ # Prefer a job whose prompt_graph contains this node unique_id (helps if multi-worker).
135
+ if unique_id:
136
+ for _worker_id, job in running.items():
137
+ try:
138
+ prompt_graph = job[2]
139
+ if isinstance(prompt_graph, dict) and unique_id in prompt_graph:
140
+ return str(job[1])
141
+ except Exception:
142
+ pass
143
+
144
+ # Fallback: take the first running job.
145
+ _worker_id, job = next(iter(running.items()))
146
+ return str(job[1])
147
+
148
+
149
+ def _replace_status_once(text: str, job_id: str, from_status: str, to_status: str) -> Tuple[str, bool]:
150
+ """
151
+ Replace the first occurrence of:
152
+ "<job_id> (from_status)" -> "<job_id> (to_status)"
153
+ allowing flexible whitespace.
154
+ """
155
+ # Example target: 12345 (queue)
156
+ pattern = re.compile(rf"(^|\n)({re.escape(job_id)})\s*\(\s*{re.escape(from_status)}\s*\)", re.MULTILINE)
157
+ repl = rf"\1{job_id} ({to_status})"
158
+ new_text, n = pattern.subn(repl, text, count=1)
159
+ return new_text, (n > 0)
160
+
161
+
162
+ def _update_remote_status(job_id: str, from_status: str, to_status: str) -> None:
163
+ """
164
+ Best-effort update:
165
+ - download
166
+ - replace once
167
+ - upload if changed
168
+ Retries a few times for transient network issues.
169
+ """
170
+ _validate_token()
171
+
172
+ attempts = 3
173
+ last_err: Optional[Exception] = None
174
+ for i in range(attempts):
175
+ try:
176
+ current = _download_job_log_text()
177
+ updated, changed = _replace_status_once(current, job_id, from_status, to_status)
178
+
179
+ if not changed:
180
+ # Don't fail the whole prompt; just warn.
181
+ _warn(f"No match found for '{job_id} ({from_status})' (nothing to update).")
182
+ return
183
+
184
+ msg = f"Job {job_id}: {from_status} -> {to_status}"
185
+ _upload_job_log_text(updated, commit_message=msg)
186
+ _warn(f"Updated HF job_log.txt: {msg}")
187
+ return
188
+ except Exception as e:
189
+ last_err = e
190
+ _warn(f"Attempt {i+1}/{attempts} failed: {e}")
191
+ time.sleep(0.5 * (i + 1))
192
+
193
+ # After retries, raise (so you notice misconfig), but you can change this to just warn if desired.
194
+ raise RuntimeError(f"Failed to update status after {attempts} attempts.") from last_err
195
+
196
+
197
+ # -----------------------------
198
+ # COMFYUI NODES
199
+ # -----------------------------
200
+ class HFJobMarkStarted:
201
+ """
202
+ Input: STRING (trigger)
203
+ Output: STRING (same as input)
204
+ Side-effect: <prompt_id> (queue) -> <prompt_id> (started) in job_log.txt
205
+ """
206
+
207
+ @classmethod
208
+ def INPUT_TYPES(cls):
209
+ return {
210
+ "required": {
211
+ "trigger": ("STRING", {"default": "", "multiline": True}),
212
+ },
213
+ "hidden": {
214
+ "prompt": "PROMPT",
215
+ "unique_id": "UNIQUE_ID",
216
+ },
217
+ }
218
+
219
+ RETURN_TYPES = ("STRING",)
220
+ FUNCTION = "run"
221
+ CATEGORY = "HF Job Log"
222
+
223
+ @classmethod
224
+ def IS_CHANGED(cls, **kwargs):
225
+ # Force execution every run (avoid cache skipping side-effects)
226
+ return float("nan")
227
+
228
+ def run(self, trigger: str, prompt: Optional[Dict[str, Any]] = None, unique_id: Optional[str] = None):
229
+ job_id = _get_current_prompt_id(prompt, unique_id)
230
+ _update_remote_status(job_id, "queue", "started")
231
+ return (trigger,)
232
+
233
+
234
+ class HFJobMarkFinished:
235
+ """
236
+ Input: IMAGE
237
+ Output: IMAGE (same as input)
238
+ Side-effect: <prompt_id> (started) -> <prompt_id> (finished) in job_log.txt
239
+ """
240
+
241
+ @classmethod
242
+ def INPUT_TYPES(cls):
243
+ return {
244
+ "required": {
245
+ "image": ("IMAGE",),
246
+ },
247
+ "hidden": {
248
+ "prompt": "PROMPT",
249
+ "unique_id": "UNIQUE_ID",
250
+ },
251
+ }
252
+
253
+ RETURN_TYPES = ("IMAGE",)
254
+ FUNCTION = "run"
255
+ CATEGORY = "HF Job Log"
256
+
257
+ @classmethod
258
+ def IS_CHANGED(cls, **kwargs):
259
+ # Force execution every run (avoid cache skipping side-effects)
260
+ return float("nan")
261
+
262
+ def run(self, image, prompt: Optional[Dict[str, Any]] = None, unique_id: Optional[str] = None):
263
+ job_id = _get_current_prompt_id(prompt, unique_id)
264
+ _update_remote_status(job_id, "started", "finished")
265
+ return (image,)
266
+
267
+
268
+ NODE_CLASS_MAPPINGS = {
269
+ "HFJobMarkStarted": HFJobMarkStarted,
270
+ "HFJobMarkFinished": HFJobMarkFinished,
271
+ }
272
+
273
+ NODE_DISPLAY_NAME_MAPPINGS = {
274
+ "HFJobMarkStarted": "HF Job → started (queue→started) [STRING passthrough]",
275
+ "HFJobMarkFinished": "HF Job → finished (started→finished) [IMAGE passthrough]",
276
+ }
277
+