Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python | |
| # coding=utf-8 | |
| """ | |
| app.py — BRIANNA, a conversational PyTorch/ML code agent built with smolagents. | |
| BRIANNA = Brilliantly Responsive Intelligent Assistant for Neural Network | |
| Applications. She can: | |
| • talk and listen (voice handled in Gradio_UI.py + voice.py), | |
| • research the web in depth (search + read full pages), | |
| • read / write / run / design PyTorch code, | |
| • search the HuggingFace Hub for models, | |
| • generate images, | |
| • extend her own toolset by writing new tools into this file, | |
| • connect to any URL or local IP address. | |
| Everything here runs on a free HuggingFace Space with no paid API keys. | |
| """ | |
| from smolagents import ( | |
| CodeAgent, | |
| DuckDuckGoSearchTool, | |
| HfApiModel, | |
| VisitWebpageTool, | |
| load_tool, | |
| tool, | |
| ) | |
| import datetime | |
| import subprocess | |
| import textwrap | |
| from pathlib import Path | |
| from typing import Any | |
| import pytz | |
| import requests | |
| import yaml | |
| from smolagents.tools import Tool | |
| from Gradio_UI import GradioUI | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # FINAL ANSWER TOOL (inlined here so no separate tools/ package is needed) | |
| # Required — must stay in the agent's tool list. | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| class FinalAnswerTool(Tool): | |
| name = "final_answer" | |
| description = "Provides a final answer to the given problem." | |
| inputs = { | |
| "answer": {"type": "any", "description": "The final answer to the problem"} | |
| } | |
| output_type = "any" | |
| def forward(self, answer: Any) -> Any: | |
| return answer | |
| def __init__(self, *args, **kwargs): | |
| self.is_initialized = False | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # TOOL 1 — Current time in any timezone | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| def get_current_time_in_timezone(timezone: str) -> str: | |
| """Returns the current local date and time in a given timezone. | |
| Args: | |
| timezone: A valid timezone string, e.g. 'America/New_York' or 'Europe/London'. | |
| """ | |
| try: | |
| tz = pytz.timezone(timezone) | |
| local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") | |
| return f"The current local time in {timezone} is: {local_time}" | |
| except Exception as e: | |
| return f"Error fetching time for timezone '{timezone}': {str(e)}" | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # TOOL 2 — Fetch a webpage and return its readable text | |
| # Used by BRIANNA for deep research after a search finds a useful URL. | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| def fetch_webpage_text(url: str) -> str: | |
| """Fetches the plain-text content of any public webpage. | |
| Use this after a web search to read the full content of a promising result URL. | |
| Args: | |
| url: The full URL of the page to fetch, e.g. 'https://pytorch.org/docs/stable/nn.html'. | |
| """ | |
| try: | |
| import re | |
| headers = {"User-Agent": "Mozilla/5.0 (BRIANNA-Agent/1.0)"} | |
| resp = requests.get(url, headers=headers, timeout=15) | |
| resp.raise_for_status() | |
| # Strip HTML tags with a simple approach (no extra deps needed). | |
| text = re.sub(r"<[^>]+>", " ", resp.text) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text[:4000] # Keep within the context window. | |
| except Exception as e: | |
| return f"Error fetching {url}: {str(e)}" | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # TOOL 3 — PyTorch / ML code template generator | |
| # Gives BRIANNA instant boilerplate for common ML architectures. | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| def get_pytorch_template(task: str) -> str: | |
| """Returns a ready-to-run PyTorch code template for a given ML task or architecture. | |
| Supported tasks: classification, cnn, transformer, rnn, fine_tune, autoencoder. | |
| Args: | |
| task: The ML task or architecture name (see supported list above). | |
| """ | |
| templates = { | |
| "classification": ''' | |
| import torch | |
| import torch.nn as nn | |
| from torch.utils.data import DataLoader, TensorDataset | |
| class MLP(nn.Module): | |
| def __init__(self, in_dim=128, hidden=256, out_dim=10): | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.Linear(in_dim, hidden), | |
| nn.ReLU(), | |
| nn.Dropout(0.3), | |
| nn.Linear(hidden, out_dim), | |
| ) | |
| def forward(self, x): | |
| return self.net(x) | |
| model = MLP() | |
| opt = torch.optim.Adam(model.parameters(), lr=1e-3) | |
| loss_fn = nn.CrossEntropyLoss() | |
| # Dummy data — replace with your DataLoader | |
| X = torch.randn(128, 128) | |
| y = torch.randint(0, 10, (128,)) | |
| loader = DataLoader(TensorDataset(X, y), batch_size=32, shuffle=True) | |
| for epoch in range(5): | |
| for xb, yb in loader: | |
| opt.zero_grad() | |
| loss_fn(model(xb), yb).backward() | |
| opt.step() | |
| print(f"Epoch {epoch + 1} complete") | |
| ''', | |
| "cnn": ''' | |
| import torch | |
| import torch.nn as nn | |
| class CNN(nn.Module): | |
| def __init__(self, num_classes=10): | |
| super().__init__() | |
| self.features = nn.Sequential( | |
| nn.Conv2d(3, 32, kernel_size=3, padding=1), | |
| nn.BatchNorm2d(32), | |
| nn.ReLU(), | |
| nn.MaxPool2d(2), | |
| nn.Conv2d(32, 64, kernel_size=3, padding=1), | |
| nn.BatchNorm2d(64), | |
| nn.ReLU(), | |
| nn.MaxPool2d(2), | |
| ) | |
| self.classifier = nn.Sequential( | |
| nn.Flatten(), | |
| nn.Linear(64 * 8 * 8, 256), | |
| nn.ReLU(), | |
| nn.Dropout(0.5), | |
| nn.Linear(256, num_classes), | |
| ) | |
| def forward(self, x): | |
| return self.classifier(self.features(x)) | |
| model = CNN(num_classes=10) | |
| x = torch.randn(4, 3, 32, 32) # batch of 4 CIFAR-sized images | |
| print(model(x).shape) # torch.Size([4, 10]) | |
| ''', | |
| "transformer": ''' | |
| import torch | |
| import torch.nn as nn | |
| class TransformerClassifier(nn.Module): | |
| def __init__(self, vocab_size=10000, d_model=256, nhead=8, num_layers=3, num_classes=2): | |
| super().__init__() | |
| self.embed = nn.Embedding(vocab_size, d_model) | |
| self.pos_enc = nn.Embedding(512, d_model) # learned positional encoding | |
| enc_layer = nn.TransformerEncoderLayer(d_model, nhead, batch_first=True) | |
| self.encoder = nn.TransformerEncoder(enc_layer, num_layers) | |
| self.head = nn.Linear(d_model, num_classes) | |
| def forward(self, x): | |
| positions = torch.arange(x.size(1), device=x.device).unsqueeze(0) | |
| out = self.embed(x) + self.pos_enc(positions) | |
| out = self.encoder(out) | |
| return self.head(out.mean(dim=1)) # mean pooling | |
| model = TransformerClassifier() | |
| tokens = torch.randint(0, 10000, (2, 64)) # batch=2, seq_len=64 | |
| print(model(tokens).shape) # torch.Size([2, 2]) | |
| ''', | |
| "rnn": ''' | |
| import torch | |
| import torch.nn as nn | |
| class LSTMModel(nn.Module): | |
| def __init__(self, input_size=64, hidden_size=128, num_layers=2, output_size=10): | |
| super().__init__() | |
| self.lstm = nn.LSTM( | |
| input_size, hidden_size, num_layers, batch_first=True, dropout=0.3 | |
| ) | |
| self.head = nn.Linear(hidden_size, output_size) | |
| def forward(self, x): | |
| out, _ = self.lstm(x) | |
| return self.head(out[:, -1, :]) # last time-step | |
| model = LSTMModel() | |
| x = torch.randn(8, 20, 64) # batch=8, seq_len=20, features=64 | |
| print(model(x).shape) # torch.Size([8, 10]) | |
| ''', | |
| "fine_tune": ''' | |
| import torch | |
| import torch.nn as nn | |
| from torchvision import models | |
| # Load pretrained ResNet50 (ImageNet weights) | |
| model = models.resnet50(weights="IMAGENET1K_V2") | |
| # Freeze everything first | |
| for param in model.parameters(): | |
| param.requires_grad = False | |
| # Replace classifier head for your task | |
| num_classes = 5 | |
| model.fc = nn.Linear(model.fc.in_features, num_classes) | |
| # Unfreeze last residual block for fine-tuning | |
| for param in model.layer4.parameters(): | |
| param.requires_grad = True | |
| optimizer = torch.optim.Adam( | |
| filter(lambda p: p.requires_grad, model.parameters()), lr=1e-4 | |
| ) | |
| print( | |
| "Trainable params:", | |
| sum(p.numel() for p in model.parameters() if p.requires_grad), | |
| ) | |
| ''', | |
| "autoencoder": ''' | |
| import torch | |
| import torch.nn as nn | |
| class Autoencoder(nn.Module): | |
| def __init__(self, input_dim=784, latent_dim=32): | |
| super().__init__() | |
| self.encoder = nn.Sequential( | |
| nn.Linear(input_dim, 256), nn.ReLU(), nn.Linear(256, latent_dim) | |
| ) | |
| self.decoder = nn.Sequential( | |
| nn.Linear(latent_dim, 256), nn.ReLU(), nn.Linear(256, input_dim), nn.Sigmoid() | |
| ) | |
| def forward(self, x): | |
| z = self.encoder(x) | |
| recon = self.decoder(z) | |
| return recon, z | |
| model = Autoencoder() | |
| loss_fn = nn.MSELoss() | |
| opt = torch.optim.Adam(model.parameters(), lr=1e-3) | |
| x = torch.randn(16, 784) | |
| recon, z = model(x) | |
| loss = loss_fn(recon, x) | |
| print(f"Reconstruction loss: {loss.item():.4f}, latent shape: {z.shape}") | |
| ''', | |
| } | |
| key = task.lower().replace(" ", "_").replace("-", "_") | |
| if key in templates: | |
| return f"# PyTorch template: {task}\n" + textwrap.dedent(templates[key]) | |
| available = ", ".join(templates.keys()) | |
| return ( | |
| f"No template found for '{task}'. " | |
| f"Available templates: {available}. " | |
| f"Ask BRIANNA to write custom code for anything not listed." | |
| ) | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # TOOLS 4a & 4b — Inspect the Space codebase | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| def list_space_files(directory: str = ".") -> str: | |
| """Lists all Python (.py) and config files in the Space's working directory. | |
| Use this when you need to inspect the codebase before adding a new tool. | |
| Args: | |
| directory: Directory to scan (default '.' = Space root). | |
| """ | |
| root = Path(directory) | |
| py_files = sorted(root.rglob("*.py")) | |
| yaml_files = sorted(root.rglob("*.yaml")) + sorted(root.rglob("*.yml")) | |
| all_files = py_files + yaml_files | |
| if not all_files: | |
| return "No Python or config files found." | |
| return "\n".join(str(f) for f in all_files) | |
| def read_space_file(file_path: str) -> str: | |
| """Reads and returns the full contents of any file in the Space. | |
| Use this to inspect existing tools before writing new ones. | |
| Args: | |
| file_path: Relative or absolute path to the file, e.g. 'app.py'. | |
| """ | |
| try: | |
| return Path(file_path).read_text(encoding="utf-8") | |
| except Exception as e: | |
| return f"Error reading '{file_path}': {str(e)}" | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # TOOL 5 — Write a new @tool function into app.py (self-extension) | |
| # Pipeline: list_space_files → read_space_file → (research) → write_new_tool → test | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| def write_new_tool_to_file(tool_name: str, tool_code: str) -> str: | |
| """Appends a new @tool function to app.py so BRIANNA can use it from now on. | |
| The agent calls this as the final step of the self-extension pipeline: | |
| 1. inspect codebase, 2. research a library, 3. write the tool, 4. test it. | |
| Note: on a HuggingFace Space the filesystem is ephemeral, so commit the change | |
| or restart the Space to make the new tool permanently available to the agent. | |
| Args: | |
| tool_name: Short name of the new tool (used for a comment header). | |
| tool_code: Complete Python source of the new @tool function, starting with | |
| the @tool decorator. | |
| """ | |
| code = textwrap.dedent(tool_code).strip() | |
| if not code.startswith("@tool"): | |
| code = "@tool\n" + code | |
| try: | |
| with open("app.py", "a", encoding="utf-8") as f: | |
| f.write(f"\n\n# ── Auto-generated tool: {tool_name} ──\n") | |
| f.write(code) | |
| f.write("\n") | |
| return ( | |
| f"Tool '{tool_name}' successfully appended to app.py. " | |
| f"Restart the Space (or commit + reload) to make it available to the agent." | |
| ) | |
| except Exception as e: | |
| return f"Failed to write tool '{tool_name}': {str(e)}" | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # TOOL 6 — Test any Python snippet safely (verify new tools before use) | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| def test_python_code(code: str) -> str: | |
| """Runs a Python code snippet in a subprocess and returns its output. | |
| Use this to test a new tool before integrating it. PyTorch and the standard | |
| library are made available to the snippet. | |
| Args: | |
| code: Valid Python code to execute (as a plain string, not a file path). | |
| """ | |
| script = "import torch\nimport torch.nn as nn\n" + code | |
| try: | |
| result = subprocess.run( | |
| ["python3", "-c", script], | |
| capture_output=True, | |
| text=True, | |
| timeout=60, | |
| ) | |
| output = (result.stdout + result.stderr).strip() | |
| return output if output else "Code ran successfully with no output." | |
| except subprocess.TimeoutExpired: | |
| return "Code timed out after 60 seconds." | |
| except Exception as e: | |
| return f"Error running code: {str(e)}" | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # TOOL 7 — HuggingFace model search (no API key — uses the public HF Hub API) | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| def search_huggingface_models(task: str, limit: int = 5) -> str: | |
| """Searches the HuggingFace Hub for the most-downloaded PyTorch models for a task. | |
| No API key required. | |
| Args: | |
| task: ML task / pipeline tag, e.g. 'text-classification', 'image-segmentation', | |
| 'object-detection', 'text-generation', 'translation'. | |
| limit: Maximum number of models to return (default 5, max 10). | |
| """ | |
| try: | |
| url = ( | |
| "https://huggingface.co/api/models" | |
| f"?filter={task}&library=pytorch&sort=downloads&limit={min(limit, 10)}" | |
| ) | |
| resp = requests.get(url, timeout=10) | |
| resp.raise_for_status() | |
| models = resp.json() | |
| if not models: | |
| return f"No PyTorch models found for task '{task}'." | |
| lines = [ | |
| f"• {m.get('modelId', m.get('id', '?'))} " | |
| f"({m.get('downloads', '?')} downloads)" | |
| for m in models | |
| ] | |
| return f"Top PyTorch models for '{task}':\n" + "\n".join(lines) | |
| except Exception as e: | |
| return f"Error searching HF Hub: {str(e)}" | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # TOOL 8 — Connect to any URL / IP address (local APIs, services, IoT devices) | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| def call_custom_ip(url: str, method: str = "GET", body: str = "") -> str: | |
| """Sends an HTTP request to any URL or local IP address and returns the response. | |
| Useful for connecting to custom APIs, local servers, or devices. | |
| Args: | |
| url: Full URL including scheme, e.g. 'http://192.168.1.10:8080/api/data' | |
| or 'https://myapi.example.com/status'. | |
| method: HTTP method to use: GET, POST, PUT, or DELETE (default GET). | |
| body: Optional JSON body string for POST or PUT requests. | |
| """ | |
| try: | |
| headers = {"Content-Type": "application/json", "User-Agent": "BRIANNA/1.0"} | |
| resp = requests.request( | |
| method.upper(), url, headers=headers, data=body or None, timeout=10 | |
| ) | |
| return f"HTTP {resp.status_code}\n{resp.text[:2000]}" | |
| except Exception as e: | |
| return f"Request to '{url}' failed: {str(e)}" | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # AGENT SETUP | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| final_answer = FinalAnswerTool() | |
| # The LLM engine — served by the free HF Inference API (no key needed on a Space). | |
| model = HfApiModel( | |
| max_tokens=2096, | |
| temperature=0.5, | |
| model_id="Qwen/Qwen2.5-Coder-32B-Instruct", # may occasionally be overloaded | |
| custom_role_conversions=None, | |
| ) | |
| # Image-generation tool loaded from the Hub. Wrapped so a network/Hub hiccup at | |
| # startup can never stop the Space from launching. | |
| extra_tools = [] | |
| try: | |
| image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True) | |
| extra_tools.append(image_generation_tool) | |
| except Exception as e: # noqa: BLE001 | |
| print(f"[BRIANNA] Image generation tool unavailable, continuing without it: {e}") | |
| # Web tools, also wrapped — if DuckDuckGo's package/endpoint misbehaves the Space | |
| # still comes up with the rest of BRIANNA's abilities. | |
| web_tools = [] | |
| try: | |
| web_tools.append(DuckDuckGoSearchTool()) | |
| except Exception as e: # noqa: BLE001 | |
| print(f"[BRIANNA] DuckDuckGo search unavailable: {e}") | |
| try: | |
| web_tools.append(VisitWebpageTool()) | |
| except Exception as e: # noqa: BLE001 | |
| print(f"[BRIANNA] VisitWebpage tool unavailable: {e}") | |
| with open("prompts.yaml", "r") as stream: | |
| prompt_templates = yaml.safe_load(stream) | |
| agent = CodeAgent( | |
| model=model, | |
| tools=[ | |
| # ── Keep this — required ────────────────────────────────────────── | |
| final_answer, | |
| # ── Web research ────────────────────────────────────────────────── | |
| *web_tools, # DuckDuckGoSearchTool + VisitWebpageTool | |
| fetch_webpage_text, # alternative deep-read for any URL | |
| # ── Time ────────────────────────────────────────────────────────── | |
| get_current_time_in_timezone, | |
| # ── ML / PyTorch ────────────────────────────────────────────────── | |
| get_pytorch_template, | |
| test_python_code, | |
| search_huggingface_models, | |
| # ── Self-extension pipeline ─────────────────────────────────────── | |
| list_space_files, | |
| read_space_file, | |
| write_new_tool_to_file, | |
| # ── Networking ──────────────────────────────────────────────────── | |
| call_custom_ip, | |
| # ── Image generation (if it loaded) ─────────────────────────────── | |
| *extra_tools, | |
| ], | |
| max_steps=6, | |
| verbosity_level=1, | |
| grammar=None, | |
| planning_interval=None, | |
| name="BRIANNA", | |
| description=( | |
| "BRIANNA is a conversational ML code agent specialising in PyTorch. " | |
| "She can search the web, read pages, generate images, write and test " | |
| "PyTorch code, find HuggingFace models, connect to custom IPs, and " | |
| "extend her own toolset by writing new tools into the codebase." | |
| ), | |
| prompt_templates=prompt_templates, | |
| ) | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # OPTIONAL — Add an MCP server's tools to BRIANNA (suggested: arXiv MCP server) | |
| # ────────────────────────────────────────────────────────────────────────────── | |
| # smolagents speaks the Model Context Protocol natively via `ToolCollection`. | |
| # A great pairing for an ML research agent is the **arXiv MCP server** | |
| # (https://github.com/blazickjp/arxiv-mcp-server): it lets BRIANNA search and read | |
| # full ML papers from arXiv. To enable it, add `mcp[cli]` (and the server) to | |
| # requirements.txt and uncomment the block below — then pass `*arxiv_tools` into | |
| # the `tools=[...]` list above. | |
| # | |
| # from smolagents import ToolCollection | |
| # from mcp import StdioServerParameters | |
| # | |
| # arxiv_params = StdioServerParameters( | |
| # command="uvx", | |
| # args=["arxiv-mcp-server"], | |
| # ) | |
| # with ToolCollection.from_mcp(arxiv_params, trust_remote_code=True) as arxiv: | |
| # arxiv_tools = arxiv.tools | |
| # # build the CodeAgent *inside* this `with` block so the MCP stays connected. | |
| GradioUI(agent).launch() |