Spaces:
Sleeping
Sleeping
| # # example_usage.py | |
| # import asyncio | |
| # import traceback | |
| # from agents import Runner, RunContextWrapper | |
| # from agents.exceptions import InputGuardrailTripwireTriggered | |
| # from openai.types.responses import ResponseTextDeltaEvent | |
| # from chatbot.chatbot_agent import innscribe_assistant | |
| # async def query_innscribe_bot(user_message: str, stream: bool = True): | |
| # """ | |
| # Query the Innoscribe bot with optional streaming (ChatGPT-style chunk-by-chunk output). | |
| # Args: | |
| # user_message: The user's message/query | |
| # stream: If True, stream responses chunk by chunk like ChatGPT. If False, wait for complete response. | |
| # Returns: | |
| # The final output from the agent | |
| # """ | |
| # try: | |
| # ctx = RunContextWrapper(context={}) | |
| # if stream: | |
| # # ChatGPT-style streaming: clean output, text appears chunk by chunk | |
| # result = Runner.run_streamed( | |
| # innscribe_assistant, | |
| # input=user_message, | |
| # context=ctx.context | |
| # ) | |
| # # Stream text chunk by chunk in real-time (like ChatGPT) | |
| # async for event in result.stream_events(): | |
| # if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | |
| # delta = event.data.delta | |
| # if delta: | |
| # # Print each chunk immediately as it arrives (ChatGPT-style) | |
| # print(delta, end="", flush=True) | |
| # print("\n") # New line after streaming completes | |
| # return result.final_output | |
| # else: | |
| # # Non-streaming mode: wait for complete response | |
| # response = await Runner.run( | |
| # innscribe_assistant, | |
| # input=user_message, | |
| # context=ctx.context | |
| # ) | |
| # return response.final_output | |
| # except InputGuardrailTripwireTriggered as e: | |
| # print(f"\nβ οΈ Guardrail blocked the query: {e}") | |
| # if hasattr(e, 'result') and hasattr(e.result, 'output_info'): | |
| # print(f"Guardrail reason: {e.result.output_info}") | |
| # print("The query was determined to be unrelated to Innoscribe services.") | |
| # return None | |
| # except Exception as e: | |
| # print(f"\nβ Error: {e}") | |
| # print(traceback.format_exc()) | |
| # raise | |
| # async def interactive_chat(): | |
| # """ | |
| # Interactive ChatGPT-style conversation loop. | |
| # Type 'exit', 'quit', or 'bye' to end the conversation. | |
| # """ | |
| # print("=" * 60) | |
| # print("π€ Innoscribe Assistant - ChatGPT-style Chat") | |
| # print("Type 'exit', 'quit', or 'bye' to end the conversation") | |
| # print("=" * 60) | |
| # print() | |
| # while True: | |
| # try: | |
| # user_message = input("π€ You: ").strip() | |
| # # Check for exit commands | |
| # if user_message.lower() in ['exit', 'quit', 'bye', '']: | |
| # print("\nπ Goodbye! Have a great day!") | |
| # break | |
| # # Display assistant prefix and stream response | |
| # print("π€ Assistant: ", end="", flush=True) | |
| # # Stream response chunk by chunk (ChatGPT-style) | |
| # response = await query_innscribe_bot(user_message, stream=True) | |
| # print() # Empty line between messages | |
| # except KeyboardInterrupt: | |
| # print("\n\nπ Conversation interrupted. Goodbye!") | |
| # break | |
| # except Exception as e: | |
| # print(f"\nβ Error: {e}") | |
| # print("Please try again or type 'exit' to quit.\n") | |
| # async def main(): | |
| # try: | |
| # # Option 1: Single message example (ChatGPT-style streaming) | |
| # user_message = "Hello, how can I help you?" | |
| # print(f"π€ You: {user_message}\n") | |
| # print("π€ Assistant: ", end="", flush=True) | |
| # # Stream response chunk by chunk (ChatGPT-style) | |
| # response = await query_innscribe_bot(user_message, stream=True) | |
| # # Option 2: Uncomment below to use interactive chat mode instead | |
| # # await interactive_chat() | |
| # except Exception as e: | |
| # print(f"\nβ Error: {e}") | |
| # print(traceback.format_exc()) | |
| # if __name__ == "__main__": | |
| # try: | |
| # asyncio.run(main()) | |
| # except Exception as e: | |
| # print(f"Fatal error: {e}") | |
| # print(traceback.format_exc()) | |
| # example_usage.py | |
| import asyncio | |
| import traceback | |
| from agents import Runner, RunContextWrapper | |
| from agents.exceptions import InputGuardrailTripwireTriggered | |
| from openai.types.responses import ResponseTextDeltaEvent | |
| from chatbot.chatbot_agent import jobobike_assistant | |
| async def query_jobobike_bot(user_message: str, stream: bool = True): | |
| """ | |
| Query the JOBObike bot with optional streaming (ChatGPT-style chunk-by-chunk output). | |
| Args: | |
| user_message: The user's message/query | |
| stream: If True, stream responses chunk by chunk like ChatGPT. If False, wait for complete response. | |
| Returns: | |
| The final output from the agent | |
| """ | |
| try: | |
| ctx = RunContextWrapper(context={}) | |
| if stream: | |
| # ChatGPT-style streaming: clean output, text appears chunk by chunk | |
| result = Runner.run_streamed( | |
| jobobike_assistant, | |
| input=user_message, | |
| context=ctx.context | |
| ) | |
| # Stream text chunk by chunk in real-time (like ChatGPT) | |
| async for event in result.stream_events(): | |
| if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | |
| delta = event.data.delta | |
| if delta: | |
| # Print each chunk immediately as it arrives (ChatGPT-style) | |
| print(delta, end="", flush=True) | |
| print("\n") # New line after streaming completes | |
| return result.final_output | |
| else: | |
| # Non-streaming mode: wait for complete response | |
| response = await Runner.run( | |
| jobobike_assistant, | |
| input=user_message, | |
| context=ctx.context | |
| ) | |
| return response.final_output | |
| except InputGuardrailTripwireTriggered as e: | |
| print(f"\nβ οΈ Guardrail blocked the query: {e}") | |
| if hasattr(e, 'result') and hasattr(e.result, 'output_info'): | |
| print(f"Guardrail reason: {e.result.output_info}") | |
| print("The query was determined to be unrelated to JOBObike services.") | |
| return None | |
| except Exception as e: | |
| print(f"\nβ Error: {e}") | |
| print(traceback.format_exc()) | |
| raise | |
| async def interactive_chat(): | |
| """ | |
| Interactive ChatGPT-style conversation loop. | |
| Type 'exit', 'quit', or 'bye' to end the conversation. | |
| """ | |
| print("=" * 60) | |
| print("π€ JOBObike Assistant - ChatGPT-style Chat") | |
| print("Type 'exit', 'quit', or 'bye' to end the conversation") | |
| print("=" * 60) | |
| print() | |
| while True: | |
| try: | |
| user_message = input("π€ You: ").strip() | |
| # Check for exit commands | |
| if user_message.lower() in ['exit', 'quit', 'bye', '']: | |
| print("\nπ Goodbye! Have a great day!") | |
| break | |
| # Display assistant prefix and stream response | |
| print("π€ Assistant: ", end="", flush=True) | |
| # Stream response chunk by chunk (ChatGPT-style) | |
| response = await query_jobobike_bot(user_message, stream=True) | |
| print() # Empty line between messages | |
| except KeyboardInterrupt: | |
| print("\n\nπ Conversation interrupted. Goodbye!") | |
| break | |
| except Exception as e: | |
| print(f"\nβ Error: {e}") | |
| print("Please try again or type 'exit' to quit.\n") | |
| async def main(): | |
| try: | |
| # Option 1: Single message example (ChatGPT-style streaming) | |
| user_message = "Hello, tell me about your services." | |
| print(f"π€ You: {user_message}\n") | |
| print("π€ Assistant: ", end="", flush=True) | |
| # Stream response chunk by chunk (ChatGPT-style) | |
| response = await query_jobobike_bot(user_message, stream=True) | |
| # Option 2: Uncomment below to use interactive chat mode instead | |
| # await interactive_chat() | |
| except Exception as e: | |
| print(f"\nβ Error: {e}") | |
| print(traceback.format_exc()) | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except Exception as e: | |
| print(f"Fatal error: {e}") | |
| print(traceback.format_exc()) |