Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import streamlit as st | |
| from dotenv import load_dotenv | |
| from langchain_groq import ChatGroq | |
| from langgraph.graph import StateGraph, START, END | |
| # from langgraph.prebuilt import ToolNode, tools_condition | |
| from langgraph.checkpoint.memory import MemorySaver | |
| from langchain_core.messages import AIMessage, HumanMessage | |
| from langgraph.graph.message import add_messages | |
| from typing import Annotated | |
| from typing_extensions import TypedDict | |
| from langchain_together import Together | |
| from tools import execute_python_code, web_search, deep_think | |
| import io | |
| import contextlib | |
| import traceback | |
| from langchain_core.messages import AIMessage | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chains import LLMChain | |
| from langchain_core.messages import AIMessage | |
| from typing import List | |
| from langgraph.graph import StateGraph, END | |
| # Load environment | |
| load_dotenv() | |
| # os.environ["TAVILY_API_KEY"] = os.getenv("TAVILY_API_KEY") | |
| os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY") | |
| together_api_key = os.getenv("TOGETHER_API_KEY") | |
| # Define tools | |
| # LangGraph State | |
| class State(TypedDict): | |
| messages: Annotated[list, add_messages] | |
| input : str | |
| questions : List[str] | |
| answers:List[str] | |
| code : str | |
| explanation:str | |
| subtasks: List[str] | |
| follow_up_questions: List[str] | |
| # LLM | |
| code_generator = Together( | |
| model="deepseek-ai/DeepSeek-R1-Distill-Llama-70B-free", | |
| temperature=0.2, | |
| max_tokens=1500, | |
| api_key=together_api_key, # Note: parameter name changed from together_api_key to api_key | |
| ) | |
| # Memory | |
| memory = MemorySaver() | |
| def generate_questions(state: State): | |
| user_input = state["messages"][-1].content | |
| result = subtask_chain.invoke({"user_goal": user_input}) | |
| text = result["text"] | |
| subtasks, questions = parse_subtasks_and_questions(text) | |
| follow_up = "\n".join(f"Q{i+1}: {q}" for i, q in enumerate(questions)) | |
| state["messages"].append(AIMessage(content="To proceed, please answer these questions:\n" + follow_up)) | |
| return { | |
| "messages": state["messages"], | |
| "questions": questions, | |
| "answers": [], # Wait for user | |
| } | |
| def wait_for_answers(state: State): | |
| # Just pass through until answers are submitted | |
| return state | |
| def handle_answers(state: State): | |
| full_input = state["input"] + "\n\n" + "\n".join(state["answers"]) | |
| return {**state, "input": full_input} | |
| # Define node | |
| def ai_assistance(state: State): | |
| result =code_generator.invoke(state["messages"]) | |
| return {"messages": state['messages']+[result]} | |
| # def agent_node(state: State): | |
| # # Use your LLM here (e.g., Together, OpenAI, etc.) | |
| # model = ChatGoogleGenerativeAI(model = "gemini-2.0-flash-001").bind_tools(tools) | |
| # follow_up_prompt = "Break down this task into subtasks and ask follow-up questions if needed:\n\n" | |
| # last_user_msg = state["messages"][-1].content | |
| # full_prompt = follow_up_prompt + last_user_msg | |
| # response = model.invoke(full_prompt) | |
| # return {"messages": state["messages"] + [AIMessage(content=response)]} | |
| llm = ChatGroq( model="qwen/qwen3-32b",temperature=0.6) | |
| # Template to extract subtasks from the user's input | |
| subtask_prompt = PromptTemplate.from_template( | |
| """You are an expert AI agent designer. | |
| Given the user's goal: | |
| "{user_goal}" | |
| 1. Break this goal into a clear list of subtasks (in bullet points). | |
| 2. If any clarification is needed, ask relevant follow-up questions. | |
| Respond in this format: | |
| --- | |
| Subtasks: | |
| - ... | |
| - ... | |
| Follow-Up Questions (if any): | |
| - ... | |
| ---""" | |
| ) | |
| subtask_chain = LLMChain(llm=llm, prompt=subtask_prompt) | |
| def agent_node(state: State): | |
| user_input = state["messages"][-1].content | |
| # Get subtasks and possible questions | |
| result = subtask_chain.invoke({"user_goal": user_input}) | |
| response_text = result["text"] | |
| # Parse subtasks and follow-up questions | |
| subtasks, questions = parse_subtasks_and_questions(response_text) | |
| # Append AI response to messages | |
| state["messages"].append(AIMessage(content=response_text)) | |
| # Save subtasks and questions into state | |
| return { | |
| "messages": state["messages"], | |
| "subtasks": subtasks, | |
| "follow_up_questions": questions | |
| } | |
| # βοΈ Helper function to parse bullet points | |
| def parse_subtasks_and_questions(text: str): | |
| subtasks = [] | |
| questions = [] | |
| collecting = None | |
| for line in text.strip().splitlines(): | |
| line = line.strip() | |
| if line.lower().startswith("subtasks:"): | |
| collecting = "subtasks" | |
| elif line.lower().startswith("follow-up questions"): | |
| collecting = "questions" | |
| elif line.startswith("-"): | |
| if collecting == "subtasks": | |
| subtasks.append(line[1:].strip()) | |
| elif collecting == "questions": | |
| questions.append(line[1:].strip()) | |
| return subtasks, questions | |
| import time | |
| def generate_code(state: State): | |
| user_prompt = state["input"] | |
| system_prompt = """You are an expert Python coding assistant specializing in LangGraph applications. | |
| Generate clean, working Python code for the user's request with these requirements: | |
| 1. The code MUST use the LangGraph framework (langgraph library). | |
| 2. Implement a proper flow graph using StateGraph. | |
| 3. Include all necessary imports and make sure the code is complete. | |
| 4. Include code to visualize the flow graph (using builder.show() or similar methods). | |
| 5. Structure the code with proper node functions, state definitions, and graph compilation. | |
| Your code must include the following: | |
| 1. **LangGraph architecture**: Use StateGraph, add_node, add_edge, set_entry_point, etc. | |
| 2. **Subtask breakdown**: Translate user requirements into multiple graph nodes that represent subtasks. | |
| 3. **LLM Agent**: At least one node should be powered by an LLM (e.g., via langchain or similar). | |
| 4. **Terminal Output**: Include a node that prints or returns the final output. | |
| 5. **Execution Ready**: All necessary imports, type definitions (e.g., TypedDict for state), and execution commands (`graph = builder.compile()` + `graph.invoke()`). | |
| STRICT RULES: | |
| - DO NOT explain anything. | |
| - DO NOT wrap code in markdown. | |
| - DO NOT add comments. | |
| IMPORTANT: Output ONLY the final Python code. | |
| DO NOT include any explanations, comments, or text before, inside, or after the code. | |
| Start the output with the necessary import statements (e.g., "from langgraph import StateGraph, State, Transition"). | |
| No additional text, no markdown fences, just the pure code. | |
| User request:""" | |
| instruction = f""" | |
| You are an expert LangGraph developer. | |
| Your task is to generate working Python code using the LangGraph library based on the user's request. | |
| Guidelines: | |
| - Identify the high-level steps from the user's prompt. | |
| - Break the task into individual LangGraph nodes (functions). | |
| - Define a TypedDict for the shared state. | |
| - Build a `StateGraph` using `add_node`, `add_edge`, and `set_entry_point`. | |
| - Ensure the graph compiles and ends at the `END` node. | |
| - Avoid external libraries unless clearly specified. | |
| - Print final output using a terminal node if needed. | |
| - Keep it clean, minimal, and executable. | |
| Now, generate the code for this task: | |
| {user_prompt} | |
| """ | |
| full_prompt = system_prompt + instruction | |
| for attempt in range(3): | |
| try: | |
| code_response = code_generator.invoke(full_prompt) | |
| return {**state, | |
| "code": str(code_response)} | |
| except Exception as e: | |
| if "503" in str(e): | |
| print(f"[Retry {attempt+1}/3] Together API unavailable (503). Retrying...") | |
| time.sleep(2) | |
| else: | |
| raise e | |
| raise Exception("Together API failed after 3 retries.") | |
| def explain_code(state): | |
| code = state["code"] | |
| user_prompt = state["input"] | |
| system_prompt = """You are a LangGraph expert who explains code clearly. Provide a detailed explanation of the code in three parts: | |
| 1. LANGGRAPH FLOW: Explain the flow graph architecture, including nodes, edges, and how data flows through the graph. Describe what would appear in the flow visualization. | |
| 2. CODE FLOW: Explain the high-level flow of the code, its architecture, and how different components interact. | |
| 3. CODE EXPLANATION: Break down the code step-by-step so a beginner can understand what each part does. | |
| 4. VISUALIZATION INSTRUCTIONS: Provide clear instructions on how to run the code to see the flow visualization. | |
| Make your explanation clear, concise, and educational. Include ASCII art to represent the flow graph if possible. | |
| """ | |
| prompt = f"""User requested: {user_prompt} | |
| Here's the generated LangGraph code: | |
| ```python | |
| {code} | |
| ``` | |
| Explain the LangGraph flow, code architecture, and provide detailed instructions for visualization.""" | |
| full_prompt = system_prompt + prompt | |
| explanation = code_generator.invoke(full_prompt) | |
| return {**state, "explanation": explanation, "code": state.get("code")} | |
| # from langchain.chat_models import ChatOpenAI | |
| # llm = ChatOpenAI(model_name="gpt-4", temperature=0) | |
| # def agent_node(state): | |
| # input_text = state["input"] | |
| # result = llm.predict(input_text) | |
| # return {"response": result} | |
| def execute_code(state: State) -> State: | |
| code = state.get("code", "") | |
| buffer = io.StringIO() | |
| try: | |
| with contextlib.redirect_stdout(buffer): | |
| exec(code, {}) | |
| output = buffer.getvalue() or "β Code executed successfully with no output." | |
| except Exception: | |
| output = "β Execution Error:\n" + traceback.format_exc() | |
| return { | |
| **state, | |
| "execution_result": output | |
| } | |
| def subtask_splitter(state): | |
| input_text = state["input"] | |
| # Hardcoded LLM call example | |
| response = llm.predict(f"Split this task into clear LangGraph subtasks:\n{input_text}") | |
| return {"subtasks": response} | |
| def get_all_tools(): | |
| return [ | |
| # ... other tools | |
| execute_python_code | |
| ] | |
| def router(state): | |
| user_input = state["input"].lower() | |
| if "generate" in user_input: | |
| return "Generate_Code" | |
| else: | |
| return "AI_Assistance" | |
| # Define your graph builder with the state schema | |
| builder = StateGraph(State) | |
| # Add Nodes | |
| builder.add_node("LLM_Agent", agent_node) | |
| # builder.add_node("AI_Assistance", ai_assistance) | |
| builder.add_node("Generate_Questions", generate_questions) | |
| builder.add_node("Wait_For_Answers", wait_for_answers) | |
| builder.add_node("Handle_Answers", handle_answers) | |
| # this must be defined | |
| builder.add_node("Generate_Code", generate_code) | |
| builder.add_node("Code_Explainer", explain_code) | |
| # Set Entry Point | |
| builder.set_entry_point("LLM_Agent") | |
| # Define Conditional Function | |
| def check_if_answered(state: State) -> str: | |
| if "answers" in state and state['answers'] and any(state['answers']): | |
| return "answered" | |
| else: | |
| return "not_answered" | |
| # Define Flow | |
| builder.add_edge("LLM_Agent", "Generate_Questions") | |
| builder.add_conditional_edges( | |
| "Generate_Questions", | |
| check_if_answered, | |
| { | |
| "answered": "Handle_Answers", | |
| "not_answered": "Wait_For_Answers" | |
| } | |
| ) | |
| builder.add_edge("Wait_For_Answers", "Generate_Questions") | |
| builder.add_edge("Handle_Answers", "Generate_Code") | |
| builder.add_edge("Generate_Code", "Code_Explainer") | |
| builder.add_edge("Code_Explainer", END) | |
| # Optionally: define what happens after waiting (if it's a loop) | |
| # builder.add_edge("Wait_For_Answers", "Generate_Questions") # retry loop | |
| graph = builder.compile(checkpointer=memory) | |
| # Streamlit UI setup | |
| st.set_page_config(page_title="MitraVerse", layout="wide") | |
| st.markdown(""" | |
| <style> | |
| .stChatMessage { | |
| padding: 12px; | |
| margin-bottom: 12px; | |
| border-radius: 12px; | |
| max-width: 90%; | |
| } | |
| .user { | |
| background-color: #dcf8c6; | |
| align-self: flex-end; | |
| } | |
| .bot { | |
| background-color: #f1f0f0; | |
| align-self: flex-start; | |
| } | |
| .input-box { | |
| display: flex; | |
| align-items: center; | |
| gap: 0.5rem; | |
| } | |
| #floating-container { | |
| display: flex; | |
| align-items: center; | |
| justify-content: space-between; | |
| padding: 0.25rem 0.75rem; | |
| background-color: #f9f9f9; | |
| border-radius: 0.75rem; | |
| margin-top: 1rem; | |
| border: 1px solid #ccc; | |
| } | |
| .floating-popup { | |
| margin-top: 0.5rem; | |
| padding: 0.5rem; | |
| border-radius: 0.5rem; | |
| border: 1px solid #ccc; | |
| background-color: white; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| st.title("π§ MitraVerse") | |
| # Columns for button layout | |
| col1, col2, col3 = st.columns(3) | |
| # Initialize session | |
| if "thread_id" not in st.session_state: | |
| st.session_state.thread_id = "1" | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| # Show chat | |
| for msg in st.session_state.chat_history: | |
| role = "user" if isinstance(msg, HumanMessage) else "bot" | |
| st.markdown(f"<div class='stChatMessage {role}'>{msg.content}</div>", unsafe_allow_html=True) | |
| with st.container(): | |
| with st.form("chat_form", clear_on_submit=True): | |
| st.markdown('<div id="floating-container">', unsafe_allow_html=True) | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| user_input = st.text_input("Ask me", label_visibility="collapsed", placeholder="Ask me Anything") | |
| submitted = st.form_submit_button(label="Send") | |
| if submitted and user_input: | |
| st.session_state.chat_history.append(HumanMessage(content=user_input)) | |
| config = {"configurable": {"thread_id": st.session_state.thread_id},"recursion_limit" : 50} | |
| state_input = { | |
| "messages": st.session_state.chat_history, | |
| "input": user_input, | |
| "answers": [], | |
| } | |
| # First round: check if we already have questions pending | |
| result = graph.invoke(state_input, config=config) | |
| if result.get("questions") and not result.get("answers"): | |
| st.session_state.pending_questions = result["questions"] | |
| st.session_state.latest_state = result # Save intermediate state | |
| st.rerun() | |
| else: | |
| st.session_state.chat_history = result.get("messages", st.session_state.chat_history) | |
| if result.get("code"): | |
| st.session_state.latest_code = result["code"] | |
| st.session_state.chat_history.append( | |
| AIMessage(content="**π» Generated Code:**\n\n```python\n" + result["code"] + "\n```") | |
| ) | |
| if result.get("explanation"): | |
| st.session_state.latest_explanation = result["explanation"] | |
| st.session_state.chat_history.append( | |
| AIMessage(content="**π Code Explanation:**\n\n```\n" + result["explanation"] + "\n```") | |
| ) | |
| st.rerun() | |
| elif "pending_questions" in st.session_state and st.session_state.pending_questions: | |
| st.markdown("### π Please answer the following questions:") | |
| answers = [] | |
| with st.form("answer_form", clear_on_submit=True): | |
| for i, question in enumerate(st.session_state.pending_questions): | |
| answers.append(st.text_input(f"{question}", key=f"answer_{i}")) | |
| submit_answers = st.form_submit_button("Submit Answers") | |
| if submit_answers: | |
| latest_state = st.session_state.latest_state | |
| latest_state["answers"] = answers | |
| config = {"configurable": {"thread_id": st.session_state.thread_id}} | |
| result = graph.invoke(latest_state, config=config) | |
| st.session_state.pending_questions = [] # Clear | |
| st.session_state.chat_history = result.get("messages", st.session_state.chat_history) | |
| if result.get("code"): | |
| st.session_state.latest_code = result["code"] | |
| st.session_state.chat_history.append( | |
| AIMessage(content="**π» Generated Code:**\n\n```python\n" + result["code"] + "\n```") | |
| ) | |
| if result.get("explanation"): | |
| st.session_state.latest_explanation = result["explanation"] | |
| st.session_state.chat_history.append( | |
| AIMessage(content="**π Code Explanation:**\n\n```\n" + result["explanation"] + "\n```") | |
| ) | |
| st.rerun() | |
| st.markdown("<script>window.scrollTo(0, document.body.scrollHeight);</script>", unsafe_allow_html=True) | |
| # ================================ | |
| # TOOL BUTTONS SECTION | |
| # ================================ | |
| # col1, col2, col3 = st.columns(3) | |
| # user_prompt = st.session_state.get("latest_code", "") or user_input # fallback to user_input if needed | |
| # with st.container(): | |
| # if col1.button("βοΈ Run Python Code"): | |
| # if user_prompt: | |
| # with st.spinner("Executing your Python code..."): | |
| # result = execute_python_code.invoke({"code": user_prompt}) | |
| # st.success("β Output:") | |
| # st.code(result, language="python") | |
| # else: | |
| # # st.warning("Please enter Python code in the input box.") | |
| # if col1.button("π Web Search"): | |
| # if user_prompt: | |
| # with st.spinner("Searching the web..."): | |
| # result = web_search.invoke({"query": user_prompt}) | |
| # st.success("π Search Result:") | |
| # st.write(result) | |
| # else: | |
| # st.warning("Please enter a search query.") | |
| # if col2.button("π§ Deep Think"): | |
| # if user_prompt: | |
| # with st.spinner("Thinking deeply..."): | |
| # result = deep_think.invoke({"prompt": user_prompt}) | |
| # st.success("π§ Reasoned Output:") | |
| # st.write(result) | |
| # else: | |
| # st.warning("Please enter a prompt.") | |