Spaces:
Runtime error
Runtime error
| """ | |
| RepoForge β Agentic Repo Conversion App | |
| ======================================== | |
| β’ Multi-key Gemini rotation (auto-switches when quota hits) | |
| β’ RAG over source repo (chunked, on-demand, never fully loaded) | |
| β’ Creates a HF Space (Docker preset), generates Dockerfile + vite.config.js | |
| β’ Surgical patch loop: read build logs β apply unified diffs β never full rewrites | |
| β’ Streams progress back to Gradio UI in real-time | |
| """ | |
| import gradio as gr | |
| import google.generativeai as genai | |
| from huggingface_hub import HfApi, SpaceHardware | |
| import os, re, json, time, math, hashlib, tempfile, subprocess, textwrap | |
| from pathlib import Path | |
| from dataclasses import dataclass, field | |
| from typing import Generator, Optional | |
| import threading | |
| # βββ Gemini Key Rotator βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class GeminiRotator: | |
| """Round-robin across multiple Gemini API keys; swaps on quota errors.""" | |
| QUOTA_ERRORS = ("429", "quota", "rate", "exhausted", "resource_exhausted") | |
| def __init__(self, keys: list[str]): | |
| self.keys = [k.strip() for k in keys if k.strip()] | |
| self.idx = 0 | |
| self.lock = threading.Lock() | |
| if not self.keys: | |
| raise ValueError("At least one Gemini API key is required.") | |
| def current_key(self) -> str: | |
| return self.keys[self.idx % len(self.keys)] | |
| def rotate(self): | |
| with self.lock: | |
| self.idx = (self.idx + 1) % len(self.keys) | |
| def generate(self, system: str, prompt: str, max_tokens: int = 8192) -> str: | |
| """Try each key once; raise if all exhausted.""" | |
| for attempt in range(len(self.keys)): | |
| key = self.current_key() | |
| try: | |
| genai.configure(api_key=key) | |
| model = genai.GenerativeModel( | |
| model_name="gemini-1.5-pro", | |
| system_instruction=system, | |
| ) | |
| resp = model.generate_content( | |
| prompt, | |
| generation_config=genai.types.GenerationConfig( | |
| max_output_tokens=max_tokens, | |
| temperature=0.2, | |
| ), | |
| ) | |
| return resp.text | |
| except Exception as e: | |
| err = str(e).lower() | |
| if any(q in err for q in self.QUOTA_ERRORS): | |
| self.rotate() | |
| continue | |
| raise | |
| raise RuntimeError("All Gemini API keys exhausted or errored.") | |
| # βββ Repo RAG Index βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class FileChunk: | |
| path: str | |
| start_line: int | |
| end_line: int | |
| content: str | |
| tokens: int # rough estimate | |
| def header(self) -> str: | |
| return f"### {self.path} (lines {self.start_line}-{self.end_line})" | |
| class RepoRAG: | |
| """ | |
| Lightweight RAG over a local repo directory. | |
| Files are chunked with configurable size (300β1200 lines) and a 30-line overlap | |
| so context is never lost at chunk boundaries. | |
| On query we return the top-k most relevant chunks (never the whole repo). | |
| """ | |
| MIN_CHUNK_LINES = 300 | |
| MAX_CHUNK_LINES = 1200 | |
| OVERLAP_LINES = 30 # lines shared between adjacent chunks | |
| IGNORE_DIRS = {".git", "node_modules", "__pycache__", ".venv", "dist", "build"} | |
| TEXT_EXTS = { | |
| ".py", ".js", ".ts", ".jsx", ".tsx", ".rs", ".go", ".java", ".c", ".cpp", | |
| ".h", ".hpp", ".cs", ".rb", ".php", ".swift", ".kt", ".toml", ".yaml", | |
| ".yml", ".json", ".md", ".txt", ".sh", ".bash", ".dockerfile", ".env", | |
| ".html", ".css", ".scss", ".sql", ".graphql", | |
| } | |
| def __init__(self, repo_path: str, chunk_lines: int = 600): | |
| self.repo_path = Path(repo_path) | |
| self.chunk_lines = max(self.MIN_CHUNK_LINES, min(self.MAX_CHUNK_LINES, chunk_lines)) | |
| self.chunks: list[FileChunk] = [] | |
| self.index: dict[str, list[int]] = {} # token β chunk indices | |
| self._build() | |
| def _build(self): | |
| step = self.chunk_lines - self.OVERLAP_LINES # stride with overlap | |
| for fpath in self.repo_path.rglob("*"): | |
| if fpath.is_dir(): | |
| continue | |
| if any(p in fpath.parts for p in self.IGNORE_DIRS): | |
| continue | |
| if fpath.suffix.lower() not in self.TEXT_EXTS: | |
| continue | |
| try: | |
| lines = fpath.read_text(errors="replace").splitlines() | |
| except Exception: | |
| continue | |
| rel = str(fpath.relative_to(self.repo_path)) | |
| total = max(len(lines), 1) | |
| for i in range(0, total, step): | |
| chunk_lines = lines[i : i + self.chunk_lines] | |
| content = "\n".join(chunk_lines) | |
| chunk = FileChunk( | |
| path=rel, | |
| start_line=i + 1, | |
| end_line=i + len(chunk_lines), | |
| content=content, | |
| tokens=len(content) // 4, # rough char/4 β tokens | |
| ) | |
| self.chunks.append(chunk) | |
| # index words | |
| for word in re.findall(r"[a-zA-Z_]\w{2,}", content.lower()): | |
| self.index.setdefault(word, []).append(len(self.chunks) - 1) | |
| def query(self, query: str, top_k: int = 6, budget_tokens: int = 12000) -> str: | |
| """Return top_k relevant chunks as a formatted string, within token budget.""" | |
| words = re.findall(r"[a-zA-Z_]\w{2,}", query.lower()) | |
| scores: dict[int, int] = {} | |
| for w in words: | |
| for idx in self.index.get(w, []): | |
| scores[idx] = scores.get(idx, 0) + 1 | |
| ranked = sorted(scores, key=lambda i: -scores[i])[:top_k] | |
| # Pad with first chunks if nothing matched | |
| for i in range(len(self.chunks)): | |
| if len(ranked) >= top_k: | |
| break | |
| if i not in ranked: | |
| ranked.append(i) | |
| out, used = [], 0 | |
| for idx in ranked: | |
| c = self.chunks[idx] | |
| if used + c.tokens > budget_tokens: | |
| break | |
| out.append(f"{c.header()}\n```\n{c.content}\n```") | |
| used += c.tokens | |
| return "\n\n".join(out) if out else "(no relevant source chunks found)" | |
| def file_tree(self, max_lines: int = 200) -> str: | |
| paths = sorted(set(c.path for c in self.chunks)) | |
| lines = paths[:max_lines] | |
| if len(paths) > max_lines: | |
| lines.append(f"... and {len(paths) - max_lines} more files") | |
| return "\n".join(lines) | |
| def get_file(self, rel_path: str) -> Optional[str]: | |
| """Return full content of a specific file.""" | |
| target = Path(rel_path) | |
| full = self.repo_path / target | |
| if full.exists(): | |
| try: | |
| return full.read_text(errors="replace") | |
| except Exception: | |
| pass | |
| return None | |
| # βββ HuggingFace Space Manager ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class SpaceManager: | |
| def __init__(self, hf_token: str): | |
| self.api = HfApi(token=hf_token) | |
| self.token = hf_token | |
| self.space_id: Optional[str] = None | |
| def create_space(self, namespace: str, name: str) -> str: | |
| """Create a Docker-preset HF Space. Returns repo_id.""" | |
| repo_id = f"{namespace}/{name}" | |
| self.api.create_repo( | |
| repo_id=repo_id, | |
| repo_type="space", | |
| space_sdk="docker", | |
| exist_ok=True, | |
| private=False, | |
| ) | |
| self.space_id = repo_id | |
| return repo_id | |
| def upload_file(self, local_path: str, repo_path: str): | |
| if not self.space_id: | |
| raise RuntimeError("No space created yet.") | |
| self.api.upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=repo_path, | |
| repo_id=self.space_id, | |
| repo_type="space", | |
| ) | |
| def upload_content(self, content: str, repo_path: str): | |
| """Upload string content directly.""" | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=Path(repo_path).suffix, | |
| delete=False) as f: | |
| f.write(content) | |
| tmp = f.name | |
| self.upload_file(tmp, repo_path) | |
| os.unlink(tmp) | |
| def get_logs(self, lines: int = 80) -> str: | |
| """Fetch build logs from the Space.""" | |
| if not self.space_id: | |
| return "" | |
| try: | |
| logs = self.api.get_space_runtime(self.space_id) | |
| # HF SDK returns runtime info; fetch build logs via jobs endpoint | |
| # We use the logs iterator | |
| log_text = [] | |
| for entry in self.api.get_space_logs(self.space_id, lines=lines): | |
| log_text.append(entry.get("text", "")) | |
| return "\n".join(log_text) | |
| except Exception as e: | |
| return f"(could not fetch logs: {e})" | |
| def space_url(self) -> str: | |
| if not self.space_id: | |
| return "" | |
| return f"https://huggingface.co/spaces/{self.space_id}" | |
| # βββ Patch Utilities ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def apply_unified_diff(original: str, patch: str) -> str: | |
| """Apply a unified diff string to original content. Falls back to whole-replace.""" | |
| with tempfile.TemporaryDirectory() as td: | |
| orig_file = os.path.join(td, "original") | |
| patch_file = os.path.join(td, "patch.diff") | |
| Path(orig_file).write_text(original) | |
| Path(patch_file).write_text(patch) | |
| result = subprocess.run( | |
| ["patch", "-u", orig_file, patch_file], | |
| capture_output=True, text=True | |
| ) | |
| if result.returncode == 0: | |
| return Path(orig_file).read_text() | |
| return original # patch failed, return unchanged | |
| def extract_files_from_response(text: str) -> dict[str, str]: | |
| """ | |
| Parse LLM response for fenced code blocks with filenames. | |
| Supports: ```language path/to/file or ### file: path/to/file ``` | |
| """ | |
| files = {} | |
| # Pattern 1: ```lang path/to/file\n...content...\n``` | |
| pattern1 = re.finditer( | |
| r"```[\w]*\s+([\w./\-]+)\n(.*?)```", | |
| text, re.DOTALL | |
| ) | |
| for m in pattern1: | |
| files[m.group(1).strip()] = m.group(2) | |
| # Pattern 2: ### FILE: path\n```\ncontent\n``` | |
| pattern2 = re.finditer( | |
| r"###\s+(?:FILE|file):\s*([\w./\-]+)\s*\n```[\w]*\n(.*?)```", | |
| text, re.DOTALL | |
| ) | |
| for m in pattern2: | |
| files[m.group(1).strip()] = m.group(2) | |
| return files | |
| def extract_patches(text: str) -> dict[str, str]: | |
| """Extract unified diff patches from LLM response.""" | |
| patches = {} | |
| pattern = re.finditer( | |
| r"###\s+PATCH:\s*([\w./\-]+)\s*\n```diff\n(.*?)```", | |
| text, re.DOTALL | |
| ) | |
| for m in pattern: | |
| patches[m.group(1).strip()] = m.group(2) | |
| return patches | |
| # βββ File Continuation System βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Sentinel the model outputs when it runs out of tokens mid-file | |
| INCOMPLETE_SENTINEL = "##INCOMPLETE##" | |
| class IncompleteFile: | |
| """Tracks a file the model started but didn't finish.""" | |
| path: str | |
| content_so_far: str # everything written up to the cutoff | |
| last_line: int # last line number written (1-indexed) | |
| resume_hint: str # last ~20 lines to show the model for context | |
| def detect_incomplete(files: dict[str, str]) -> list[IncompleteFile]: | |
| """ | |
| Scan parsed files for the INCOMPLETE sentinel. | |
| Returns a list of files that need continuation. | |
| """ | |
| incomplete = [] | |
| for path, content in files.items(): | |
| if INCOMPLETE_SENTINEL in content: | |
| # Strip the sentinel itself | |
| clean = content[: content.index(INCOMPLETE_SENTINEL)].rstrip() | |
| lines = clean.splitlines() | |
| tail = "\n".join(lines[-20:]) if len(lines) >= 20 else clean | |
| incomplete.append(IncompleteFile( | |
| path=path, | |
| content_so_far=clean, | |
| last_line=len(lines), | |
| resume_hint=tail, | |
| )) | |
| files[path] = clean # store what we have so far | |
| return incomplete | |
| def extract_continuation(text: str, inc: IncompleteFile) -> Optional[str]: | |
| """ | |
| Pull the continuation content for a specific file from a CONTINUE response. | |
| The model is instructed to emit: | |
| ### CONTINUE: path/to/file | |
| ```lang | |
| <rest of file from where it left off> | |
| ``` | |
| Returns the continuation content, or None if not found. | |
| """ | |
| pattern = re.search( | |
| rf"###\s+CONTINUE:\s*{re.escape(inc.path)}\s*\n```[\w]*\n(.*?)```", | |
| text, re.DOTALL | |
| ) | |
| if pattern: | |
| return pattern.group(1) | |
| # Fallback: bare fenced block | |
| pattern2 = re.search( | |
| rf"```[\w]*\s+{re.escape(inc.path)}\n(.*?)```", | |
| text, re.DOTALL | |
| ) | |
| return pattern2.group(1) if pattern2 else None | |
| # βββ Prompts ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SYSTEM_ARCHITECT = """You are RepoForge, an expert software architect and full-stack engineer. | |
| You help users convert, rebuild, or transform code repositories by: | |
| 1. Analyzing the source repo structure (provided via RAG chunks) | |
| 2. Creating a HuggingFace Space with Docker preset | |
| 3. Generating a production-grade Dockerfile and vite.config.js | |
| 4. Iteratively patching code based on build logs β NEVER fully regenerating files after initial creation | |
| 5. Making SURGICAL unified-diff patches to fix errors | |
| FILE FORMAT RULES (strictly follow): | |
| - New files: fenced block with path on the opening line: | |
| ```typescript src/main.ts | |
| <content> | |
| ``` | |
| - Patches: ### PATCH: src/main.ts then ```diff with unified diff syntax | |
| - Always prefer patches over full file rewrites after initial generation | |
| - Keep responses focused; do NOT repeat unchanged file contents | |
| - Files may be up to 1200 lines. Split larger logic into sub-modules. | |
| CONTINUATION PROTOCOL (critical): | |
| - If you cannot finish a file in one response, write as much as you can, then end the | |
| fenced block with the literal token ##INCOMPLETE## on its own line before the closing ```. | |
| - Example: | |
| ```rust src/handlers.rs | |
| // ... all content you managed to write ... | |
| ##INCOMPLETE## | |
| ``` | |
| - In the very next response (a CONTINUE prompt), resume from exactly where you left off. | |
| - For continuations output: | |
| ### CONTINUE: src/handlers.rs | |
| ```rust | |
| <rest of file, picking up at the exact next line> | |
| ``` | |
| - Never re-emit lines already written. Overlap by at most 2 lines for context. | |
| - A file is only considered complete when its fenced block closes WITHOUT ##INCOMPLETE##. | |
| MEMORY RULES: | |
| - Source repo context is provided as RAG chunks (not the full repo) | |
| - Ask for more context by outputting NEED_CONTEXT: <query> on its own line | |
| - Be precise about line numbers in diffs | |
| """ | |
| PROMPT_INITIAL = """ | |
| ## Goal | |
| {goal} | |
| ## Source Repo File Tree | |
| {file_tree} | |
| ## Relevant Source Chunks (RAG) | |
| {rag_chunks} | |
| ## CodeConvert Agent Instructions (from HF Space) | |
| {agents_md} | |
| ## Task | |
| 1. Generate a `Dockerfile` (Docker preset for HF Spaces β port 7860, non-root user) | |
| 2. Generate a `vite.config.js` (or `vite.config.ts`) appropriate for this project | |
| 3. Generate a `README.md` for the HF Space (title, description, sdk: docker, app_port: 7860) | |
| 4. Begin converting/scaffolding the main entrypoint file(s) toward the goal | |
| 5. Files may be up to 1200 lines; split larger logic into sub-modules | |
| 6. If you run out of space mid-file, end that block with ##INCOMPLETE## β do NOT truncate silently | |
| Output each file using the fenced block format. Use ##INCOMPLETE## if needed. | |
| """ | |
| PROMPT_CONTINUE = """ | |
| ## Continuation Request | |
| You previously started writing `{path}` but ran out of space at line {last_line}. | |
| ## Last lines written (for context): | |
| ``` | |
| {resume_hint} | |
| ``` | |
| ## Goal (keep in mind) | |
| {goal} | |
| ## Task | |
| Continue writing `{path}` from exactly where you left off (line {next_line}). | |
| Output ONLY the continuation using: | |
| ### CONTINUE: {path} | |
| ```<lang> | |
| <rest of file from line {next_line} onward> | |
| ``` | |
| If you STILL cannot finish in this response, end again with ##INCOMPLETE##. | |
| Do NOT re-emit lines already written (overlap max 2 lines for context is fine). | |
| """ | |
| PROMPT_PATCH = """ | |
| ## Current Goal | |
| {goal} | |
| ## Build Logs (last 80 lines) | |
| ``` | |
| {logs} | |
| ``` | |
| ## Files Currently in Space | |
| {file_list} | |
| ## Relevant Source Chunks (RAG for context) | |
| {rag_chunks} | |
| ## Task | |
| Analyze the build logs. Identify errors. Output ONLY surgical patches (unified diff) to fix them. | |
| - Use ### PATCH: <filepath> then ```diff blocks | |
| - Do NOT rewrite whole files | |
| - If you need more source context, output NEED_CONTEXT: <specific query> on its own line | |
| - If the build succeeded and goal is met, output: GOAL_COMPLETE | |
| - If the build succeeded but goal isn't fully met, output next round of patches | |
| - If a new file is needed (not previously created), use the normal fenced block format | |
| - Use ##INCOMPLETE## if a new file can't fit in one response | |
| """ | |
| # βββ Main Agent Loop ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| AGENTS_MD_FALLBACK = """ | |
| CodeConvert is an AI agent that converts codebases between programming languages. | |
| It uses a RAG approach to reference the original repo, generates idiomatic target-language | |
| code, and iteratively patches until the build passes. It focuses on: | |
| - Preserving logic and architecture | |
| - Generating idiomatic target-language code | |
| - Surgical patches based on compiler/runtime errors | |
| - Modular file structure | |
| """ | |
| def run_agent( | |
| hf_token: str, | |
| gemini_keys_raw: str, | |
| goal: str, | |
| source_input: str, # github url, hf space url, or local folder path | |
| space_name: str, | |
| max_iterations: int = 8, | |
| chunk_lines: int = 600, | |
| ) -> Generator[str, None, None]: | |
| """Main agentic loop. Yields log strings for Gradio streaming.""" | |
| def log(msg: str): | |
| return msg + "\n" | |
| yield log("π§ Initializing RepoForge...") | |
| # Parse Gemini keys | |
| gemini_keys = [k.strip() for k in re.split(r"[,\n]+", gemini_keys_raw) if k.strip()] | |
| if not gemini_keys: | |
| yield log("β No Gemini API keys provided.") | |
| return | |
| try: | |
| rotator = GeminiRotator(gemini_keys) | |
| except Exception as e: | |
| yield log(f"β Gemini init error: {e}") | |
| return | |
| # Validate HF token | |
| try: | |
| api = HfApi(token=hf_token) | |
| user = api.whoami() | |
| namespace = user["name"] | |
| yield log(f"β HuggingFace authenticated as: {namespace}") | |
| except Exception as e: | |
| yield log(f"β HF token error: {e}") | |
| return | |
| # ββ Clone / prepare source repo ββ | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| repo_dir = os.path.join(tmpdir, "source_repo") | |
| if source_input.startswith("https://github.com"): | |
| yield log(f"π¦ Cloning GitHub repo: {source_input}") | |
| result = subprocess.run( | |
| ["git", "clone", "--depth=1", source_input, repo_dir], | |
| capture_output=True, text=True | |
| ) | |
| if result.returncode != 0: | |
| yield log(f"β Git clone failed:\n{result.stderr}") | |
| return | |
| yield log("β Repo cloned.") | |
| elif "huggingface.co/spaces" in source_input: | |
| # Extract space id from URL | |
| m = re.search(r"huggingface\.co/spaces/([\w\-]+/[\w\-]+)", source_input) | |
| if not m: | |
| yield log("β Could not parse HF Space URL.") | |
| return | |
| space_id = m.group(1) | |
| yield log(f"π¦ Cloning HF Space: {space_id}") | |
| result = subprocess.run( | |
| ["git", "clone", "--depth=1", | |
| f"https://huggingface.co/spaces/{space_id}", repo_dir], | |
| capture_output=True, text=True | |
| ) | |
| if result.returncode != 0: | |
| yield log(f"β HF Space clone failed:\n{result.stderr}") | |
| return | |
| yield log("β HF Space cloned.") | |
| elif os.path.isdir(source_input): | |
| repo_dir = source_input | |
| yield log(f"π Using local folder: {repo_dir}") | |
| else: | |
| yield log("β Source must be a GitHub URL, HF Space URL, or local folder path.") | |
| return | |
| # ββ Build RAG index ββ | |
| yield log("π Building RAG index over source repo...") | |
| rag = RepoRAG(repo_dir, chunk_lines=chunk_lines) | |
| yield log(f"β Indexed {len(rag.chunks)} chunks from {len(set(c.path for c in rag.chunks))} files (chunk size: {rag.chunk_lines} lines).") | |
| file_tree = rag.file_tree() | |
| # ββ Create HF Space ββ | |
| space_mgr = SpaceManager(hf_token) | |
| safe_name = re.sub(r"[^a-zA-Z0-9\-]", "-", space_name.strip())[:50] or "repoforge-app" | |
| yield log(f"π Creating HF Space: {namespace}/{safe_name}") | |
| try: | |
| repo_id = space_mgr.create_space(namespace, safe_name) | |
| yield log(f"β Space created: {space_mgr.space_url()}") | |
| except Exception as e: | |
| yield log(f"β Failed to create space: {e}") | |
| return | |
| # ββ Track files in space ββ | |
| space_files: dict[str, str] = {} # path β content | |
| # ββ Initial generation ββ | |
| yield log("\nπ€ Asking Gemini to generate initial files...") | |
| initial_rag = rag.query(goal, top_k=8, budget_tokens=14000) | |
| initial_prompt = PROMPT_INITIAL.format( | |
| goal=goal, | |
| file_tree=file_tree[:3000], | |
| rag_chunks=initial_rag, | |
| agents_md=AGENTS_MD_FALLBACK, | |
| ) | |
| try: | |
| response = rotator.generate(SYSTEM_ARCHITECT, initial_prompt, max_tokens=8192) | |
| except Exception as e: | |
| yield log(f"β Gemini error: {e}") | |
| return | |
| # Parse files from response | |
| new_files = extract_files_from_response(response) | |
| if not new_files: | |
| yield log("β οΈ Gemini didn't output any files. Showing raw response:") | |
| yield log(response[:2000]) | |
| return | |
| # ββ Handle incomplete files from initial generation ββ | |
| pending_continuations = detect_incomplete(new_files) | |
| if pending_continuations: | |
| yield log(f"π {len(pending_continuations)} file(s) incomplete β requesting continuations...") | |
| while pending_continuations: | |
| inc = pending_continuations.pop(0) | |
| yield log(f" β©οΈ Continuing {inc.path} from line {inc.last_line}...") | |
| cont_prompt = PROMPT_CONTINUE.format( | |
| path=inc.path, | |
| last_line=inc.last_line, | |
| next_line=inc.last_line + 1, | |
| resume_hint=inc.resume_hint, | |
| goal=goal, | |
| ) | |
| try: | |
| cont_response = rotator.generate(SYSTEM_ARCHITECT, cont_prompt, max_tokens=8192) | |
| except Exception as e: | |
| yield log(f" β Continuation error for {inc.path}: {e}") | |
| continue | |
| continuation = extract_continuation(cont_response, inc) | |
| if continuation: | |
| # Check if this continuation is itself incomplete | |
| if INCOMPLETE_SENTINEL in continuation: | |
| clean_cont = continuation[: continuation.index(INCOMPLETE_SENTINEL)].rstrip() | |
| appended = inc.content_so_far + "\n" + clean_cont | |
| tail_lines = appended.splitlines() | |
| tail = "\n".join(tail_lines[-20:]) | |
| pending_continuations.append(IncompleteFile( | |
| path=inc.path, | |
| content_so_far=appended, | |
| last_line=len(appended.splitlines()), | |
| resume_hint=tail, | |
| )) | |
| new_files[inc.path] = appended | |
| yield log(f" β©οΈ {inc.path} still incomplete at line {len(appended.splitlines())} β queuing another continuation") | |
| else: | |
| new_files[inc.path] = inc.content_so_far + "\n" + continuation | |
| yield log(f" β {inc.path} completed ({len(new_files[inc.path].splitlines())} lines total)") | |
| else: | |
| yield log(f" β οΈ Could not parse continuation for {inc.path} β using partial content") | |
| # Ensure we have required files | |
| if "Dockerfile" not in new_files: | |
| yield log("β οΈ No Dockerfile generated β adding minimal one.") | |
| new_files["Dockerfile"] = textwrap.dedent(""" | |
| FROM node:20-slim | |
| WORKDIR /app | |
| COPY package*.json ./ | |
| RUN npm ci | |
| COPY . . | |
| RUN npm run build | |
| EXPOSE 7860 | |
| CMD ["npm", "run", "preview", "--", "--port", "7860", "--host"] | |
| """).strip() | |
| if "README.md" not in new_files: | |
| new_files["README.md"] = textwrap.dedent(f""" | |
| --- | |
| title: {safe_name} | |
| emoji: π§ | |
| colorFrom: blue | |
| colorTo: purple | |
| sdk: docker | |
| app_port: 7860 | |
| pinned: false | |
| --- | |
| # {safe_name} | |
| Built with RepoForge. | |
| """).strip() | |
| # Upload all initial files | |
| yield log(f"\nπ€ Uploading {len(new_files)} initial files to HF Space...") | |
| for fpath, content in new_files.items(): | |
| try: | |
| space_mgr.upload_content(content, fpath) | |
| space_files[fpath] = content | |
| yield log(f" β {fpath} ({len(content):,} chars)") | |
| except Exception as e: | |
| yield log(f" β {fpath}: {e}") | |
| yield log(f"\nπ Space URL: {space_mgr.space_url()}") | |
| yield log("β³ Waiting for initial build (60s)...") | |
| time.sleep(60) | |
| # ββ Patch loop ββ | |
| for iteration in range(1, max_iterations + 1): | |
| yield log(f"\n{'='*50}") | |
| yield log(f"π Patch iteration {iteration}/{max_iterations}") | |
| # Fetch logs | |
| yield log("π Fetching build logs...") | |
| logs = space_mgr.get_logs(lines=80) | |
| if not logs: | |
| logs = "(no logs available yet β space may still be building)" | |
| yield log(f"Logs preview:\n{logs[:500]}...") | |
| # Check for NEED_CONTEXT in previous response | |
| need_ctx_matches = re.findall(r"NEED_CONTEXT:\s*(.+)", response) | |
| if need_ctx_matches: | |
| extra_query = " ".join(need_ctx_matches) | |
| yield log(f"π Agent needs more context: '{extra_query[:100]}'") | |
| patch_rag = rag.query(extra_query, top_k=6, budget_tokens=10000) | |
| else: | |
| patch_rag = rag.query(goal + " " + logs, top_k=6, budget_tokens=10000) | |
| patch_prompt = PROMPT_PATCH.format( | |
| goal=goal, | |
| logs=logs[:3000], | |
| file_list="\n".join(space_files.keys()), | |
| rag_chunks=patch_rag, | |
| ) | |
| try: | |
| response = rotator.generate(SYSTEM_ARCHITECT, patch_prompt, max_tokens=8192) | |
| except Exception as e: | |
| yield log(f"β Gemini error: {e}") | |
| break | |
| if "GOAL_COMPLETE" in response: | |
| yield log("\nπ GOAL COMPLETE! Agent confirmed success.") | |
| break | |
| # Apply patches | |
| patches = extract_patches(response) | |
| new_in_patch = extract_files_from_response(response) | |
| if not patches and not new_in_patch: | |
| yield log("β οΈ No patches or new files found in response.") | |
| yield log(response[:1000]) | |
| for fpath, patch_str in patches.items(): | |
| original = space_files.get(fpath, "") | |
| patched = apply_unified_diff(original, patch_str) | |
| if patched != original: | |
| try: | |
| space_mgr.upload_content(patched, fpath) | |
| space_files[fpath] = patched | |
| yield log(f" π©Ή Patched: {fpath}") | |
| except Exception as e: | |
| yield log(f" β Upload failed for {fpath}: {e}") | |
| else: | |
| yield log(f" β οΈ Patch didn't apply cleanly for {fpath}") | |
| # New files from patch response (shouldn't be many after init) | |
| patch_new_incomplete = detect_incomplete(new_in_patch) | |
| if patch_new_incomplete: | |
| yield log(f" π {len(patch_new_incomplete)} new file(s) incomplete in patch round β continuing...") | |
| while patch_new_incomplete: | |
| inc = patch_new_incomplete.pop(0) | |
| yield log(f" β©οΈ Continuing new file {inc.path} from line {inc.last_line}...") | |
| cont_prompt = PROMPT_CONTINUE.format( | |
| path=inc.path, | |
| last_line=inc.last_line, | |
| next_line=inc.last_line + 1, | |
| resume_hint=inc.resume_hint, | |
| goal=goal, | |
| ) | |
| try: | |
| cont_r = rotator.generate(SYSTEM_ARCHITECT, cont_prompt, max_tokens=8192) | |
| except Exception as e: | |
| yield log(f" β {e}") | |
| continue | |
| cont = extract_continuation(cont_r, inc) | |
| if cont: | |
| if INCOMPLETE_SENTINEL in cont: | |
| clean_c = cont[: cont.index(INCOMPLETE_SENTINEL)].rstrip() | |
| appended = inc.content_so_far + "\n" + clean_c | |
| tail_lines = appended.splitlines() | |
| patch_new_incomplete.append(IncompleteFile( | |
| path=inc.path, | |
| content_so_far=appended, | |
| last_line=len(appended.splitlines()), | |
| resume_hint="\n".join(tail_lines[-20:]), | |
| )) | |
| new_in_patch[inc.path] = appended | |
| else: | |
| new_in_patch[inc.path] = inc.content_so_far + "\n" + cont | |
| yield log(f" β {inc.path} complete ({len(new_in_patch[inc.path].splitlines())} lines)") | |
| for fpath, content in new_in_patch.items(): | |
| if fpath not in space_files: | |
| try: | |
| space_mgr.upload_content(content, fpath) | |
| space_files[fpath] = content | |
| yield log(f" β New file: {fpath}") | |
| except Exception as e: | |
| yield log(f" β {fpath}: {e}") | |
| if patches or new_in_patch: | |
| yield log("β³ Waiting for rebuild (45s)...") | |
| time.sleep(45) | |
| yield log(f"\n⨠Done! Space: {space_mgr.space_url()}") | |
| yield log(f"π Files in space: {', '.join(space_files.keys())}") | |
| # βββ Gradio UI ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CSS = """ | |
| @import url('https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;700&family=Syne:wght@400;700;800&display=swap'); | |
| :root { | |
| --bg: #0a0a0f; | |
| --surface: #111118; | |
| --border: #1e1e2e; | |
| --accent: #7c3aed; | |
| --accent2: #06b6d4; | |
| --text: #e2e8f0; | |
| --muted: #64748b; | |
| --success: #10b981; | |
| --danger: #ef4444; | |
| } | |
| body, .gradio-container { | |
| background: var(--bg) !important; | |
| font-family: 'Syne', sans-serif !important; | |
| color: var(--text) !important; | |
| } | |
| .gradio-container { max-width: 1100px !important; margin: 0 auto !important; } | |
| h1.title { | |
| font-family: 'Syne', sans-serif; | |
| font-size: 2.8rem; | |
| font-weight: 800; | |
| background: linear-gradient(135deg, #7c3aed, #06b6d4); | |
| -webkit-background-clip: text; | |
| -webkit-text-fill-color: transparent; | |
| margin: 0; | |
| letter-spacing: -0.04em; | |
| } | |
| .subtitle { color: var(--muted); font-size: 0.95rem; margin-top: 4px; font-family: 'JetBrains Mono', monospace; } | |
| .panel { | |
| background: var(--surface); | |
| border: 1px solid var(--border); | |
| border-radius: 12px; | |
| padding: 20px; | |
| } | |
| label { color: var(--muted) !important; font-size: 0.82rem !important; font-family: 'JetBrains Mono', monospace !important; } | |
| input, textarea { | |
| background: var(--bg) !important; | |
| border: 1px solid var(--border) !important; | |
| color: var(--text) !important; | |
| font-family: 'JetBrains Mono', monospace !important; | |
| border-radius: 8px !important; | |
| } | |
| input:focus, textarea:focus { | |
| border-color: var(--accent) !important; | |
| box-shadow: 0 0 0 3px rgba(124,58,237,0.15) !important; | |
| } | |
| button.primary { | |
| background: linear-gradient(135deg, var(--accent), var(--accent2)) !important; | |
| border: none !important; | |
| border-radius: 10px !important; | |
| font-family: 'Syne', sans-serif !important; | |
| font-weight: 700 !important; | |
| font-size: 1rem !important; | |
| letter-spacing: 0.02em !important; | |
| padding: 14px 28px !important; | |
| color: white !important; | |
| cursor: pointer !important; | |
| transition: opacity 0.2s !important; | |
| } | |
| button.primary:hover { opacity: 0.88 !important; } | |
| .log-box textarea { | |
| font-family: 'JetBrains Mono', monospace !important; | |
| font-size: 0.78rem !important; | |
| background: #050508 !important; | |
| color: #a0f0b0 !important; | |
| border: 1px solid #1a2a1a !important; | |
| } | |
| .tip { | |
| font-family: 'JetBrains Mono', monospace; | |
| font-size: 0.75rem; | |
| color: var(--muted); | |
| border-left: 2px solid var(--accent); | |
| padding-left: 10px; | |
| margin-top: 8px; | |
| } | |
| """ | |
| def build_ui(): | |
| with gr.Blocks(css=CSS, title="RepoForge") as demo: | |
| gr.HTML(""" | |
| <div style="padding: 32px 0 8px 0;"> | |
| <h1 class="title">RepoForge</h1> | |
| <p class="subtitle">// agentic repo conversion Β· surgical patches Β· rag-powered context</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| # ββ Left column: config ββ | |
| with gr.Column(scale=1): | |
| gr.HTML('<div class="panel">') | |
| gr.HTML('<p style="font-size:0.85rem;color:#7c3aed;font-weight:700;margin:0 0 12px;">π CREDENTIALS</p>') | |
| hf_token = gr.Textbox( | |
| label="HuggingFace Access Token", | |
| placeholder="hf_...", | |
| type="password", | |
| lines=1, | |
| ) | |
| gemini_keys = gr.Textbox( | |
| label="Gemini API Keys (one per line or comma-separated)", | |
| placeholder="AIzaSy...\nAIzaSy...", | |
| lines=4, | |
| type="password", | |
| ) | |
| gr.HTML('<p class="tip">Multiple keys β auto-rotates on quota exhaustion</p>') | |
| gr.HTML('<p style="font-size:0.85rem;color:#06b6d4;font-weight:700;margin:16px 0 12px;">π¦ SOURCE</p>') | |
| source_input = gr.Textbox( | |
| label="Source (GitHub URL / HF Space URL / Local Folder Path)", | |
| placeholder="https://github.com/owner/repo", | |
| lines=1, | |
| ) | |
| space_name = gr.Textbox( | |
| label="New HF Space Name", | |
| placeholder="my-converted-app", | |
| lines=1, | |
| ) | |
| gr.HTML('<p style="font-size:0.85rem;color:#10b981;font-weight:700;margin:16px 0 12px;">π― GOAL</p>') | |
| goal = gr.Textbox( | |
| label="Conversion Goal", | |
| placeholder="Convert this Python Flask app to a Rust Axum web server with identical API endpoints", | |
| lines=4, | |
| ) | |
| max_iters = gr.Slider( | |
| label="Max patch iterations", | |
| minimum=2, maximum=20, step=1, value=8 | |
| ) | |
| chunk_lines_slider = gr.Slider( | |
| label="RAG chunk size (lines per chunk, 300β1200)", | |
| minimum=300, maximum=1200, step=100, value=600, | |
| info="Larger = more context per RAG hit, more tokens used" | |
| ) | |
| run_btn = gr.Button("β‘ Launch RepoForge", variant="primary", elem_classes=["primary"]) | |
| gr.HTML('</div>') | |
| # ββ Right column: logs ββ | |
| with gr.Column(scale=1): | |
| gr.HTML('<div class="panel" style="height:100%;">') | |
| gr.HTML('<p style="font-size:0.85rem;color:#7c3aed;font-weight:700;margin:0 0 12px;">π AGENT LOG</p>') | |
| log_output = gr.Textbox( | |
| label="", | |
| lines=30, | |
| max_lines=60, | |
| interactive=False, | |
| elem_classes=["log-box"], | |
| show_copy_button=True, | |
| ) | |
| gr.HTML('</div>') | |
| # ββ How it works ββ | |
| with gr.Accordion("βΉοΈ How RepoForge works", open=False): | |
| gr.Markdown(""" | |
| **RepoForge** is a fully agentic repo-conversion loop: | |
| 1. **Clones** your source repo (GitHub, HF Space, or local folder) | |
| 2. **Indexes** it with a lightweight RAG system β only relevant chunks are sent to the LLM, never the whole repo | |
| 3. **Creates** a HuggingFace Space with the Docker preset | |
| 4. **Generates** `Dockerfile`, `vite.config.js`, `README.md`, and initial source files | |
| 5. **Iterates**: fetches build logs β asks Gemini to output surgical `unified diff` patches β applies them β waits for rebuild | |
| 6. **Never** fully regenerates a file after initial creation β only patches | |
| 7. **Rotates** Gemini API keys automatically when quota is hit | |
| **Source input formats:** | |
| - `https://github.com/owner/repo` β cloned via git | |
| - `https://huggingface.co/spaces/owner/name` β cloned from HF | |
| - `/home/user/myproject` β local folder (must be accessible) | |
| """) | |
| # ββ Wire up ββ | |
| accumulated_logs = gr.State("") | |
| def stream_wrapper(hf_tok, gem_keys, goal_txt, src, sname, iters, chunk_sz, prev_logs): | |
| all_logs = prev_logs or "" | |
| for chunk in run_agent( | |
| hf_token=hf_tok, | |
| gemini_keys_raw=gem_keys, | |
| goal=goal_txt, | |
| source_input=src, | |
| space_name=sname, | |
| max_iterations=int(iters), | |
| chunk_lines=int(chunk_sz), | |
| ): | |
| all_logs += chunk | |
| yield all_logs, all_logs | |
| run_btn.click( | |
| fn=stream_wrapper, | |
| inputs=[hf_token, gemini_keys, goal, source_input, space_name, max_iters, chunk_lines_slider, accumulated_logs], | |
| outputs=[log_output, accumulated_logs], | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| app = build_ui() | |
| app.launch(server_port=7860, share=False) |