File size: 16,175 Bytes
3de42e7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
#!/usr/bin/env python3
"""Comprehensive audit script for Stack 2.9 tools and skills.

This script:
1. Imports all tools and skills
2. Tests each tool with appropriate test input
3. Measures execution time
4. Reports pass/fail status
"""

import asyncio
import json
import sys
import time
import traceback
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional

# Ensure proper imports
sys.path.insert(0, '/Users/walidsobhi/stack-2.9/src')

# Import tools module (triggers registration)
import tools
from tools.base import BaseTool, ToolResult
from tools.registry import get_registry


# Test input definitions for each tool
TOOL_TEST_INPUTS: Dict[str, Dict[str, Any]] = {
    # File tools
    "file_read": {"path": "/Users/walidsobhi/stack-2.9/audit_tools.py"},
    "file_exists": {"path": "/Users/walidsobhi/stack-2.9/audit_tools.py"},
    "file_write": {"path": "/tmp/audit_test.txt", "content": "Test content from audit"},
    "file_edit": {
        "file_path": "/tmp/audit_test_edit.txt",
        "old_string": "old content",
        "new_string": "new content",
        "replace_all": False
    },
    "glob": {"pattern": "**/*.py", "path": "/Users/walidsobhi/stack-2.9/src"},
    "grep": {"pattern": "def ", "path": "/Users/walidsobhi/stack-2.9/src/tools"},
    # Web tools
    "WebSearch": {"query": "Python testing"},
    "WebFetch": {"url": "https://example.com", "prompt": "Extract the main heading"},
    # Task tools
    "task_create": {"subject": "Test task", "description": "Test description", "activeForm": "Testing"},
    "task_list": {},
    "task_update": {"taskId": "nonexistent", "status": "completed"},
    "task_get": {"taskId": "test"},
    # Todo tools
    "todo_list": {},
    "todo_add": {"content": "Test todo item"},
    "todo_complete": {"item_id": "test"},
    "todo_delete": {"item_id": "test"},
    # Config tools
    "config_get": {"key": "test.key"},
    "config_set": {"key": "test.key", "value": "test_value"},
    # Team tools
    "team_list": {},
    "team_create": {"team_name": "test_team", "members": ["user1"]},
    "team_delete": {"team_name": "test_team"},
    # Skill tools
    "skill_list": {},
    "skill_search": {"query": "test"},
    "skill_info": {"skill_name": "nonexistent"},
    "skill_execute": {"skill_name": "nonexistent"},
    "skill_chain": {"skills": []},
    # Scheduling
    "schedule_list": {},
    "schedule_add": {"title": "Test event", "time": "2025-01-01T10:00:00"},
    "schedule_delete": {"event_id": "test"},
    # Messaging
    "message_send": {"recipient": "test_user", "message": "Test message"},
    "message_list": {},
    # Brief tool
    "brief_generate": {"content": "This is test content for the brief tool."},
    # Ask question
    "ask_question": {"question": "What is 2+2?"},
    # Sleep tool
    "sleep": {"seconds": 0.1},
    # Plan mode
    "plan_create": {"prompt": "Create a test plan"},
    "plan_execute": {"plan_id": "test"},
    # MCP tool
    "mcp_list": {},
    "mcp_invoke": {"server": "test", "method": "test"},
    # Worktree tool
    "worktree_list": {},
    "worktree_create": {"name": "test-branch", "base": "main"},
    "worktree_remove": {"name": "test-branch"},
    # Remote trigger
    "remote_trigger_execute": {"target": "test-target", "action": "ping"},
    "remote_trigger_status": {"job_id": "test"},
    # Agent tool
    "agent_execute": {"task": "Test task", "context": {}},
    "agent_status": {"job_id": "test"},
    # Synthetic output
    "synthetic_generate": {"prompt": "Generate test data", "format": "json"},
    # Tool discovery
    "tool_discover": {"query": "file"},
    "tool_search": {"pattern": "file"},
    # Config
    "config_list": {},
    "config_delete": {"key": "test.key"},
}


def get_test_input(tool_name: str) -> Optional[Dict[str, Any]]:
    """Get test input for a specific tool."""
    return TOOL_TEST_INPUTS.get(tool_name)


class AuditResult:
    """Result of auditing a single tool."""

    def __init__(
        self,
        tool_name: str,
        load_success: bool = False,
        execution_success: bool = False,
        response_time: float = 0.0,
        error: str = "",
        data: Any = None,
    ):
        self.tool_name = tool_name
        self.load_success = load_success
        self.execution_success = execution_success
        self.response_time = response_time
        self.error = error
        self.data = data
        self.timestamp = datetime.now().isoformat()

    def to_dict(self) -> Dict[str, Any]:
        return {
            "tool_name": self.tool_name,
            "load_success": self.load_success,
            "execution_success": self.execution_success,
            "response_time": self.response_time,
            "error": self.error,
            "data": self.data,
            "timestamp": self.timestamp,
        }


async def test_tool_async(tool: BaseTool, test_input: Dict[str, Any]) -> AuditResult:
    """Test a tool with async execution."""
    result = AuditResult(tool_name=tool.name, load_success=True)

    try:
        start_time = time.perf_counter()

        # Check if tool has async execute method
        if asyncio.iscoroutinefunction(tool.execute):
            result_data = await tool.execute(**test_input)
        else:
            result_data = tool.execute(**test_input)

        result.response_time = time.perf_counter() - start_time

        # Check if result is a ToolResult
        if isinstance(result_data, ToolResult):
            result.execution_success = result_data.success
            result.error = result_data.error or ""
            result.data = result_data.data
        else:
            # Handle non-ToolResult returns
            result.execution_success = True
            result.data = result_data

    except Exception as e:
        result.response_time = time.perf_counter() - start_time
        result.execution_success = False
        result.error = f"{type(e).__name__}: {str(e)}"
        result.data = traceback.format_exc()

    return result


def test_tool_sync(tool: BaseTool, test_input: Dict[str, Any]) -> AuditResult:
    """Test a tool with sync execution."""
    result = AuditResult(tool_name=tool.name, load_success=True)

    try:
        start_time = time.perf_counter()
        result_data = tool.execute(**test_input)
        result.response_time = time.perf_counter() - start_time

        # Check if result is a ToolResult
        if isinstance(result_data, ToolResult):
            result.execution_success = result_data.success
            result.error = result_data.error or ""
            result.data = result_data.data
        else:
            result.execution_success = True
            result.data = result_data

    except Exception as e:
        result.response_time = time.perf_counter() - start_time
        result.execution_success = False
        result.error = f"{type(e).__name__}: {str(e)}"
        result.data = traceback.format_exc()

    return result


def test_tool_call_method(tool: BaseTool, test_input: Dict[str, Any]) -> AuditResult:
    """Test a tool using the call method."""
    result = AuditResult(tool_name=tool.name, load_success=True)

    try:
        start_time = time.perf_counter()
        result_data = tool.call(test_input)
        result.response_time = time.perf_counter() - start_time

        if isinstance(result_data, ToolResult):
            result.execution_success = result_data.success
            result.error = result_data.error or ""
            result.data = result_data.data
        else:
            result.execution_success = True
            result.data = result_data

    except Exception as e:
        result.response_time = time.perf_counter() - start_time
        result.execution_success = False
        result.error = f"{type(e).__name__}: {str(e)}"
        result.data = traceback.format_exc()

    return result


async def audit_tool(tool: BaseTool) -> AuditResult:
    """Audit a single tool."""
    tool_name = tool.name

    # Get test input for this tool
    test_input = get_test_input(tool_name)
    if not test_input:
        # Use empty dict as default
        test_input = {}

    # Try different execution methods
    try:
        # First try the call method which handles timing and validation
        return test_tool_call_method(tool, test_input)
    except Exception as e:
        # If call method fails, try async execute
        if asyncio.iscoroutinefunction(tool.execute):
            try:
                return await test_tool_async(tool, test_input)
            except Exception as e2:
                return AuditResult(
                    tool_name=tool_name,
                    load_success=True,
                    execution_success=False,
                    error=f"Async execute failed: {type(e2).__name__}: {str(e2)}"
                )
        else:
            # Try sync execute
            try:
                return test_tool_sync(tool, test_input)
            except Exception as e2:
                return AuditResult(
                    tool_name=tool_name,
                    load_success=True,
                    execution_success=False,
                    error=f"Sync execute failed: {type(e2).__name__}: {str(e2)}"
                )


async def audit_tools() -> List[AuditResult]:
    """Audit all registered tools."""
    registry = get_registry()
    tool_names = registry.list()

    print(f"\n{'='*60}")
    print(f"STACK 2.9 TOOLS AUDIT")
    print(f"{'='*60}")
    print(f"Found {len(tool_names)} registered tools:")
    for name in sorted(tool_names):
        print(f"  - {name}")

    results = []

    for tool_name in tool_names:
        tool = registry.get(tool_name)
        if tool is None:
            print(f"\n[ERROR] Tool '{tool_name}' not found in registry")
            continue

        print(f"\n[TESTING] {tool_name}...", end=" ", flush=True)

        result = await audit_tool(tool)
        results.append(result)

        if result.execution_success:
            print(f"PASS ({result.response_time:.4f}s)")
        else:
            print(f"FAIL ({result.response_time:.4f}s)")
            if result.error:
                error_preview = result.error[:100] if len(result.error) > 100 else result.error
                print(f"       Error: {error_preview}")

    return results


def check_skills() -> Dict[str, Any]:
    """Check for available skills."""
    from tools.skill_tool import _discover_skills, SKILLS_FILE, SKILL_DIRS

    print(f"\n{'='*60}")
    print(f"SKILLS CHECK")
    print(f"{'='*60}")

    skills_info = {
        "skills_file": str(SKILLS_FILE),
        "skills_file_exists": SKILLS_FILE.exists(),
        "skill_dirs": [str(d) for d in SKILL_DIRS],
        "skill_dirs_exist": [d.exists() for d in SKILL_DIRS],
        "discovered_skills": [],
    }

    try:
        discovered = _discover_skills()
        skills_info["discovered_skills"] = discovered
        print(f"Discovered {len(discovered)} skills from directories")
        for skill in discovered:
            print(f"  - {skill['name']}: {skill.get('description', 'No description')[:50]}")
    except Exception as e:
        print(f"Error discovering skills: {e}")
        skills_info["error"] = str(e)

    return skills_info


def generate_report(results: List[AuditResult], skills_info: Dict[str, Any]) -> str:
    """Generate a comprehensive audit report."""

    # Calculate statistics
    total_tools = len(results)
    passed = sum(1 for r in results if r.execution_success)
    failed = total_tools - passed

    response_times = [r.response_time for r in results if r.response_time > 0]
    avg_response_time = sum(response_times) / len(response_times) if response_times else 0
    min_response_time = min(response_times) if response_times else 0
    max_response_time = max(response_times) if response_times else 0

    report = f"""
================================================================================
                    STACK 2.9 COMPREHENSIVE AUDIT REPORT
================================================================================

Generated: {datetime.now().isoformat()}

--------------------------------------------------------------------------------
                              TOOLS SUMMARY
--------------------------------------------------------------------------------

Total Tools Tested:     {total_tools}
Passed:                {passed}
Failed:                {failed}
Pass Rate:              {passed/total_tools*100:.1f}%

--------------------------------------------------------------------------------
                           RESPONSE TIME STATISTICS
--------------------------------------------------------------------------------

Average Response Time:  {avg_response_time:.4f}s
Minimum Response Time: {min_response_time:.4f}s
Maximum Response Time: {max_response_time:.4f}s

--------------------------------------------------------------------------------
                           DETAILED RESULTS
--------------------------------------------------------------------------------
"""

    # Sort results by tool name
    sorted_results = sorted(results, key=lambda x: x.tool_name)

    for result in sorted_results:
        status = "PASS" if result.execution_success else "FAIL"
        report += f"""

Tool: {result.tool_name}
  Status:          {status}
  Load Success:    {result.load_success}
  Response Time:   {result.response_time:.4f}s
"""
        if result.error:
            error_lines = result.error.split('\n')
            report += f"  Error:          {error_lines[0]}\n"

    # Skills section
    report += f"""

--------------------------------------------------------------------------------
                             SKILLS SUMMARY
--------------------------------------------------------------------------------

Skills File:       {skills_info.get('skills_file', 'N/A')}
Skills File Exists: {skills_info.get('skills_file_exists', False)}

Skill Directories:
"""
    for i, (dir_exists, dir_path) in enumerate(zip(skills_info.get('skill_dirs_exist', []), skills_info.get('skill_dirs', []))):
        status = "EXISTS" if dir_exists else "MISSING"
        report += f"  [{status}] {dir_path}\n"

    discovered = skills_info.get('discovered_skills', [])
    report += f"""
Discovered Skills: {len(discovered)}
"""
    for skill in discovered:
        report += f"  - {skill['name']}: {skill.get('description', 'N/A')[:50]}\n"

    if skills_info.get('error'):
        report += f"""
Skills Error: {skills_info['error']}
"""

    # Final summary
    report += f"""

================================================================================
                           END OF AUDIT REPORT
================================================================================
"""

    return report


async def main():
    """Main audit function."""
    print("\nStarting Stack 2.9 Comprehensive Audit...")
    print(f"Working directory: /Users/walidsobhi/stack-2.9")

    # Audit all tools
    results = await audit_tools()

    # Check skills
    skills_info = check_skills()

    # Generate and print report
    report = generate_report(results, skills_info)
    print(report)

    # Save report to file
    report_path = "/Users/walidsobhi/stack-2.9/audit_report.txt"
    with open(report_path, 'w') as f:
        f.write(report)
    print(f"\nReport saved to: {report_path}")

    # Save JSON results
    json_results = {
        "timestamp": datetime.now().isoformat(),
        "total_tools": len(results),
        "passed": sum(1 for r in results if r.execution_success),
        "failed": sum(1 for r in results if not r.execution_success),
        "tools": [r.to_dict() for r in results],
        "skills": skills_info,
    }
    json_path = "/Users/walidsobhi/stack-2.9/audit_results.json"
    with open(json_path, 'w') as f:
        json.dump(json_results, f, indent=2)
    print(f"JSON results saved to: {json_path}")

    # Return exit code based on failures
    failed_count = sum(1 for r in results if not r.execution_success)
    return failed_count


if __name__ == "__main__":
    failed = asyncio.run(main())
    sys.exit(0 if failed == 0 else 1)