Spaces:
Sleeping
Sleeping
| # app.py (ZeroGPU Version) | |
| # A production-quality, local, and uncensored text-editing agent | |
| # designed specifically for the Hugging Face ZeroGPU platform. | |
| import gradio as gr | |
| import pathlib | |
| import re | |
| import textwrap | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| from transformers.agents import Agent, Tool | |
| from fastmcp import FastMCP | |
| from huggingface_hub import snapshot_download | |
| # --- Hugging Face Spaces GPU Decorator --- | |
| # This is the key to making the app work on ZeroGPU | |
| from spaces import GPU as spaces_GPU | |
| # --- Configuration --- | |
| MODEL_ID = "NousResearch/Meta-Llama-3-8B-Instruct-GPTQ" | |
| ROOT = pathlib.Path("workspace") | |
| ROOT.mkdir(exist_ok=True) | |
| # --- 1. MCP Text-Editing Server (The "Tools" Backend) --- | |
| # This part remains the same. | |
| server = FastMCP("DocTools") | |
| # (All your @server.tool() functions: list_files, search_in_file, read_lines, patch_file go here) | |
| # ... [Paste your tool functions here to keep the script self-contained] ... | |
| def list_files(relative_path: str = ".") -> list[str]: | |
| """Lists all files and directories within a given subdirectory of the workspace.""" | |
| try: | |
| safe_path = (ROOT / relative_path).resolve() | |
| if not safe_path.is_relative_to(ROOT.resolve()): return ["Error: Access denied."] | |
| if not safe_path.exists(): return [f"Error: Directory '{relative_path}' not found."] | |
| return [p.name for p in safe_path.iterdir()] | |
| except Exception as e: return [f"An error occurred: {str(e)}"] | |
| def search_in_file(file_path: str, pattern: str, max_hits: int = 40) -> list[str]: | |
| """Searches for a regex pattern within a specified file in the workspace.""" | |
| try: | |
| safe_path = (ROOT / file_path).resolve() | |
| if not safe_path.is_relative_to(ROOT.resolve()): return ["Error: Access denied."] | |
| if not safe_path.is_file(): return [f"Error: File '{file_path}' not found."] | |
| output, regex = [], re.compile(pattern, re.IGNORECASE) | |
| with open(safe_path, 'r', encoding='utf-8') as f: | |
| for i, line in enumerate(f): | |
| if regex.search(line): | |
| output.append(f"{i+1}: {line.rstrip()}") | |
| if len(output) >= max_hits: break | |
| return output if output else ["No matches found."] | |
| except Exception as e: return [f"An error occurred: {str(e)}"] | |
| def read_lines(file_path: str, start_line: int, end_line: int) -> str: | |
| """Reads and returns a specific range of lines from a file.""" | |
| try: | |
| safe_path = (ROOT / file_path).resolve() | |
| if not safe_path.is_relative_to(ROOT.resolve()): return "Error: Access denied." | |
| if not safe_path.is_file(): return f"Error: File '{file_path}' not found." | |
| with open(safe_path, 'r', encoding='utf-8') as f: lines = f.readlines() | |
| return "".join(lines[max(0, start_line - 1):min(len(lines), end_line)]) | |
| except Exception as e: return f"An error occurred: {str(e)}"] | |
| def patch_file(file_path: str, start_line: int, end_line: int, new_content: str) -> str: | |
| """Replaces a range of lines in a file with new content.""" | |
| try: | |
| safe_path = (ROOT / file_path).resolve() | |
| if not safe_path.is_relative_to(ROOT.resolve()): return "Error: Access denied." | |
| if not safe_path.is_file(): return f"Error: File '{file_path}' not found." | |
| with open(safe_path, 'r', encoding='utf-8') as f: lines = f.readlines() | |
| new_lines = (lines[:max(0, start_line - 1)] + | |
| [line + '\n' for line in new_content.splitlines()] + | |
| lines[end_line:]) | |
| with open(safe_path, 'w', encoding='utf-8') as f: f.writelines(new_lines) | |
| return f"Success: Patched lines {start_line}-{end_line} in '{file_path}'." | |
| except Exception as e: return f"An error occurred: {str(e)}"] | |
| # --- 2. Agent and Model Loading (ZeroGPU compatible) --- | |
| # We initialize the agent as None. It will be created on the first user request. | |
| agent = None | |
| # This is our GPU-accelerated function. | |
| # It will load the model on the first run and cache it for subsequent calls. | |
| # Request GPU for 120 seconds per call | |
| def get_agent(): | |
| """ | |
| Loads and caches the LLM agent. This function runs on a GPU. | |
| """ | |
| global agent | |
| if agent is None: | |
| print("--- Loading model and agent for the first time ---") | |
| # Download the model to a persistent cache | |
| model_path = snapshot_download(MODEL_ID) | |
| tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_path, | |
| device_map="auto", | |
| torch_dtype="auto" # Recommended for modern GPUs | |
| ) | |
| llm_pipeline = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| return_full_text=False, | |
| max_new_tokens=1024, | |
| ) | |
| tools = [Tool(name=t, description=server.get_schema(t)["description"], inputs=server.get_schema(t)["parameters"], function=lambda **kwargs, t=t: server.invoke(t, **kwargs)) for t in server.list_tools()] | |
| SYSTEM_PROMPT = textwrap.dedent(""" | |
| You are an expert technical editor. You must use your tools to answer the user's request. | |
| All file paths are relative to the '/workspace' directory. | |
| Always verify file contents with `read_lines` or `search_in_file` before patching. | |
| """) | |
| agent = Agent( | |
| llm_pipeline=llm_pipeline, | |
| tools=tools, | |
| system_prompt=SYSTEM_PROMPT, | |
| max_steps=10, | |
| memory=True | |
| ) | |
| print("--- Agent loaded successfully ---") | |
| return agent | |
| # --- 3. Gradio Chat Application --- | |
| # Using gr.ChatInterface for a cleaner UI setup | |
| with gr.Blocks(theme=gr.themes.Soft(), css="footer {display: none !important}") as demo: | |
| gr.Markdown("# ZeroGPU Text-Editing Agent 📝") | |
| gr.Markdown( | |
| """ | |
| Chat with this AI agent to perform complex edits on text documents in the workspace. | |
| **Note:** The first request will have a delay as the model is loaded onto the GPU. | |
| """ | |
| ) | |
| chatbot = gr.Chatbot(height=600, label="Agent Chat") | |
| async def chat_interaction(message, history): | |
| history.append([message, None]) | |
| yield "", history # Immediately show user message | |
| # 1. Get the agent (this triggers the GPU) | |
| current_agent = get_agent() | |
| # 2. Stream the response | |
| response = "" | |
| async for step in current_agent.astream(message): | |
| if isinstance(step, str): | |
| response = step | |
| history[-1][1] = response | |
| yield "", history | |
| gr.ChatInterface( | |
| fn=chat_interaction, | |
| chatbot=chatbot, | |
| fill_height=False | |
| ) | |
| # Add a sample file to the workspace for easy testing | |
| with open(ROOT / "sample.txt", "w") as f: | |
| f.write("This is a sample file for testing the ZeroGPU agent.") | |
| demo.queue().launch() | |