| import os, io, json, zipfile, hashlib |
| from typing import List, Dict, Any, Optional |
| import gradio as gr |
| from pydantic import BaseModel |
| from tenacity import retry, stop_after_attempt, wait_exponential, RetryError |
|
|
| |
| try: |
| from dotenv import load_dotenv |
| load_dotenv() |
| except Exception: |
| pass |
|
|
| |
| try: |
| from openai import OpenAI |
| except Exception: |
| OpenAI = None |
|
|
| try: |
| import anthropic |
| from anthropic import NotFoundError as AnthropicNotFound |
| except Exception: |
| anthropic = None |
| AnthropicNotFound = Exception |
|
|
| from firecrawl import Firecrawl |
|
|
| |
| def _to_dict(obj: Any) -> Any: |
| if isinstance(obj, BaseModel): |
| return obj.model_dump() |
| if isinstance(obj, dict): |
| return {k: _to_dict(v) for k, v in obj.items()} |
| if isinstance(obj, (list, tuple)): |
| return [_to_dict(v) for v in obj] |
| if hasattr(obj, "__dict__") and not isinstance(obj, (str, bytes)): |
| try: |
| return {k: _to_dict(v) for k, v in vars(obj).items()} |
| except Exception: |
| pass |
| return obj |
|
|
| def _pretty_json(data: Any, limit: int = 300_000) -> str: |
| try: |
| s = json.dumps(_to_dict(data), indent=2) |
| return s[:limit] |
| except Exception as e: |
| return f"<!> Could not serialize to JSON: {e}" |
|
|
| def _listify(x) -> List[Any]: |
| if x is None: |
| return [] |
| if isinstance(x, list): |
| return x |
| return [x] |
|
|
| |
| class Keys(BaseModel): |
| openai: Optional[str] = None |
| anthropic: Optional[str] = None |
| firecrawl: Optional[str] = None |
|
|
| def resolve_keys(s: Keys) -> Keys: |
| return Keys( |
| openai=s.openai or os.getenv("OPENAI_API_KEY"), |
| anthropic=s.anthropic or os.getenv("ANTHROPIC_API_KEY"), |
| firecrawl=s.firecrawl or os.getenv("FIRECRAWL_API_KEY"), |
| ) |
|
|
| |
| def fc_client(s: Keys) -> Firecrawl: |
| k = resolve_keys(s) |
| if not k.firecrawl: |
| raise gr.Error("Missing FIRECRAWL_API_KEY. Enter it in Keys → Save.") |
| return Firecrawl(api_key=k.firecrawl) |
|
|
| @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=8)) |
| def fc_search(s: Keys, query: str, limit: int = 5, scrape_formats: Optional[List[str]] = None, location: Optional[str] = None) -> Dict[str, Any]: |
| fc = fc_client(s) |
| kwargs: Dict[str, Any] = {"query": query, "limit": limit} |
| if location: |
| kwargs["location"] = location |
| if scrape_formats: |
| kwargs["scrape_options"] = {"formats": scrape_formats} |
| res = fc.search(**kwargs) |
| return _to_dict(res) |
|
|
| @retry(stop=stop_after_attempt(2), wait=wait_exponential(multiplier=1, min=1, max=10)) |
| def fc_scrape(s: Keys, url: str, formats: Optional[List[str]] = None, timeout_ms: Optional[int] = None, mobile: bool = False) -> Dict[str, Any]: |
| fc = fc_client(s) |
| kwargs: Dict[str, Any] = {"url": url} |
| if formats: |
| kwargs["formats"] = formats |
| |
| if timeout_ms: |
| kwargs["timeout"] = min(int(timeout_ms), 40000) |
| if mobile: |
| kwargs["mobile"] = True |
| res = fc.scrape(**kwargs) |
| return _to_dict(res) |
|
|
| @retry(stop=stop_after_attempt(2), wait=wait_exponential(multiplier=1, min=1, max=10)) |
| def fc_crawl(s: Keys, url: str, max_pages: int = 25, formats: Optional[List[str]] = None) -> Dict[str, Any]: |
| fc = fc_client(s) |
| kwargs: Dict[str, Any] = {"url": url, "limit": max_pages} |
| if formats: |
| kwargs["scrape_options"] = {"formats": formats} |
| res = fc.crawl(**kwargs) |
| return _to_dict(res) |
|
|
| |
| SYSTEM_STEER = ( |
| "You are ZEN's VibeCoder: extract web insights, generate clean scaffolds, " |
| "and produce production-ready artifacts. Prefer structured outlines, code blocks, and checklists. " |
| "When asked to clone or refactor, output file trees and exact text." |
| ) |
|
|
| def use_openai(s: Keys): |
| k = resolve_keys(s) |
| if not k.openai: |
| raise gr.Error("Missing OPENAI_API_KEY.") |
| if OpenAI is None: |
| raise gr.Error("OpenAI SDK not installed.") |
| return OpenAI(api_key=k.openai) |
|
|
| def use_anthropic(s: Keys): |
| k = resolve_keys(s) |
| if not k.anthropic: |
| raise gr.Error("Missing ANTHROPIC_API_KEY.") |
| if anthropic is None: |
| raise gr.Error("Anthropic SDK not installed.") |
| return anthropic.Anthropic(api_key=k.anthropic) |
|
|
| ANTHROPIC_FALLBACKS = [ |
| |
| "claude-3-7-sonnet-2025-06-13", |
| "claude-3-7-sonnet", |
| "claude-3-5-sonnet-20241022", |
| "claude-3-5-sonnet-20240620", |
| ] |
|
|
| OPENAI_FALLBACKS = [ |
| "gpt-5", |
| "gpt-4.1", |
| "gpt-4o", |
| "gpt-4o-mini", |
| ] |
|
|
| def llm_summarize(s: Keys, provider: str, model_name: str, prompt: str, ctx_md: str, temp: float=0.4) -> str: |
| ctx = (ctx_md or "")[:150000] |
| if provider == "openai": |
| client = use_openai(s) |
| candidates = [model_name] + OPENAI_FALLBACKS if model_name else OPENAI_FALLBACKS |
| last_err = None |
| for m in candidates: |
| try: |
| resp = client.chat.completions.create( |
| model=m, |
| temperature=temp, |
| messages=[ |
| {"role": "system", "content": SYSTEM_STEER}, |
| {"role": "user", "content": f"{prompt}\n\n=== SOURCE (markdown) ===\n{ctx}"}, |
| ], |
| ) |
| return (resp.choices[0].message.content or "").strip() |
| except Exception as e: |
| last_err = e |
| continue |
| raise gr.Error(f"OpenAI failed across fallbacks: {last_err}") |
| else: |
| client = use_anthropic(s) |
| candidates = [model_name] + ANTHROPIC_FALLBACKS if model_name else ANTHROPIC_FALLBACKS |
| last_err = None |
| for m in candidates: |
| try: |
| resp = client.messages.create( |
| model=m, |
| max_tokens=4000, |
| temperature=temp, |
| system=SYSTEM_STEER, |
| messages=[{"role": "user", "content": f"{prompt}\n\n=== SOURCE (markdown) ===\n{ctx}"}], |
| ) |
| chunks = [] |
| for blk in resp.content: |
| t = getattr(blk, "text", None) |
| if t: |
| chunks.append(t) |
| return "".join(chunks).strip() |
| except AnthropicNotFound as e: |
| last_err = e |
| continue |
| except Exception as e: |
| last_err = e |
| continue |
| raise gr.Error(f"Anthropic failed across fallbacks: {last_err}") |
|
|
| |
| def pack_zip(pages: List[Dict[str, Any]]) -> bytes: |
| mem = io.BytesIO() |
| with zipfile.ZipFile(mem, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: |
| manifest = [] |
| for i, p in enumerate(pages, start=1): |
| url = p.get("url") or p.get("metadata", {}).get("sourceURL") or f"page_{i}" |
| slug = hashlib.sha1(str(url).encode("utf-8")).hexdigest()[:10] |
| md = p.get("markdown") or p.get("data", {}).get("markdown") or p.get("content") or "" |
| html = p.get("html") or p.get("data", {}).get("html") or "" |
| links = p.get("links") or p.get("data", {}).get("links") or [] |
| if md: zf.writestr(f"{i:03d}_{slug}.md", md) |
| if html: zf.writestr(f"{i:03d}_{slug}.html", html) |
| manifest.append({"url": url, "title": p.get("title") or p.get("metadata", {}).get("title"), "links": links}) |
| zf.writestr("manifest.json", json.dumps(manifest, indent=2)) |
| mem.seek(0) |
| return mem.read() |
|
|
| |
| def save_keys(openai_key, anthropic_key, firecrawl_key): |
| return Keys( |
| openai=(openai_key or "").strip() or None, |
| anthropic=(anthropic_key or "").strip() or None, |
| firecrawl=(firecrawl_key or "").strip() or None, |
| ), gr.Info("Keys saved to this session. (Env vars still apply if set.)") |
|
|
| def action_search(sess: Keys, query: str, limit: int, scrape_content: bool, location: str): |
| if not query.strip(): |
| raise gr.Error("Enter a search query.") |
| formats = ["markdown", "links"] if scrape_content else None |
| res = fc_search(sess, query=query.strip(), limit=limit, scrape_formats=formats, location=(location or None)) |
| data = res.get("data", res) |
| items: List[Any] = [] |
| if isinstance(data, dict): |
| for bucket in ("web", "news", "images", "videos", "discussion"): |
| b = data.get(bucket) |
| if b: |
| items.extend(_listify(_to_dict(b))) |
| elif isinstance(data, list): |
| items = _to_dict(data) |
| else: |
| items = _listify(_to_dict(data)) |
| if not items: |
| return _pretty_json(res) |
| return json.dumps(items, indent=2) |
|
|
| def action_scrape(sess: Keys, url: str, mobile: bool, formats_sel: List[str], timeout_ms: int): |
| if not url.strip(): |
| raise gr.Error("Enter a URL.") |
| formats = formats_sel or ["markdown", "links"] |
| try: |
| out = fc_scrape(sess, url.strip(), formats=formats, timeout_ms=(timeout_ms or 15000), mobile=mobile) |
| pretty = _pretty_json(out) |
| md = out.get("markdown") or out.get("data", {}).get("markdown") or out.get("content") or "" |
| return pretty, md |
| except RetryError as e: |
| return f"<!> Scrape timed out after retries. Try increasing timeout, unchecking 'mobile', or limiting formats.\n\n{e}", "" |
| except Exception as e: |
| return f"<!> Scrape error: {e}", "" |
|
|
| def action_crawl(sess: Keys, base_url: str, max_pages: int, formats_sel: List[str]): |
| if not base_url.strip(): |
| raise gr.Error("Enter a base URL to crawl.") |
| formats = formats_sel or ["markdown", "links"] |
| try: |
| out = fc_crawl(sess, base_url.strip(), max_pages=max_pages, formats=formats) |
| pages = out.get("data") |
| if not isinstance(pages, list) or not pages: |
| raise gr.Error("Crawl returned no pages.") |
| zip_bytes = pack_zip(pages) |
| return gr.File.update(value=io.BytesIO(zip_bytes), visible=True, filename="site_clone.zip"), f"Crawled {len(pages)} pages. ZIP is ready." |
| except RetryError as e: |
| return gr.File.update(visible=False), f"<!> Crawl timed out after retries. Reduce Max Pages or try again.\n\n{e}" |
| except Exception as e: |
| return gr.File.update(visible=False), f"<!> Crawl error: {e}" |
|
|
| def action_generate(sess: Keys, provider: str, model_name: str, sys_prompt: str, user_prompt: str, context_md: str, temp: float): |
| if not user_prompt.strip(): |
| raise gr.Error("Enter a prompt or click a starter tile.") |
| model = (model_name or "").strip() |
| steer = (sys_prompt or "").strip() |
| prompt = (("SYSTEM:\n" + steer + "\n\n") if steer else "") + user_prompt.strip() |
| out = llm_summarize(sess, provider, model, prompt, context_md or "", temp=temp) |
| return out |
|
|
| |
| with gr.Blocks(css="#keys .wrap.svelte-1ipelgc { filter: none !important; }") as demo: |
| gr.Markdown("## ZEN VibeCoder — Web Clone & Research Foundry") |
| session_state = gr.State(Keys()) |
|
|
| with gr.Accordion("🔐 Keys (session)", open=True): |
| with gr.Row(): |
| openai_key = gr.Textbox(label="OPENAI_API_KEY (GPT-5 / fallbacks)", type="password", placeholder="sk-...", value=os.getenv("OPENAI_API_KEY") or "") |
| anthropic_key = gr.Textbox(label="ANTHROPIC_API_KEY (Claude Sonnet)", type="password", placeholder="anthropic-key...", value=os.getenv("ANTHROPIC_API_KEY") or "") |
| firecrawl_key = gr.Textbox(label="FIRECRAWL_API_KEY", type="password", placeholder="fc-...", value=os.getenv("FIRECRAWL_API_KEY") or "") |
| save_btn = gr.Button("Save keys", variant="primary") |
| save_msg = gr.Markdown() |
| save_btn.click(save_keys, [openai_key, anthropic_key, firecrawl_key], [session_state, save_msg]) |
|
|
| with gr.Tabs(): |
| with gr.Tab("🔎 Search"): |
| query = gr.Textbox(label="Query", placeholder='ex: site:docs "vector database" 2025') |
| with gr.Row(): |
| limit = gr.Slider(1, 20, value=6, step=1, label="Limit") |
| scrape_content = gr.Checkbox(label="Also scrape results (markdown + links)", value=True) |
| location = gr.Textbox(label="Location (optional)", placeholder="ex: Germany") |
| go_search = gr.Button("Run Search", variant="primary") |
| search_json = gr.Code(label="Results JSON", language="json") |
| go_search.click(action_search, [session_state, query, limit, scrape_content, location], [search_json]) |
|
|
| with gr.Tab("🕸️ Scrape • Crawl • Clone"): |
| with gr.Row(): |
| target_url = gr.Textbox(label="URL to Scrape", placeholder="https://example.com") |
| timeout_ms = gr.Number(label="Timeout (ms, max 40000)", value=15000) |
| with gr.Row(): |
| formats_sel = gr.CheckboxGroup(choices=["markdown","html","links","screenshot"], value=["markdown","links"], label="Formats") |
| mobile = gr.Checkbox(label="Emulate mobile", value=False) |
| run_scrape = gr.Button("Scrape URL", variant="primary") |
| scrape_json = gr.Code(label="Raw Response (JSON)", language="json") |
| scrape_md = gr.Markdown(label="Markdown Preview") |
| run_scrape.click(action_scrape, [session_state, target_url, mobile, formats_sel, timeout_ms], [scrape_json, scrape_md]) |
|
|
| gr.Markdown("---") |
|
|
| with gr.Row(): |
| base_url = gr.Textbox(label="Base URL to Crawl", placeholder="https://docs.firecrawl.dev") |
| max_pages = gr.Slider(1, 200, value=25, step=1, label="Max Pages") |
| formats_crawl = gr.CheckboxGroup(choices=["markdown","html","links"], value=["markdown","links"], label="Crawl Formats") |
| run_crawl = gr.Button("Crawl & Build ZIP", variant="primary") |
| zip_file = gr.File(label="Clone ZIP", visible=False) |
| crawl_status = gr.Markdown() |
| run_crawl.click(action_crawl, [session_state, base_url, max_pages, formats_crawl], [zip_file, crawl_status]) |
|
|
| with gr.Tab("✨ Vibe Code (Synthesis)"): |
| with gr.Row(): |
| provider = gr.Radio(choices=["openai","anthropic"], value="openai", label="Provider") |
| model_name = gr.Textbox(label="Model (override)", placeholder="(blank = auto fallback)") |
| temp = gr.Slider(0.0, 1.2, value=0.4, step=0.05, label="Temperature") |
|
|
| sys_prompt = gr.Textbox(label="System Style (optional)", |
| value="Return structured outputs with file trees, code blocks and ordered steps. Be concise and concrete.") |
| user_prompt = gr.Textbox(label="User Prompt", lines=6) |
| ctx_md = gr.Textbox(label="Context (paste markdown from Scrape/Crawl)", lines=10) |
|
|
| with gr.Row(): |
| gen_btn = gr.Button("Generate", variant="primary") |
| out_md = gr.Markdown() |
|
|
| gr.Markdown("**Starter Tiles**") |
| with gr.Row(): |
| t1 = gr.Button("🔧 Clone Docs ➜ Clean Markdown ➜ README") |
| t2 = gr.Button("🧭 Competitor Teardown ➜ Features • Pricing • Moats") |
| t3 = gr.Button("🧪 API Wrapper ➜ Python Client (requests + retries)") |
| t4 = gr.Button("📐 Landing Page Rewrite ➜ ZEN Tone") |
| t5 = gr.Button("📊 Dataset Outline ➜ Schema + Fields + ETL") |
|
|
| def fill_tile(tile: str): |
| tiles = { |
| "t1": "Create a clean knowledge pack from the context, then output a README.md with:\n- Overview\n- Key features\n- Quickstart\n- API endpoints (if any)\n- Notes & gotchas\n- License\nAlso produce a /docs/ tree outline with suggested pages and headings.", |
| "t2": "From the context, produce a feature matrix, pricing table, ICP notes, moats/risks, and a market POV. Conclude with a ZEN playbook: 5 lever moves for advantage.", |
| "t3": "Using the context, design a Python client that wraps the target API with retry/backoff and typed responses. Output:\n- package layout\n- requirements\n- client.py\n- examples/\n- README with usage.\nInclude robust error handling.", |
| "t4": "Rewrite the landing page in ZEN brand voice: crisp headline, 3 value props, social proof, CTA, and a concise FAQ. Provide HTML sections and copy blocks.", |
| "t5": "Propose a dataset schema based on the context. Output a table of fields, types, constraints, and an ETL plan (sources, transforms, validation, freshness, monitoring).", |
| } |
| return tiles[tile] |
|
|
| t1.click(lambda: fill_tile("t1"), outputs=[user_prompt]) |
| t2.click(lambda: fill_tile("t2"), outputs=[user_prompt]) |
| t3.click(lambda: fill_tile("t3"), outputs=[user_prompt]) |
| t4.click(lambda: fill_tile("t4"), outputs=[user_prompt]) |
| t5.click(lambda: fill_tile("t5"), outputs=[user_prompt]) |
|
|
| gen_btn.click(action_generate, [session_state, provider, model_name, sys_prompt, user_prompt, ctx_md, temp], [out_md]) |
|
|
| gr.Markdown("Built for **ZEN Arena** pipelines. Export ZIPs → ingest → credentialize via ZEN Cards.") |
|
|
| if __name__ == "__main__": |
| demo.launch(ssr_mode=False) |
|
|