# # example_usage.py # import asyncio # import traceback # from agents import Runner, RunContextWrapper # from agents.exceptions import InputGuardrailTripwireTriggered # from openai.types.responses import ResponseTextDeltaEvent # from chatbot.chatbot_agent import innscribe_assistant # async def query_innscribe_bot(user_message: str, stream: bool = True): # """ # Query the Innoscribe bot with optional streaming (ChatGPT-style chunk-by-chunk output). # Args: # user_message: The user's message/query # stream: If True, stream responses chunk by chunk like ChatGPT. If False, wait for complete response. # Returns: # The final output from the agent # """ # try: # ctx = RunContextWrapper(context={}) # if stream: # # ChatGPT-style streaming: clean output, text appears chunk by chunk # result = Runner.run_streamed( # innscribe_assistant, # input=user_message, # context=ctx.context # ) # # Stream text chunk by chunk in real-time (like ChatGPT) # async for event in result.stream_events(): # if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): # delta = event.data.delta # if delta: # # Print each chunk immediately as it arrives (ChatGPT-style) # print(delta, end="", flush=True) # print("\n") # New line after streaming completes # return result.final_output # else: # # Non-streaming mode: wait for complete response # response = await Runner.run( # innscribe_assistant, # input=user_message, # context=ctx.context # ) # return response.final_output # except InputGuardrailTripwireTriggered as e: # print(f"\nāš ļø Guardrail blocked the query: {e}") # if hasattr(e, 'result') and hasattr(e.result, 'output_info'): # print(f"Guardrail reason: {e.result.output_info}") # print("The query was determined to be unrelated to Innoscribe services.") # return None # except Exception as e: # print(f"\nāŒ Error: {e}") # print(traceback.format_exc()) # raise # async def interactive_chat(): # """ # Interactive ChatGPT-style conversation loop. # Type 'exit', 'quit', or 'bye' to end the conversation. # """ # print("=" * 60) # print("šŸ¤– Innoscribe Assistant - ChatGPT-style Chat") # print("Type 'exit', 'quit', or 'bye' to end the conversation") # print("=" * 60) # print() # while True: # try: # user_message = input("šŸ‘¤ You: ").strip() # # Check for exit commands # if user_message.lower() in ['exit', 'quit', 'bye', '']: # print("\nšŸ‘‹ Goodbye! Have a great day!") # break # # Display assistant prefix and stream response # print("šŸ¤– Assistant: ", end="", flush=True) # # Stream response chunk by chunk (ChatGPT-style) # response = await query_innscribe_bot(user_message, stream=True) # print() # Empty line between messages # except KeyboardInterrupt: # print("\n\nšŸ‘‹ Conversation interrupted. Goodbye!") # break # except Exception as e: # print(f"\nāŒ Error: {e}") # print("Please try again or type 'exit' to quit.\n") # async def main(): # try: # # Option 1: Single message example (ChatGPT-style streaming) # user_message = "Hello, how can I help you?" # print(f"šŸ‘¤ You: {user_message}\n") # print("šŸ¤– Assistant: ", end="", flush=True) # # Stream response chunk by chunk (ChatGPT-style) # response = await query_innscribe_bot(user_message, stream=True) # # Option 2: Uncomment below to use interactive chat mode instead # # await interactive_chat() # except Exception as e: # print(f"\nāŒ Error: {e}") # print(traceback.format_exc()) # if __name__ == "__main__": # try: # asyncio.run(main()) # except Exception as e: # print(f"Fatal error: {e}") # print(traceback.format_exc()) # example_usage.py import asyncio import traceback from agents import Runner, RunContextWrapper from agents.exceptions import InputGuardrailTripwireTriggered from openai.types.responses import ResponseTextDeltaEvent from chatbot.chatbot_agent import jobobike_assistant async def query_jobobike_bot(user_message: str, stream: bool = True): """ Query the JOBObike bot with optional streaming (ChatGPT-style chunk-by-chunk output). Args: user_message: The user's message/query stream: If True, stream responses chunk by chunk like ChatGPT. If False, wait for complete response. Returns: The final output from the agent """ try: ctx = RunContextWrapper(context={}) if stream: # ChatGPT-style streaming: clean output, text appears chunk by chunk result = Runner.run_streamed( jobobike_assistant, input=user_message, context=ctx.context ) # Stream text chunk by chunk in real-time (like ChatGPT) async for event in result.stream_events(): if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): delta = event.data.delta if delta: # Print each chunk immediately as it arrives (ChatGPT-style) print(delta, end="", flush=True) print("\n") # New line after streaming completes return result.final_output else: # Non-streaming mode: wait for complete response response = await Runner.run( jobobike_assistant, input=user_message, context=ctx.context ) return response.final_output except InputGuardrailTripwireTriggered as e: print(f"\nāš ļø Guardrail blocked the query: {e}") if hasattr(e, 'result') and hasattr(e.result, 'output_info'): print(f"Guardrail reason: {e.result.output_info}") print("The query was determined to be unrelated to JOBObike services.") return None except Exception as e: print(f"\nāŒ Error: {e}") print(traceback.format_exc()) raise async def interactive_chat(): """ Interactive ChatGPT-style conversation loop. Type 'exit', 'quit', or 'bye' to end the conversation. """ print("=" * 60) print("šŸ¤– JOBObike Assistant - ChatGPT-style Chat") print("Type 'exit', 'quit', or 'bye' to end the conversation") print("=" * 60) print() while True: try: user_message = input("šŸ‘¤ You: ").strip() # Check for exit commands if user_message.lower() in ['exit', 'quit', 'bye', '']: print("\nšŸ‘‹ Goodbye! Have a great day!") break # Display assistant prefix and stream response print("šŸ¤– Assistant: ", end="", flush=True) # Stream response chunk by chunk (ChatGPT-style) response = await query_jobobike_bot(user_message, stream=True) print() # Empty line between messages except KeyboardInterrupt: print("\n\nšŸ‘‹ Conversation interrupted. Goodbye!") break except Exception as e: print(f"\nāŒ Error: {e}") print("Please try again or type 'exit' to quit.\n") async def main(): try: # Option 1: Single message example (ChatGPT-style streaming) user_message = "Hello, tell me about your services." print(f"šŸ‘¤ You: {user_message}\n") print("šŸ¤– Assistant: ", end="", flush=True) # Stream response chunk by chunk (ChatGPT-style) response = await query_jobobike_bot(user_message, stream=True) # Option 2: Uncomment below to use interactive chat mode instead # await interactive_chat() except Exception as e: print(f"\nāŒ Error: {e}") print(traceback.format_exc()) if __name__ == "__main__": try: asyncio.run(main()) except Exception as e: print(f"Fatal error: {e}") print(traceback.format_exc())