| import asyncio |
| from typing import TypedDict |
| import gradio as gr |
| import uuid |
| import logging |
|
|
| from langgraph.checkpoint.memory import InMemorySaver |
| from langgraph.graph.state import END, START, StateGraph |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| class SimpleState(TypedDict): |
| messages : list[str] |
|
|
| |
| async def test_node(state: SimpleState): |
| for i in range(5): |
| msg = f"Step {i+1}/5: processing..." |
| logger.info(msg) |
| state["messages"].append(msg) |
| yield state |
| await asyncio.sleep(1) |
| state["messages"].append("All steps complete!") |
| yield state |
| return |
|
|
|
|
| def create_graph(thread_id : str): |
| logger.info("%s| Creating Graph : started", thread_id) |
| memory = InMemorySaver() |
| workflow = StateGraph(SimpleState) |
| workflow.add_node("test", test_node) |
| workflow.add_edge(START, "test") |
| workflow.add_edge("test", END) |
|
|
| logger.info("%s| Creating Graph : finished", thread_id) |
| return workflow.compile(checkpointer=memory) |
|
|
| |
| async def process_reports_stream(files): |
| if not files: |
| yield "Please upload at least one PDF file.", None |
| return |
|
|
| yield f"Initiating processing of {len(files)} files...", None |
| thread_id = str(uuid.uuid4()) |
| logger.info(f"Starting workflow with thread_id={thread_id}") |
|
|
| workflow = create_graph(thread_id) |
| state = SimpleState(messages=[]) |
| for file in files: |
| state["messages"].append(f"Uploaded file: {file.name}") |
|
|
| config = {"configurable": {"thread_id": thread_id}} |
| |
| async for state_update in workflow.astream(state,config=config, stream_mode="values"): |
| |
| print("state_update=",state_update) |
| buffer = "\n".join(state_update["messages"]) |
| yield buffer, None |
| await asyncio.sleep(1) |
|
|
| await asyncio.sleep(5) |
| |
| yield "✅ Processing complete! (No file generated in test)", None |
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("## Async Streaming Progress Test") |
|
|
| file_input = gr.File(file_types=[".pdf"], file_count="multiple", label="Upload PDFs") |
| output_box = gr.Markdown(label="Progress Output") |
| pdf_output = gr.File(label="Generated Report") |
|
|
| run_btn = gr.Button("Start Processing") |
| run_btn.click( |
| process_reports_stream, |
| inputs=file_input, |
| outputs=[output_box, pdf_output], |
| queue=True, |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|