Mexar / backend /services /agent_service.py
Devrajsinh bharatsinh gohil
Initial commit of MEXAR Ultimate - Phase 2 cleanup complete
b0b150b
import shutil
import json
from pathlib import Path
from typing import List, Optional
from sqlalchemy.orm import Session
from models.agent import Agent
from models.user import User
from core.config import settings
class AgentService:
def __init__(self):
self.storage_path = Path(settings.STORAGE_PATH)
self.storage_path.mkdir(parents=True, exist_ok=True)
def create_agent(self, db: Session, user: User, name: str, system_prompt: str) -> Agent:
"""Create a new agent entry in database."""
# Sanitize name
clean_name = name.strip().replace(" ", "_").lower()
# Check if agent already exists for this user
existing = db.query(Agent).filter(
Agent.user_id == user.id,
Agent.name == clean_name
).first()
if existing:
raise ValueError(f"You already have an agent named '{clean_name}'")
# Create agent storage directory
agent_dir = self.storage_path / str(user.id) / clean_name
agent_dir.mkdir(parents=True, exist_ok=True)
# Create DB record
new_agent = Agent(
user_id=user.id,
name=clean_name,
system_prompt=system_prompt,
storage_path=str(agent_dir),
status="initializing"
)
db.add(new_agent)
db.commit()
db.refresh(new_agent)
return new_agent
def get_agent(self, db: Session, user: User, agent_name: str) -> Optional[Agent]:
"""Get a specific agent owned by the user."""
return db.query(Agent).filter(
Agent.user_id == user.id,
Agent.name == agent_name
).first()
def get_agent_by_id(self, db: Session, agent_id: int, user_id: int) -> Optional[Agent]:
"""Get agent by ID with ownership check."""
return db.query(Agent).filter(
Agent.id == agent_id,
Agent.user_id == user_id
).first()
def list_agents(self, db: Session, user: User) -> List[Agent]:
"""List all agents owned by the user."""
return db.query(Agent).filter(Agent.user_id == user.id).all()
def delete_agent(self, db: Session, user: User, agent_name: str):
"""Delete an agent and its files."""
agent = self.get_agent(db, user, agent_name)
if not agent:
raise ValueError("Agent not found")
# Delete files
try:
if agent.storage_path and Path(agent.storage_path).exists():
shutil.rmtree(agent.storage_path)
except Exception as e:
print(f"Error deleting files for agent {agent.name}: {e}")
# Continue to delete DB record even if file deletion fails
db.delete(agent)
db.commit()
agent_service = AgentService()