sheami / study /app_stream_demo.py
vikramvasudevan's picture
Upload folder using huggingface_hub
10882ae verified
import asyncio
from typing import TypedDict
import gradio as gr
import uuid
import logging
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph.state import END, START, StateGraph
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Simple state class to simulate your SheamiState structure
class SimpleState(TypedDict):
messages : list[str]
# Simulate a node function that yields progress updates asynchronously
async def test_node(state: SimpleState):
for i in range(5):
msg = f"Step {i+1}/5: processing..."
logger.info(msg)
state["messages"].append(msg)
yield state
await asyncio.sleep(1)
state["messages"].append("All steps complete!")
yield state
return
def create_graph(thread_id : str):
logger.info("%s| Creating Graph : started", thread_id)
memory = InMemorySaver()
workflow = StateGraph(SimpleState)
workflow.add_node("test", test_node)
workflow.add_edge(START, "test")
workflow.add_edge("test", END)
logger.info("%s| Creating Graph : finished", thread_id)
return workflow.compile(checkpointer=memory)
# Async Gradio function to stream progress updates to UI
async def process_reports_stream(files):
if not files:
yield "Please upload at least one PDF file.", None
return
yield f"Initiating processing of {len(files)} files...", None
thread_id = str(uuid.uuid4())
logger.info(f"Starting workflow with thread_id={thread_id}")
workflow = create_graph(thread_id)
state = SimpleState(messages=[])
for file in files:
state["messages"].append(f"Uploaded file: {file.name}")
config = {"configurable": {"thread_id": thread_id}}
# Async stream processing with workflow.astream
async for state_update in workflow.astream(state,config=config, stream_mode="values"):
# Yield the latest message to Gradio while file output is None (streaming)
print("state_update=",state_update)
buffer = "\n".join(state_update["messages"])
yield buffer, None
await asyncio.sleep(1)
await asyncio.sleep(5)
# After streaming complete, yield final message and dummy file output (None here)
yield "✅ Processing complete! (No file generated in test)", None
# Gradio UI setup
with gr.Blocks() as demo:
gr.Markdown("## Async Streaming Progress Test")
file_input = gr.File(file_types=[".pdf"], file_count="multiple", label="Upload PDFs")
output_box = gr.Markdown(label="Progress Output")
pdf_output = gr.File(label="Generated Report")
run_btn = gr.Button("Start Processing")
run_btn.click(
process_reports_stream,
inputs=file_input,
outputs=[output_box, pdf_output],
queue=True,
)
if __name__ == "__main__":
demo.launch()