File size: 19,923 Bytes
5374a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
import asyncio
import json
import sys
from typing import Dict
from pydantic import Field
from ..core.logging import logger
from ..core import BaseModule
from .hitl import HITLRequest, HITLResponse, HITLDecision, HITLContext, HITLInteractionType, HITLMode

class HITLManager(BaseModule):
    """
    HITL Manager - Manages Human-in-the-Loop interactions
    It must be instancialized and add as a parameter to the WorkFlow instance like: workflow = WorkFlow(graph=graph, llm=llm, agent_manager=AgentManager(agents=agents), hitl_manager=hitl_manager)
    """
    
    # Pydantic fields
    active: bool = Field(default=False, description="Whether HITL is currently active")
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._pending_requests: Dict[str, asyncio.Future] = {}
        self.hitl_input_output_mapping = {}
    
    def init_module(self):
        """Module initialization"""
        self._pending_requests: Dict[str, asyncio.Future] = {}
    
    def activate(self):
        """activate HITL feature"""
        self.active = True
        logger.info("HITL feature activated")
    
    def deactivate(self):
        """deactivate HITL feature"""
        self.active = False
        logger.info("HITL feature deactivated")
    
    @property
    def is_active(self) -> bool:
        return self.active

    async def request_approval(
        self,
        task_name: str,
        agent_name: str, 
        action_name: str,
        interaction_type: HITLInteractionType,
        mode: HITLMode,
        action_inputs_data: dict = None,
        execution_result = None,
        workflow_goal: str = None,
        display_context: Dict = None,
        timeout: float = 1800.0
    ) -> HITLResponse:
        """Request human approval"""
        
        if not self.active:
            # HITL is not active, auto-approved
            return HITLResponse(
                request_id="auto_approved",
                decision=HITLDecision.APPROVE,
                feedback="HITL not active, auto-approved"
            )
        
        # create HITL context
        context = HITLContext(
            task_name=task_name,
            agent_name=agent_name,
            action_name=action_name,
            workflow_goal=workflow_goal,
            action_inputs=action_inputs_data or {},
            execution_result=execution_result,
            display_context=display_context or {}
        )
        
        # generate prompt message
        prompt_message = self._generate_prompt_message(interaction_type, mode, context)
        
        # create request
        request = HITLRequest(
            interaction_type=interaction_type,
            mode=mode,
            context=context,
            prompt_message=prompt_message
        )
        
        # create Future to wait for response
        future = asyncio.Future()
        self._pending_requests[request.request_id] = future
        
        # display request and wait for response in CLI
        try:
            response = await self._handle_cli_interaction(request, timeout)
            future.set_result(response)
            return response
        except asyncio.TimeoutError:
            response = HITLResponse(
                request_id=request.request_id,
                decision=HITLDecision.REJECT,
                feedback="Timeout: No human response received"
            )
            future.set_result(response)
            return response
        finally:
            # clean up
            self._pending_requests.pop(request.request_id, None)
    
    async def _handle_cli_interaction(self, request: HITLRequest, timeout: float) -> HITLResponse:
        """handle cli interaction"""
        
        print("\n" + "="*80)
        print("🔔 Human-in-the-Loop approval request")
        print("="*80)
        print(request.prompt_message)
        print("="*80)
        
        try:
            if request.interaction_type == HITLInteractionType.APPROVE_REJECT:
                return await self._handle_approve_reject(request)
            elif request.interaction_type == HITLInteractionType.REVIEW_EDIT_STATE:
                return await self._handle_review_edit(request)
            elif request.interaction_type == HITLInteractionType.REVIEW_TOOL_CALLS:
                return await self._handle_tool_calls(request)
            elif request.interaction_type == HITLInteractionType.MULTI_TURN_CONVERSATION:
                return await self._handle_conversation(request)
            else:
                return HITLResponse(
                    request_id=request.request_id,
                    decision=HITLDecision.REJECT,
                    feedback="Unknown interaction type"
                )
        except Exception as e:
            logger.error(f"CLI interaction error: {e}")
            return HITLResponse(
                request_id=request.request_id,
                decision=HITLDecision.REJECT,
                feedback=f"Error: {str(e)}"
            )
    
    async def _handle_approve_reject(self, request: HITLRequest) -> HITLResponse:
        """handle approve/reject"""
        
        def get_user_input():
            while True:
                choice = input("\nPlease select [a]pprove / [r]eject: ").lower().strip()
                if choice in ['a', 'approve']:
                    return HITLDecision.APPROVE
                elif choice in ['r', 'reject']:
                    return HITLDecision.REJECT
                print("Invalid input, please input 'a' or 'r'")
        
        # run blocking input in event loop
        loop = asyncio.get_event_loop()
        decision = await loop.run_in_executor(None, get_user_input)
        
        feedback = ""
        if decision == HITLDecision.REJECT:
            def get_feedback():
                return input("Please provide the reason for rejection (optional): ").strip()
            feedback = await loop.run_in_executor(None, get_feedback)
        
        return HITLResponse(
            request_id=request.request_id,
            decision=decision,
            feedback=feedback if feedback else None
        )
    
    async def _handle_review_edit(self, request: HITLRequest) -> HITLResponse:
        """handle review edit"""
        # TODO: implement review edit
        raise NotImplementedError("Not implemented HITL type: HITLInteractionType.REVIEW_EDIT_STATE")
        # def get_user_input():
        #     print(f"\nCurrent execution result:")
        #     result = request.context.execution_result
        #     if isinstance(result, dict):
        #         print(json.dumps(result, ensure_ascii=False, indent=2))
        #     else:
        #         print(str(result))
            
        #     while True:
        #         choice = input("\nPlease select [a]pprove / [m]odify / [r]eject: ").lower().strip()
        #         if choice in ['a', 'approve']:
        #             return HITLDecision.APPROVE, None
        #         elif choice in ['r', 'reject']:
        #             return HITLDecision.REJECT, None
        #         elif choice in ['m', 'modify']:
        #             new_content = input("Please input the modified content (JSON format): ").strip()
        #             try:
        #                 if new_content:
        #                     modified = json.loads(new_content)
        #                     return HITLDecision.MODIFY, modified
        #                 else:
        #                     return HITLDecision.APPROVE, None
        #             except json.JSONDecodeError:
        #                 print("JSON format error, please input again")
        #                 continue
        #         print("Invalid input, please input 'a', 'm' or 'r'")
        
        # loop = asyncio.get_event_loop()
        # decision, modified_content = await loop.run_in_executor(None, get_user_input)
        
        # return HITLResponse(
        #     request_id=request.request_id,
        #     decision=decision,
        #     modified_content=modified_content
        # )
    
    async def _handle_tool_calls(self, request: HITLRequest) -> HITLResponse:
        """handle tool calls review"""
        # TODO: implement tool calls review
        raise NotImplementedError("Not implemented HITL type: HITLInteractionType.REVIEW_TOOL_CALLS")
        # def get_user_input():
        #     tool_calls = request.context.action_inputs.get('tool_calls', [])
        #     print(f"\nTool calls:")
        #     print(json.dumps(tool_calls, ensure_ascii=False, indent=2))
            
        #     while True:
        #         choice = input("\nPlease select [a]pprove / [m]odify / [r]eject: ").lower().strip()
        #         if choice in ['a', 'approve']:
        #             return HITLDecision.APPROVE, None
        #         elif choice in ['r', 'reject']:
        #             return HITLDecision.REJECT, None
        #         elif choice in ['m', 'modify']:
        #             new_calls = input("Please input the modified tool calls (JSON format): ").strip()
        #             try:
        #                 if new_calls:
        #                     modified = json.loads(new_calls)
        #                     return HITLDecision.MODIFY, modified
        #                 else:
        #                     return HITLDecision.APPROVE, None
        #             except json.JSONDecodeError:
        #                 print("JSON format
        
        # loop = asyncio.get_event_loop()
        # decision, modified_content = await loop.run_in_executor(None, get_user_input)
        
        # return HITLResponse(
        #     request_id=request.request_id,
        #     decision=decision,
        #     modified_content=modified_content
        # )
    
    async def _handle_conversation(self, request: HITLRequest) -> HITLResponse:
        """handle multi-turn conversation"""
        # TODO: implement multi-turn conversation
        raise NotImplementedError("Not implemented HITL type: HITLInteractionType.MULTI_TURN_CONVERSATION")
        # def get_user_input():
        #     user_input = input("\nPlease input the guidance content (or 'continue' to continue): ").strip()
        #     if user_input.lower() == 'continue':
        #         return HITLDecision.CONTINUE, None
        #     elif user_input.lower() in ['reject', 'stop']:
        #         return HITLDecision.REJECT, None
        #     else:
        #         return HITLDecision.MODIFY, user_input
        
        # loop = asyncio.get_event_loop()
        # decision, content = await loop.run_in_executor(None, get_user_input)
        
        # return HITLResponse(
        #     request_id=request.request_id,
        #     decision=decision,
        #     modified_content=content,
        #     feedback=content
        # )
    
    def _generate_prompt_message(
        self, 
        interaction_type: HITLInteractionType, 
        mode: HITLMode, 
        context: HITLContext
    ) -> str:
        """generate prompt message"""
        
        base_info = f"""
Task: {context.task_name}
Agent: {context.agent_name}
Action: {context.action_name}
Workflow Goal: {context.workflow_goal or 'N/A'}
Mode: {'Pre-Execution Approval' if mode == HITLMode.PRE_EXECUTION else 'Post-Execution Review'}
"""
        
        if mode == HITLMode.PRE_EXECUTION:
            base_info += f"\nparameters to be executed:\n{json.dumps(context.action_inputs, ensure_ascii=False, indent=2)}"
        else:
            base_info += f"\nexecution_result:\n{json.dumps(context.execution_result, ensure_ascii=False, indent=2) if context.execution_result else 'None'}"
        
        return base_info


    async def request_user_input(
        self,
        task_name: str,
        agent_name: str,
        action_name: str,
        input_fields: dict,
        workflow_goal: str = None,
        display_context: dict = None,
        timeout: float = 3600.0
    ) -> HITLResponse:
        """Request user input based on predefined fields"""
        
        if not self.active:
            # HITL is not active, return empty input
            return HITLResponse(
                request_id="auto_approved",
                decision=HITLDecision.CONTINUE,
                modified_content={},
                feedback="HITL not active, returning empty input"
            )
        
        # create HITL context
        context = HITLContext(
            task_name=task_name,
            agent_name=agent_name,
            action_name=action_name,
            workflow_goal=workflow_goal,
            action_inputs={"input_fields": input_fields},
            execution_result=None,
            display_context=display_context or {}
        )
        
        # generate prompt message for user input collection
        prompt_message = self._generate_user_input_prompt_message(context, input_fields)
        
        # create request
        request = HITLRequest(
            interaction_type=HITLInteractionType.COLLECT_USER_INPUT,
            mode=HITLMode.PRE_EXECUTION,
            context=context,
            prompt_message=prompt_message
        )
        
        # create Future to wait for response
        future = asyncio.Future()
        self._pending_requests[request.request_id] = future
        
        # display request and wait for response in CLI
        try:
            response = await self._handle_user_input_collection(request, input_fields, timeout)
            future.set_result(response)
            return response
        except asyncio.TimeoutError:
            response = HITLResponse(
                request_id=request.request_id,
                decision=HITLDecision.REJECT,
                feedback="Timeout: No user input received"
            )
            future.set_result(response)
            return response
        finally:
            # clean up
            self._pending_requests.pop(request.request_id, None)

    async def _handle_user_input_collection(self, request: HITLRequest, input_fields: dict, timeout: float) -> HITLResponse:
        """Handle user input collection"""
        
        print("\n" + "="*80)
        print("📝 User input collection request")
        print("="*80)
        print(request.prompt_message)
        print("="*80)
        
        try:
            def get_user_inputs():
                collected_inputs = {}
                
                print("\nPlease provide the following inputs:")
                for field_name, field_info in input_fields.items():
                    field_type = field_info.get('type', 'string')
                    description = field_info.get('description', '')
                    required = field_info.get('required', True)
                    default_value = field_info.get('default', None)
                    
                    while True:
                        prompt_text = f"\n{field_name}"
                        if description:
                            prompt_text += f" ({description})"
                        if not required:
                            prompt_text += " [optional]"
                        if default_value is not None:
                            prompt_text += f" [default: {default_value}]"
                        prompt_text += ": "
                        
                        user_input = input(prompt_text).strip()
                        
                        # Handle empty input
                        if not user_input:
                            if not required and default_value is not None:
                                user_input = str(default_value)
                            elif not required:
                                user_input = ""
                            else:
                                print(f"Field '{field_name}' is required, please provide input.")
                                continue
                        
                        # Type conversion and validation
                        try:
                            if field_type == 'int':
                                # collected_inputs[field_name] = int(user_input) if user_input else None
                                collected_inputs[field_name] = str(user_input) if user_input else None  # prepare_action_prompt do not accept int
                            elif field_type == 'float':
                                # collected_inputs[field_name] = float(user_input) if user_input else None
                                collected_inputs[field_name] = str(user_input) if user_input else None
                            elif field_type == 'bool':
                                collected_inputs[field_name] = user_input.lower() in ['true', '1', 'yes', 'y'] if user_input else None
                            else:  # string or others
                                collected_inputs[field_name] = user_input
                            break
                        except ValueError:
                            print(f"Input format error, field '{field_name}' needs {field_type} type value.")
                            continue
                
                # Confirmation
                print("\nCollected inputs:")
                for field_name, value in collected_inputs.items():
                    print(f"  {field_name}: {value}")
                
                while True:
                    confirm = input("\nConfirm these inputs? [y]es / [n]o / [r]etry: ").lower().strip()
                    if confirm in ['y', 'yes']:
                        return collected_inputs
                    elif confirm in ['n', 'no']:
                        sys.exit()
                    elif confirm in ['r', 'retry']:
                        return get_user_inputs()  # Retry input collection
                    else:
                        print("Invalid input, please input 'y', 'n' or 'r'")
            
            # Run blocking input in event loop
            loop = asyncio.get_event_loop()
            collected_data = await loop.run_in_executor(None, get_user_inputs)
            
            if collected_data is not None:
                return HITLResponse(
                    request_id=request.request_id,
                    decision=HITLDecision.CONTINUE,
                    modified_content=collected_data,
                    feedback="User input collection completed"
                )
            else:
                return HITLResponse(
                    request_id=request.request_id,
                    decision=HITLDecision.REJECT,
                    feedback="User cancelled input"
                )
                
        except Exception as e:
            logger.error(f"User input collection error: {e}")
            return HITLResponse(
                request_id=request.request_id,
                decision=HITLDecision.REJECT,
                feedback=f"Error: {str(e)}"
            )

    def _generate_user_input_prompt_message(self, context: HITLContext, input_fields: dict) -> str:
        """Generate prompt message for user input collection"""
        
        base_info = f"""
Task: {context.task_name}
Agent: {context.agent_name}
Action: {context.action_name}
Workflow Goal: {context.workflow_goal or 'N/A'}

User input fields to be collected:
"""
        
        for field_name, field_info in input_fields.items():
            field_type = field_info.get('type', 'string')
            description = field_info.get('description', '')
            required = field_info.get('required', True)
            default_value = field_info.get('default', None)
            
            base_info += f"\n- {field_name} ({field_type})"
            if description:
                base_info += f": {description}"
            if not required:
                base_info += " [optional]"
            if default_value is not None:
                base_info += f" [default: {default_value}]"
        
        return base_info