| import os |
| import shutil |
| import tempfile |
| from fastapi import FastAPI, HTTPException, Request |
| from fastapi.responses import FileResponse, HTMLResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| from starlette.background import BackgroundTask |
| from pydantic import BaseModel, HttpUrl |
| from git import Repo |
| import logging |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| app = FastAPI(title="Repo2TXT API") |
|
|
| |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| MAX_FILE_SIZE_MB = 2 |
| MAX_LINES = 10000 |
|
|
| SKIP_EXTENSIONS = { |
| ".pkl", ".pyc", ".exe", ".dll", ".so", ".zip", ".tar", ".gz", |
| ".jpg", ".jpeg", ".png", ".gif", ".mp4", ".mp3", ".wav", ".pdf", |
| ".bin", ".pt", ".ckpt", ".onnx", ".glb", ".json", ".ico", ".svg" |
| } |
|
|
| SKIP_DIRS = { |
| "__pycache__", "node_modules", "venv", ".venv", |
| "env", ".git", "generated", "build", "dist", |
| ".idea", ".vscode", ".next", ".github", "vendor" |
| } |
|
|
| class RepoRequest(BaseModel): |
| url: str |
|
|
| def merge_repo(repo_url: str): |
| temp_dir = tempfile.mkdtemp() |
| repo_dir = os.path.join(temp_dir, "repo") |
| output_file = os.path.join(temp_dir, "project.txt") |
|
|
| try: |
| logger.info(f"Cloning repo: {repo_url}") |
| |
| Repo.clone_from(repo_url, repo_dir, depth=1) |
|
|
| with open(output_file, "w", encoding="utf-8") as out: |
| out.write(f"Source Repository: {repo_url}\n") |
| out.write("="*50 + "\n\n") |
|
|
| for root, dirs, files in os.walk(repo_dir): |
| |
| dirs[:] = [d for d in dirs if d not in SKIP_DIRS] |
|
|
| for file in files: |
| ext = os.path.splitext(file)[1].lower() |
| if ext in SKIP_EXTENSIONS or file.startswith('.'): |
| continue |
|
|
| file_path = os.path.join(root, file) |
|
|
| |
| try: |
| size_mb = os.path.getsize(file_path) / (1024 * 1024) |
| if size_mb > MAX_FILE_SIZE_MB: |
| continue |
|
|
| with open(file_path, "r", encoding="utf-8", errors='ignore') as f: |
| lines = f.readlines() |
|
|
| if len(lines) > MAX_LINES: |
| logger.info(f"Skipping {file} due to line count limit") |
| continue |
|
|
| rel_path = os.path.relpath(file_path, repo_dir) |
| out.write(f"\n\n===== FILE: {rel_path} =====\n\n") |
| out.writelines(lines) |
| except Exception as e: |
| logger.warning(f"Could not process file {file_path}: {e}") |
| continue |
|
|
| return output_file, temp_dir |
|
|
| except Exception as e: |
| logger.error(f"Error processing repo: {e}") |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| raise HTTPException(status_code=400, detail=f"Failed to process repository: {str(e)}") |
|
|
| @app.post("/download") |
| async def download_repo(req: RepoRequest): |
| if not req.url: |
| raise HTTPException(status_code=400, detail="Repository URL is required") |
| |
| file_path, temp_dir = merge_repo(req.url) |
|
|
| if not os.path.exists(file_path): |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| raise HTTPException(status_code=500, detail="Generated file not found") |
|
|
| return FileResponse( |
| file_path, |
| media_type="text/plain", |
| filename="project.txt", |
| background=BackgroundTask(shutil.rmtree, temp_dir, ignore_errors=True) |
| ) |
|
|
| @app.get("/") |
| async def root(): |
| index_path = os.path.join(BASE_DIR, "index.html") |
| if os.path.exists(index_path): |
| return FileResponse(index_path) |
| return {"message": "Repo2TXT API is running. index.html not found."} |
|
|