Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from typing import Literal, TypedDict | |
| import asyncio | |
| import os | |
| import streamlit as st | |
| import json | |
| import datetime | |
| import logfire | |
| from supabase import Client | |
| from pydantic_ai.models.gemini import GeminiModel | |
| from supabase import create_client | |
| # Import all the message part classes | |
| from pydantic_ai.messages import ( | |
| ModelMessage, | |
| ModelRequest, | |
| ModelResponse, | |
| SystemPromptPart, | |
| UserPromptPart, | |
| TextPart, | |
| ToolCallPart, | |
| ToolReturnPart, | |
| RetryPromptPart, | |
| ModelMessagesTypeAdapter | |
| ) | |
| from pydantic_ai_agent import PineScript_expert, PydanticAIDeps | |
| # Load environment variables | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| # Initialize Supabase client | |
| supabase: Client = create_client( | |
| os.getenv("SUPABASE_URL1"), | |
| os.getenv("SUPABASE_SERVICE_KEY1") | |
| ) | |
| # Configure logfire to suppress warnings (optional) | |
| logfire.configure(send_to_logfire='always') | |
| class ChatMessage(TypedDict): | |
| """Format of messages sent to the browser/API.""" | |
| role: Literal['user', 'model'] | |
| timestamp: str | |
| content: str | |
| def display_message_part(part): | |
| """ | |
| Display a single part of a message in the Streamlit UI. | |
| Customize how you display system prompts, user prompts, | |
| tool calls, tool returns, etc. | |
| """ | |
| # system-prompt | |
| if part.part_kind == 'system-prompt': | |
| with st.chat_message("system"): | |
| st.markdown(f"**System**: {part.content}") | |
| # user-prompt | |
| elif part.part_kind == 'user-prompt': | |
| with st.chat_message("user"): | |
| st.markdown(part.content) | |
| # text | |
| elif part.part_kind == 'text': | |
| with st.chat_message("assistant"): | |
| st.markdown(part.content) | |
| async def run_agent_with_streaming(user_input: str): | |
| """ | |
| Run the agent with streaming text for the user_input prompt, | |
| while maintaining the entire conversation in `st.session_state.messages`. | |
| """ | |
| # Prepare dependencies for the agent | |
| deps = PydanticAIDeps( | |
| supabase=supabase, | |
| gemini=GeminiModel('gemini-1.5-flash', api_key=GOOGLE_API_KEY) | |
| ) | |
| message_placeholder = st.empty() | |
| partial_text = "" | |
| try: | |
| # Run the agent in a stream | |
| async with PineScript_expert.run_stream( | |
| user_input, | |
| deps=deps, | |
| message_history=st.session_state.messages[:-1], # pass entire conversation so far | |
| ) as result: | |
| # Render partial text as it arrives | |
| async for chunk in result.stream_text(delta=True): | |
| partial_text += chunk | |
| message_placeholder.markdown(partial_text) | |
| except Exception as e: | |
| # Instead of displaying the raw error, display a friendly message. | |
| message_placeholder.markdown("**Could you please ask again?**") | |
| st.error("Could you please ask again?") | |
| # Also, add the friendly message to the conversation history. | |
| st.session_state.messages.append( | |
| ModelResponse(parts=[TextPart(content="Could you please ask again?")]) | |
| ) | |
| return | |
| # Now that the stream is finished, add new messages from this run | |
| filtered_messages = [ | |
| msg for msg in result.new_messages() | |
| if not (hasattr(msg, 'parts') and any(part.part_kind == 'user-prompt' for part in msg.parts)) | |
| ] | |
| st.session_state.messages.extend(filtered_messages) | |
| st.session_state.messages.append( | |
| ModelResponse(parts=[TextPart(content=partial_text)]) | |
| ) | |
| def store_user_query(user_query: str): | |
| """ | |
| Stores the user's query in the 'user_queries' table. | |
| """ | |
| try: | |
| result = supabase.table('user_queries').insert({ | |
| 'query': user_query, | |
| 'timestamp': datetime.datetime.now().isoformat() | |
| }).execute() | |
| print("User query stored successfully.") | |
| except Exception as e: | |
| print(f"Error storing user query: {e}") | |
| async def main(): | |
| st.title("PineScript Agentic RAG") | |
| st.write("Ask any question about PineScript, the hidden truths of its beauty lie within.") | |
| # Initialize chat history in session state if not present | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| # Display all messages from the conversation so far | |
| # Each message is either a ModelRequest or ModelResponse. | |
| # We iterate over their parts to decide how to display them. | |
| for msg in st.session_state.messages: | |
| if isinstance(msg, ModelRequest) or isinstance(msg, ModelResponse): | |
| for part in msg.parts: | |
| display_message_part(part) | |
| # Chat input for the user | |
| user_input = st.chat_input("What questions do you have about PineScript?") | |
| if user_input: | |
| # We append a new request to the conversation explicitly | |
| store_user_query(user_input) | |
| st.session_state.messages.append( | |
| ModelRequest(parts=[UserPromptPart(content=user_input)]) | |
| ) | |
| # Display user prompt in the UI | |
| with st.chat_message("user"): | |
| st.markdown(user_input) | |
| # Display the assistant's partial response while streaming | |
| with st.chat_message("assistant"): | |
| # Actually run the agent now, streaming the text | |
| await run_agent_with_streaming(user_input) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |