yashpinjarkar10's picture
Rename streamlit_ui.py to app.py
3c9b226 verified
from __future__ import annotations
from typing import Literal, TypedDict
import asyncio
import os
import streamlit as st
import json
import datetime
import logfire
from supabase import Client
from pydantic_ai.models.gemini import GeminiModel
from supabase import create_client
# Import all the message part classes
from pydantic_ai.messages import (
ModelMessage,
ModelRequest,
ModelResponse,
SystemPromptPart,
UserPromptPart,
TextPart,
ToolCallPart,
ToolReturnPart,
RetryPromptPart,
ModelMessagesTypeAdapter
)
from pydantic_ai_agent import PineScript_expert, PydanticAIDeps
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
# Initialize Supabase client
supabase: Client = create_client(
os.getenv("SUPABASE_URL1"),
os.getenv("SUPABASE_SERVICE_KEY1")
)
# Configure logfire to suppress warnings (optional)
logfire.configure(send_to_logfire='always')
class ChatMessage(TypedDict):
"""Format of messages sent to the browser/API."""
role: Literal['user', 'model']
timestamp: str
content: str
def display_message_part(part):
"""
Display a single part of a message in the Streamlit UI.
Customize how you display system prompts, user prompts,
tool calls, tool returns, etc.
"""
# system-prompt
if part.part_kind == 'system-prompt':
with st.chat_message("system"):
st.markdown(f"**System**: {part.content}")
# user-prompt
elif part.part_kind == 'user-prompt':
with st.chat_message("user"):
st.markdown(part.content)
# text
elif part.part_kind == 'text':
with st.chat_message("assistant"):
st.markdown(part.content)
async def run_agent_with_streaming(user_input: str):
"""
Run the agent with streaming text for the user_input prompt,
while maintaining the entire conversation in `st.session_state.messages`.
"""
# Prepare dependencies for the agent
deps = PydanticAIDeps(
supabase=supabase,
gemini=GeminiModel('gemini-1.5-flash', api_key=GOOGLE_API_KEY)
)
message_placeholder = st.empty()
partial_text = ""
try:
# Run the agent in a stream
async with PineScript_expert.run_stream(
user_input,
deps=deps,
message_history=st.session_state.messages[:-1], # pass entire conversation so far
) as result:
# Render partial text as it arrives
async for chunk in result.stream_text(delta=True):
partial_text += chunk
message_placeholder.markdown(partial_text)
except Exception as e:
# Instead of displaying the raw error, display a friendly message.
message_placeholder.markdown("**Could you please ask again?**")
st.error("Could you please ask again?")
# Also, add the friendly message to the conversation history.
st.session_state.messages.append(
ModelResponse(parts=[TextPart(content="Could you please ask again?")])
)
return
# Now that the stream is finished, add new messages from this run
filtered_messages = [
msg for msg in result.new_messages()
if not (hasattr(msg, 'parts') and any(part.part_kind == 'user-prompt' for part in msg.parts))
]
st.session_state.messages.extend(filtered_messages)
st.session_state.messages.append(
ModelResponse(parts=[TextPart(content=partial_text)])
)
def store_user_query(user_query: str):
"""
Stores the user's query in the 'user_queries' table.
"""
try:
result = supabase.table('user_queries').insert({
'query': user_query,
'timestamp': datetime.datetime.now().isoformat()
}).execute()
print("User query stored successfully.")
except Exception as e:
print(f"Error storing user query: {e}")
async def main():
st.title("PineScript Agentic RAG")
st.write("Ask any question about PineScript, the hidden truths of its beauty lie within.")
# Initialize chat history in session state if not present
if "messages" not in st.session_state:
st.session_state.messages = []
# Display all messages from the conversation so far
# Each message is either a ModelRequest or ModelResponse.
# We iterate over their parts to decide how to display them.
for msg in st.session_state.messages:
if isinstance(msg, ModelRequest) or isinstance(msg, ModelResponse):
for part in msg.parts:
display_message_part(part)
# Chat input for the user
user_input = st.chat_input("What questions do you have about PineScript?")
if user_input:
# We append a new request to the conversation explicitly
store_user_query(user_input)
st.session_state.messages.append(
ModelRequest(parts=[UserPromptPart(content=user_input)])
)
# Display user prompt in the UI
with st.chat_message("user"):
st.markdown(user_input)
# Display the assistant's partial response while streaming
with st.chat_message("assistant"):
# Actually run the agent now, streaming the text
await run_agent_with_streaming(user_input)
if __name__ == "__main__":
asyncio.run(main())