Spaces:
Sleeping
Sleeping
| """ | |
| Skill Agent Router - API Endpoints | |
| """ | |
| from fastapi import APIRouter, HTTPException | |
| from typing import Dict, Any | |
| from src.models.agent_models import SkillRequest, SkillResponse, TaskStatus | |
| from src.api.agents.skill.orchestrator import SkillOrchestrator | |
| from src.api.core.logger import logger | |
| import time | |
| router = APIRouter() | |
| orchestrator = SkillOrchestrator() | |
| async def execute_skill(request: SkillRequest) -> SkillResponse: | |
| """ | |
| Execute a skill agent task | |
| - **prompt**: The task description | |
| - **skill_type**: Type of skill (code_review, bug_detection, documentation, etc.) | |
| - **context**: Optional context data | |
| - **tools**: Optional list of available tools | |
| """ | |
| start_time = time.time() | |
| try: | |
| logger.info(f"Executing skill: {request.skill_type}") | |
| result = await orchestrator.execute( | |
| prompt=request.prompt, | |
| skill_type=request.skill_type, | |
| context=request.context, | |
| tools=request.tools, | |
| ) | |
| execution_time = time.time() - start_time | |
| return SkillResponse( | |
| status=TaskStatus.COMPLETED, | |
| result=result, | |
| execution_time=execution_time, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Skill execution error: {str(e)}") | |
| execution_time = time.time() - start_time | |
| return SkillResponse( | |
| status=TaskStatus.FAILED, | |
| error=str(e), | |
| execution_time=execution_time, | |
| ) | |
| async def get_skill_types() -> Dict[str, Any]: | |
| """Get available skill types""" | |
| return { | |
| "skills": [ | |
| {"id": "code_review", "name": "Code Review", "description": "Review code for issues"}, | |
| {"id": "bug_detection", "name": "Bug Detection", "description": "Find bugs in code"}, | |
| {"id": "documentation", "name": "Documentation", "description": "Generate documentation"}, | |
| {"id": "refactoring", "name": "Refactoring", "description": "Refactor code for better quality"}, | |
| ] | |
| } | |
| async def get_available_tools() -> Dict[str, Any]: | |
| """Get available tools for skill execution""" | |
| return { | |
| "tools": [ | |
| {"id": "file_reader", "name": "File Reader", "description": "Read files from workspace"}, | |
| {"id": "file_writer", "name": "File Writer", "description": "Write files to workspace"}, | |
| {"id": "search", "name": "Search", "description": "Search in files"}, | |
| {"id": "git", "name": "Git", "description": "Git operations"}, | |
| ] | |
| } | |