CodeScribe / server /tasks.py
Rahul-Samedavar's picture
allset
3e802a5
# server/tasks.py
import os
import shutil
import tempfile
import zipfile
import json
import asyncio
from pathlib import Path
from typing import AsyncGenerator, List
from git import Repo, GitCommandError
from codescribe.config import load_config
from codescribe.llm_handler import LLMHandler
from codescribe.orchestrator import DocstringOrchestrator
from codescribe.readme_generator import ReadmeGenerator
async def process_project(
project_path: Path,
description: str,
readme_note: str,
is_temp: bool,
exclude_list: List[str],
new_branch_name: str = None,
repo_full_name: str = None,
github_token: str = None
) -> AsyncGenerator[str, None]:
loop = asyncio.get_running_loop()
queue = asyncio.Queue()
def emit_event(event: str, data: dict):
"""A thread-safe way to send events from the worker thread to the async generator."""
loop.call_soon_threadsafe(queue.put_nowait, {"event": event, "data": data})
def _blocking_process():
"""The main, synchronous processing logic that runs in a separate thread."""
try:
# --- Setup ---
config = load_config()
llm_handler = LLMHandler(config.api_keys, progress_callback=lambda msg: emit_event("log", {"message": msg}))
# --- 1. Docstrings Phase (using the orchestrator) ---
doc_orchestrator = DocstringOrchestrator(
path_or_url=str(project_path),
description=description,
exclude=exclude_list,
llm_handler=llm_handler,
progress_callback=emit_event,
repo_full_name=repo_full_name # <<< THIS IS THE FIX
)
doc_orchestrator.project_path = project_path
doc_orchestrator.is_temp_dir = False
doc_orchestrator.run()
# --- 2. READMEs Phase (using the generator) ---
readme_gen = ReadmeGenerator(
path_or_url=str(project_path),
description=description,
exclude=exclude_list,
llm_handler=llm_handler,
user_note=readme_note,
progress_callback=emit_event,
repo_full_name=repo_full_name
)
readme_gen.project_path = project_path
readme_gen.is_temp_dir = False
readme_gen.run()
# --- 3. Output Phase ---
emit_event("phase", {"id": "output", "status": "in-progress"})
if new_branch_name:
# Git logic remains the same
emit_event("subtask", {"parentId": "output", "listId": "output-step-list", "id": "git-check", "name": "Checking for changes...", "status": "in-progress"})
repo = Repo(project_path)
if repo.untracked_files:
repo.git.add(repo.untracked_files)
if not repo.is_dirty(untracked_files=True):
emit_event("subtask", {"parentId": "output", "id": "git-check", "status": "success"})
emit_event("log", {"message": "No changes were generated by the AI."})
emit_event("done", {"type": "github", "url": f"https://github.com/{repo_full_name}", "message": "✅ No changes needed. Your repository is up to date."})
return
emit_event("subtask", {"parentId": "output", "id": "git-check", "status": "success"})
emit_event("subtask", {"parentId": "output", "listId": "output-step-list", "id": "git-push", "name": f"Pushing to branch '{new_branch_name}'...", "status": "in-progress"})
try:
if new_branch_name in repo.heads:
repo.heads[new_branch_name].checkout()
else:
repo.create_head(new_branch_name).checkout()
repo.git.add(A=True)
repo.index.commit("docs: Add AI-generated documentation by CodeScribe")
origin = repo.remote(name='origin')
origin.push(new_branch_name, force=True)
pr_url = f"https://github.com/{repo_full_name}/pull/new/{new_branch_name}"
emit_event("subtask", {"parentId": "output", "id": "git-push", "status": "success"})
emit_event("done", {"type": "github", "url": pr_url, "message": "✅ Successfully pushed changes!"})
except GitCommandError as e:
emit_event("log", {"message": f"Git Error: {e}"})
raise RuntimeError(f"Failed to push to GitHub: {e}")
else:
# ZIP logic remains the same
emit_event("subtask", {"parentId": "output", "listId": "output-step-list", "id": "zip-create", "name": "Creating downloadable ZIP file...", "status": "in-progress"})
temp_dir = tempfile.gettempdir()
try:
contained_dir = next(project_path.iterdir())
project_name = contained_dir.name
except StopIteration:
project_name = "documented-project"
zip_filename_base = f"codescribe-docs-{project_name}"
zip_path_base = Path(temp_dir) / zip_filename_base
zip_full_path = shutil.make_archive(str(zip_path_base), 'zip', project_path)
zip_file_name = Path(zip_full_path).name
emit_event("subtask", {"parentId": "output", "id": "zip-create", "status": "success"})
emit_event("done", {"type": "zip", "download_path": zip_file_name, "message": "✅ Your documented project is ready for download."})
emit_event("phase", {"id": "output", "status": "success"})
except Exception as e:
emit_event("error", str(e))
finally:
loop.call_soon_threadsafe(queue.put_nowait, None)
# Async runner remains the same
main_task = loop.run_in_executor(None, _blocking_process)
while True:
message = await queue.get()
if message is None:
break
json_line = json.dumps({"type": message["event"], "payload": message["data"]})
yield f"{json_line}\n"
await main_task
if is_temp and project_path and project_path.exists():
shutil.rmtree(project_path, ignore_errors=True)