Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Gradio App – robuste Async-Ausführung via Status-Endpoints | |
| - Rein asynchron mit Status-Endpoints | |
| - Läuft serverseitig weiter, auch wenn UI/Verbindung weg ist | |
| - Globale HTTP-Session mit Keep-Alive | |
| - Polling mit Backoff + Retry-After | |
| - Logging in Konsole + logs/<job_id>.log | |
| - FIX: exclude_hash richtig laden/berücksichtigen (paginierte Wholix-Suche) | |
| - FIX: pro Lead harte Fehlerisolierung -> kein Gradio-Absturz | |
| - NEU: Speichert IMMER in Wholix. Falls keine echte E-Mail vorhanden ist, | |
| wird eine stabile Platzhalter-Adresse generiert (example.com) und | |
| ein Tag "no-email" gesetzt. | |
| - NEU: 422-Robustheit -> progressive Degradation & Minimal-Record-Fallback | |
| - NEU: URL-Normalisierung (Umlaute/IDNA) für Pfad/Query/Fragment | |
| """ | |
| import json | |
| import os | |
| import random | |
| import re | |
| import time | |
| import uuid | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor | |
| from typing import Any, Dict, List, Optional, Tuple, Set | |
| import logging | |
| from logging.handlers import RotatingFileHandler | |
| from pathlib import Path | |
| import gradio as gr | |
| import requests | |
| from requests.adapters import HTTPAdapter | |
| # ============================== CONFIG ==================================== | |
| LEAD_BASE_URL = "https://wholixleadgenbackend.ngrok.io" | |
| WHOLIX_BASE_URL = "https://api.wholix.ai" | |
| MAX_LEADS = 100 # Sicherheitskappe | |
| MAX_WORKERS = 16 # gleichzeitige Pipelines am Server | |
| JOB_TTL_SEC = 60 * 60 # wie lange fertige Jobs im Speicher bleiben (1h) | |
| # ============================== LOGGING ==================================== | |
| LOG_DIR = Path("logs") | |
| LOG_DIR.mkdir(parents=True, exist_ok=True) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)s | %(threadName)s | %(message)s", | |
| ) | |
| APP_LOG = logging.getLogger("app") | |
| def get_job_logger(job_id: str) -> logging.Logger: | |
| lg = logging.getLogger(f"job.{job_id}") | |
| if not any(isinstance(h, RotatingFileHandler) for h in lg.handlers): | |
| fh = RotatingFileHandler( | |
| LOG_DIR / f"{job_id}.log", | |
| maxBytes=512_000, | |
| backupCount=2, | |
| encoding="utf-8", | |
| ) | |
| fh.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")) | |
| lg.addHandler(fh) | |
| lg.setLevel(logging.INFO) | |
| lg.propagate = True | |
| return lg | |
| # ========================== GLOBAL HTTP SESSION ============================ | |
| GLOBAL_SES = requests.Session() | |
| GLOBAL_SES.headers.update({"Accept": "application/json, text/plain;q=0.9, */*;q=0.8"}) | |
| adapter = HTTPAdapter(pool_connections=50, pool_maxsize=50, max_retries=0) | |
| GLOBAL_SES.mount("https://", adapter) | |
| GLOBAL_SES.mount("http://", adapter) | |
| try: | |
| from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter | |
| ka = TCPKeepAliveAdapter(idle=30, interval=30, count=3) | |
| GLOBAL_SES.mount("https://", ka) | |
| GLOBAL_SES.mount("http://", ka) | |
| except Exception: | |
| pass | |
| # ================================ HTTP ===================================== | |
| class HTTPError(Exception): | |
| def __init__(self, status: int, url: str, body: Any = None, headers: Dict[str, str] = None): | |
| msg = f"HTTP {status} on {url}" | |
| if body is not None: | |
| try: | |
| bs = body if isinstance(body, str) else json.dumps(body, ensure_ascii=False) | |
| msg += f": {bs[:500]}" | |
| except Exception: | |
| pass | |
| super().__init__(msg) | |
| self.status = status | |
| self.url = url | |
| self.body = body | |
| self.headers = headers or {} | |
| def _is_json_ct(ct: str) -> bool: | |
| return bool(ct and ("application/json" in ct or ct.endswith("+json"))) | |
| def _safe_json(text: str): | |
| try: | |
| return json.loads(text) | |
| except Exception: | |
| repaired = text.replace("NaN", "null").replace("Infinity", "null").replace("-Infinity", "null") | |
| return json.loads(repaired) | |
| def req( | |
| url: str, | |
| method: str = "GET", | |
| headers: Optional[Dict[str, str]] = None, | |
| json_body: Any = None, | |
| data: Any = None, | |
| timeout: Tuple[float, float] = (5.0, 15.0), | |
| session: Optional[requests.Session] = None, | |
| ) -> Any: | |
| s = session or GLOBAL_SES | |
| r = s.request( | |
| method=method.upper(), | |
| url=url, | |
| headers=headers, | |
| json=json_body, | |
| data=data, | |
| timeout=timeout, | |
| ) | |
| ct = r.headers.get("content-type", "") | |
| text = r.text or "" | |
| if not r.ok: | |
| body = None | |
| if text: | |
| try: | |
| body = r.json() if _is_json_ct(ct) or (not ct and text.strip().startswith(("{", "["))) else text | |
| except Exception: | |
| body = text | |
| raise HTTPError(r.status_code, url, body=body, headers=dict(r.headers)) | |
| if text == "": | |
| return None | |
| if _is_json_ct(ct) or (not ct and text.strip().startswith(("{", "["))): | |
| try: | |
| return r.json() | |
| except Exception: | |
| return _safe_json(text) | |
| return text | |
| # ========================= URL NORMALISIERUNG ============================== | |
| from urllib.parse import urlsplit, urlunsplit, quote | |
| def _normalize_url(u: Optional[str]) -> Optional[str]: | |
| """ | |
| Normalisiert URLs für strenge Validatoren: | |
| - Scheme ergänzen (https) | |
| - Host -> IDNA (punycode) | |
| - Path/Query/Fragment UTF-8 percent-encoden | |
| """ | |
| if not u: | |
| return None | |
| u = u.strip() | |
| if not re.match(r"^[a-zA-Z][a-zA-Z0-9+\-.]*://", u): | |
| u = "https://" + u | |
| try: | |
| parts = urlsplit(u) | |
| if not parts.scheme or not parts.netloc: | |
| return None | |
| # Host zu IDNA | |
| try: | |
| host = parts.hostname.encode("idna").decode("ascii") if parts.hostname else "" | |
| except Exception: | |
| return None | |
| netloc = host | |
| if parts.port: | |
| netloc += f":{parts.port}" | |
| if parts.username: | |
| auth = parts.username | |
| if parts.password: | |
| auth += f":{parts.password}" | |
| netloc = f"{auth}@{netloc}" | |
| path = quote(parts.path or "", safe="/-._~") | |
| query = quote(parts.query or "", safe="=&?/-._~") | |
| fragment = quote(parts.fragment or "", safe="/-._~") | |
| return urlunsplit((parts.scheme.lower(), netloc.lower(), path, query, fragment)) | |
| except Exception: | |
| return None | |
| # ========================= Async Start & Poll ============================== | |
| def _looks_like_ngrok_html(body: Any) -> bool: | |
| try: | |
| s = body if isinstance(body, str) else json.dumps(body, ensure_ascii=False) | |
| except Exception: | |
| s = str(body) | |
| s_lower = (s or "").lower() | |
| return ("cdn.ngrok.com" in s_lower) or ("ngrok" in s_lower and "<html" in s_lower) | |
| def start_async_job(url: str, body: dict, headers: dict, session: Optional[requests.Session] = None) -> str: | |
| """ | |
| Startet einen asynchronen Backend-Job und gibt die job_id zurück. | |
| Neu: interne Retries (Backoff + Jitter) bei 404/408/425/429/5xx und ngrok-HTML. | |
| Macht vorher einen Warmup-Ping auf die Base-URL. | |
| """ | |
| s = session or GLOBAL_SES | |
| # --- Warmup: ngrok/Server "aufwecken" (ignoriert Fehler) --- | |
| try: | |
| base = re.split(r"(?<=://[^/]+)", url)[0] # "https://host.tld" | |
| s.get(base, timeout=(3.0, 3.0)) | |
| except Exception: | |
| pass | |
| # --- POST mit Retry --- | |
| attempts = 0 | |
| max_attempts = 6 | |
| delay = 0.8 | |
| while True: | |
| attempts += 1 | |
| try: | |
| res = req( | |
| url=url, | |
| method="POST", | |
| headers={**headers, "Prefer": "respond-async"}, | |
| json_body=body, | |
| timeout=(5.0, 15.0), | |
| session=s, | |
| ) | |
| job_id = (res or {}).get("job_id") | |
| ok = (res or {}).get("ok", False) | |
| if not ok or not job_id: | |
| raise RuntimeError(f"Backend hat keine job_id geliefert (res={res!r})") | |
| return job_id | |
| except HTTPError as e: | |
| transient_codes = {404, 408, 425, 429, 500, 502, 503, 504, 520, 522, 524} | |
| is_transient = (e.status in transient_codes) or _looks_like_ngrok_html(e.body) | |
| if attempts < max_attempts and is_transient: | |
| # Retry-After respektieren | |
| ra = 0.0 | |
| try: | |
| if isinstance(e.body, dict): | |
| ra = float(e.body.get("retry_after", 0) or 0) | |
| except Exception: | |
| ra = 0.0 | |
| if not ra: | |
| try: | |
| ra_hdr = (e.headers or {}).get("Retry-After", "") | |
| ra = float(ra_hdr) if ra_hdr else 0.0 | |
| except Exception: | |
| ra = 0.0 | |
| sleep_for = ra or (delay + random.uniform(0, 0.5 * delay)) | |
| logging.warning(f"start_async_job: transient {e.status}, retry {attempts}/{max_attempts} in {sleep_for:.1f}s …") | |
| time.sleep(sleep_for) | |
| delay = min(8.0, delay * 1.8) | |
| continue | |
| # nicht transient oder Versuche aufgebraucht | |
| raise | |
| def poll_status( | |
| url: str, | |
| headers: dict, | |
| max_wait_sec: int = 60 * 20, | |
| min_delay: float = 1.0, | |
| max_delay: float = 30.0, | |
| session: Optional[requests.Session] = None, | |
| on_tick=None, | |
| ) -> dict: | |
| t0 = time.time() | |
| attempt = 0 | |
| delay = min_delay | |
| while True: | |
| if time.time() - t0 > max_wait_sec: | |
| raise TimeoutError("Timeout beim Warten auf Ergebnis.") | |
| try: | |
| st = req(url, method="GET", headers=headers, timeout=(5.0, 6.0), session=session) | |
| except HTTPError as e: | |
| if e.status in (408, 425, 429, 500, 502, 503, 504, 520, 522, 524): | |
| ra = 0.0 | |
| if isinstance(e.body, dict): | |
| try: | |
| ra = float(e.body.get("retry_after", 0)) | |
| except Exception: | |
| ra = 0.0 | |
| if not ra: | |
| try: | |
| ra_hdr = (e.headers or {}).get("Retry-After", "") | |
| ra = float(ra_hdr) if ra_hdr else 0.0 | |
| except Exception: | |
| ra = 0.0 | |
| attempt += 1 | |
| delay = min(max_delay, max(min_delay, 0.7 * (2 ** attempt))) | |
| time.sleep(ra or (delay + random.uniform(0, 0.4 * delay))) | |
| continue | |
| raise | |
| if callable(on_tick): | |
| try: | |
| on_tick(st) | |
| except Exception: | |
| pass | |
| if st and st.get("ok") and st.get("status") == "done": | |
| return st | |
| attempt = min(attempt + 1, 20) | |
| delay = min(max_delay, max(min_delay, delay * 1.6)) | |
| time.sleep(delay) | |
| # ============================= Services ==================================== | |
| def unwrap_result(payload: Any) -> Any: | |
| p = payload or {} | |
| if isinstance(p, dict) and p.get("ok") and any(k in p for k in ("result", "data", "content")): | |
| p = p.get("result") or p.get("data") or p.get("content") | |
| return p | |
| def suggest_single_lead(token_id: str, filters: dict, icp_text: str, exclude_ids: List[str]) -> dict: | |
| res_job_id = start_async_job( | |
| url=f"{LEAD_BASE_URL}/lead/suggest?async=1", | |
| body={"filters": filters, "icp_text": icp_text or "", "exclude_ids": exclude_ids or []}, | |
| headers={"X-Token-Id": token_id}, | |
| ) | |
| st = poll_status( | |
| url=f"{LEAD_BASE_URL}/lead/suggest/status/{res_job_id}?t={int(time.time()*1000)}", | |
| headers={"X-Token-Id": token_id}, | |
| max_wait_sec=60 * 20, | |
| session=GLOBAL_SES, | |
| ) | |
| res = unwrap_result(st) | |
| if isinstance(res, dict) and isinstance(res.get("items"), list) and res["items"]: | |
| return res["items"][0] | |
| if isinstance(res, list) and res: | |
| return res[0] | |
| return res if isinstance(res, dict) else {"result": res} | |
| def _flatten_text(val: Any) -> str: | |
| """ | |
| Newlines/Tabs/Mehrfach-Whitespace sauber zu einem Space zusammenziehen. | |
| Keine Feldraten, nur Plain-String-Verarbeitung. | |
| """ | |
| s = "" if val is None else str(val) | |
| s = s.replace("\r\n", "\n").replace("\r", "\n") | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def normalize_draft(raw: Any) -> Dict[str, Any]: | |
| r = unwrap_result(raw) or raw or {} | |
| if isinstance(r, dict) and "message" in r and isinstance(r["message"], dict): | |
| r = r["message"] | |
| email_obj = r.get("email") if isinstance(r, dict) else {} | |
| def pick(obj, *keys): | |
| if not isinstance(obj, dict): return "" | |
| for k in keys: | |
| for kk, vv in obj.items(): | |
| if isinstance(kk, str) and kk.lower() == k.lower(): | |
| if isinstance(vv, str) and vv.strip(): return vv.strip() | |
| if vv: return vv | |
| return "" | |
| subject = pick(email_obj, "subject", "email_subject", "Betreff") or pick(r, "subject", "email_subject", "Betreff") | |
| body = pick(email_obj, "body", "text", "content") or pick(r, "body", "text", "content") | |
| fu1 = pick(r, "followup1", "FollowUp1", "LinkedIn") | |
| fu2 = pick(r, "followup2", "FollowUp2", "Facebook") | |
| to = pick(email_obj, "to") | |
| return {"email": {"to": to, "subject": subject, "body": body}, "followup1": fu1, "followup2": fu2} | |
| def email_generate_async(token_id: str, variables: dict, items: List[dict]) -> dict: | |
| """ | |
| Startet /email/generate (async) und gibt nur ECHTE Endpoint-Daten zurück. | |
| - Übergibt globale Variablen 1:1 (inkl. Signatur) auf Root-Ebene. | |
| - Keine Feldumbenennung, keine Heuristik. | |
| Rückgabe: | |
| { | |
| "email": {"subject": <Betreff>, "body": <Text>}, | |
| "followup1": <FollowUp1>, | |
| "followup2": <FollowUp2>, | |
| "raw": <unwrap_result(...)> | |
| } | |
| """ | |
| if not items: | |
| raise ValueError("items fehlt (mindestens 1 Lead erforderlich).") | |
| root_allowed = { | |
| "Produkt_und_Dienstleistungsbeschreibung", | |
| "CTA", | |
| "Signatur", | |
| "Checkliste_Landingpage", | |
| "homepage_url", # optional, falls im Frontend genutzt | |
| "tags", # wird serverseitig ignoriert, aber ok | |
| "Touch_Point", # optional – stört den Backend-Kontrakt nicht | |
| } | |
| safe_vars = {} | |
| for k, v in (variables or {}).items(): | |
| if k in root_allowed: | |
| safe_vars[k] = v | |
| start_body = {**safe_vars, "items": items, "item_index": 0} | |
| try: | |
| sent_keys = sorted(list(start_body.keys())) | |
| sig_preview = str(start_body.get("Signatur") or "")[:120] | |
| APP_LOG.info(f"/email/generate body keys: {sent_keys}") | |
| APP_LOG.info(f"/email/generate Signatur (preview): {sig_preview}") | |
| except Exception: | |
| pass | |
| res_job_id = start_async_job( | |
| url=f"{LEAD_BASE_URL}/email/generate?async=1", | |
| body=start_body, | |
| headers={"X-Token-Id": token_id}, | |
| ) | |
| st = poll_status( | |
| url=f"{LEAD_BASE_URL}/email/generate/status/{res_job_id}?t={int(time.time()*1000)}", | |
| headers={"X-Token-Id": token_id}, | |
| max_wait_sec=60 * 20, | |
| session=GLOBAL_SES, | |
| ) | |
| raw = unwrap_result(st) or {} | |
| results = raw.get("results") if isinstance(raw, dict) else None | |
| first = (results[0] if isinstance(results, list) and results else {}) or {} | |
| msg = first.get("message") if isinstance(first, dict) else {} | |
| subj = str((msg or {}).get("Betreff") or "") | |
| text = str((msg or {}).get("Text") or "") | |
| fu1 = str((msg or {}).get("FollowUp1") or "") | |
| fu2 = str((msg or {}).get("FollowUp2") or "") | |
| return { | |
| "email": {"subject": subj, "body": text}, | |
| "followup1": fu1, | |
| "followup2": fu2, | |
| "raw": raw, | |
| } | |
| def wholix_login(email: str, password: str) -> str: | |
| """ | |
| Kompatibel zur alten Signatur (liefert weiterhin ein Token-String), | |
| richtet aber zusätzlich den globalen Auto-ReLogin-Manager _WHOLIX_SES ein. | |
| """ | |
| global _WHOLIX_SES | |
| _WHOLIX_SES = WholixSession(email, password, base_url=WHOLIX_BASE_URL, session=GLOBAL_SES) | |
| return _WHOLIX_SES.get_token() | |
| # ===================== Helfer für Platzhalter-E-Mail ======================= | |
| EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") | |
| def _slug(val: Any, maxlen: int = 24) -> str: | |
| s = "" if val is None else str(val) | |
| s = s.lower().strip() | |
| s = re.sub(r"[^a-z0-9]+", "-", s) | |
| s = re.sub(r"-+", "-", s).strip("-") | |
| return s[:maxlen] or "x" | |
| def _make_placeholder_email(record: dict) -> str: | |
| # Stable, valide, kurz | |
| fn = _slug(record.get("firstname")) | |
| ln = _slug(record.get("lastname")) | |
| base = (fn + "." + ln).strip(".") or "lead" | |
| cid = _slug(record.get("exclude_hash") or uuid.uuid4().hex[:8], maxlen=16) | |
| local = f"{base}.{cid}"[:48] | |
| return f"{local}@example.com" # RFC-Reservedomäne, von Validatoren akzeptiert | |
| # ===================== Wholix Store + Fallbacks ============================ | |
| # ============================= Wholix Auth ================================ | |
| # ============================= Wholix Auth (NEU) ============================= | |
| # Globaler Session-Manager (wird in wholix_login gesetzt) | |
| _WHOLIX_SES = None # type: Optional["WholixSession"] | |
| class WholixSession: | |
| """ | |
| Kapselt Token-Handling: | |
| - Login mit email/password | |
| - Autorisierte Requests mit Auto-ReLogin bei 401/403/419 (ein Retry) | |
| - Thread-sicher | |
| """ | |
| def __init__(self, email: str, password: str, base_url: str = WHOLIX_BASE_URL, session: Optional[requests.Session] = None): | |
| self.email = email | |
| self.password = password | |
| self.base_url = base_url.rstrip("/") | |
| self._token = None | |
| self._lock = threading.RLock() | |
| self._ses = session or GLOBAL_SES | |
| def _login(self) -> str: | |
| res = req(f"{self.base_url}/api/v1/auth/login", method="POST", | |
| json_body={"email": self.email, "password": self.password}, | |
| timeout=(5.0, 15.0), session=self._ses) | |
| token = (res or {}).get("token") | |
| if not token: | |
| raise RuntimeError("Wholix-Login fehlgeschlagen.") | |
| return token | |
| def get_token(self) -> str: | |
| if self._token: | |
| return self._token | |
| with self._lock: | |
| if not self._token: | |
| self._token = self._login() | |
| return self._token | |
| def auth_headers(self) -> Dict[str, str]: | |
| return {"Authorization": f"Bearer {self.get_token()}"} | |
| def req_authed(self, path: str, *, method: str = "GET", headers: Optional[Dict[str, str]] = None, | |
| json_body: Any = None, data: Any = None, timeout: Tuple[float, float] = (5.0, 30.0)): | |
| url = path if path.startswith("http") else f"{self.base_url}{path}" | |
| hdrs = {} | |
| hdrs.update(headers or {}) | |
| hdrs.update(self.auth_headers()) | |
| try: | |
| return req(url, method=method, headers=hdrs, json_body=json_body, data=data, timeout=timeout, session=self._ses) | |
| except HTTPError as e: | |
| if e.status in (401, 403, 419): | |
| with self._lock: | |
| self._token = None | |
| self._token = self._login() | |
| hdrs = {} | |
| hdrs.update(headers or {}) | |
| hdrs.update(self.auth_headers()) | |
| return req(url, method=method, headers=hdrs, json_body=json_body, data=data, timeout=timeout, session=self._ses) | |
| raise | |
| def _store_with_fallbacks(token: str, payload: dict, module: str) -> dict: | |
| """ | |
| Versucht 3 Stufen, um 422 zu vermeiden. | |
| NEU: nutzt, wenn vorhanden, den globalen WholixSession-Manager (_WHOLIX_SES) | |
| mit Auto-ReLogin. Fällt sonst auf das übergebene Token zurück. | |
| """ | |
| url_path = "/api/v1/table-object-data/store-objects" | |
| def _post_with_session(body): | |
| # bevorzugt Auto-ReLogin-Session | |
| if _WHOLIX_SES is not None: | |
| return _WHOLIX_SES.req_authed(url_path, method="POST", json_body=body, timeout=(5.0, 30.0)) | |
| # Fallback: manuelles Token (legacy) | |
| url = f"{WHOLIX_BASE_URL}{url_path}" | |
| headers = {"Authorization": f"Bearer {token}"} | |
| return req(url, method="POST", headers=headers, json_body=body, timeout=(5.0, 30.0)) | |
| # 1: Original | |
| body1 = {"module": module, "action": "store", "data": [payload]} | |
| try: | |
| return _post_with_session(body1) | |
| except HTTPError as e: | |
| if e.status != 422: | |
| raise | |
| # 2: URLs reparieren/entfernen | |
| p2 = dict(payload) | |
| for k in ("linkedin_url", "company_url"): | |
| if p2.get(k): | |
| fixed = _normalize_url(p2.get(k)) | |
| if fixed: | |
| p2[k] = fixed | |
| else: | |
| p2.pop(k, None) | |
| try: | |
| return _post_with_session({"module": module, "action": "store", "data": [p2]}) | |
| except HTTPError as e: | |
| if e.status != 422: | |
| raise | |
| # 3: Minimal-Record | |
| minimal = { | |
| "email": p2.get("email") or _make_placeholder_email(p2), | |
| "firstname": p2.get("firstname") or "", | |
| "lastname": p2.get("lastname") or "", | |
| "company_name": p2.get("company_name") or "", | |
| "exclude_hash": p2.get("exclude_hash") or _slug(uuid.uuid4().hex[:8]), | |
| "tags": p2.get("tags") or {"keys": ["no-email"], "values": ["no-email"]}, | |
| } | |
| return _post_with_session({"module": module, "action": "store", "data": [minimal]}) | |
| def wholix_store_contact(token: str, record: dict, module: str = "Contacts", allow_placeholder: bool = True) -> dict: | |
| """ | |
| Sendet NUR erlaubte Felder an Wholix und saniert problematische Werte. | |
| IMMER speicherbar: erzeugt bei Bedarf Placeholder-Mail (example.com) + Tag. | |
| Nutzt _store_with_fallbacks (mit Auto-ReLogin via _WHOLIX_SES, falls vorhanden). | |
| """ | |
| if not isinstance(record, dict): | |
| raise ValueError("Wholix: record muss ein dict sein.") | |
| def _clean_str(v): | |
| if v is None: | |
| return None | |
| s = str(v).strip() | |
| return s if s else None | |
| # E-Mail prüfen / ggf. generieren | |
| email = _clean_str(record.get("email")) | |
| if not email or not EMAIL_RE.match(email): | |
| if allow_placeholder: | |
| email = _make_placeholder_email(record) | |
| record["email"] = email | |
| if isinstance(record.get("tags"), dict): | |
| for k in ("keys", "values"): | |
| record["tags"].setdefault(k, []) | |
| if "no-email" not in record["tags"][k]: | |
| record["tags"][k].append("no-email") | |
| ALLOWED = { | |
| "firstname", | |
| "lastname", | |
| "email", # Pflichtfeld (ggf. Platzhalter) | |
| "adress", # (sic) genau so | |
| "city", | |
| "postcode", | |
| "phonenumber", | |
| "job_title", | |
| "departments", # Text | |
| "linkedin_url", | |
| "company_name", | |
| "company_url", | |
| "message_mail", | |
| "message_mail_subject", | |
| "message_followup1", | |
| "message_followup2", | |
| "exclude_hash", | |
| "status_field", # Multi-Select: {keys:[], values:[]} | |
| "tags", # Multi-Select: {keys:[], values:[]} | |
| } | |
| def _coerce_departments(v): | |
| if v is None: | |
| return None | |
| if isinstance(v, (list, tuple, set)): | |
| v = ", ".join(str(x).strip() for x in v if str(x).strip()) | |
| else: | |
| v = str(v).strip() | |
| if v.startswith("[") and v.endswith("]"): | |
| v = v[1:-1].strip().strip("'\"") | |
| return _clean_str(v) | |
| out = {} | |
| for k in ALLOWED: | |
| if k not in record: | |
| continue | |
| val = record[k] | |
| if k == "departments": | |
| val = _coerce_departments(val) | |
| elif k in ("linkedin_url", "company_url"): | |
| val = _normalize_url(val) | |
| elif k in ("status_field", "tags"): | |
| if not (isinstance(val, dict) and "keys" in val and "values" in val): | |
| val = None | |
| else: | |
| val = _clean_str(val) | |
| if val is not None: | |
| out[k] = val | |
| # Pflichtfeld sicher (jetzt inkl. Platzhalter möglich) | |
| out["email"] = _clean_str(email) | |
| # POST mit Fallback-Logik (nutzt Auto-ReLogin falls _WHOLIX_SES vorhanden) | |
| return _store_with_fallbacks(token, out, module) | |
| # ======= NEW: Wholix-Excludes paginiert laden (wie in deinem JS) =========== | |
| def wholix_fetch_excludes(token: str, | |
| module_name: str = "Contacts", | |
| per_page: int = 500, | |
| max_pages: int = 100, | |
| dedupe: bool = True) -> List[Dict[str, str]]: | |
| """ | |
| Paginiert Excludes laden. | |
| NEU: nutzt, wenn vorhanden, den globalen WholixSession-Manager (_WHOLIX_SES) | |
| mit Auto-ReLogin. Fällt sonst auf das übergebene Token zurück. | |
| """ | |
| path = "/api/v1/table-object-data/fetch-paginated-results" | |
| out: List[Dict[str, str]] = [] | |
| seen: Set[str] = set() | |
| last_page = 10**9 | |
| for page in range(1, max_pages + 1): | |
| if page > last_page: | |
| break | |
| payload = {"module": module_name, "action": "search", "page": page, "per_page": per_page} | |
| try: | |
| if _WHOLIX_SES is not None: | |
| res = _WHOLIX_SES.req_authed(path, method="POST", json_body=payload, timeout=(5.0, 30.0)) | |
| else: | |
| url = f"{WHOLIX_BASE_URL}{path}" | |
| headers = {"Authorization": f"Bearer {token}"} | |
| res = req(url, method="POST", headers=headers, json_body=payload, timeout=(5.0, 30.0)) | |
| except Exception as e: | |
| APP_LOG.warning(f"Wholix-Excludes Page {page} Fehler: {e}") | |
| break | |
| data_block = (res or {}).get("data") or {} | |
| rows = data_block.get("data") or data_block.get("results") or [] | |
| if isinstance(data_block.get("last_page"), (int, float)): | |
| last_page = int(data_block.get("last_page")) | |
| else: | |
| last_page = page | |
| if not isinstance(rows, list) or not rows: | |
| break | |
| for row in rows: | |
| ex = (row.get("exclude_hash") or row.get("excludeHash") or row.get("exclude_id") or "").strip() | |
| if not ex: | |
| continue | |
| if dedupe and ex in seen: | |
| continue | |
| if dedupe: | |
| seen.add(ex) | |
| cname = (row.get("company_name") or row.get("companyName") or "").strip() | |
| out.append({"exclude_hash": ex, "company_name": cname}) | |
| return out | |
| # ========================== Job Management ================================= | |
| EXEC = ThreadPoolExecutor(max_workers=MAX_WORKERS) | |
| JOBS: Dict[str, Dict[str, Any]] = {} | |
| def _job_init(job_id: str): | |
| JOBS[job_id] = { | |
| "log": [], | |
| "progress": 0, | |
| "rows": [], | |
| "done": False, | |
| "error": None, | |
| "lock": threading.RLock(), | |
| "created_at": time.time(), | |
| "finished_at": None, | |
| } | |
| def _job_emit(job_id: str, msg: str = None, progress: Optional[int] = None, rows_append: Optional[dict] = None): | |
| st = JOBS.get(job_id) | |
| if not st: return | |
| lg = get_job_logger(job_id) | |
| with st["lock"]: | |
| if msg: | |
| st["log"].append(msg) | |
| st["log"] = st["log"][-1000:] | |
| APP_LOG.info(f"[{job_id}] {msg}") | |
| lg.info(msg) | |
| if isinstance(progress, int): | |
| st["progress"] = max(0, min(100, progress)) | |
| if rows_append: | |
| st["rows"].append(rows_append) | |
| st["rows"] = st["rows"][-1000:] | |
| APP_LOG.info(f"[{job_id}] row: {rows_append}") | |
| lg.info(f"row: {rows_append}") | |
| def _job_finish(job_id: str, error: Optional[str] = None): | |
| st = JOBS.get(job_id) | |
| if not st: return | |
| lg = get_job_logger(job_id) | |
| with st["lock"]: | |
| st["error"] = error | |
| st["done"] = True | |
| st["finished_at"] = time.time() | |
| final = "✅ Fertig." if not error else f"❌ {error}" | |
| st["log"].append(final) | |
| APP_LOG.info(f"[{job_id}] {final}") | |
| (lg.error if error else lg.info)(final) | |
| def _gc_jobs(): | |
| now = time.time() | |
| for jid, st in list(JOBS.items()): | |
| if st.get("done") and st.get("finished_at") and now - st["finished_at"] > JOB_TTL_SEC: | |
| JOBS.pop(jid, None) | |
| # ========================== Pipeline (Background) =========================== | |
| LEAD_COUNTS = [1, 2, 3, 4, 5, 10, 15, 20, 40, 80, 100, 200, 300, 400, 500, 1000] | |
| CURL_DATA_RE = re.compile(r"""--data(?:-raw)?\s+(?P<q>['\"])(?P<body>.*?)(?P=q)""", re.DOTALL) | |
| HDR_XTOKEN_RE = re.compile(r"""-H\s+(?P<q>['\"])X-Token-Id:\s*(?P<val>[^'\"]+)(?P=q)""", re.IGNORECASE) | |
| def _find_data_quote_start(s: str) -> Tuple[int, Optional[str]]: | |
| m = re.search(r"--data(?:-raw)?\s+(['\"])", s) | |
| if not m: | |
| return -1, None | |
| return m.end(1), m.group(1) | |
| def _scan_quoted_payload(s: str, i: int, q: str) -> Tuple[str, int]: | |
| out = [] | |
| n = len(s) | |
| while i < n: | |
| ch = s[i] | |
| if q == "'": | |
| if i + 4 < n and s[i:i+5] == "'\"'\"'": | |
| out.append("'"); i += 5; continue | |
| if ch == "'": | |
| return "".join(out), i + 1 | |
| out.append(ch); i += 1 | |
| else: | |
| if ch == "\\" and i + 1 < n: | |
| out.append(s[i+1]); i += 2; continue | |
| if ch == '"': | |
| return "".join(out), i + 1 | |
| out.append(ch); i += 1 | |
| return "".join(out), i | |
| def parse_curl(curl_text: str) -> Tuple[str, Dict[str, Any]]: | |
| """ | |
| Extrahiert X-Token-Id und JSON aus einem curl mit --data-raw '...'. | |
| Keine Feldumbenennungen, keine Heuristik – JSON wird 1:1 übernommen. | |
| """ | |
| import shlex | |
| if not isinstance(curl_text, str) or not curl_text.strip(): | |
| raise ValueError("Leerer curl-Text.") | |
| token_id = "" | |
| m = re.search(r"""-H\s+(["'])X-Token-Id:\s*([^"']+)\1""", curl_text, re.IGNORECASE) | |
| if m: | |
| token_id = m.group(2).strip() | |
| if not token_id: | |
| m2 = re.search(r"""X-Token-Id:\s*([A-Za-z0-9\-\._]+)""", curl_text, re.IGNORECASE) | |
| if m2: | |
| token_id = m2.group(1).strip() | |
| if not token_id: | |
| env_token = os.getenv("X_TOKEN_ID", "").strip() | |
| if env_token: | |
| token_id = env_token | |
| if not token_id: | |
| raise ValueError("Konnte keinen X-Token-Id Header im curl (oder env X_TOKEN_ID) finden.") | |
| qmatch = re.search(r"""--data(?:-raw)?\s+(['\"])""", curl_text) | |
| if not qmatch: | |
| raise ValueError("Konnte den JSON Body nicht finden (erwarte --data-raw '...').") | |
| quote = qmatch.group(1) | |
| start = qmatch.end(1) | |
| out = [] | |
| i = start | |
| n = len(curl_text) | |
| if quote == "'": | |
| while i < n: | |
| if i + 4 < n and curl_text[i:i+5] == "'\"'\"'": | |
| out.append("'") | |
| i += 5 | |
| continue | |
| ch = curl_text[i] | |
| if ch == "'": | |
| break | |
| out.append(ch) | |
| i += 1 | |
| else: | |
| while i < n: | |
| ch = curl_text[i] | |
| if ch == "\\" and i + 1 < n: | |
| out.append(curl_text[i+1]) | |
| i += 2 | |
| continue | |
| if ch == '"': | |
| break | |
| out.append(ch) | |
| i += 1 | |
| body_str = "".join(out).strip() | |
| try: | |
| payload = json.loads(body_str) | |
| except json.JSONDecodeError: | |
| payload = json.loads(body_str.replace("\r\n", "\n").replace("\r", "\n")) | |
| return token_id, payload | |
| def run_pipeline_bg(job_id: str, curl_text: str, n_leads_ui: int): | |
| lg = get_job_logger(job_id) | |
| APP_LOG.info(f"[{job_id}] Background job gestartet (n={n_leads_ui})") | |
| lg.info(f"Background job gestartet (n={n_leads_ui})") | |
| _gc_jobs() | |
| _job_emit(job_id, "Job gestartet.") | |
| try: | |
| token_id, payload = parse_curl(curl_text) | |
| except Exception: | |
| logging.exception(f"[{job_id}] Parse-Fehler") | |
| _job_finish(job_id, "Parse-Fehler: siehe Logs (Stacktrace).") | |
| return | |
| wh_email = payload.get("wholix_email") or payload.get("Wholix_email") or "" | |
| wh_pass = payload.get("wholix_passwort") or payload.get("wholix_password") or "" | |
| if not wh_email or not wh_pass: | |
| _job_finish(job_id, "In der JSON-Payload fehlen wholix_email / wholix_passwort.") | |
| return | |
| # Globale Felder exakt wie im Curl (1:1) | |
| filters = payload.get("filters") or {} | |
| icp_text = payload.get("Produkt_und_Dienstleistungsbeschreibung") or "" | |
| checklist = payload.get("Checkliste_Landingpage") or "" | |
| signature = payload.get("Signatur") or "" | |
| cta = payload.get("CTA") or "" | |
| homepage_url = payload.get("icp_homepage_url") or "" | |
| raw_tag = payload.get("Wholic_tag") or payload.get("Wholix_tag") or "AI" | |
| # Lead-Anzahl | |
| try: | |
| n_leads = int(n_leads_ui) | |
| except Exception: | |
| n_leads = 1 | |
| n_leads = max(1, min(n_leads, MAX_LEADS)) | |
| # Fortschritt: 1 (Login) + pro Lead 4 Schritte | |
| total_steps = (n_leads * 4) + 1 | |
| step = 0 | |
| # 1) Wholix Login | |
| step += 1 | |
| _job_emit(job_id, "→ Wholix Login …", progress=int(step / total_steps * 100)) | |
| try: | |
| wh_token = wholix_login(wh_email, wh_pass) | |
| except Exception: | |
| logging.exception(f"[{job_id}] Wholix-Login fehlgeschlagen") | |
| _job_finish(job_id, "Wholix-Login fehlgeschlagen (Details in Logs).") | |
| return | |
| # ======= EXCLUDES LADEN / MERGEN ======================================= | |
| initial_excludes = set() | |
| try: | |
| for x in (payload.get("exclude_ids") or []): | |
| if x: initial_excludes.add(str(x).strip()) | |
| except Exception: | |
| pass | |
| _job_emit(job_id, f"→ Wholix-Excludes laden … (Seed: {len(initial_excludes)})") | |
| try: | |
| wh_excludes = wholix_fetch_excludes(wh_token) # paginiert | |
| for row in wh_excludes: | |
| ex = (row.get("exclude_hash") or "").strip() | |
| if ex: | |
| initial_excludes.add(ex) | |
| _job_emit(job_id, f" {len(wh_excludes)} Excludes aus Wholix, gesamt {len(initial_excludes)}") | |
| except Exception: | |
| logging.exception(f"[{job_id}] Wholix-Excludes laden fehlgeschlagen") | |
| _job_emit(job_id, " Excludes laden übersprungen (Fehler).") | |
| excludes: List[str] = list(initial_excludes) | |
| excludes_set: Set[str] = set(initial_excludes) | |
| # ---- Helper zum sicheren Auslesen echter Felder (ohne Raten) ----------- | |
| def pick(d: dict, *keys): | |
| for k in keys: | |
| if isinstance(d, dict) and k in d and d[k] not in (None, "", [], {}): | |
| return d[k] | |
| return None | |
| def norm_tags(raw) -> dict: | |
| if raw is None: | |
| return {"keys": [], "values": []} | |
| vals: List[str] = [] | |
| if isinstance(raw, str): | |
| s = raw.strip() | |
| if (s.startswith("[") and s.endswith("]")): | |
| try: | |
| arr = json.loads(s) | |
| if isinstance(arr, list): | |
| vals = [str(x).strip() for x in arr if str(x).strip()] | |
| else: | |
| vals = [s.strip("[] ").strip().strip("'\"")] | |
| except Exception: | |
| vals = [s.strip("[] ").strip().strip("'\"")] | |
| else: | |
| vals = [s] | |
| elif isinstance(raw, (list, tuple, set)): | |
| vals = [str(x).strip() for x in raw if str(x).strip()] | |
| else: | |
| vals = [str(raw).strip()] | |
| vals = [v for v in vals if v] | |
| return {"keys": vals, "values": vals} | |
| def norm_departments(v) -> str: | |
| if v is None: | |
| return None | |
| if isinstance(v, (list, tuple, set)): | |
| return ", ".join(str(x).strip() for x in v if str(x).strip()) or None | |
| s = str(v).strip() | |
| if s.startswith("[") and s.endswith("]"): | |
| s = s[1:-1].strip().strip("'\"") | |
| return s or None | |
| def norm_url(u: Any) -> Optional[str]: | |
| return _normalize_url(u) | |
| for i in range(1, n_leads + 1): | |
| try: | |
| # 2) Lead holen | |
| step += 1 | |
| _job_emit(job_id, f"→ [{i}/{n_leads}] Lead vorschlagen … (excludes={len(excludes_set)})", | |
| progress=int(step / total_steps * 100)) | |
| try: | |
| lead = suggest_single_lead(token_id, filters, icp_text, excludes) | |
| except Exception: | |
| logging.exception(f"[{job_id}] Lead-Fehler") | |
| _job_emit(job_id, "❌ Lead-Fehler: siehe Logs") | |
| continue | |
| if not isinstance(lead, dict): | |
| _job_emit(job_id, "❌ Ungültige Lead-Struktur") | |
| continue | |
| person = (lead.get("person") or {}) | |
| company = (lead.get("company") or {}) | |
| combined_id = str(lead.get("combined_id") or "").strip() | |
| if combined_id and combined_id in excludes_set: | |
| _job_emit(job_id, f" ⚠️ Lead übersprungen (bereits excluded): {combined_id}") | |
| continue | |
| _job_emit(job_id, f" Lead: {person.get('first_name','?')} {person.get('last_name','?')} @ {company.get('name') or company.get('company_name','?')}") | |
| # 3) Nachricht generieren (echte Endpoint-Daten) | |
| step += 1 | |
| _job_emit(job_id, " → Nachricht generieren …", progress=int(step / total_steps * 100)) | |
| items = [{"combined_id": combined_id, "company": company, "person": person}] | |
| variables = { | |
| "Produkt_und_Dienstleistungsbeschreibung": icp_text, | |
| "CTA": cta, | |
| "Signatur": signature, | |
| "Checkliste_Landingpage": checklist, | |
| "homepage_url": homepage_url, # optional | |
| "tags": raw_tag, # kommt als "[AI]" → später normiert | |
| "Touch_Point": "LinkedIn DM", # optional | |
| } | |
| try: | |
| draft = email_generate_async(token_id, variables, items) | |
| except Exception: | |
| logging.exception(f"[{job_id}] Email-Generate-Fehler") | |
| _job_emit(job_id, "❌ Email-Generate-Fehler: siehe Logs") | |
| continue | |
| subj_raw = (draft.get("email") or {}).get("subject", "") | |
| body_raw = (draft.get("email") or {}).get("body", "") | |
| subj_flat = _flatten_text(subj_raw) | |
| body_flat = _flatten_text(body_raw) | |
| _job_emit(job_id, f" ✉️ Subject: {subj_flat}") | |
| _job_emit(job_id, f" ✉️ Message: {body_flat}") | |
| # 4) Speichern in Wholix (IMMER – auch ohne echte E-Mail) | |
| step += 1 | |
| _job_emit(job_id, " → Speichere in Wholix …", progress=int(step / total_steps * 100)) | |
| email_to = str(person.get("email") or "").strip() | |
| try: | |
| # --- Felder befüllen NUR wenn real vorhanden ----------------- | |
| firstname = pick(person, "first_name") | |
| lastname = pick(person, "last_name") | |
| job_title = pick(person, "job_title", "title", "position") | |
| departments = norm_departments(pick(person, "departments")) | |
| linkedin = pick(person, "linkedin_url") | |
| phonenumber = pick(person, "phone", "phonenumber") or pick(company, "phone", "company_phone", "phonenumber") | |
| adress = pick(person, "adress", "address", "street") or pick(company, "adress", "address", "street") | |
| city = pick(person, "city") or pick(company, "city") | |
| postcode = pick(person, "postcode", "postal_code", "zip") or pick(company, "postcode", "postal_code", "zip") | |
| company_name = pick(company, "name", "company_name") | |
| company_url = norm_url(pick(company, "url", "website_url", "website", "domain")) | |
| tags_ms = norm_tags(raw_tag) | |
| # Wenn keine echte E-Mail: Tag ergänzen | |
| if not email_to: | |
| tags_ms = {"keys": list({*tags_ms.get("keys", []), "no-email"}), | |
| "values": list({*tags_ms.get("values", []), "no-email"})} | |
| record = { | |
| "firstname": firstname, | |
| "lastname": lastname, | |
| "email": email_to or None, # kann None sein → Platzhalter wird im Store erzeugt | |
| "phonenumber": phonenumber, | |
| "adress": adress, | |
| "city": city, | |
| "postcode": postcode, | |
| "job_title": job_title, | |
| "departments": departments, | |
| "linkedin_url": linkedin, | |
| "company_name": company_name, | |
| "company_url": company_url, | |
| "message_mail_subject": subj_raw or None, | |
| "message_mail": body_raw or None, | |
| "message_followup1": draft.get("followup1") or None, | |
| "message_followup2": draft.get("followup2") or None, | |
| "exclude_hash": combined_id or None, | |
| "status_field": {"keys": ["Kontakt aufgenommen"], "values": ["Kontakt aufgenommen"]}, | |
| "tags": tags_ms, | |
| } | |
| store_res = wholix_store_contact(wh_token, record, allow_placeholder=True) | |
| stored_ok = bool(store_res) | |
| if not email_to: | |
| _job_emit(job_id, " ℹ️ Keine echte E-Mail – Platzhalter gespeichert und Tag 'no-email' gesetzt.") | |
| except Exception: | |
| logging.exception(f"[{job_id}] Wholix-Store-Fehler") | |
| _job_emit(job_id, "❌ Wholix-Store-Fehler: siehe Logs") | |
| stored_ok = False | |
| if combined_id: | |
| excludes_set.add(combined_id) | |
| excludes.append(combined_id) | |
| _job_emit(job_id, rows_append={ | |
| "person": f"{person.get('first_name','')} {person.get('last_name','')}".strip(), | |
| "email": email_to or "(placeholder)", | |
| "company": company.get("name") or company.get("company_name") or "", | |
| "subject": subj_flat, | |
| "message": body_flat, | |
| "stored_ok": stored_ok, | |
| }) | |
| # 5) Abschluss | |
| step += 1 | |
| _job_emit(job_id, " ✓ Lead abgeschlossen.", progress=int(min(99, step / total_steps * 100))) | |
| except Exception: | |
| logging.exception(f"[{job_id}] Unerwarteter Fehler im Lead-Durchlauf") | |
| _job_emit(job_id, "❌ Unerwarteter Fehler – Details in Logs.") | |
| _job_emit(job_id, progress=100) | |
| _job_finish(job_id, None) | |
| # ================================ UI ======================================= | |
| def build_ui(): | |
| with gr.Blocks(theme=gr.themes.Soft(), css=""" | |
| .logbox textarea { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", monospace; font-size: 12.5px; line-height: 1.35; } | |
| """) as demo: | |
| gr.Markdown("## Wholix Lead → Message → Store (rein asynchron, robust für lange Jobs)") | |
| gr.Markdown( | |
| "Füge deinen kompletten **`curl`** (mit `X-Token-Id` und JSON `--data-raw`) ein, wähle die Anzahl Leads und klicke **Start**. " | |
| "Die Verarbeitung läuft serverseitig weiter – selbst wenn der Browser/Tab schließt. " | |
| "Mit **Aktualisieren** holst du den aktuellen Status ab." | |
| ) | |
| with gr.Row(): | |
| curl_in = gr.Textbox( | |
| label="curl Befehl", | |
| placeholder="curl -sS -N -X POST 'https://.../stream' -H 'X-Token-Id: ...' --data-raw '{...}'", | |
| lines=12 | |
| ) | |
| with gr.Row(): | |
| count = gr.Dropdown(choices=[str(x) for x in LEAD_COUNTS], value="1", label="Anzahl Leads") | |
| with gr.Row(): | |
| start_btn = gr.Button("🚀 Start", variant="primary") | |
| poll_btn = gr.Button("🔄 Aktualisieren") | |
| with gr.Row(): | |
| job_id_tb = gr.Textbox(label="Job-ID", interactive=True) | |
| with gr.Row(): | |
| status = gr.Textbox(label="Status / Log", lines=18, interactive=False, elem_classes=["logbox"]) | |
| with gr.Row(): | |
| progress = gr.Slider(label="Progress", minimum=0, maximum=100, value=0, interactive=False) | |
| with gr.Row(): | |
| out = gr.Dataframe( | |
| headers=["person", "email", "company", "subject", "message", "stored_ok"], | |
| label="Ergebnisse", | |
| interactive=False, | |
| wrap=True, | |
| row_count=(0, "dynamic"), | |
| col_count=(6, "fixed"), | |
| ) | |
| def start_job(curl_text: str, n: str): | |
| try: | |
| n_int = int(n) | |
| except Exception: | |
| n_int = 1 | |
| n_int = max(1, min(n_int, MAX_LEADS)) | |
| job_id = str(uuid.uuid4()) | |
| _job_init(job_id) | |
| _job_emit(job_id, f"Job gestartet: {job_id}") | |
| EXEC.submit(run_pipeline_bg, job_id, curl_text, n_int) | |
| st = JOBS[job_id] | |
| with st["lock"]: | |
| log = "\n".join(st["log"]) | |
| prog = int(st["progress"]) | |
| rows = st["rows"] | |
| return log, prog, rows, job_id | |
| def poll_job(job_id: str): | |
| st = JOBS.get(job_id) | |
| if not st: | |
| return "Unbekannte oder abgelaufene Job-ID.", 0, [] | |
| with st["lock"]: | |
| log = "\n".join(st["log"][-500:]) | |
| prog = int(st["progress"]) | |
| rows = st["rows"] | |
| return log, prog, rows | |
| start_btn.click(start_job, inputs=[curl_in, count], | |
| outputs=[status, progress, out, job_id_tb]) | |
| poll_btn.click(poll_job, inputs=[job_id_tb], | |
| outputs=[status, progress, out]) | |
| return demo | |
| # ============================== MAIN ======================================= | |
| if __name__ == "__main__": | |
| app = build_ui() | |
| app.launch(server_name="0.0.0.0", debug=True, share=True) | |