Spaces:
Running
Running
| import shutil | |
| import json | |
| from pathlib import Path | |
| from typing import List, Optional | |
| from sqlalchemy.orm import Session | |
| from models.agent import Agent | |
| from models.user import User | |
| from core.config import settings | |
| class AgentService: | |
| def __init__(self): | |
| self.storage_path = Path(settings.STORAGE_PATH) | |
| self.storage_path.mkdir(parents=True, exist_ok=True) | |
| def create_agent(self, db: Session, user: User, name: str, system_prompt: str) -> Agent: | |
| """Create a new agent entry in database.""" | |
| # Sanitize name | |
| clean_name = name.strip().replace(" ", "_").lower() | |
| # Check if agent already exists for this user | |
| existing = db.query(Agent).filter( | |
| Agent.user_id == user.id, | |
| Agent.name == clean_name | |
| ).first() | |
| if existing: | |
| raise ValueError(f"You already have an agent named '{clean_name}'") | |
| # Create agent storage directory | |
| agent_dir = self.storage_path / str(user.id) / clean_name | |
| agent_dir.mkdir(parents=True, exist_ok=True) | |
| # Create DB record | |
| new_agent = Agent( | |
| user_id=user.id, | |
| name=clean_name, | |
| system_prompt=system_prompt, | |
| storage_path=str(agent_dir), | |
| status="initializing" | |
| ) | |
| db.add(new_agent) | |
| db.commit() | |
| db.refresh(new_agent) | |
| return new_agent | |
| def get_agent(self, db: Session, user: User, agent_name: str) -> Optional[Agent]: | |
| """Get a specific agent owned by the user.""" | |
| return db.query(Agent).filter( | |
| Agent.user_id == user.id, | |
| Agent.name == agent_name | |
| ).first() | |
| def get_agent_by_id(self, db: Session, agent_id: int, user_id: int) -> Optional[Agent]: | |
| """Get agent by ID with ownership check.""" | |
| return db.query(Agent).filter( | |
| Agent.id == agent_id, | |
| Agent.user_id == user_id | |
| ).first() | |
| def list_agents(self, db: Session, user: User) -> List[Agent]: | |
| """List all agents owned by the user.""" | |
| return db.query(Agent).filter(Agent.user_id == user.id).all() | |
| def delete_agent(self, db: Session, user: User, agent_name: str): | |
| """Delete an agent and its files.""" | |
| agent = self.get_agent(db, user, agent_name) | |
| if not agent: | |
| raise ValueError("Agent not found") | |
| # Delete files | |
| try: | |
| if agent.storage_path and Path(agent.storage_path).exists(): | |
| shutil.rmtree(agent.storage_path) | |
| except Exception as e: | |
| print(f"Error deleting files for agent {agent.name}: {e}") | |
| # Continue to delete DB record even if file deletion fails | |
| db.delete(agent) | |
| db.commit() | |
| agent_service = AgentService() | |