""" RepoForge – Agentic Repo Conversion App ======================================== • Multi-key Gemini rotation (auto-switches when quota hits) • RAG over source repo (chunked, on-demand, never fully loaded) • Creates a HF Space (Docker preset), generates Dockerfile + vite.config.js • Surgical patch loop: read build logs → apply unified diffs → never full rewrites • Streams progress back to Gradio UI in real-time """ import gradio as gr import google.generativeai as genai from huggingface_hub import HfApi, SpaceHardware import os, re, json, time, math, hashlib, tempfile, subprocess, textwrap from pathlib import Path from dataclasses import dataclass, field from typing import Generator, Optional import threading # ─── Gemini Key Rotator ─────────────────────────────────────────────────────── class GeminiRotator: """Round-robin across multiple Gemini API keys; swaps on quota errors.""" QUOTA_ERRORS = ("429", "quota", "rate", "exhausted", "resource_exhausted") def __init__(self, keys: list[str]): self.keys = [k.strip() for k in keys if k.strip()] self.idx = 0 self.lock = threading.Lock() if not self.keys: raise ValueError("At least one Gemini API key is required.") def current_key(self) -> str: return self.keys[self.idx % len(self.keys)] def rotate(self): with self.lock: self.idx = (self.idx + 1) % len(self.keys) def generate(self, system: str, prompt: str, max_tokens: int = 8192) -> str: """Try each key once; raise if all exhausted.""" for attempt in range(len(self.keys)): key = self.current_key() try: genai.configure(api_key=key) model = genai.GenerativeModel( model_name="gemini-1.5-pro", system_instruction=system, ) resp = model.generate_content( prompt, generation_config=genai.types.GenerationConfig( max_output_tokens=max_tokens, temperature=0.2, ), ) return resp.text except Exception as e: err = str(e).lower() if any(q in err for q in self.QUOTA_ERRORS): self.rotate() continue raise raise RuntimeError("All Gemini API keys exhausted or errored.") # ─── Repo RAG Index ─────────────────────────────────────────────────────────── @dataclass class FileChunk: path: str start_line: int end_line: int content: str tokens: int # rough estimate def header(self) -> str: return f"### {self.path} (lines {self.start_line}-{self.end_line})" class RepoRAG: """ Lightweight RAG over a local repo directory. Files are chunked with configurable size (300–1200 lines) and a 30-line overlap so context is never lost at chunk boundaries. On query we return the top-k most relevant chunks (never the whole repo). """ MIN_CHUNK_LINES = 300 MAX_CHUNK_LINES = 1200 OVERLAP_LINES = 30 # lines shared between adjacent chunks IGNORE_DIRS = {".git", "node_modules", "__pycache__", ".venv", "dist", "build"} TEXT_EXTS = { ".py", ".js", ".ts", ".jsx", ".tsx", ".rs", ".go", ".java", ".c", ".cpp", ".h", ".hpp", ".cs", ".rb", ".php", ".swift", ".kt", ".toml", ".yaml", ".yml", ".json", ".md", ".txt", ".sh", ".bash", ".dockerfile", ".env", ".html", ".css", ".scss", ".sql", ".graphql", } def __init__(self, repo_path: str, chunk_lines: int = 600): self.repo_path = Path(repo_path) self.chunk_lines = max(self.MIN_CHUNK_LINES, min(self.MAX_CHUNK_LINES, chunk_lines)) self.chunks: list[FileChunk] = [] self.index: dict[str, list[int]] = {} # token → chunk indices self._build() def _build(self): step = self.chunk_lines - self.OVERLAP_LINES # stride with overlap for fpath in self.repo_path.rglob("*"): if fpath.is_dir(): continue if any(p in fpath.parts for p in self.IGNORE_DIRS): continue if fpath.suffix.lower() not in self.TEXT_EXTS: continue try: lines = fpath.read_text(errors="replace").splitlines() except Exception: continue rel = str(fpath.relative_to(self.repo_path)) total = max(len(lines), 1) for i in range(0, total, step): chunk_lines = lines[i : i + self.chunk_lines] content = "\n".join(chunk_lines) chunk = FileChunk( path=rel, start_line=i + 1, end_line=i + len(chunk_lines), content=content, tokens=len(content) // 4, # rough char/4 ≈ tokens ) self.chunks.append(chunk) # index words for word in re.findall(r"[a-zA-Z_]\w{2,}", content.lower()): self.index.setdefault(word, []).append(len(self.chunks) - 1) def query(self, query: str, top_k: int = 6, budget_tokens: int = 12000) -> str: """Return top_k relevant chunks as a formatted string, within token budget.""" words = re.findall(r"[a-zA-Z_]\w{2,}", query.lower()) scores: dict[int, int] = {} for w in words: for idx in self.index.get(w, []): scores[idx] = scores.get(idx, 0) + 1 ranked = sorted(scores, key=lambda i: -scores[i])[:top_k] # Pad with first chunks if nothing matched for i in range(len(self.chunks)): if len(ranked) >= top_k: break if i not in ranked: ranked.append(i) out, used = [], 0 for idx in ranked: c = self.chunks[idx] if used + c.tokens > budget_tokens: break out.append(f"{c.header()}\n```\n{c.content}\n```") used += c.tokens return "\n\n".join(out) if out else "(no relevant source chunks found)" def file_tree(self, max_lines: int = 200) -> str: paths = sorted(set(c.path for c in self.chunks)) lines = paths[:max_lines] if len(paths) > max_lines: lines.append(f"... and {len(paths) - max_lines} more files") return "\n".join(lines) def get_file(self, rel_path: str) -> Optional[str]: """Return full content of a specific file.""" target = Path(rel_path) full = self.repo_path / target if full.exists(): try: return full.read_text(errors="replace") except Exception: pass return None # ─── HuggingFace Space Manager ──────────────────────────────────────────────── class SpaceManager: def __init__(self, hf_token: str): self.api = HfApi(token=hf_token) self.token = hf_token self.space_id: Optional[str] = None def create_space(self, namespace: str, name: str) -> str: """Create a Docker-preset HF Space. Returns repo_id.""" repo_id = f"{namespace}/{name}" self.api.create_repo( repo_id=repo_id, repo_type="space", space_sdk="docker", exist_ok=True, private=False, ) self.space_id = repo_id return repo_id def upload_file(self, local_path: str, repo_path: str): if not self.space_id: raise RuntimeError("No space created yet.") self.api.upload_file( path_or_fileobj=local_path, path_in_repo=repo_path, repo_id=self.space_id, repo_type="space", ) def upload_content(self, content: str, repo_path: str): """Upload string content directly.""" with tempfile.NamedTemporaryFile(mode="w", suffix=Path(repo_path).suffix, delete=False) as f: f.write(content) tmp = f.name self.upload_file(tmp, repo_path) os.unlink(tmp) def get_logs(self, lines: int = 80) -> str: """Fetch build logs from the Space.""" if not self.space_id: return "" try: logs = self.api.get_space_runtime(self.space_id) # HF SDK returns runtime info; fetch build logs via jobs endpoint # We use the logs iterator log_text = [] for entry in self.api.get_space_logs(self.space_id, lines=lines): log_text.append(entry.get("text", "")) return "\n".join(log_text) except Exception as e: return f"(could not fetch logs: {e})" def space_url(self) -> str: if not self.space_id: return "" return f"https://huggingface.co/spaces/{self.space_id}" # ─── Patch Utilities ────────────────────────────────────────────────────────── def apply_unified_diff(original: str, patch: str) -> str: """Apply a unified diff string to original content. Falls back to whole-replace.""" with tempfile.TemporaryDirectory() as td: orig_file = os.path.join(td, "original") patch_file = os.path.join(td, "patch.diff") Path(orig_file).write_text(original) Path(patch_file).write_text(patch) result = subprocess.run( ["patch", "-u", orig_file, patch_file], capture_output=True, text=True ) if result.returncode == 0: return Path(orig_file).read_text() return original # patch failed, return unchanged def extract_files_from_response(text: str) -> dict[str, str]: """ Parse LLM response for fenced code blocks with filenames. Supports: ```language path/to/file or ### file: path/to/file ``` """ files = {} # Pattern 1: ```lang path/to/file\n...content...\n``` pattern1 = re.finditer( r"```[\w]*\s+([\w./\-]+)\n(.*?)```", text, re.DOTALL ) for m in pattern1: files[m.group(1).strip()] = m.group(2) # Pattern 2: ### FILE: path\n```\ncontent\n``` pattern2 = re.finditer( r"###\s+(?:FILE|file):\s*([\w./\-]+)\s*\n```[\w]*\n(.*?)```", text, re.DOTALL ) for m in pattern2: files[m.group(1).strip()] = m.group(2) return files def extract_patches(text: str) -> dict[str, str]: """Extract unified diff patches from LLM response.""" patches = {} pattern = re.finditer( r"###\s+PATCH:\s*([\w./\-]+)\s*\n```diff\n(.*?)```", text, re.DOTALL ) for m in pattern: patches[m.group(1).strip()] = m.group(2) return patches # ─── File Continuation System ───────────────────────────────────────────────── # Sentinel the model outputs when it runs out of tokens mid-file INCOMPLETE_SENTINEL = "##INCOMPLETE##" @dataclass class IncompleteFile: """Tracks a file the model started but didn't finish.""" path: str content_so_far: str # everything written up to the cutoff last_line: int # last line number written (1-indexed) resume_hint: str # last ~20 lines to show the model for context def detect_incomplete(files: dict[str, str]) -> list[IncompleteFile]: """ Scan parsed files for the INCOMPLETE sentinel. Returns a list of files that need continuation. """ incomplete = [] for path, content in files.items(): if INCOMPLETE_SENTINEL in content: # Strip the sentinel itself clean = content[: content.index(INCOMPLETE_SENTINEL)].rstrip() lines = clean.splitlines() tail = "\n".join(lines[-20:]) if len(lines) >= 20 else clean incomplete.append(IncompleteFile( path=path, content_so_far=clean, last_line=len(lines), resume_hint=tail, )) files[path] = clean # store what we have so far return incomplete def extract_continuation(text: str, inc: IncompleteFile) -> Optional[str]: """ Pull the continuation content for a specific file from a CONTINUE response. The model is instructed to emit: ### CONTINUE: path/to/file ```lang ``` Returns the continuation content, or None if not found. """ pattern = re.search( rf"###\s+CONTINUE:\s*{re.escape(inc.path)}\s*\n```[\w]*\n(.*?)```", text, re.DOTALL ) if pattern: return pattern.group(1) # Fallback: bare fenced block pattern2 = re.search( rf"```[\w]*\s+{re.escape(inc.path)}\n(.*?)```", text, re.DOTALL ) return pattern2.group(1) if pattern2 else None # ─── Prompts ────────────────────────────────────────────────────────────────── SYSTEM_ARCHITECT = """You are RepoForge, an expert software architect and full-stack engineer. You help users convert, rebuild, or transform code repositories by: 1. Analyzing the source repo structure (provided via RAG chunks) 2. Creating a HuggingFace Space with Docker preset 3. Generating a production-grade Dockerfile and vite.config.js 4. Iteratively patching code based on build logs — NEVER fully regenerating files after initial creation 5. Making SURGICAL unified-diff patches to fix errors FILE FORMAT RULES (strictly follow): - New files: fenced block with path on the opening line: ```typescript src/main.ts ``` - Patches: ### PATCH: src/main.ts then ```diff with unified diff syntax - Always prefer patches over full file rewrites after initial generation - Keep responses focused; do NOT repeat unchanged file contents - Files may be up to 1200 lines. Split larger logic into sub-modules. CONTINUATION PROTOCOL (critical): - If you cannot finish a file in one response, write as much as you can, then end the fenced block with the literal token ##INCOMPLETE## on its own line before the closing ```. - Example: ```rust src/handlers.rs // ... all content you managed to write ... ##INCOMPLETE## ``` - In the very next response (a CONTINUE prompt), resume from exactly where you left off. - For continuations output: ### CONTINUE: src/handlers.rs ```rust ``` - Never re-emit lines already written. Overlap by at most 2 lines for context. - A file is only considered complete when its fenced block closes WITHOUT ##INCOMPLETE##. MEMORY RULES: - Source repo context is provided as RAG chunks (not the full repo) - Ask for more context by outputting NEED_CONTEXT: on its own line - Be precise about line numbers in diffs """ PROMPT_INITIAL = """ ## Goal {goal} ## Source Repo File Tree {file_tree} ## Relevant Source Chunks (RAG) {rag_chunks} ## CodeConvert Agent Instructions (from HF Space) {agents_md} ## Task 1. Generate a `Dockerfile` (Docker preset for HF Spaces — port 7860, non-root user) 2. Generate a `vite.config.js` (or `vite.config.ts`) appropriate for this project 3. Generate a `README.md` for the HF Space (title, description, sdk: docker, app_port: 7860) 4. Begin converting/scaffolding the main entrypoint file(s) toward the goal 5. Files may be up to 1200 lines; split larger logic into sub-modules 6. If you run out of space mid-file, end that block with ##INCOMPLETE## — do NOT truncate silently Output each file using the fenced block format. Use ##INCOMPLETE## if needed. """ PROMPT_CONTINUE = """ ## Continuation Request You previously started writing `{path}` but ran out of space at line {last_line}. ## Last lines written (for context): ``` {resume_hint} ``` ## Goal (keep in mind) {goal} ## Task Continue writing `{path}` from exactly where you left off (line {next_line}). Output ONLY the continuation using: ### CONTINUE: {path} ``` ``` If you STILL cannot finish in this response, end again with ##INCOMPLETE##. Do NOT re-emit lines already written (overlap max 2 lines for context is fine). """ PROMPT_PATCH = """ ## Current Goal {goal} ## Build Logs (last 80 lines) ``` {logs} ``` ## Files Currently in Space {file_list} ## Relevant Source Chunks (RAG for context) {rag_chunks} ## Task Analyze the build logs. Identify errors. Output ONLY surgical patches (unified diff) to fix them. - Use ### PATCH: then ```diff blocks - Do NOT rewrite whole files - If you need more source context, output NEED_CONTEXT: on its own line - If the build succeeded and goal is met, output: GOAL_COMPLETE - If the build succeeded but goal isn't fully met, output next round of patches - If a new file is needed (not previously created), use the normal fenced block format - Use ##INCOMPLETE## if a new file can't fit in one response """ # ─── Main Agent Loop ────────────────────────────────────────────────────────── AGENTS_MD_FALLBACK = """ CodeConvert is an AI agent that converts codebases between programming languages. It uses a RAG approach to reference the original repo, generates idiomatic target-language code, and iteratively patches until the build passes. It focuses on: - Preserving logic and architecture - Generating idiomatic target-language code - Surgical patches based on compiler/runtime errors - Modular file structure """ def run_agent( hf_token: str, gemini_keys_raw: str, goal: str, source_input: str, # github url, hf space url, or local folder path space_name: str, max_iterations: int = 8, chunk_lines: int = 600, ) -> Generator[str, None, None]: """Main agentic loop. Yields log strings for Gradio streaming.""" def log(msg: str): return msg + "\n" yield log("🔧 Initializing RepoForge...") # Parse Gemini keys gemini_keys = [k.strip() for k in re.split(r"[,\n]+", gemini_keys_raw) if k.strip()] if not gemini_keys: yield log("❌ No Gemini API keys provided.") return try: rotator = GeminiRotator(gemini_keys) except Exception as e: yield log(f"❌ Gemini init error: {e}") return # Validate HF token try: api = HfApi(token=hf_token) user = api.whoami() namespace = user["name"] yield log(f"✅ HuggingFace authenticated as: {namespace}") except Exception as e: yield log(f"❌ HF token error: {e}") return # ── Clone / prepare source repo ── with tempfile.TemporaryDirectory() as tmpdir: repo_dir = os.path.join(tmpdir, "source_repo") if source_input.startswith("https://github.com"): yield log(f"📦 Cloning GitHub repo: {source_input}") result = subprocess.run( ["git", "clone", "--depth=1", source_input, repo_dir], capture_output=True, text=True ) if result.returncode != 0: yield log(f"❌ Git clone failed:\n{result.stderr}") return yield log("✅ Repo cloned.") elif "huggingface.co/spaces" in source_input: # Extract space id from URL m = re.search(r"huggingface\.co/spaces/([\w\-]+/[\w\-]+)", source_input) if not m: yield log("❌ Could not parse HF Space URL.") return space_id = m.group(1) yield log(f"📦 Cloning HF Space: {space_id}") result = subprocess.run( ["git", "clone", "--depth=1", f"https://huggingface.co/spaces/{space_id}", repo_dir], capture_output=True, text=True ) if result.returncode != 0: yield log(f"❌ HF Space clone failed:\n{result.stderr}") return yield log("✅ HF Space cloned.") elif os.path.isdir(source_input): repo_dir = source_input yield log(f"📁 Using local folder: {repo_dir}") else: yield log("❌ Source must be a GitHub URL, HF Space URL, or local folder path.") return # ── Build RAG index ── yield log("🔍 Building RAG index over source repo...") rag = RepoRAG(repo_dir, chunk_lines=chunk_lines) yield log(f"✅ Indexed {len(rag.chunks)} chunks from {len(set(c.path for c in rag.chunks))} files (chunk size: {rag.chunk_lines} lines).") file_tree = rag.file_tree() # ── Create HF Space ── space_mgr = SpaceManager(hf_token) safe_name = re.sub(r"[^a-zA-Z0-9\-]", "-", space_name.strip())[:50] or "repoforge-app" yield log(f"🚀 Creating HF Space: {namespace}/{safe_name}") try: repo_id = space_mgr.create_space(namespace, safe_name) yield log(f"✅ Space created: {space_mgr.space_url()}") except Exception as e: yield log(f"❌ Failed to create space: {e}") return # ── Track files in space ── space_files: dict[str, str] = {} # path → content # ── Initial generation ── yield log("\n🤖 Asking Gemini to generate initial files...") initial_rag = rag.query(goal, top_k=8, budget_tokens=14000) initial_prompt = PROMPT_INITIAL.format( goal=goal, file_tree=file_tree[:3000], rag_chunks=initial_rag, agents_md=AGENTS_MD_FALLBACK, ) try: response = rotator.generate(SYSTEM_ARCHITECT, initial_prompt, max_tokens=8192) except Exception as e: yield log(f"❌ Gemini error: {e}") return # Parse files from response new_files = extract_files_from_response(response) if not new_files: yield log("⚠️ Gemini didn't output any files. Showing raw response:") yield log(response[:2000]) return # ── Handle incomplete files from initial generation ── pending_continuations = detect_incomplete(new_files) if pending_continuations: yield log(f"📝 {len(pending_continuations)} file(s) incomplete — requesting continuations...") while pending_continuations: inc = pending_continuations.pop(0) yield log(f" ↩️ Continuing {inc.path} from line {inc.last_line}...") cont_prompt = PROMPT_CONTINUE.format( path=inc.path, last_line=inc.last_line, next_line=inc.last_line + 1, resume_hint=inc.resume_hint, goal=goal, ) try: cont_response = rotator.generate(SYSTEM_ARCHITECT, cont_prompt, max_tokens=8192) except Exception as e: yield log(f" ❌ Continuation error for {inc.path}: {e}") continue continuation = extract_continuation(cont_response, inc) if continuation: # Check if this continuation is itself incomplete if INCOMPLETE_SENTINEL in continuation: clean_cont = continuation[: continuation.index(INCOMPLETE_SENTINEL)].rstrip() appended = inc.content_so_far + "\n" + clean_cont tail_lines = appended.splitlines() tail = "\n".join(tail_lines[-20:]) pending_continuations.append(IncompleteFile( path=inc.path, content_so_far=appended, last_line=len(appended.splitlines()), resume_hint=tail, )) new_files[inc.path] = appended yield log(f" ↩️ {inc.path} still incomplete at line {len(appended.splitlines())} — queuing another continuation") else: new_files[inc.path] = inc.content_so_far + "\n" + continuation yield log(f" ✅ {inc.path} completed ({len(new_files[inc.path].splitlines())} lines total)") else: yield log(f" ⚠️ Could not parse continuation for {inc.path} — using partial content") # Ensure we have required files if "Dockerfile" not in new_files: yield log("⚠️ No Dockerfile generated — adding minimal one.") new_files["Dockerfile"] = textwrap.dedent(""" FROM node:20-slim WORKDIR /app COPY package*.json ./ RUN npm ci COPY . . RUN npm run build EXPOSE 7860 CMD ["npm", "run", "preview", "--", "--port", "7860", "--host"] """).strip() if "README.md" not in new_files: new_files["README.md"] = textwrap.dedent(f""" --- title: {safe_name} emoji: 🔧 colorFrom: blue colorTo: purple sdk: docker app_port: 7860 pinned: false --- # {safe_name} Built with RepoForge. """).strip() # Upload all initial files yield log(f"\n📤 Uploading {len(new_files)} initial files to HF Space...") for fpath, content in new_files.items(): try: space_mgr.upload_content(content, fpath) space_files[fpath] = content yield log(f" ✅ {fpath} ({len(content):,} chars)") except Exception as e: yield log(f" ❌ {fpath}: {e}") yield log(f"\n🔗 Space URL: {space_mgr.space_url()}") yield log("⏳ Waiting for initial build (60s)...") time.sleep(60) # ── Patch loop ── for iteration in range(1, max_iterations + 1): yield log(f"\n{'='*50}") yield log(f"🔁 Patch iteration {iteration}/{max_iterations}") # Fetch logs yield log("📋 Fetching build logs...") logs = space_mgr.get_logs(lines=80) if not logs: logs = "(no logs available yet — space may still be building)" yield log(f"Logs preview:\n{logs[:500]}...") # Check for NEED_CONTEXT in previous response need_ctx_matches = re.findall(r"NEED_CONTEXT:\s*(.+)", response) if need_ctx_matches: extra_query = " ".join(need_ctx_matches) yield log(f"🔍 Agent needs more context: '{extra_query[:100]}'") patch_rag = rag.query(extra_query, top_k=6, budget_tokens=10000) else: patch_rag = rag.query(goal + " " + logs, top_k=6, budget_tokens=10000) patch_prompt = PROMPT_PATCH.format( goal=goal, logs=logs[:3000], file_list="\n".join(space_files.keys()), rag_chunks=patch_rag, ) try: response = rotator.generate(SYSTEM_ARCHITECT, patch_prompt, max_tokens=8192) except Exception as e: yield log(f"❌ Gemini error: {e}") break if "GOAL_COMPLETE" in response: yield log("\n🎉 GOAL COMPLETE! Agent confirmed success.") break # Apply patches patches = extract_patches(response) new_in_patch = extract_files_from_response(response) if not patches and not new_in_patch: yield log("⚠️ No patches or new files found in response.") yield log(response[:1000]) for fpath, patch_str in patches.items(): original = space_files.get(fpath, "") patched = apply_unified_diff(original, patch_str) if patched != original: try: space_mgr.upload_content(patched, fpath) space_files[fpath] = patched yield log(f" 🩹 Patched: {fpath}") except Exception as e: yield log(f" ❌ Upload failed for {fpath}: {e}") else: yield log(f" ⚠️ Patch didn't apply cleanly for {fpath}") # New files from patch response (shouldn't be many after init) patch_new_incomplete = detect_incomplete(new_in_patch) if patch_new_incomplete: yield log(f" 📝 {len(patch_new_incomplete)} new file(s) incomplete in patch round — continuing...") while patch_new_incomplete: inc = patch_new_incomplete.pop(0) yield log(f" ↩️ Continuing new file {inc.path} from line {inc.last_line}...") cont_prompt = PROMPT_CONTINUE.format( path=inc.path, last_line=inc.last_line, next_line=inc.last_line + 1, resume_hint=inc.resume_hint, goal=goal, ) try: cont_r = rotator.generate(SYSTEM_ARCHITECT, cont_prompt, max_tokens=8192) except Exception as e: yield log(f" ❌ {e}") continue cont = extract_continuation(cont_r, inc) if cont: if INCOMPLETE_SENTINEL in cont: clean_c = cont[: cont.index(INCOMPLETE_SENTINEL)].rstrip() appended = inc.content_so_far + "\n" + clean_c tail_lines = appended.splitlines() patch_new_incomplete.append(IncompleteFile( path=inc.path, content_so_far=appended, last_line=len(appended.splitlines()), resume_hint="\n".join(tail_lines[-20:]), )) new_in_patch[inc.path] = appended else: new_in_patch[inc.path] = inc.content_so_far + "\n" + cont yield log(f" ✅ {inc.path} complete ({len(new_in_patch[inc.path].splitlines())} lines)") for fpath, content in new_in_patch.items(): if fpath not in space_files: try: space_mgr.upload_content(content, fpath) space_files[fpath] = content yield log(f" ➕ New file: {fpath}") except Exception as e: yield log(f" ❌ {fpath}: {e}") if patches or new_in_patch: yield log("⏳ Waiting for rebuild (45s)...") time.sleep(45) yield log(f"\n✨ Done! Space: {space_mgr.space_url()}") yield log(f"📊 Files in space: {', '.join(space_files.keys())}") # ─── Gradio UI ──────────────────────────────────────────────────────────────── CSS = """ @import url('https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;700&family=Syne:wght@400;700;800&display=swap'); :root { --bg: #0a0a0f; --surface: #111118; --border: #1e1e2e; --accent: #7c3aed; --accent2: #06b6d4; --text: #e2e8f0; --muted: #64748b; --success: #10b981; --danger: #ef4444; } body, .gradio-container { background: var(--bg) !important; font-family: 'Syne', sans-serif !important; color: var(--text) !important; } .gradio-container { max-width: 1100px !important; margin: 0 auto !important; } h1.title { font-family: 'Syne', sans-serif; font-size: 2.8rem; font-weight: 800; background: linear-gradient(135deg, #7c3aed, #06b6d4); -webkit-background-clip: text; -webkit-text-fill-color: transparent; margin: 0; letter-spacing: -0.04em; } .subtitle { color: var(--muted); font-size: 0.95rem; margin-top: 4px; font-family: 'JetBrains Mono', monospace; } .panel { background: var(--surface); border: 1px solid var(--border); border-radius: 12px; padding: 20px; } label { color: var(--muted) !important; font-size: 0.82rem !important; font-family: 'JetBrains Mono', monospace !important; } input, textarea { background: var(--bg) !important; border: 1px solid var(--border) !important; color: var(--text) !important; font-family: 'JetBrains Mono', monospace !important; border-radius: 8px !important; } input:focus, textarea:focus { border-color: var(--accent) !important; box-shadow: 0 0 0 3px rgba(124,58,237,0.15) !important; } button.primary { background: linear-gradient(135deg, var(--accent), var(--accent2)) !important; border: none !important; border-radius: 10px !important; font-family: 'Syne', sans-serif !important; font-weight: 700 !important; font-size: 1rem !important; letter-spacing: 0.02em !important; padding: 14px 28px !important; color: white !important; cursor: pointer !important; transition: opacity 0.2s !important; } button.primary:hover { opacity: 0.88 !important; } .log-box textarea { font-family: 'JetBrains Mono', monospace !important; font-size: 0.78rem !important; background: #050508 !important; color: #a0f0b0 !important; border: 1px solid #1a2a1a !important; } .tip { font-family: 'JetBrains Mono', monospace; font-size: 0.75rem; color: var(--muted); border-left: 2px solid var(--accent); padding-left: 10px; margin-top: 8px; } """ def build_ui(): with gr.Blocks(css=CSS, title="RepoForge") as demo: gr.HTML("""

RepoForge

// agentic repo conversion · surgical patches · rag-powered context

""") with gr.Row(): # ── Left column: config ── with gr.Column(scale=1): gr.HTML('
') gr.HTML('

🔑 CREDENTIALS

') hf_token = gr.Textbox( label="HuggingFace Access Token", placeholder="hf_...", type="password", lines=1, ) gemini_keys = gr.Textbox( label="Gemini API Keys (one per line or comma-separated)", placeholder="AIzaSy...\nAIzaSy...", lines=4, type="password", ) gr.HTML('

Multiple keys → auto-rotates on quota exhaustion

') gr.HTML('

📦 SOURCE

') source_input = gr.Textbox( label="Source (GitHub URL / HF Space URL / Local Folder Path)", placeholder="https://github.com/owner/repo", lines=1, ) space_name = gr.Textbox( label="New HF Space Name", placeholder="my-converted-app", lines=1, ) gr.HTML('

🎯 GOAL

') goal = gr.Textbox( label="Conversion Goal", placeholder="Convert this Python Flask app to a Rust Axum web server with identical API endpoints", lines=4, ) max_iters = gr.Slider( label="Max patch iterations", minimum=2, maximum=20, step=1, value=8 ) chunk_lines_slider = gr.Slider( label="RAG chunk size (lines per chunk, 300–1200)", minimum=300, maximum=1200, step=100, value=600, info="Larger = more context per RAG hit, more tokens used" ) run_btn = gr.Button("⚡ Launch RepoForge", variant="primary", elem_classes=["primary"]) gr.HTML('
') # ── Right column: logs ── with gr.Column(scale=1): gr.HTML('
') gr.HTML('

📟 AGENT LOG

') log_output = gr.Textbox( label="", lines=30, max_lines=60, interactive=False, elem_classes=["log-box"], show_copy_button=True, ) gr.HTML('
') # ── How it works ── with gr.Accordion("ℹ️ How RepoForge works", open=False): gr.Markdown(""" **RepoForge** is a fully agentic repo-conversion loop: 1. **Clones** your source repo (GitHub, HF Space, or local folder) 2. **Indexes** it with a lightweight RAG system — only relevant chunks are sent to the LLM, never the whole repo 3. **Creates** a HuggingFace Space with the Docker preset 4. **Generates** `Dockerfile`, `vite.config.js`, `README.md`, and initial source files 5. **Iterates**: fetches build logs → asks Gemini to output surgical `unified diff` patches → applies them → waits for rebuild 6. **Never** fully regenerates a file after initial creation — only patches 7. **Rotates** Gemini API keys automatically when quota is hit **Source input formats:** - `https://github.com/owner/repo` — cloned via git - `https://huggingface.co/spaces/owner/name` — cloned from HF - `/home/user/myproject` — local folder (must be accessible) """) # ── Wire up ── accumulated_logs = gr.State("") def stream_wrapper(hf_tok, gem_keys, goal_txt, src, sname, iters, chunk_sz, prev_logs): all_logs = prev_logs or "" for chunk in run_agent( hf_token=hf_tok, gemini_keys_raw=gem_keys, goal=goal_txt, source_input=src, space_name=sname, max_iterations=int(iters), chunk_lines=int(chunk_sz), ): all_logs += chunk yield all_logs, all_logs run_btn.click( fn=stream_wrapper, inputs=[hf_token, gemini_keys, goal, source_input, space_name, max_iters, chunk_lines_slider, accumulated_logs], outputs=[log_output, accumulated_logs], ) return demo if __name__ == "__main__": app = build_ui() app.launch(server_port=7860, share=False)