codeconvert / app.py
wuhp's picture
Create app.py
a623051 verified
"""
RepoForge – Agentic Repo Conversion App
========================================
β€’ Multi-key Gemini rotation (auto-switches when quota hits)
β€’ RAG over source repo (chunked, on-demand, never fully loaded)
β€’ Creates a HF Space (Docker preset), generates Dockerfile + vite.config.js
β€’ Surgical patch loop: read build logs β†’ apply unified diffs β†’ never full rewrites
β€’ Streams progress back to Gradio UI in real-time
"""
import gradio as gr
import google.generativeai as genai
from huggingface_hub import HfApi, SpaceHardware
import os, re, json, time, math, hashlib, tempfile, subprocess, textwrap
from pathlib import Path
from dataclasses import dataclass, field
from typing import Generator, Optional
import threading
# ─── Gemini Key Rotator ───────────────────────────────────────────────────────
class GeminiRotator:
"""Round-robin across multiple Gemini API keys; swaps on quota errors."""
QUOTA_ERRORS = ("429", "quota", "rate", "exhausted", "resource_exhausted")
def __init__(self, keys: list[str]):
self.keys = [k.strip() for k in keys if k.strip()]
self.idx = 0
self.lock = threading.Lock()
if not self.keys:
raise ValueError("At least one Gemini API key is required.")
def current_key(self) -> str:
return self.keys[self.idx % len(self.keys)]
def rotate(self):
with self.lock:
self.idx = (self.idx + 1) % len(self.keys)
def generate(self, system: str, prompt: str, max_tokens: int = 8192) -> str:
"""Try each key once; raise if all exhausted."""
for attempt in range(len(self.keys)):
key = self.current_key()
try:
genai.configure(api_key=key)
model = genai.GenerativeModel(
model_name="gemini-1.5-pro",
system_instruction=system,
)
resp = model.generate_content(
prompt,
generation_config=genai.types.GenerationConfig(
max_output_tokens=max_tokens,
temperature=0.2,
),
)
return resp.text
except Exception as e:
err = str(e).lower()
if any(q in err for q in self.QUOTA_ERRORS):
self.rotate()
continue
raise
raise RuntimeError("All Gemini API keys exhausted or errored.")
# ─── Repo RAG Index ───────────────────────────────────────────────────────────
@dataclass
class FileChunk:
path: str
start_line: int
end_line: int
content: str
tokens: int # rough estimate
def header(self) -> str:
return f"### {self.path} (lines {self.start_line}-{self.end_line})"
class RepoRAG:
"""
Lightweight RAG over a local repo directory.
Files are chunked with configurable size (300–1200 lines) and a 30-line overlap
so context is never lost at chunk boundaries.
On query we return the top-k most relevant chunks (never the whole repo).
"""
MIN_CHUNK_LINES = 300
MAX_CHUNK_LINES = 1200
OVERLAP_LINES = 30 # lines shared between adjacent chunks
IGNORE_DIRS = {".git", "node_modules", "__pycache__", ".venv", "dist", "build"}
TEXT_EXTS = {
".py", ".js", ".ts", ".jsx", ".tsx", ".rs", ".go", ".java", ".c", ".cpp",
".h", ".hpp", ".cs", ".rb", ".php", ".swift", ".kt", ".toml", ".yaml",
".yml", ".json", ".md", ".txt", ".sh", ".bash", ".dockerfile", ".env",
".html", ".css", ".scss", ".sql", ".graphql",
}
def __init__(self, repo_path: str, chunk_lines: int = 600):
self.repo_path = Path(repo_path)
self.chunk_lines = max(self.MIN_CHUNK_LINES, min(self.MAX_CHUNK_LINES, chunk_lines))
self.chunks: list[FileChunk] = []
self.index: dict[str, list[int]] = {} # token β†’ chunk indices
self._build()
def _build(self):
step = self.chunk_lines - self.OVERLAP_LINES # stride with overlap
for fpath in self.repo_path.rglob("*"):
if fpath.is_dir():
continue
if any(p in fpath.parts for p in self.IGNORE_DIRS):
continue
if fpath.suffix.lower() not in self.TEXT_EXTS:
continue
try:
lines = fpath.read_text(errors="replace").splitlines()
except Exception:
continue
rel = str(fpath.relative_to(self.repo_path))
total = max(len(lines), 1)
for i in range(0, total, step):
chunk_lines = lines[i : i + self.chunk_lines]
content = "\n".join(chunk_lines)
chunk = FileChunk(
path=rel,
start_line=i + 1,
end_line=i + len(chunk_lines),
content=content,
tokens=len(content) // 4, # rough char/4 β‰ˆ tokens
)
self.chunks.append(chunk)
# index words
for word in re.findall(r"[a-zA-Z_]\w{2,}", content.lower()):
self.index.setdefault(word, []).append(len(self.chunks) - 1)
def query(self, query: str, top_k: int = 6, budget_tokens: int = 12000) -> str:
"""Return top_k relevant chunks as a formatted string, within token budget."""
words = re.findall(r"[a-zA-Z_]\w{2,}", query.lower())
scores: dict[int, int] = {}
for w in words:
for idx in self.index.get(w, []):
scores[idx] = scores.get(idx, 0) + 1
ranked = sorted(scores, key=lambda i: -scores[i])[:top_k]
# Pad with first chunks if nothing matched
for i in range(len(self.chunks)):
if len(ranked) >= top_k:
break
if i not in ranked:
ranked.append(i)
out, used = [], 0
for idx in ranked:
c = self.chunks[idx]
if used + c.tokens > budget_tokens:
break
out.append(f"{c.header()}\n```\n{c.content}\n```")
used += c.tokens
return "\n\n".join(out) if out else "(no relevant source chunks found)"
def file_tree(self, max_lines: int = 200) -> str:
paths = sorted(set(c.path for c in self.chunks))
lines = paths[:max_lines]
if len(paths) > max_lines:
lines.append(f"... and {len(paths) - max_lines} more files")
return "\n".join(lines)
def get_file(self, rel_path: str) -> Optional[str]:
"""Return full content of a specific file."""
target = Path(rel_path)
full = self.repo_path / target
if full.exists():
try:
return full.read_text(errors="replace")
except Exception:
pass
return None
# ─── HuggingFace Space Manager ────────────────────────────────────────────────
class SpaceManager:
def __init__(self, hf_token: str):
self.api = HfApi(token=hf_token)
self.token = hf_token
self.space_id: Optional[str] = None
def create_space(self, namespace: str, name: str) -> str:
"""Create a Docker-preset HF Space. Returns repo_id."""
repo_id = f"{namespace}/{name}"
self.api.create_repo(
repo_id=repo_id,
repo_type="space",
space_sdk="docker",
exist_ok=True,
private=False,
)
self.space_id = repo_id
return repo_id
def upload_file(self, local_path: str, repo_path: str):
if not self.space_id:
raise RuntimeError("No space created yet.")
self.api.upload_file(
path_or_fileobj=local_path,
path_in_repo=repo_path,
repo_id=self.space_id,
repo_type="space",
)
def upload_content(self, content: str, repo_path: str):
"""Upload string content directly."""
with tempfile.NamedTemporaryFile(mode="w", suffix=Path(repo_path).suffix,
delete=False) as f:
f.write(content)
tmp = f.name
self.upload_file(tmp, repo_path)
os.unlink(tmp)
def get_logs(self, lines: int = 80) -> str:
"""Fetch build logs from the Space."""
if not self.space_id:
return ""
try:
logs = self.api.get_space_runtime(self.space_id)
# HF SDK returns runtime info; fetch build logs via jobs endpoint
# We use the logs iterator
log_text = []
for entry in self.api.get_space_logs(self.space_id, lines=lines):
log_text.append(entry.get("text", ""))
return "\n".join(log_text)
except Exception as e:
return f"(could not fetch logs: {e})"
def space_url(self) -> str:
if not self.space_id:
return ""
return f"https://huggingface.co/spaces/{self.space_id}"
# ─── Patch Utilities ──────────────────────────────────────────────────────────
def apply_unified_diff(original: str, patch: str) -> str:
"""Apply a unified diff string to original content. Falls back to whole-replace."""
with tempfile.TemporaryDirectory() as td:
orig_file = os.path.join(td, "original")
patch_file = os.path.join(td, "patch.diff")
Path(orig_file).write_text(original)
Path(patch_file).write_text(patch)
result = subprocess.run(
["patch", "-u", orig_file, patch_file],
capture_output=True, text=True
)
if result.returncode == 0:
return Path(orig_file).read_text()
return original # patch failed, return unchanged
def extract_files_from_response(text: str) -> dict[str, str]:
"""
Parse LLM response for fenced code blocks with filenames.
Supports: ```language path/to/file or ### file: path/to/file ```
"""
files = {}
# Pattern 1: ```lang path/to/file\n...content...\n```
pattern1 = re.finditer(
r"```[\w]*\s+([\w./\-]+)\n(.*?)```",
text, re.DOTALL
)
for m in pattern1:
files[m.group(1).strip()] = m.group(2)
# Pattern 2: ### FILE: path\n```\ncontent\n```
pattern2 = re.finditer(
r"###\s+(?:FILE|file):\s*([\w./\-]+)\s*\n```[\w]*\n(.*?)```",
text, re.DOTALL
)
for m in pattern2:
files[m.group(1).strip()] = m.group(2)
return files
def extract_patches(text: str) -> dict[str, str]:
"""Extract unified diff patches from LLM response."""
patches = {}
pattern = re.finditer(
r"###\s+PATCH:\s*([\w./\-]+)\s*\n```diff\n(.*?)```",
text, re.DOTALL
)
for m in pattern:
patches[m.group(1).strip()] = m.group(2)
return patches
# ─── File Continuation System ─────────────────────────────────────────────────
# Sentinel the model outputs when it runs out of tokens mid-file
INCOMPLETE_SENTINEL = "##INCOMPLETE##"
@dataclass
class IncompleteFile:
"""Tracks a file the model started but didn't finish."""
path: str
content_so_far: str # everything written up to the cutoff
last_line: int # last line number written (1-indexed)
resume_hint: str # last ~20 lines to show the model for context
def detect_incomplete(files: dict[str, str]) -> list[IncompleteFile]:
"""
Scan parsed files for the INCOMPLETE sentinel.
Returns a list of files that need continuation.
"""
incomplete = []
for path, content in files.items():
if INCOMPLETE_SENTINEL in content:
# Strip the sentinel itself
clean = content[: content.index(INCOMPLETE_SENTINEL)].rstrip()
lines = clean.splitlines()
tail = "\n".join(lines[-20:]) if len(lines) >= 20 else clean
incomplete.append(IncompleteFile(
path=path,
content_so_far=clean,
last_line=len(lines),
resume_hint=tail,
))
files[path] = clean # store what we have so far
return incomplete
def extract_continuation(text: str, inc: IncompleteFile) -> Optional[str]:
"""
Pull the continuation content for a specific file from a CONTINUE response.
The model is instructed to emit:
### CONTINUE: path/to/file
```lang
<rest of file from where it left off>
```
Returns the continuation content, or None if not found.
"""
pattern = re.search(
rf"###\s+CONTINUE:\s*{re.escape(inc.path)}\s*\n```[\w]*\n(.*?)```",
text, re.DOTALL
)
if pattern:
return pattern.group(1)
# Fallback: bare fenced block
pattern2 = re.search(
rf"```[\w]*\s+{re.escape(inc.path)}\n(.*?)```",
text, re.DOTALL
)
return pattern2.group(1) if pattern2 else None
# ─── Prompts ──────────────────────────────────────────────────────────────────
SYSTEM_ARCHITECT = """You are RepoForge, an expert software architect and full-stack engineer.
You help users convert, rebuild, or transform code repositories by:
1. Analyzing the source repo structure (provided via RAG chunks)
2. Creating a HuggingFace Space with Docker preset
3. Generating a production-grade Dockerfile and vite.config.js
4. Iteratively patching code based on build logs β€” NEVER fully regenerating files after initial creation
5. Making SURGICAL unified-diff patches to fix errors
FILE FORMAT RULES (strictly follow):
- New files: fenced block with path on the opening line:
```typescript src/main.ts
<content>
```
- Patches: ### PATCH: src/main.ts then ```diff with unified diff syntax
- Always prefer patches over full file rewrites after initial generation
- Keep responses focused; do NOT repeat unchanged file contents
- Files may be up to 1200 lines. Split larger logic into sub-modules.
CONTINUATION PROTOCOL (critical):
- If you cannot finish a file in one response, write as much as you can, then end the
fenced block with the literal token ##INCOMPLETE## on its own line before the closing ```.
- Example:
```rust src/handlers.rs
// ... all content you managed to write ...
##INCOMPLETE##
```
- In the very next response (a CONTINUE prompt), resume from exactly where you left off.
- For continuations output:
### CONTINUE: src/handlers.rs
```rust
<rest of file, picking up at the exact next line>
```
- Never re-emit lines already written. Overlap by at most 2 lines for context.
- A file is only considered complete when its fenced block closes WITHOUT ##INCOMPLETE##.
MEMORY RULES:
- Source repo context is provided as RAG chunks (not the full repo)
- Ask for more context by outputting NEED_CONTEXT: <query> on its own line
- Be precise about line numbers in diffs
"""
PROMPT_INITIAL = """
## Goal
{goal}
## Source Repo File Tree
{file_tree}
## Relevant Source Chunks (RAG)
{rag_chunks}
## CodeConvert Agent Instructions (from HF Space)
{agents_md}
## Task
1. Generate a `Dockerfile` (Docker preset for HF Spaces β€” port 7860, non-root user)
2. Generate a `vite.config.js` (or `vite.config.ts`) appropriate for this project
3. Generate a `README.md` for the HF Space (title, description, sdk: docker, app_port: 7860)
4. Begin converting/scaffolding the main entrypoint file(s) toward the goal
5. Files may be up to 1200 lines; split larger logic into sub-modules
6. If you run out of space mid-file, end that block with ##INCOMPLETE## β€” do NOT truncate silently
Output each file using the fenced block format. Use ##INCOMPLETE## if needed.
"""
PROMPT_CONTINUE = """
## Continuation Request
You previously started writing `{path}` but ran out of space at line {last_line}.
## Last lines written (for context):
```
{resume_hint}
```
## Goal (keep in mind)
{goal}
## Task
Continue writing `{path}` from exactly where you left off (line {next_line}).
Output ONLY the continuation using:
### CONTINUE: {path}
```<lang>
<rest of file from line {next_line} onward>
```
If you STILL cannot finish in this response, end again with ##INCOMPLETE##.
Do NOT re-emit lines already written (overlap max 2 lines for context is fine).
"""
PROMPT_PATCH = """
## Current Goal
{goal}
## Build Logs (last 80 lines)
```
{logs}
```
## Files Currently in Space
{file_list}
## Relevant Source Chunks (RAG for context)
{rag_chunks}
## Task
Analyze the build logs. Identify errors. Output ONLY surgical patches (unified diff) to fix them.
- Use ### PATCH: <filepath> then ```diff blocks
- Do NOT rewrite whole files
- If you need more source context, output NEED_CONTEXT: <specific query> on its own line
- If the build succeeded and goal is met, output: GOAL_COMPLETE
- If the build succeeded but goal isn't fully met, output next round of patches
- If a new file is needed (not previously created), use the normal fenced block format
- Use ##INCOMPLETE## if a new file can't fit in one response
"""
# ─── Main Agent Loop ──────────────────────────────────────────────────────────
AGENTS_MD_FALLBACK = """
CodeConvert is an AI agent that converts codebases between programming languages.
It uses a RAG approach to reference the original repo, generates idiomatic target-language
code, and iteratively patches until the build passes. It focuses on:
- Preserving logic and architecture
- Generating idiomatic target-language code
- Surgical patches based on compiler/runtime errors
- Modular file structure
"""
def run_agent(
hf_token: str,
gemini_keys_raw: str,
goal: str,
source_input: str, # github url, hf space url, or local folder path
space_name: str,
max_iterations: int = 8,
chunk_lines: int = 600,
) -> Generator[str, None, None]:
"""Main agentic loop. Yields log strings for Gradio streaming."""
def log(msg: str):
return msg + "\n"
yield log("πŸ”§ Initializing RepoForge...")
# Parse Gemini keys
gemini_keys = [k.strip() for k in re.split(r"[,\n]+", gemini_keys_raw) if k.strip()]
if not gemini_keys:
yield log("❌ No Gemini API keys provided.")
return
try:
rotator = GeminiRotator(gemini_keys)
except Exception as e:
yield log(f"❌ Gemini init error: {e}")
return
# Validate HF token
try:
api = HfApi(token=hf_token)
user = api.whoami()
namespace = user["name"]
yield log(f"βœ… HuggingFace authenticated as: {namespace}")
except Exception as e:
yield log(f"❌ HF token error: {e}")
return
# ── Clone / prepare source repo ──
with tempfile.TemporaryDirectory() as tmpdir:
repo_dir = os.path.join(tmpdir, "source_repo")
if source_input.startswith("https://github.com"):
yield log(f"πŸ“¦ Cloning GitHub repo: {source_input}")
result = subprocess.run(
["git", "clone", "--depth=1", source_input, repo_dir],
capture_output=True, text=True
)
if result.returncode != 0:
yield log(f"❌ Git clone failed:\n{result.stderr}")
return
yield log("βœ… Repo cloned.")
elif "huggingface.co/spaces" in source_input:
# Extract space id from URL
m = re.search(r"huggingface\.co/spaces/([\w\-]+/[\w\-]+)", source_input)
if not m:
yield log("❌ Could not parse HF Space URL.")
return
space_id = m.group(1)
yield log(f"πŸ“¦ Cloning HF Space: {space_id}")
result = subprocess.run(
["git", "clone", "--depth=1",
f"https://huggingface.co/spaces/{space_id}", repo_dir],
capture_output=True, text=True
)
if result.returncode != 0:
yield log(f"❌ HF Space clone failed:\n{result.stderr}")
return
yield log("βœ… HF Space cloned.")
elif os.path.isdir(source_input):
repo_dir = source_input
yield log(f"πŸ“ Using local folder: {repo_dir}")
else:
yield log("❌ Source must be a GitHub URL, HF Space URL, or local folder path.")
return
# ── Build RAG index ──
yield log("πŸ” Building RAG index over source repo...")
rag = RepoRAG(repo_dir, chunk_lines=chunk_lines)
yield log(f"βœ… Indexed {len(rag.chunks)} chunks from {len(set(c.path for c in rag.chunks))} files (chunk size: {rag.chunk_lines} lines).")
file_tree = rag.file_tree()
# ── Create HF Space ──
space_mgr = SpaceManager(hf_token)
safe_name = re.sub(r"[^a-zA-Z0-9\-]", "-", space_name.strip())[:50] or "repoforge-app"
yield log(f"πŸš€ Creating HF Space: {namespace}/{safe_name}")
try:
repo_id = space_mgr.create_space(namespace, safe_name)
yield log(f"βœ… Space created: {space_mgr.space_url()}")
except Exception as e:
yield log(f"❌ Failed to create space: {e}")
return
# ── Track files in space ──
space_files: dict[str, str] = {} # path β†’ content
# ── Initial generation ──
yield log("\nπŸ€– Asking Gemini to generate initial files...")
initial_rag = rag.query(goal, top_k=8, budget_tokens=14000)
initial_prompt = PROMPT_INITIAL.format(
goal=goal,
file_tree=file_tree[:3000],
rag_chunks=initial_rag,
agents_md=AGENTS_MD_FALLBACK,
)
try:
response = rotator.generate(SYSTEM_ARCHITECT, initial_prompt, max_tokens=8192)
except Exception as e:
yield log(f"❌ Gemini error: {e}")
return
# Parse files from response
new_files = extract_files_from_response(response)
if not new_files:
yield log("⚠️ Gemini didn't output any files. Showing raw response:")
yield log(response[:2000])
return
# ── Handle incomplete files from initial generation ──
pending_continuations = detect_incomplete(new_files)
if pending_continuations:
yield log(f"πŸ“ {len(pending_continuations)} file(s) incomplete β€” requesting continuations...")
while pending_continuations:
inc = pending_continuations.pop(0)
yield log(f" ↩️ Continuing {inc.path} from line {inc.last_line}...")
cont_prompt = PROMPT_CONTINUE.format(
path=inc.path,
last_line=inc.last_line,
next_line=inc.last_line + 1,
resume_hint=inc.resume_hint,
goal=goal,
)
try:
cont_response = rotator.generate(SYSTEM_ARCHITECT, cont_prompt, max_tokens=8192)
except Exception as e:
yield log(f" ❌ Continuation error for {inc.path}: {e}")
continue
continuation = extract_continuation(cont_response, inc)
if continuation:
# Check if this continuation is itself incomplete
if INCOMPLETE_SENTINEL in continuation:
clean_cont = continuation[: continuation.index(INCOMPLETE_SENTINEL)].rstrip()
appended = inc.content_so_far + "\n" + clean_cont
tail_lines = appended.splitlines()
tail = "\n".join(tail_lines[-20:])
pending_continuations.append(IncompleteFile(
path=inc.path,
content_so_far=appended,
last_line=len(appended.splitlines()),
resume_hint=tail,
))
new_files[inc.path] = appended
yield log(f" ↩️ {inc.path} still incomplete at line {len(appended.splitlines())} β€” queuing another continuation")
else:
new_files[inc.path] = inc.content_so_far + "\n" + continuation
yield log(f" βœ… {inc.path} completed ({len(new_files[inc.path].splitlines())} lines total)")
else:
yield log(f" ⚠️ Could not parse continuation for {inc.path} β€” using partial content")
# Ensure we have required files
if "Dockerfile" not in new_files:
yield log("⚠️ No Dockerfile generated β€” adding minimal one.")
new_files["Dockerfile"] = textwrap.dedent("""
FROM node:20-slim
WORKDIR /app
COPY package*.json ./
RUN npm ci
COPY . .
RUN npm run build
EXPOSE 7860
CMD ["npm", "run", "preview", "--", "--port", "7860", "--host"]
""").strip()
if "README.md" not in new_files:
new_files["README.md"] = textwrap.dedent(f"""
---
title: {safe_name}
emoji: πŸ”§
colorFrom: blue
colorTo: purple
sdk: docker
app_port: 7860
pinned: false
---
# {safe_name}
Built with RepoForge.
""").strip()
# Upload all initial files
yield log(f"\nπŸ“€ Uploading {len(new_files)} initial files to HF Space...")
for fpath, content in new_files.items():
try:
space_mgr.upload_content(content, fpath)
space_files[fpath] = content
yield log(f" βœ… {fpath} ({len(content):,} chars)")
except Exception as e:
yield log(f" ❌ {fpath}: {e}")
yield log(f"\nπŸ”— Space URL: {space_mgr.space_url()}")
yield log("⏳ Waiting for initial build (60s)...")
time.sleep(60)
# ── Patch loop ──
for iteration in range(1, max_iterations + 1):
yield log(f"\n{'='*50}")
yield log(f"πŸ” Patch iteration {iteration}/{max_iterations}")
# Fetch logs
yield log("πŸ“‹ Fetching build logs...")
logs = space_mgr.get_logs(lines=80)
if not logs:
logs = "(no logs available yet β€” space may still be building)"
yield log(f"Logs preview:\n{logs[:500]}...")
# Check for NEED_CONTEXT in previous response
need_ctx_matches = re.findall(r"NEED_CONTEXT:\s*(.+)", response)
if need_ctx_matches:
extra_query = " ".join(need_ctx_matches)
yield log(f"πŸ” Agent needs more context: '{extra_query[:100]}'")
patch_rag = rag.query(extra_query, top_k=6, budget_tokens=10000)
else:
patch_rag = rag.query(goal + " " + logs, top_k=6, budget_tokens=10000)
patch_prompt = PROMPT_PATCH.format(
goal=goal,
logs=logs[:3000],
file_list="\n".join(space_files.keys()),
rag_chunks=patch_rag,
)
try:
response = rotator.generate(SYSTEM_ARCHITECT, patch_prompt, max_tokens=8192)
except Exception as e:
yield log(f"❌ Gemini error: {e}")
break
if "GOAL_COMPLETE" in response:
yield log("\nπŸŽ‰ GOAL COMPLETE! Agent confirmed success.")
break
# Apply patches
patches = extract_patches(response)
new_in_patch = extract_files_from_response(response)
if not patches and not new_in_patch:
yield log("⚠️ No patches or new files found in response.")
yield log(response[:1000])
for fpath, patch_str in patches.items():
original = space_files.get(fpath, "")
patched = apply_unified_diff(original, patch_str)
if patched != original:
try:
space_mgr.upload_content(patched, fpath)
space_files[fpath] = patched
yield log(f" 🩹 Patched: {fpath}")
except Exception as e:
yield log(f" ❌ Upload failed for {fpath}: {e}")
else:
yield log(f" ⚠️ Patch didn't apply cleanly for {fpath}")
# New files from patch response (shouldn't be many after init)
patch_new_incomplete = detect_incomplete(new_in_patch)
if patch_new_incomplete:
yield log(f" πŸ“ {len(patch_new_incomplete)} new file(s) incomplete in patch round β€” continuing...")
while patch_new_incomplete:
inc = patch_new_incomplete.pop(0)
yield log(f" ↩️ Continuing new file {inc.path} from line {inc.last_line}...")
cont_prompt = PROMPT_CONTINUE.format(
path=inc.path,
last_line=inc.last_line,
next_line=inc.last_line + 1,
resume_hint=inc.resume_hint,
goal=goal,
)
try:
cont_r = rotator.generate(SYSTEM_ARCHITECT, cont_prompt, max_tokens=8192)
except Exception as e:
yield log(f" ❌ {e}")
continue
cont = extract_continuation(cont_r, inc)
if cont:
if INCOMPLETE_SENTINEL in cont:
clean_c = cont[: cont.index(INCOMPLETE_SENTINEL)].rstrip()
appended = inc.content_so_far + "\n" + clean_c
tail_lines = appended.splitlines()
patch_new_incomplete.append(IncompleteFile(
path=inc.path,
content_so_far=appended,
last_line=len(appended.splitlines()),
resume_hint="\n".join(tail_lines[-20:]),
))
new_in_patch[inc.path] = appended
else:
new_in_patch[inc.path] = inc.content_so_far + "\n" + cont
yield log(f" βœ… {inc.path} complete ({len(new_in_patch[inc.path].splitlines())} lines)")
for fpath, content in new_in_patch.items():
if fpath not in space_files:
try:
space_mgr.upload_content(content, fpath)
space_files[fpath] = content
yield log(f" βž• New file: {fpath}")
except Exception as e:
yield log(f" ❌ {fpath}: {e}")
if patches or new_in_patch:
yield log("⏳ Waiting for rebuild (45s)...")
time.sleep(45)
yield log(f"\n✨ Done! Space: {space_mgr.space_url()}")
yield log(f"πŸ“Š Files in space: {', '.join(space_files.keys())}")
# ─── Gradio UI ────────────────────────────────────────────────────────────────
CSS = """
@import url('https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;700&family=Syne:wght@400;700;800&display=swap');
:root {
--bg: #0a0a0f;
--surface: #111118;
--border: #1e1e2e;
--accent: #7c3aed;
--accent2: #06b6d4;
--text: #e2e8f0;
--muted: #64748b;
--success: #10b981;
--danger: #ef4444;
}
body, .gradio-container {
background: var(--bg) !important;
font-family: 'Syne', sans-serif !important;
color: var(--text) !important;
}
.gradio-container { max-width: 1100px !important; margin: 0 auto !important; }
h1.title {
font-family: 'Syne', sans-serif;
font-size: 2.8rem;
font-weight: 800;
background: linear-gradient(135deg, #7c3aed, #06b6d4);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
margin: 0;
letter-spacing: -0.04em;
}
.subtitle { color: var(--muted); font-size: 0.95rem; margin-top: 4px; font-family: 'JetBrains Mono', monospace; }
.panel {
background: var(--surface);
border: 1px solid var(--border);
border-radius: 12px;
padding: 20px;
}
label { color: var(--muted) !important; font-size: 0.82rem !important; font-family: 'JetBrains Mono', monospace !important; }
input, textarea {
background: var(--bg) !important;
border: 1px solid var(--border) !important;
color: var(--text) !important;
font-family: 'JetBrains Mono', monospace !important;
border-radius: 8px !important;
}
input:focus, textarea:focus {
border-color: var(--accent) !important;
box-shadow: 0 0 0 3px rgba(124,58,237,0.15) !important;
}
button.primary {
background: linear-gradient(135deg, var(--accent), var(--accent2)) !important;
border: none !important;
border-radius: 10px !important;
font-family: 'Syne', sans-serif !important;
font-weight: 700 !important;
font-size: 1rem !important;
letter-spacing: 0.02em !important;
padding: 14px 28px !important;
color: white !important;
cursor: pointer !important;
transition: opacity 0.2s !important;
}
button.primary:hover { opacity: 0.88 !important; }
.log-box textarea {
font-family: 'JetBrains Mono', monospace !important;
font-size: 0.78rem !important;
background: #050508 !important;
color: #a0f0b0 !important;
border: 1px solid #1a2a1a !important;
}
.tip {
font-family: 'JetBrains Mono', monospace;
font-size: 0.75rem;
color: var(--muted);
border-left: 2px solid var(--accent);
padding-left: 10px;
margin-top: 8px;
}
"""
def build_ui():
with gr.Blocks(css=CSS, title="RepoForge") as demo:
gr.HTML("""
<div style="padding: 32px 0 8px 0;">
<h1 class="title">RepoForge</h1>
<p class="subtitle">// agentic repo conversion Β· surgical patches Β· rag-powered context</p>
</div>
""")
with gr.Row():
# ── Left column: config ──
with gr.Column(scale=1):
gr.HTML('<div class="panel">')
gr.HTML('<p style="font-size:0.85rem;color:#7c3aed;font-weight:700;margin:0 0 12px;">πŸ”‘ CREDENTIALS</p>')
hf_token = gr.Textbox(
label="HuggingFace Access Token",
placeholder="hf_...",
type="password",
lines=1,
)
gemini_keys = gr.Textbox(
label="Gemini API Keys (one per line or comma-separated)",
placeholder="AIzaSy...\nAIzaSy...",
lines=4,
type="password",
)
gr.HTML('<p class="tip">Multiple keys β†’ auto-rotates on quota exhaustion</p>')
gr.HTML('<p style="font-size:0.85rem;color:#06b6d4;font-weight:700;margin:16px 0 12px;">πŸ“¦ SOURCE</p>')
source_input = gr.Textbox(
label="Source (GitHub URL / HF Space URL / Local Folder Path)",
placeholder="https://github.com/owner/repo",
lines=1,
)
space_name = gr.Textbox(
label="New HF Space Name",
placeholder="my-converted-app",
lines=1,
)
gr.HTML('<p style="font-size:0.85rem;color:#10b981;font-weight:700;margin:16px 0 12px;">🎯 GOAL</p>')
goal = gr.Textbox(
label="Conversion Goal",
placeholder="Convert this Python Flask app to a Rust Axum web server with identical API endpoints",
lines=4,
)
max_iters = gr.Slider(
label="Max patch iterations",
minimum=2, maximum=20, step=1, value=8
)
chunk_lines_slider = gr.Slider(
label="RAG chunk size (lines per chunk, 300–1200)",
minimum=300, maximum=1200, step=100, value=600,
info="Larger = more context per RAG hit, more tokens used"
)
run_btn = gr.Button("⚑ Launch RepoForge", variant="primary", elem_classes=["primary"])
gr.HTML('</div>')
# ── Right column: logs ──
with gr.Column(scale=1):
gr.HTML('<div class="panel" style="height:100%;">')
gr.HTML('<p style="font-size:0.85rem;color:#7c3aed;font-weight:700;margin:0 0 12px;">πŸ“Ÿ AGENT LOG</p>')
log_output = gr.Textbox(
label="",
lines=30,
max_lines=60,
interactive=False,
elem_classes=["log-box"],
show_copy_button=True,
)
gr.HTML('</div>')
# ── How it works ──
with gr.Accordion("ℹ️ How RepoForge works", open=False):
gr.Markdown("""
**RepoForge** is a fully agentic repo-conversion loop:
1. **Clones** your source repo (GitHub, HF Space, or local folder)
2. **Indexes** it with a lightweight RAG system β€” only relevant chunks are sent to the LLM, never the whole repo
3. **Creates** a HuggingFace Space with the Docker preset
4. **Generates** `Dockerfile`, `vite.config.js`, `README.md`, and initial source files
5. **Iterates**: fetches build logs β†’ asks Gemini to output surgical `unified diff` patches β†’ applies them β†’ waits for rebuild
6. **Never** fully regenerates a file after initial creation β€” only patches
7. **Rotates** Gemini API keys automatically when quota is hit
**Source input formats:**
- `https://github.com/owner/repo` β€” cloned via git
- `https://huggingface.co/spaces/owner/name` β€” cloned from HF
- `/home/user/myproject` β€” local folder (must be accessible)
""")
# ── Wire up ──
accumulated_logs = gr.State("")
def stream_wrapper(hf_tok, gem_keys, goal_txt, src, sname, iters, chunk_sz, prev_logs):
all_logs = prev_logs or ""
for chunk in run_agent(
hf_token=hf_tok,
gemini_keys_raw=gem_keys,
goal=goal_txt,
source_input=src,
space_name=sname,
max_iterations=int(iters),
chunk_lines=int(chunk_sz),
):
all_logs += chunk
yield all_logs, all_logs
run_btn.click(
fn=stream_wrapper,
inputs=[hf_token, gemini_keys, goal, source_input, space_name, max_iters, chunk_lines_slider, accumulated_logs],
outputs=[log_output, accumulated_logs],
)
return demo
if __name__ == "__main__":
app = build_ui()
app.launch(server_port=7860, share=False)