#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Gradio App – robuste Async-Ausführung via Status-Endpoints - Rein asynchron mit Status-Endpoints - Läuft serverseitig weiter, auch wenn UI/Verbindung weg ist - Globale HTTP-Session mit Keep-Alive - Polling mit Backoff + Retry-After - Logging in Konsole + logs/.log - FIX: exclude_hash richtig laden/berücksichtigen (paginierte Wholix-Suche) - FIX: pro Lead harte Fehlerisolierung -> kein Gradio-Absturz - NEU: Speichert IMMER in Wholix. Falls keine echte E-Mail vorhanden ist, wird eine stabile Platzhalter-Adresse generiert (example.com) und ein Tag "no-email" gesetzt. - NEU: 422-Robustheit -> progressive Degradation & Minimal-Record-Fallback - NEU: URL-Normalisierung (Umlaute/IDNA) für Pfad/Query/Fragment """ import json import os import random import re import time import uuid import threading from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, List, Optional, Tuple, Set import logging from logging.handlers import RotatingFileHandler from pathlib import Path import gradio as gr import requests from requests.adapters import HTTPAdapter # ============================== CONFIG ==================================== LEAD_BASE_URL = "https://wholixleadgenbackend.ngrok.io" WHOLIX_BASE_URL = "https://api.wholix.ai" MAX_LEADS = 100 # Sicherheitskappe MAX_WORKERS = 16 # gleichzeitige Pipelines am Server JOB_TTL_SEC = 60 * 60 # wie lange fertige Jobs im Speicher bleiben (1h) # ============================== LOGGING ==================================== LOG_DIR = Path("logs") LOG_DIR.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(threadName)s | %(message)s", ) APP_LOG = logging.getLogger("app") def get_job_logger(job_id: str) -> logging.Logger: lg = logging.getLogger(f"job.{job_id}") if not any(isinstance(h, RotatingFileHandler) for h in lg.handlers): fh = RotatingFileHandler( LOG_DIR / f"{job_id}.log", maxBytes=512_000, backupCount=2, encoding="utf-8", ) fh.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")) lg.addHandler(fh) lg.setLevel(logging.INFO) lg.propagate = True return lg # ========================== GLOBAL HTTP SESSION ============================ GLOBAL_SES = requests.Session() GLOBAL_SES.headers.update({"Accept": "application/json, text/plain;q=0.9, */*;q=0.8"}) adapter = HTTPAdapter(pool_connections=50, pool_maxsize=50, max_retries=0) GLOBAL_SES.mount("https://", adapter) GLOBAL_SES.mount("http://", adapter) try: from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter ka = TCPKeepAliveAdapter(idle=30, interval=30, count=3) GLOBAL_SES.mount("https://", ka) GLOBAL_SES.mount("http://", ka) except Exception: pass # ================================ HTTP ===================================== class HTTPError(Exception): def __init__(self, status: int, url: str, body: Any = None, headers: Dict[str, str] = None): msg = f"HTTP {status} on {url}" if body is not None: try: bs = body if isinstance(body, str) else json.dumps(body, ensure_ascii=False) msg += f": {bs[:500]}" except Exception: pass super().__init__(msg) self.status = status self.url = url self.body = body self.headers = headers or {} def _is_json_ct(ct: str) -> bool: return bool(ct and ("application/json" in ct or ct.endswith("+json"))) def _safe_json(text: str): try: return json.loads(text) except Exception: repaired = text.replace("NaN", "null").replace("Infinity", "null").replace("-Infinity", "null") return json.loads(repaired) def req( url: str, method: str = "GET", headers: Optional[Dict[str, str]] = None, json_body: Any = None, data: Any = None, timeout: Tuple[float, float] = (5.0, 15.0), session: Optional[requests.Session] = None, ) -> Any: s = session or GLOBAL_SES r = s.request( method=method.upper(), url=url, headers=headers, json=json_body, data=data, timeout=timeout, ) ct = r.headers.get("content-type", "") text = r.text or "" if not r.ok: body = None if text: try: body = r.json() if _is_json_ct(ct) or (not ct and text.strip().startswith(("{", "["))) else text except Exception: body = text raise HTTPError(r.status_code, url, body=body, headers=dict(r.headers)) if text == "": return None if _is_json_ct(ct) or (not ct and text.strip().startswith(("{", "["))): try: return r.json() except Exception: return _safe_json(text) return text # ========================= URL NORMALISIERUNG ============================== from urllib.parse import urlsplit, urlunsplit, quote def _normalize_url(u: Optional[str]) -> Optional[str]: """ Normalisiert URLs für strenge Validatoren: - Scheme ergänzen (https) - Host -> IDNA (punycode) - Path/Query/Fragment UTF-8 percent-encoden """ if not u: return None u = u.strip() if not re.match(r"^[a-zA-Z][a-zA-Z0-9+\-.]*://", u): u = "https://" + u try: parts = urlsplit(u) if not parts.scheme or not parts.netloc: return None # Host zu IDNA try: host = parts.hostname.encode("idna").decode("ascii") if parts.hostname else "" except Exception: return None netloc = host if parts.port: netloc += f":{parts.port}" if parts.username: auth = parts.username if parts.password: auth += f":{parts.password}" netloc = f"{auth}@{netloc}" path = quote(parts.path or "", safe="/-._~") query = quote(parts.query or "", safe="=&?/-._~") fragment = quote(parts.fragment or "", safe="/-._~") return urlunsplit((parts.scheme.lower(), netloc.lower(), path, query, fragment)) except Exception: return None # ========================= Async Start & Poll ============================== def _looks_like_ngrok_html(body: Any) -> bool: try: s = body if isinstance(body, str) else json.dumps(body, ensure_ascii=False) except Exception: s = str(body) s_lower = (s or "").lower() return ("cdn.ngrok.com" in s_lower) or ("ngrok" in s_lower and " str: """ Startet einen asynchronen Backend-Job und gibt die job_id zurück. Neu: interne Retries (Backoff + Jitter) bei 404/408/425/429/5xx und ngrok-HTML. Macht vorher einen Warmup-Ping auf die Base-URL. """ s = session or GLOBAL_SES # --- Warmup: ngrok/Server "aufwecken" (ignoriert Fehler) --- try: base = re.split(r"(?<=://[^/]+)", url)[0] # "https://host.tld" s.get(base, timeout=(3.0, 3.0)) except Exception: pass # --- POST mit Retry --- attempts = 0 max_attempts = 6 delay = 0.8 while True: attempts += 1 try: res = req( url=url, method="POST", headers={**headers, "Prefer": "respond-async"}, json_body=body, timeout=(5.0, 15.0), session=s, ) job_id = (res or {}).get("job_id") ok = (res or {}).get("ok", False) if not ok or not job_id: raise RuntimeError(f"Backend hat keine job_id geliefert (res={res!r})") return job_id except HTTPError as e: transient_codes = {404, 408, 425, 429, 500, 502, 503, 504, 520, 522, 524} is_transient = (e.status in transient_codes) or _looks_like_ngrok_html(e.body) if attempts < max_attempts and is_transient: # Retry-After respektieren ra = 0.0 try: if isinstance(e.body, dict): ra = float(e.body.get("retry_after", 0) or 0) except Exception: ra = 0.0 if not ra: try: ra_hdr = (e.headers or {}).get("Retry-After", "") ra = float(ra_hdr) if ra_hdr else 0.0 except Exception: ra = 0.0 sleep_for = ra or (delay + random.uniform(0, 0.5 * delay)) logging.warning(f"start_async_job: transient {e.status}, retry {attempts}/{max_attempts} in {sleep_for:.1f}s …") time.sleep(sleep_for) delay = min(8.0, delay * 1.8) continue # nicht transient oder Versuche aufgebraucht raise def poll_status( url: str, headers: dict, max_wait_sec: int = 60 * 20, min_delay: float = 1.0, max_delay: float = 30.0, session: Optional[requests.Session] = None, on_tick=None, ) -> dict: t0 = time.time() attempt = 0 delay = min_delay while True: if time.time() - t0 > max_wait_sec: raise TimeoutError("Timeout beim Warten auf Ergebnis.") try: st = req(url, method="GET", headers=headers, timeout=(5.0, 6.0), session=session) except HTTPError as e: if e.status in (408, 425, 429, 500, 502, 503, 504, 520, 522, 524): ra = 0.0 if isinstance(e.body, dict): try: ra = float(e.body.get("retry_after", 0)) except Exception: ra = 0.0 if not ra: try: ra_hdr = (e.headers or {}).get("Retry-After", "") ra = float(ra_hdr) if ra_hdr else 0.0 except Exception: ra = 0.0 attempt += 1 delay = min(max_delay, max(min_delay, 0.7 * (2 ** attempt))) time.sleep(ra or (delay + random.uniform(0, 0.4 * delay))) continue raise if callable(on_tick): try: on_tick(st) except Exception: pass if st and st.get("ok") and st.get("status") == "done": return st attempt = min(attempt + 1, 20) delay = min(max_delay, max(min_delay, delay * 1.6)) time.sleep(delay) # ============================= Services ==================================== def unwrap_result(payload: Any) -> Any: p = payload or {} if isinstance(p, dict) and p.get("ok") and any(k in p for k in ("result", "data", "content")): p = p.get("result") or p.get("data") or p.get("content") return p def suggest_single_lead(token_id: str, filters: dict, icp_text: str, exclude_ids: List[str]) -> dict: res_job_id = start_async_job( url=f"{LEAD_BASE_URL}/lead/suggest?async=1", body={"filters": filters, "icp_text": icp_text or "", "exclude_ids": exclude_ids or []}, headers={"X-Token-Id": token_id}, ) st = poll_status( url=f"{LEAD_BASE_URL}/lead/suggest/status/{res_job_id}?t={int(time.time()*1000)}", headers={"X-Token-Id": token_id}, max_wait_sec=60 * 20, session=GLOBAL_SES, ) res = unwrap_result(st) if isinstance(res, dict) and isinstance(res.get("items"), list) and res["items"]: return res["items"][0] if isinstance(res, list) and res: return res[0] return res if isinstance(res, dict) else {"result": res} def _flatten_text(val: Any) -> str: """ Newlines/Tabs/Mehrfach-Whitespace sauber zu einem Space zusammenziehen. Keine Feldraten, nur Plain-String-Verarbeitung. """ s = "" if val is None else str(val) s = s.replace("\r\n", "\n").replace("\r", "\n") s = re.sub(r"\s+", " ", s).strip() return s def normalize_draft(raw: Any) -> Dict[str, Any]: r = unwrap_result(raw) or raw or {} if isinstance(r, dict) and "message" in r and isinstance(r["message"], dict): r = r["message"] email_obj = r.get("email") if isinstance(r, dict) else {} def pick(obj, *keys): if not isinstance(obj, dict): return "" for k in keys: for kk, vv in obj.items(): if isinstance(kk, str) and kk.lower() == k.lower(): if isinstance(vv, str) and vv.strip(): return vv.strip() if vv: return vv return "" subject = pick(email_obj, "subject", "email_subject", "Betreff") or pick(r, "subject", "email_subject", "Betreff") body = pick(email_obj, "body", "text", "content") or pick(r, "body", "text", "content") fu1 = pick(r, "followup1", "FollowUp1", "LinkedIn") fu2 = pick(r, "followup2", "FollowUp2", "Facebook") to = pick(email_obj, "to") return {"email": {"to": to, "subject": subject, "body": body}, "followup1": fu1, "followup2": fu2} def email_generate_async(token_id: str, variables: dict, items: List[dict]) -> dict: """ Startet /email/generate (async) und gibt nur ECHTE Endpoint-Daten zurück. - Übergibt globale Variablen 1:1 (inkl. Signatur) auf Root-Ebene. - Keine Feldumbenennung, keine Heuristik. Rückgabe: { "email": {"subject": , "body": }, "followup1": , "followup2": , "raw": } """ if not items: raise ValueError("items fehlt (mindestens 1 Lead erforderlich).") root_allowed = { "Produkt_und_Dienstleistungsbeschreibung", "CTA", "Signatur", "Checkliste_Landingpage", "homepage_url", # optional, falls im Frontend genutzt "tags", # wird serverseitig ignoriert, aber ok "Touch_Point", # optional – stört den Backend-Kontrakt nicht } safe_vars = {} for k, v in (variables or {}).items(): if k in root_allowed: safe_vars[k] = v start_body = {**safe_vars, "items": items, "item_index": 0} try: sent_keys = sorted(list(start_body.keys())) sig_preview = str(start_body.get("Signatur") or "")[:120] APP_LOG.info(f"/email/generate body keys: {sent_keys}") APP_LOG.info(f"/email/generate Signatur (preview): {sig_preview}") except Exception: pass res_job_id = start_async_job( url=f"{LEAD_BASE_URL}/email/generate?async=1", body=start_body, headers={"X-Token-Id": token_id}, ) st = poll_status( url=f"{LEAD_BASE_URL}/email/generate/status/{res_job_id}?t={int(time.time()*1000)}", headers={"X-Token-Id": token_id}, max_wait_sec=60 * 20, session=GLOBAL_SES, ) raw = unwrap_result(st) or {} results = raw.get("results") if isinstance(raw, dict) else None first = (results[0] if isinstance(results, list) and results else {}) or {} msg = first.get("message") if isinstance(first, dict) else {} subj = str((msg or {}).get("Betreff") or "") text = str((msg or {}).get("Text") or "") fu1 = str((msg or {}).get("FollowUp1") or "") fu2 = str((msg or {}).get("FollowUp2") or "") return { "email": {"subject": subj, "body": text}, "followup1": fu1, "followup2": fu2, "raw": raw, } def wholix_login(email: str, password: str) -> str: """ Kompatibel zur alten Signatur (liefert weiterhin ein Token-String), richtet aber zusätzlich den globalen Auto-ReLogin-Manager _WHOLIX_SES ein. """ global _WHOLIX_SES _WHOLIX_SES = WholixSession(email, password, base_url=WHOLIX_BASE_URL, session=GLOBAL_SES) return _WHOLIX_SES.get_token() # ===================== Helfer für Platzhalter-E-Mail ======================= EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") def _slug(val: Any, maxlen: int = 24) -> str: s = "" if val is None else str(val) s = s.lower().strip() s = re.sub(r"[^a-z0-9]+", "-", s) s = re.sub(r"-+", "-", s).strip("-") return s[:maxlen] or "x" def _make_placeholder_email(record: dict) -> str: # Stable, valide, kurz fn = _slug(record.get("firstname")) ln = _slug(record.get("lastname")) base = (fn + "." + ln).strip(".") or "lead" cid = _slug(record.get("exclude_hash") or uuid.uuid4().hex[:8], maxlen=16) local = f"{base}.{cid}"[:48] return f"{local}@example.com" # RFC-Reservedomäne, von Validatoren akzeptiert # ===================== Wholix Store + Fallbacks ============================ # ============================= Wholix Auth ================================ # ============================= Wholix Auth (NEU) ============================= # Globaler Session-Manager (wird in wholix_login gesetzt) _WHOLIX_SES = None # type: Optional["WholixSession"] class WholixSession: """ Kapselt Token-Handling: - Login mit email/password - Autorisierte Requests mit Auto-ReLogin bei 401/403/419 (ein Retry) - Thread-sicher """ def __init__(self, email: str, password: str, base_url: str = WHOLIX_BASE_URL, session: Optional[requests.Session] = None): self.email = email self.password = password self.base_url = base_url.rstrip("/") self._token = None self._lock = threading.RLock() self._ses = session or GLOBAL_SES def _login(self) -> str: res = req(f"{self.base_url}/api/v1/auth/login", method="POST", json_body={"email": self.email, "password": self.password}, timeout=(5.0, 15.0), session=self._ses) token = (res or {}).get("token") if not token: raise RuntimeError("Wholix-Login fehlgeschlagen.") return token def get_token(self) -> str: if self._token: return self._token with self._lock: if not self._token: self._token = self._login() return self._token def auth_headers(self) -> Dict[str, str]: return {"Authorization": f"Bearer {self.get_token()}"} def req_authed(self, path: str, *, method: str = "GET", headers: Optional[Dict[str, str]] = None, json_body: Any = None, data: Any = None, timeout: Tuple[float, float] = (5.0, 30.0)): url = path if path.startswith("http") else f"{self.base_url}{path}" hdrs = {} hdrs.update(headers or {}) hdrs.update(self.auth_headers()) try: return req(url, method=method, headers=hdrs, json_body=json_body, data=data, timeout=timeout, session=self._ses) except HTTPError as e: if e.status in (401, 403, 419): with self._lock: self._token = None self._token = self._login() hdrs = {} hdrs.update(headers or {}) hdrs.update(self.auth_headers()) return req(url, method=method, headers=hdrs, json_body=json_body, data=data, timeout=timeout, session=self._ses) raise def _store_with_fallbacks(token: str, payload: dict, module: str) -> dict: """ Versucht 3 Stufen, um 422 zu vermeiden. NEU: nutzt, wenn vorhanden, den globalen WholixSession-Manager (_WHOLIX_SES) mit Auto-ReLogin. Fällt sonst auf das übergebene Token zurück. """ url_path = "/api/v1/table-object-data/store-objects" def _post_with_session(body): # bevorzugt Auto-ReLogin-Session if _WHOLIX_SES is not None: return _WHOLIX_SES.req_authed(url_path, method="POST", json_body=body, timeout=(5.0, 30.0)) # Fallback: manuelles Token (legacy) url = f"{WHOLIX_BASE_URL}{url_path}" headers = {"Authorization": f"Bearer {token}"} return req(url, method="POST", headers=headers, json_body=body, timeout=(5.0, 30.0)) # 1: Original body1 = {"module": module, "action": "store", "data": [payload]} try: return _post_with_session(body1) except HTTPError as e: if e.status != 422: raise # 2: URLs reparieren/entfernen p2 = dict(payload) for k in ("linkedin_url", "company_url"): if p2.get(k): fixed = _normalize_url(p2.get(k)) if fixed: p2[k] = fixed else: p2.pop(k, None) try: return _post_with_session({"module": module, "action": "store", "data": [p2]}) except HTTPError as e: if e.status != 422: raise # 3: Minimal-Record minimal = { "email": p2.get("email") or _make_placeholder_email(p2), "firstname": p2.get("firstname") or "", "lastname": p2.get("lastname") or "", "company_name": p2.get("company_name") or "", "exclude_hash": p2.get("exclude_hash") or _slug(uuid.uuid4().hex[:8]), "tags": p2.get("tags") or {"keys": ["no-email"], "values": ["no-email"]}, } return _post_with_session({"module": module, "action": "store", "data": [minimal]}) def wholix_store_contact(token: str, record: dict, module: str = "Contacts", allow_placeholder: bool = True) -> dict: """ Sendet NUR erlaubte Felder an Wholix und saniert problematische Werte. IMMER speicherbar: erzeugt bei Bedarf Placeholder-Mail (example.com) + Tag. Nutzt _store_with_fallbacks (mit Auto-ReLogin via _WHOLIX_SES, falls vorhanden). """ if not isinstance(record, dict): raise ValueError("Wholix: record muss ein dict sein.") def _clean_str(v): if v is None: return None s = str(v).strip() return s if s else None # E-Mail prüfen / ggf. generieren email = _clean_str(record.get("email")) if not email or not EMAIL_RE.match(email): if allow_placeholder: email = _make_placeholder_email(record) record["email"] = email if isinstance(record.get("tags"), dict): for k in ("keys", "values"): record["tags"].setdefault(k, []) if "no-email" not in record["tags"][k]: record["tags"][k].append("no-email") ALLOWED = { "firstname", "lastname", "email", # Pflichtfeld (ggf. Platzhalter) "adress", # (sic) genau so "city", "postcode", "phonenumber", "job_title", "departments", # Text "linkedin_url", "company_name", "company_url", "message_mail", "message_mail_subject", "message_followup1", "message_followup2", "exclude_hash", "status_field", # Multi-Select: {keys:[], values:[]} "tags", # Multi-Select: {keys:[], values:[]} } def _coerce_departments(v): if v is None: return None if isinstance(v, (list, tuple, set)): v = ", ".join(str(x).strip() for x in v if str(x).strip()) else: v = str(v).strip() if v.startswith("[") and v.endswith("]"): v = v[1:-1].strip().strip("'\"") return _clean_str(v) out = {} for k in ALLOWED: if k not in record: continue val = record[k] if k == "departments": val = _coerce_departments(val) elif k in ("linkedin_url", "company_url"): val = _normalize_url(val) elif k in ("status_field", "tags"): if not (isinstance(val, dict) and "keys" in val and "values" in val): val = None else: val = _clean_str(val) if val is not None: out[k] = val # Pflichtfeld sicher (jetzt inkl. Platzhalter möglich) out["email"] = _clean_str(email) # POST mit Fallback-Logik (nutzt Auto-ReLogin falls _WHOLIX_SES vorhanden) return _store_with_fallbacks(token, out, module) # ======= NEW: Wholix-Excludes paginiert laden (wie in deinem JS) =========== def wholix_fetch_excludes(token: str, module_name: str = "Contacts", per_page: int = 500, max_pages: int = 100, dedupe: bool = True) -> List[Dict[str, str]]: """ Paginiert Excludes laden. NEU: nutzt, wenn vorhanden, den globalen WholixSession-Manager (_WHOLIX_SES) mit Auto-ReLogin. Fällt sonst auf das übergebene Token zurück. """ path = "/api/v1/table-object-data/fetch-paginated-results" out: List[Dict[str, str]] = [] seen: Set[str] = set() last_page = 10**9 for page in range(1, max_pages + 1): if page > last_page: break payload = {"module": module_name, "action": "search", "page": page, "per_page": per_page} try: if _WHOLIX_SES is not None: res = _WHOLIX_SES.req_authed(path, method="POST", json_body=payload, timeout=(5.0, 30.0)) else: url = f"{WHOLIX_BASE_URL}{path}" headers = {"Authorization": f"Bearer {token}"} res = req(url, method="POST", headers=headers, json_body=payload, timeout=(5.0, 30.0)) except Exception as e: APP_LOG.warning(f"Wholix-Excludes Page {page} Fehler: {e}") break data_block = (res or {}).get("data") or {} rows = data_block.get("data") or data_block.get("results") or [] if isinstance(data_block.get("last_page"), (int, float)): last_page = int(data_block.get("last_page")) else: last_page = page if not isinstance(rows, list) or not rows: break for row in rows: ex = (row.get("exclude_hash") or row.get("excludeHash") or row.get("exclude_id") or "").strip() if not ex: continue if dedupe and ex in seen: continue if dedupe: seen.add(ex) cname = (row.get("company_name") or row.get("companyName") or "").strip() out.append({"exclude_hash": ex, "company_name": cname}) return out # ========================== Job Management ================================= EXEC = ThreadPoolExecutor(max_workers=MAX_WORKERS) JOBS: Dict[str, Dict[str, Any]] = {} def _job_init(job_id: str): JOBS[job_id] = { "log": [], "progress": 0, "rows": [], "done": False, "error": None, "lock": threading.RLock(), "created_at": time.time(), "finished_at": None, } def _job_emit(job_id: str, msg: str = None, progress: Optional[int] = None, rows_append: Optional[dict] = None): st = JOBS.get(job_id) if not st: return lg = get_job_logger(job_id) with st["lock"]: if msg: st["log"].append(msg) st["log"] = st["log"][-1000:] APP_LOG.info(f"[{job_id}] {msg}") lg.info(msg) if isinstance(progress, int): st["progress"] = max(0, min(100, progress)) if rows_append: st["rows"].append(rows_append) st["rows"] = st["rows"][-1000:] APP_LOG.info(f"[{job_id}] row: {rows_append}") lg.info(f"row: {rows_append}") def _job_finish(job_id: str, error: Optional[str] = None): st = JOBS.get(job_id) if not st: return lg = get_job_logger(job_id) with st["lock"]: st["error"] = error st["done"] = True st["finished_at"] = time.time() final = "✅ Fertig." if not error else f"❌ {error}" st["log"].append(final) APP_LOG.info(f"[{job_id}] {final}") (lg.error if error else lg.info)(final) def _gc_jobs(): now = time.time() for jid, st in list(JOBS.items()): if st.get("done") and st.get("finished_at") and now - st["finished_at"] > JOB_TTL_SEC: JOBS.pop(jid, None) # ========================== Pipeline (Background) =========================== LEAD_COUNTS = [1, 2, 3, 4, 5, 10, 15, 20, 40, 80, 100, 200, 300, 400, 500, 1000] CURL_DATA_RE = re.compile(r"""--data(?:-raw)?\s+(?P['\"])(?P.*?)(?P=q)""", re.DOTALL) HDR_XTOKEN_RE = re.compile(r"""-H\s+(?P['\"])X-Token-Id:\s*(?P[^'\"]+)(?P=q)""", re.IGNORECASE) def _find_data_quote_start(s: str) -> Tuple[int, Optional[str]]: m = re.search(r"--data(?:-raw)?\s+(['\"])", s) if not m: return -1, None return m.end(1), m.group(1) def _scan_quoted_payload(s: str, i: int, q: str) -> Tuple[str, int]: out = [] n = len(s) while i < n: ch = s[i] if q == "'": if i + 4 < n and s[i:i+5] == "'\"'\"'": out.append("'"); i += 5; continue if ch == "'": return "".join(out), i + 1 out.append(ch); i += 1 else: if ch == "\\" and i + 1 < n: out.append(s[i+1]); i += 2; continue if ch == '"': return "".join(out), i + 1 out.append(ch); i += 1 return "".join(out), i def parse_curl(curl_text: str) -> Tuple[str, Dict[str, Any]]: """ Extrahiert X-Token-Id und JSON aus einem curl mit --data-raw '...'. Keine Feldumbenennungen, keine Heuristik – JSON wird 1:1 übernommen. """ import shlex if not isinstance(curl_text, str) or not curl_text.strip(): raise ValueError("Leerer curl-Text.") token_id = "" m = re.search(r"""-H\s+(["'])X-Token-Id:\s*([^"']+)\1""", curl_text, re.IGNORECASE) if m: token_id = m.group(2).strip() if not token_id: m2 = re.search(r"""X-Token-Id:\s*([A-Za-z0-9\-\._]+)""", curl_text, re.IGNORECASE) if m2: token_id = m2.group(1).strip() if not token_id: env_token = os.getenv("X_TOKEN_ID", "").strip() if env_token: token_id = env_token if not token_id: raise ValueError("Konnte keinen X-Token-Id Header im curl (oder env X_TOKEN_ID) finden.") qmatch = re.search(r"""--data(?:-raw)?\s+(['\"])""", curl_text) if not qmatch: raise ValueError("Konnte den JSON Body nicht finden (erwarte --data-raw '...').") quote = qmatch.group(1) start = qmatch.end(1) out = [] i = start n = len(curl_text) if quote == "'": while i < n: if i + 4 < n and curl_text[i:i+5] == "'\"'\"'": out.append("'") i += 5 continue ch = curl_text[i] if ch == "'": break out.append(ch) i += 1 else: while i < n: ch = curl_text[i] if ch == "\\" and i + 1 < n: out.append(curl_text[i+1]) i += 2 continue if ch == '"': break out.append(ch) i += 1 body_str = "".join(out).strip() try: payload = json.loads(body_str) except json.JSONDecodeError: payload = json.loads(body_str.replace("\r\n", "\n").replace("\r", "\n")) return token_id, payload def run_pipeline_bg(job_id: str, curl_text: str, n_leads_ui: int): lg = get_job_logger(job_id) APP_LOG.info(f"[{job_id}] Background job gestartet (n={n_leads_ui})") lg.info(f"Background job gestartet (n={n_leads_ui})") _gc_jobs() _job_emit(job_id, "Job gestartet.") try: token_id, payload = parse_curl(curl_text) except Exception: logging.exception(f"[{job_id}] Parse-Fehler") _job_finish(job_id, "Parse-Fehler: siehe Logs (Stacktrace).") return wh_email = payload.get("wholix_email") or payload.get("Wholix_email") or "" wh_pass = payload.get("wholix_passwort") or payload.get("wholix_password") or "" if not wh_email or not wh_pass: _job_finish(job_id, "In der JSON-Payload fehlen wholix_email / wholix_passwort.") return # Globale Felder exakt wie im Curl (1:1) filters = payload.get("filters") or {} icp_text = payload.get("Produkt_und_Dienstleistungsbeschreibung") or "" checklist = payload.get("Checkliste_Landingpage") or "" signature = payload.get("Signatur") or "" cta = payload.get("CTA") or "" homepage_url = payload.get("icp_homepage_url") or "" raw_tag = payload.get("Wholic_tag") or payload.get("Wholix_tag") or "AI" # Lead-Anzahl try: n_leads = int(n_leads_ui) except Exception: n_leads = 1 n_leads = max(1, min(n_leads, MAX_LEADS)) # Fortschritt: 1 (Login) + pro Lead 4 Schritte total_steps = (n_leads * 4) + 1 step = 0 # 1) Wholix Login step += 1 _job_emit(job_id, "→ Wholix Login …", progress=int(step / total_steps * 100)) try: wh_token = wholix_login(wh_email, wh_pass) except Exception: logging.exception(f"[{job_id}] Wholix-Login fehlgeschlagen") _job_finish(job_id, "Wholix-Login fehlgeschlagen (Details in Logs).") return # ======= EXCLUDES LADEN / MERGEN ======================================= initial_excludes = set() try: for x in (payload.get("exclude_ids") or []): if x: initial_excludes.add(str(x).strip()) except Exception: pass _job_emit(job_id, f"→ Wholix-Excludes laden … (Seed: {len(initial_excludes)})") try: wh_excludes = wholix_fetch_excludes(wh_token) # paginiert for row in wh_excludes: ex = (row.get("exclude_hash") or "").strip() if ex: initial_excludes.add(ex) _job_emit(job_id, f" {len(wh_excludes)} Excludes aus Wholix, gesamt {len(initial_excludes)}") except Exception: logging.exception(f"[{job_id}] Wholix-Excludes laden fehlgeschlagen") _job_emit(job_id, " Excludes laden übersprungen (Fehler).") excludes: List[str] = list(initial_excludes) excludes_set: Set[str] = set(initial_excludes) # ---- Helper zum sicheren Auslesen echter Felder (ohne Raten) ----------- def pick(d: dict, *keys): for k in keys: if isinstance(d, dict) and k in d and d[k] not in (None, "", [], {}): return d[k] return None def norm_tags(raw) -> dict: if raw is None: return {"keys": [], "values": []} vals: List[str] = [] if isinstance(raw, str): s = raw.strip() if (s.startswith("[") and s.endswith("]")): try: arr = json.loads(s) if isinstance(arr, list): vals = [str(x).strip() for x in arr if str(x).strip()] else: vals = [s.strip("[] ").strip().strip("'\"")] except Exception: vals = [s.strip("[] ").strip().strip("'\"")] else: vals = [s] elif isinstance(raw, (list, tuple, set)): vals = [str(x).strip() for x in raw if str(x).strip()] else: vals = [str(raw).strip()] vals = [v for v in vals if v] return {"keys": vals, "values": vals} def norm_departments(v) -> str: if v is None: return None if isinstance(v, (list, tuple, set)): return ", ".join(str(x).strip() for x in v if str(x).strip()) or None s = str(v).strip() if s.startswith("[") and s.endswith("]"): s = s[1:-1].strip().strip("'\"") return s or None def norm_url(u: Any) -> Optional[str]: return _normalize_url(u) for i in range(1, n_leads + 1): try: # 2) Lead holen step += 1 _job_emit(job_id, f"→ [{i}/{n_leads}] Lead vorschlagen … (excludes={len(excludes_set)})", progress=int(step / total_steps * 100)) try: lead = suggest_single_lead(token_id, filters, icp_text, excludes) except Exception: logging.exception(f"[{job_id}] Lead-Fehler") _job_emit(job_id, "❌ Lead-Fehler: siehe Logs") continue if not isinstance(lead, dict): _job_emit(job_id, "❌ Ungültige Lead-Struktur") continue person = (lead.get("person") or {}) company = (lead.get("company") or {}) combined_id = str(lead.get("combined_id") or "").strip() if combined_id and combined_id in excludes_set: _job_emit(job_id, f" ⚠️ Lead übersprungen (bereits excluded): {combined_id}") continue _job_emit(job_id, f" Lead: {person.get('first_name','?')} {person.get('last_name','?')} @ {company.get('name') or company.get('company_name','?')}") # 3) Nachricht generieren (echte Endpoint-Daten) step += 1 _job_emit(job_id, " → Nachricht generieren …", progress=int(step / total_steps * 100)) items = [{"combined_id": combined_id, "company": company, "person": person}] variables = { "Produkt_und_Dienstleistungsbeschreibung": icp_text, "CTA": cta, "Signatur": signature, "Checkliste_Landingpage": checklist, "homepage_url": homepage_url, # optional "tags": raw_tag, # kommt als "[AI]" → später normiert "Touch_Point": "LinkedIn DM", # optional } try: draft = email_generate_async(token_id, variables, items) except Exception: logging.exception(f"[{job_id}] Email-Generate-Fehler") _job_emit(job_id, "❌ Email-Generate-Fehler: siehe Logs") continue subj_raw = (draft.get("email") or {}).get("subject", "") body_raw = (draft.get("email") or {}).get("body", "") subj_flat = _flatten_text(subj_raw) body_flat = _flatten_text(body_raw) _job_emit(job_id, f" ✉️ Subject: {subj_flat}") _job_emit(job_id, f" ✉️ Message: {body_flat}") # 4) Speichern in Wholix (IMMER – auch ohne echte E-Mail) step += 1 _job_emit(job_id, " → Speichere in Wholix …", progress=int(step / total_steps * 100)) email_to = str(person.get("email") or "").strip() try: # --- Felder befüllen NUR wenn real vorhanden ----------------- firstname = pick(person, "first_name") lastname = pick(person, "last_name") job_title = pick(person, "job_title", "title", "position") departments = norm_departments(pick(person, "departments")) linkedin = pick(person, "linkedin_url") phonenumber = pick(person, "phone", "phonenumber") or pick(company, "phone", "company_phone", "phonenumber") adress = pick(person, "adress", "address", "street") or pick(company, "adress", "address", "street") city = pick(person, "city") or pick(company, "city") postcode = pick(person, "postcode", "postal_code", "zip") or pick(company, "postcode", "postal_code", "zip") company_name = pick(company, "name", "company_name") company_url = norm_url(pick(company, "url", "website_url", "website", "domain")) tags_ms = norm_tags(raw_tag) # Wenn keine echte E-Mail: Tag ergänzen if not email_to: tags_ms = {"keys": list({*tags_ms.get("keys", []), "no-email"}), "values": list({*tags_ms.get("values", []), "no-email"})} record = { "firstname": firstname, "lastname": lastname, "email": email_to or None, # kann None sein → Platzhalter wird im Store erzeugt "phonenumber": phonenumber, "adress": adress, "city": city, "postcode": postcode, "job_title": job_title, "departments": departments, "linkedin_url": linkedin, "company_name": company_name, "company_url": company_url, "message_mail_subject": subj_raw or None, "message_mail": body_raw or None, "message_followup1": draft.get("followup1") or None, "message_followup2": draft.get("followup2") or None, "exclude_hash": combined_id or None, "status_field": {"keys": ["Kontakt aufgenommen"], "values": ["Kontakt aufgenommen"]}, "tags": tags_ms, } store_res = wholix_store_contact(wh_token, record, allow_placeholder=True) stored_ok = bool(store_res) if not email_to: _job_emit(job_id, " ℹ️ Keine echte E-Mail – Platzhalter gespeichert und Tag 'no-email' gesetzt.") except Exception: logging.exception(f"[{job_id}] Wholix-Store-Fehler") _job_emit(job_id, "❌ Wholix-Store-Fehler: siehe Logs") stored_ok = False if combined_id: excludes_set.add(combined_id) excludes.append(combined_id) _job_emit(job_id, rows_append={ "person": f"{person.get('first_name','')} {person.get('last_name','')}".strip(), "email": email_to or "(placeholder)", "company": company.get("name") or company.get("company_name") or "", "subject": subj_flat, "message": body_flat, "stored_ok": stored_ok, }) # 5) Abschluss step += 1 _job_emit(job_id, " ✓ Lead abgeschlossen.", progress=int(min(99, step / total_steps * 100))) except Exception: logging.exception(f"[{job_id}] Unerwarteter Fehler im Lead-Durchlauf") _job_emit(job_id, "❌ Unerwarteter Fehler – Details in Logs.") _job_emit(job_id, progress=100) _job_finish(job_id, None) # ================================ UI ======================================= def build_ui(): with gr.Blocks(theme=gr.themes.Soft(), css=""" .logbox textarea { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", monospace; font-size: 12.5px; line-height: 1.35; } """) as demo: gr.Markdown("## Wholix Lead → Message → Store (rein asynchron, robust für lange Jobs)") gr.Markdown( "Füge deinen kompletten **`curl`** (mit `X-Token-Id` und JSON `--data-raw`) ein, wähle die Anzahl Leads und klicke **Start**. " "Die Verarbeitung läuft serverseitig weiter – selbst wenn der Browser/Tab schließt. " "Mit **Aktualisieren** holst du den aktuellen Status ab." ) with gr.Row(): curl_in = gr.Textbox( label="curl Befehl", placeholder="curl -sS -N -X POST 'https://.../stream' -H 'X-Token-Id: ...' --data-raw '{...}'", lines=12 ) with gr.Row(): count = gr.Dropdown(choices=[str(x) for x in LEAD_COUNTS], value="1", label="Anzahl Leads") with gr.Row(): start_btn = gr.Button("🚀 Start", variant="primary") poll_btn = gr.Button("🔄 Aktualisieren") with gr.Row(): job_id_tb = gr.Textbox(label="Job-ID", interactive=True) with gr.Row(): status = gr.Textbox(label="Status / Log", lines=18, interactive=False, elem_classes=["logbox"]) with gr.Row(): progress = gr.Slider(label="Progress", minimum=0, maximum=100, value=0, interactive=False) with gr.Row(): out = gr.Dataframe( headers=["person", "email", "company", "subject", "message", "stored_ok"], label="Ergebnisse", interactive=False, wrap=True, row_count=(0, "dynamic"), col_count=(6, "fixed"), ) def start_job(curl_text: str, n: str): try: n_int = int(n) except Exception: n_int = 1 n_int = max(1, min(n_int, MAX_LEADS)) job_id = str(uuid.uuid4()) _job_init(job_id) _job_emit(job_id, f"Job gestartet: {job_id}") EXEC.submit(run_pipeline_bg, job_id, curl_text, n_int) st = JOBS[job_id] with st["lock"]: log = "\n".join(st["log"]) prog = int(st["progress"]) rows = st["rows"] return log, prog, rows, job_id def poll_job(job_id: str): st = JOBS.get(job_id) if not st: return "Unbekannte oder abgelaufene Job-ID.", 0, [] with st["lock"]: log = "\n".join(st["log"][-500:]) prog = int(st["progress"]) rows = st["rows"] return log, prog, rows start_btn.click(start_job, inputs=[curl_in, count], outputs=[status, progress, out, job_id_tb]) poll_btn.click(poll_job, inputs=[job_id_tb], outputs=[status, progress, out]) return demo # ============================== MAIN ======================================= if __name__ == "__main__": app = build_ui() app.launch(server_name="0.0.0.0", debug=True, share=True)