Humanlearning's picture
feat: Implement interactive query and expiry sweep agents with a Gradio UI, add a Hugging Face pre-push hook, and refine .gitignore.
e29da2b
import asyncio
import os
import logging
from typing import Dict, Any, List
import gradio as gr
from dotenv import load_dotenv
load_dotenv(".env.local")
load_dotenv()
from langchain_core.messages import HumanMessage, AIMessage
import uuid
from langgraph.checkpoint.memory import InMemorySaver
# Configure logging for main
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("credentialwatch_agent")
# In-memory checkpointer to preserve tool call context within a session
checkpointer = InMemorySaver()
from credentialwatch_agent.mcp_client import mcp_client
from credentialwatch_agent.agents.expiry_sweep import expiry_sweep_graph
from credentialwatch_agent.agents.interactive_query import get_interactive_query_graph
async def run_expiry_sweep(window_days: int = 90) -> Dict[str, Any]:
"""
Runs the expiry sweep workflow.
"""
logger.info(f"Starting expiry sweep for {window_days} days...")
await mcp_client.connect()
print(f"Starting expiry sweep for {window_days} days...")
# Initialize state
initial_state = {
"providers": [],
"alerts_created": 0,
"errors": [],
"summary": "",
"window_days": window_days
}
# Run the graph
# Note: The graph expects to fetch data itself, so initial state can be minimal.
# We might want to pass window_days if the graph supported dynamic config in state,
# but for now the graph hardcodes 90 or uses tool defaults.
# To make it dynamic, we'd need to update the graph to read from state.
# For this hackathon, we'll assume the graph handles it or we pass it via a modified state if needed.
# The current implementation of fetch_expiring_credentials uses a hardcoded 90 or tool default.
logger.info("Invoking expiry_sweep_graph...")
final_state = await expiry_sweep_graph.ainvoke(initial_state)
logger.info("Expiry sweep graph completed.")
return {
"summary": final_state.get("summary"),
"alerts_created": final_state.get("alerts_created"),
"errors": final_state.get("errors")
}
async def run_chat_turn(message: str, history: List[List[str]], thread_id: str) -> str:
"""
Runs a turn of the interactive query agent.
Uses checkpointer with thread_id to preserve tool call context within a session.
"""
logger.info(f"Starting chat turn with message: {message} (thread_id: {thread_id})")
await mcp_client.connect()
# Only pass the new message - checkpointer handles full history including tool calls
initial_state = {"messages": [HumanMessage(content=message)]}
# Run the graph with checkpointer
logger.info("Invoking interactive_query_graph...")
interactive_query_graph = get_interactive_query_graph(checkpointer=checkpointer)
config = {"configurable": {"thread_id": thread_id}}
final_state = await interactive_query_graph.ainvoke(initial_state, config=config)
logger.info("Interactive query graph completed.")
# Extract the last message
last_message = final_state["messages"][-1]
return last_message.content
# --- Gradio UI ---
async def start_app():
"""Initializes the app and connects to MCP servers."""
print("Connecting to MCP servers...")
logger.info("Initializing app and connecting to MCP servers...")
await mcp_client.connect()
async def stop_app():
"""Closes connections."""
print("Closing MCP connections...")
logger.info("Stopping app and closing MCP connections...")
await mcp_client.close()
with gr.Blocks(title="CredentialWatch") as demo:
gr.Markdown("# CredentialWatch Agent System")
with gr.Tab("Interactive Query"):
gr.Markdown("Ask questions about provider credentials, e.g., 'Who has expiring licenses?'")
thread_id_state = gr.State(lambda: str(uuid.uuid4()))
chat_interface = gr.ChatInterface(
fn=run_chat_turn,
additional_inputs=[thread_id_state]
)
with gr.Tab("Expiry Sweep"):
gr.Markdown("Run a batch sweep to check for expiring credentials and create alerts.")
with gr.Row():
sweep_btn = gr.Button("Run Sweep", variant="primary")
sweep_output = gr.JSON(label="Sweep Results")
sweep_btn.click(fn=run_expiry_sweep, inputs=[], outputs=[sweep_output])
# Startup/Shutdown hooks
# Gradio doesn't have native async startup hooks easily exposed in Blocks without mounting to FastAPI.
# But we can run the connect logic when the script starts if we run it via `uv run`.
# For a proper app, we'd use lifespan events in FastAPI.
# Here, we will just connect globally on import or first use if possible,
# or use a startup event if we were using `gr.mount_gradio_app`.
# For simplicity in this script, we'll rely on the global mcp_client.connect() being called
# or we can wrap the demo launch.
if __name__ == "__main__":
# Launch the demo.
# Note: We rely on lazy connection in run_chat_turn/run_expiry_sweep to connect mcp_client.
# This avoids creating a conflicting event loop before Gradio starts.
demo.launch(server_name="0.0.0.0", server_port=7860, mcp_server=True)