mohsin / main.py
MuhammadSaad16's picture
Add application file
cb3f557
# # example_usage.py
# import asyncio
# import traceback
# from agents import Runner, RunContextWrapper
# from agents.exceptions import InputGuardrailTripwireTriggered
# from openai.types.responses import ResponseTextDeltaEvent
# from chatbot.chatbot_agent import innscribe_assistant
# async def query_innscribe_bot(user_message: str, stream: bool = True):
# """
# Query the Innoscribe bot with optional streaming (ChatGPT-style chunk-by-chunk output).
# Args:
# user_message: The user's message/query
# stream: If True, stream responses chunk by chunk like ChatGPT. If False, wait for complete response.
# Returns:
# The final output from the agent
# """
# try:
# ctx = RunContextWrapper(context={})
# if stream:
# # ChatGPT-style streaming: clean output, text appears chunk by chunk
# result = Runner.run_streamed(
# innscribe_assistant,
# input=user_message,
# context=ctx.context
# )
# # Stream text chunk by chunk in real-time (like ChatGPT)
# async for event in result.stream_events():
# if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
# delta = event.data.delta
# if delta:
# # Print each chunk immediately as it arrives (ChatGPT-style)
# print(delta, end="", flush=True)
# print("\n") # New line after streaming completes
# return result.final_output
# else:
# # Non-streaming mode: wait for complete response
# response = await Runner.run(
# innscribe_assistant,
# input=user_message,
# context=ctx.context
# )
# return response.final_output
# except InputGuardrailTripwireTriggered as e:
# print(f"\n⚠️ Guardrail blocked the query: {e}")
# if hasattr(e, 'result') and hasattr(e.result, 'output_info'):
# print(f"Guardrail reason: {e.result.output_info}")
# print("The query was determined to be unrelated to Innoscribe services.")
# return None
# except Exception as e:
# print(f"\n❌ Error: {e}")
# print(traceback.format_exc())
# raise
# async def interactive_chat():
# """
# Interactive ChatGPT-style conversation loop.
# Type 'exit', 'quit', or 'bye' to end the conversation.
# """
# print("=" * 60)
# print("πŸ€– Innoscribe Assistant - ChatGPT-style Chat")
# print("Type 'exit', 'quit', or 'bye' to end the conversation")
# print("=" * 60)
# print()
# while True:
# try:
# user_message = input("πŸ‘€ You: ").strip()
# # Check for exit commands
# if user_message.lower() in ['exit', 'quit', 'bye', '']:
# print("\nπŸ‘‹ Goodbye! Have a great day!")
# break
# # Display assistant prefix and stream response
# print("πŸ€– Assistant: ", end="", flush=True)
# # Stream response chunk by chunk (ChatGPT-style)
# response = await query_innscribe_bot(user_message, stream=True)
# print() # Empty line between messages
# except KeyboardInterrupt:
# print("\n\nπŸ‘‹ Conversation interrupted. Goodbye!")
# break
# except Exception as e:
# print(f"\n❌ Error: {e}")
# print("Please try again or type 'exit' to quit.\n")
# async def main():
# try:
# # Option 1: Single message example (ChatGPT-style streaming)
# user_message = "Hello, how can I help you?"
# print(f"πŸ‘€ You: {user_message}\n")
# print("πŸ€– Assistant: ", end="", flush=True)
# # Stream response chunk by chunk (ChatGPT-style)
# response = await query_innscribe_bot(user_message, stream=True)
# # Option 2: Uncomment below to use interactive chat mode instead
# # await interactive_chat()
# except Exception as e:
# print(f"\n❌ Error: {e}")
# print(traceback.format_exc())
# if __name__ == "__main__":
# try:
# asyncio.run(main())
# except Exception as e:
# print(f"Fatal error: {e}")
# print(traceback.format_exc())
# example_usage.py
import asyncio
import traceback
from agents import Runner, RunContextWrapper
from agents.exceptions import InputGuardrailTripwireTriggered
from openai.types.responses import ResponseTextDeltaEvent
from chatbot.chatbot_agent import jobobike_assistant
async def query_jobobike_bot(user_message: str, stream: bool = True):
"""
Query the JOBObike bot with optional streaming (ChatGPT-style chunk-by-chunk output).
Args:
user_message: The user's message/query
stream: If True, stream responses chunk by chunk like ChatGPT. If False, wait for complete response.
Returns:
The final output from the agent
"""
try:
ctx = RunContextWrapper(context={})
if stream:
# ChatGPT-style streaming: clean output, text appears chunk by chunk
result = Runner.run_streamed(
jobobike_assistant,
input=user_message,
context=ctx.context
)
# Stream text chunk by chunk in real-time (like ChatGPT)
async for event in result.stream_events():
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
delta = event.data.delta
if delta:
# Print each chunk immediately as it arrives (ChatGPT-style)
print(delta, end="", flush=True)
print("\n") # New line after streaming completes
return result.final_output
else:
# Non-streaming mode: wait for complete response
response = await Runner.run(
jobobike_assistant,
input=user_message,
context=ctx.context
)
return response.final_output
except InputGuardrailTripwireTriggered as e:
print(f"\n⚠️ Guardrail blocked the query: {e}")
if hasattr(e, 'result') and hasattr(e.result, 'output_info'):
print(f"Guardrail reason: {e.result.output_info}")
print("The query was determined to be unrelated to JOBObike services.")
return None
except Exception as e:
print(f"\n❌ Error: {e}")
print(traceback.format_exc())
raise
async def interactive_chat():
"""
Interactive ChatGPT-style conversation loop.
Type 'exit', 'quit', or 'bye' to end the conversation.
"""
print("=" * 60)
print("πŸ€– JOBObike Assistant - ChatGPT-style Chat")
print("Type 'exit', 'quit', or 'bye' to end the conversation")
print("=" * 60)
print()
while True:
try:
user_message = input("πŸ‘€ You: ").strip()
# Check for exit commands
if user_message.lower() in ['exit', 'quit', 'bye', '']:
print("\nπŸ‘‹ Goodbye! Have a great day!")
break
# Display assistant prefix and stream response
print("πŸ€– Assistant: ", end="", flush=True)
# Stream response chunk by chunk (ChatGPT-style)
response = await query_jobobike_bot(user_message, stream=True)
print() # Empty line between messages
except KeyboardInterrupt:
print("\n\nπŸ‘‹ Conversation interrupted. Goodbye!")
break
except Exception as e:
print(f"\n❌ Error: {e}")
print("Please try again or type 'exit' to quit.\n")
async def main():
try:
# Option 1: Single message example (ChatGPT-style streaming)
user_message = "Hello, tell me about your services."
print(f"πŸ‘€ You: {user_message}\n")
print("πŸ€– Assistant: ", end="", flush=True)
# Stream response chunk by chunk (ChatGPT-style)
response = await query_jobobike_bot(user_message, stream=True)
# Option 2: Uncomment below to use interactive chat mode instead
# await interactive_chat()
except Exception as e:
print(f"\n❌ Error: {e}")
print(traceback.format_exc())
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as e:
print(f"Fatal error: {e}")
print(traceback.format_exc())