File size: 19,194 Bytes
8d1819a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
# noqa: D401 (docstrings) – internal helper
import asyncio
import uuid
import atexit
from typing import Any, List
import contextlib
import threading

from python.helpers import settings
from starlette.requests import Request

# Local imports
from python.helpers.print_style import PrintStyle
from agent import AgentContext, UserMessage, AgentContextType
from initialize import initialize_agent
from python.helpers.persist_chat import remove_chat

# Import FastA2A
try:
    from fasta2a import Worker, FastA2A  # type: ignore
    from fasta2a.broker import InMemoryBroker  # type: ignore
    from fasta2a.storage import InMemoryStorage  # type: ignore
    from fasta2a.schema import Message, Artifact, AgentProvider, Skill  # type: ignore
    FASTA2A_AVAILABLE = True
except ImportError:  # pragma: no cover – library not installed
    FASTA2A_AVAILABLE = False
    # Minimal stubs for type checkers when FastA2A is not available

    class Worker:  # type: ignore
        def __init__(self, **kwargs):
            pass

        async def run_task(self, params):
            pass

        async def cancel_task(self, params):
            pass

        def build_message_history(self, history):
            return []

        def build_artifacts(self, result):
            return []

    class FastA2A:  # type: ignore
        def __init__(self, **kwargs):
            pass

        async def __call__(self, scope, receive, send):
            pass

    class InMemoryBroker:  # type: ignore
        pass

    class InMemoryStorage:  # type: ignore
        async def update_task(self, **kwargs):
            pass

    Message = Artifact = AgentProvider = Skill = Any  # type: ignore

_PRINTER = PrintStyle(italic=True, font_color="purple", padding=False)


class AgentZeroWorker(Worker):  # type: ignore[misc]
    """Agent Zero implementation of FastA2A Worker."""

    def __init__(self, broker, storage):
        super().__init__(broker=broker, storage=storage)
        self.storage = storage

    async def run_task(self, params: Any) -> None:  # params: TaskSendParams
        """Execute a task by processing the message through Agent Zero."""
        context = None
        try:
            task_id = params['id']
            message = params['message']

            _PRINTER.print(f"[A2A] Processing task {task_id} with new temporary context")

            # Convert A2A message to Agent Zero format
            agent_message = self._convert_message(message)

            # Always create new temporary context for this A2A conversation
            cfg = initialize_agent()
            context = AgentContext(cfg, type=AgentContextType.BACKGROUND)

            # Log user message so it appears instantly in UI chat window
            context.log.log(
                type="user",  # type: ignore[arg-type]
                heading="Remote user message",
                content=agent_message.message,
                kvps={"from": "A2A"},
                temp=False,
            )

            # Process message through Agent Zero (includes response)
            task = context.communicate(agent_message)
            result_text = await task.result()

            # Build A2A message from result
            response_message: Message = {  # type: ignore
                'role': 'agent',
                'parts': [{'kind': 'text', 'text': str(result_text)}],
                'kind': 'message',
                'message_id': str(uuid.uuid4())
            }

            await self.storage.update_task(  # type: ignore[attr-defined]
                task_id=task_id,
                state='completed',
                new_messages=[response_message]
            )

            # Clean up context like non-persistent MCP chats
            context.reset()
            AgentContext.remove(context.id)
            remove_chat(context.id)

            _PRINTER.print(f"[A2A] Completed task {task_id} and cleaned up context")

        except Exception as e:
            _PRINTER.print(f"[A2A] Error processing task {params.get('id', 'unknown')}: {e}")
            await self.storage.update_task(
                task_id=params.get('id', 'unknown'),
                state='failed'
            )

            # Clean up context even on failure to prevent resource leaks
            if context:
                context.reset()
                AgentContext.remove(context.id)
                remove_chat(context.id)
                _PRINTER.print(f"[A2A] Cleaned up failed context {context.id}")

    async def cancel_task(self, params: Any) -> None:  # params: TaskIdParams
        """Cancel a running task."""
        task_id = params['id']
        _PRINTER.print(f"[A2A] Cancelling task {task_id}")
        await self.storage.update_task(task_id=task_id, state='canceled')  # type: ignore[attr-defined]

        # Note: No context cleanup needed since contexts are always temporary and cleaned up in run_task

    def build_message_history(self, history: List[Any]) -> List[Message]:  # type: ignore
        # Not used in this simplified implementation
        return []

    def build_artifacts(self, result: Any) -> List[Artifact]:  # type: ignore
        # No artifacts for now
        return []

    def _convert_message(self, a2a_message: Message) -> UserMessage:  # type: ignore
        """Convert A2A message to Agent Zero UserMessage."""
        # Extract text from message parts
        text_parts = [part.get('text', '') for part in a2a_message.get('parts', []) if part.get('kind') == 'text']
        message_text = '\n'.join(text_parts)

        # Extract file attachments
        attachments = []
        for part in a2a_message.get('parts', []):
            if part.get('kind') == 'file':
                file_info = part.get('file', {})
                if 'uri' in file_info:
                    attachments.append(file_info['uri'])

        return UserMessage(
            message=message_text,
            attachments=attachments
        )


class DynamicA2AProxy:
    """Dynamic proxy for FastA2A server that allows reconfiguration."""

    _instance = None

    def __init__(self):
        self.app = None
        self.token = ""
        self._lock = threading.Lock()  # Use threading.Lock instead of asyncio.Lock
        self._startup_done: bool = False
        self._worker_bg_task: asyncio.Task | None = None
        self._reconfigure_needed: bool = False  # Flag for deferred reconfiguration

        if FASTA2A_AVAILABLE:
            # Initialize with default token
            cfg = settings.get_settings()
            self.token = cfg.get("mcp_server_token", "")
            self._configure()
            self._register_shutdown()
        else:
            _PRINTER.print("[A2A] FastA2A not available, server will return 503")

    @staticmethod
    def get_instance():
        if DynamicA2AProxy._instance is None:
            DynamicA2AProxy._instance = DynamicA2AProxy()
        return DynamicA2AProxy._instance

    def reconfigure(self, token: str):
        """Reconfigure the FastA2A server with new token."""
        self.token = token
        if FASTA2A_AVAILABLE:
            with self._lock:
                # Mark that reconfiguration is needed - will be done on next request
                self._reconfigure_needed = True
                self._startup_done = False  # Force restart on next request
                _PRINTER.print("[A2A] Reconfiguration scheduled for next request")

    def _configure(self):
        """Configure the FastA2A application with Agent Zero integration."""
        try:
            storage = InMemoryStorage()  # type: ignore[arg-type]
            broker = InMemoryBroker()  # type: ignore[arg-type]

            # Define Agent Zero's skills
            skills: List[Skill] = [{  # type: ignore
                "id": "general_assistance",
                "name": "General AI Assistant",
                "description": "Provides general AI assistance including code execution, file management, web browsing, and problem solving",
                "tags": ["ai", "assistant", "code", "files", "web", "automation"],
                "examples": [
                    "Write and execute Python code",
                    "Manage files and directories",
                    "Browse the web and extract information",
                    "Solve complex problems step by step",
                    "Install software and manage systems"
                ],
                "input_modes": ["text/plain", "application/octet-stream"],
                "output_modes": ["text/plain", "application/json"]
            }]

            provider: AgentProvider = {  # type: ignore
                "organization": "Agent Zero",
                "url": "https://github.com/frdel/agent-zero"
            }

            # Create new FastA2A app with proper thread safety
            new_app = FastA2A(  # type: ignore
                storage=storage,
                broker=broker,
                name="Agent Zero",
                description=(
                    "A general AI assistant that can execute code, manage files, browse the web, and "
                    "solve complex problems in an isolated Linux environment."
                ),
                version="1.0.0",
                provider=provider,
                skills=skills,
                lifespan=None,  # We manage lifespan manually
                middleware=[],  # No middleware - we handle auth in wrapper
            )

            # Store for later lazy startup (needs active event-loop)
            self._storage = storage  # type: ignore[attr-defined]
            self._broker = broker  # type: ignore[attr-defined]
            self._worker = AgentZeroWorker(broker=broker, storage=storage)  # type: ignore[attr-defined]

            # Atomic update of the app
            self.app = new_app

            # _PRINTER.print("[A2A] FastA2A server configured successfully")

        except Exception as e:
            _PRINTER.print(f"[A2A] Failed to configure FastA2A server: {e}")
            self.app = None
            raise

    # ---------------------------------------------------------------------
    # Shutdown handling
    # ---------------------------------------------------------------------

    def _register_shutdown(self):
        """Register an atexit hook to gracefully stop worker & task manager."""

        def _sync_shutdown():
            try:
                if not self._startup_done or not FASTA2A_AVAILABLE:
                    return
                loop = asyncio.new_event_loop()
                loop.run_until_complete(self._async_shutdown())
                loop.close()
            except Exception:
                pass  # ignore errors during interpreter shutdown

        atexit.register(_sync_shutdown)

    async def _async_shutdown(self):
        """Async shutdown: cancel worker task & close task manager."""
        if self._worker_bg_task and not self._worker_bg_task.done():
            self._worker_bg_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._worker_bg_task
        try:
            if hasattr(self, 'app') and self.app:
                await self.app.task_manager.__aexit__(None, None, None)  # type: ignore[attr-defined]
        except Exception:
            pass

    async def _async_reconfigure(self):
        """Perform async reconfiguration with proper lifecycle management."""
        _PRINTER.print("[A2A] Starting async reconfiguration")

        # Shutdown existing components
        await self._async_shutdown()

        # Reset startup state
        self._startup_done = False
        self._worker_bg_task = None

        # Reconfigure with new token
        self._configure()

        # Restart components
        await self._startup()

        # Clear reconfiguration flag
        self._reconfigure_needed = False

        _PRINTER.print("[A2A] Async reconfiguration completed")

    async def _startup(self):
        """Ensure TaskManager and Worker are running inside current event-loop."""
        if self._startup_done or not FASTA2A_AVAILABLE:
            return
        self._startup_done = True

        # Start task manager
        await self.app.task_manager.__aenter__()  # type: ignore[attr-defined]

        async def _worker_loop():
            async with self._worker.run():  # type: ignore[attr-defined]
                await asyncio.Event().wait()

        # fire-and-forget background task – keep reference
        self._worker_bg_task = asyncio.create_task(_worker_loop())
        _PRINTER.print("[A2A] Worker & TaskManager started")

    async def __call__(self, scope, receive, send):
        """ASGI application interface with token-based routing."""
        if not FASTA2A_AVAILABLE:
            # FastA2A not available, return 503
            response = b'HTTP/1.1 503 Service Unavailable\r\n\r\nFastA2A not available'
            await send({
                'type': 'http.response.start',
                'status': 503,
                'headers': [[b'content-type', b'text/plain']],
            })
            await send({
                'type': 'http.response.body',
                'body': response,
            })
            return

        from python.helpers import settings
        cfg = settings.get_settings()
        if not cfg["a2a_server_enabled"]:
            response = b'HTTP/1.1 403 Forbidden\r\n\r\nA2A server is disabled'
            await send({
                'type': 'http.response.start',
                'status': 403,
                'headers': [[b'content-type', b'text/plain']],
            })
            await send({
                'type': 'http.response.body',
                'body': response,
            })
            return

        # Check if reconfiguration is needed
        if self._reconfigure_needed:
            try:
                await self._async_reconfigure()
            except Exception as e:
                _PRINTER.print(f"[A2A] Error during reconfiguration: {e}")
                # Return 503 if reconfiguration failed
                await send({
                    'type': 'http.response.start',
                    'status': 503,
                    'headers': [[b'content-type', b'text/plain']],
                })
                await send({
                    'type': 'http.response.body',
                    'body': b'FastA2A reconfiguration failed',
                })
                return

        if self.app is None:
            # FastA2A not configured, return 503
            response = b'HTTP/1.1 503 Service Unavailable\r\n\r\nFastA2A not configured'
            await send({
                'type': 'http.response.start',
                'status': 503,
                'headers': [[b'content-type', b'text/plain']],
            })
            await send({
                'type': 'http.response.body',
                'body': response,
            })
            return

        # Lazy-start background components the first time we get a request
        if not self._startup_done:
            try:
                _PRINTER.print("[A2A] Starting up FastA2A components")
                await self._startup()
            except Exception as e:
                _PRINTER.print(f"[A2A] Error during startup: {e}")
                # Return 503 if startup failed
                await send({
                    'type': 'http.response.start',
                    'status': 503,
                    'headers': [[b'content-type', b'text/plain']],
                })
                await send({
                    'type': 'http.response.body',
                    'body': b'FastA2A startup failed',
                })
                return

        # Handle token-based routing: /a2a/t-{token}/... or /t-{token}/...
        path = scope.get('path', '')

        # Strip /a2a prefix if present (DispatcherMiddleware doesn't always strip it)
        if path.startswith('/a2a'):
            path = path[4:]  # Remove '/a2a' prefix

        # Check if path matches token pattern /t-{token}/
        if path.startswith('/t-'):
            # Extract token from path
            if '/' in path[3:]:
                path_parts = path[3:].split('/', 1)  # Remove '/t-' prefix
                request_token = path_parts[0]
                remaining_path = '/' + path_parts[1] if len(path_parts) > 1 else '/'
            else:
                request_token = path[3:]
                remaining_path = '/'

            # Validate token
            cfg = settings.get_settings()
            expected_token = cfg.get("mcp_server_token")

            if expected_token and request_token != expected_token:
                # Invalid token, return 401
                await send({
                    'type': 'http.response.start',
                    'status': 401,
                    'headers': [[b'content-type', b'text/plain']],
                })
                await send({
                    'type': 'http.response.body',
                    'body': b'Unauthorized',
                })
                return

            # Update scope with cleaned path
            scope = dict(scope)
            scope['path'] = remaining_path
        else:
            # No token in path, check other auth methods
            request = Request(scope, receive=receive)

            cfg = settings.get_settings()
            expected = cfg.get("mcp_server_token")

            if expected:
                auth_header = request.headers.get("Authorization", "")
                api_key = request.headers.get("X-API-KEY") or request.query_params.get("api_key")

                is_authorized = (
                    (auth_header.startswith("Bearer ") and auth_header.split(" ", 1)[1] == expected) or
                    (api_key == expected)
                )

                if not is_authorized:
                    # No valid auth, return 401
                    await send({
                        'type': 'http.response.start',
                        'status': 401,
                        'headers': [[b'content-type', b'text/plain']],
                    })
                    await send({
                        'type': 'http.response.body',
                        'body': b'Unauthorized',
                    })
                    return
            else:
                _PRINTER.print("[A2A] No expected token found in settings")

        # Delegate to FastA2A app with cleaned scope
        with self._lock:
            app = self.app
        if app:
            await app(scope, receive, send)
        else:
            # App not configured, return 503
            await send({
                'type': 'http.response.start',
                'status': 503,
                'headers': [[b'content-type', b'text/plain']],
            })
            await send({
                'type': 'http.response.body',
                'body': b'FastA2A app not configured',
            })
            return


def is_available():
    """Check if FastA2A is available and properly configured."""
    return FASTA2A_AVAILABLE and DynamicA2AProxy.get_instance().app is not None


def get_proxy():
    """Get the FastA2A proxy instance."""
    return DynamicA2AProxy.get_instance()