from __future__ import annotations import os from typing import List, Dict from dotenv import load_dotenv try: # langchain>=1.0 moved globals helpers under langchain_core from langchain.globals import set_verbose, set_debug except ImportError: from langchain_core.globals import set_verbose, set_debug from langchain_google_genai import ChatGoogleGenerativeAI from langchain_groq import ChatGroq from langchain_openai import ChatOpenAI from langgraph.constants import END from langgraph.graph import StateGraph from langgraph.prebuilt import create_react_agent from agent.prompts import cli_system_prompt from agent.states import AgentConfig, AgentGraphState, ModelBackend from agent.tools import ( write_file, read_file, get_current_directory, list_files, print_tree, search_files, summarize_project, edit_file, delete_file, run_cmd, init_project_root, ) load_dotenv() DEBUG_FLAG = os.getenv("AGENT_DEBUG", "false").lower() in {"1", "true", "yes", "on"} set_debug(DEBUG_FLAG) set_verbose(DEBUG_FLAG) def _build_llm(config: AgentConfig): """Instantiate the requested backend with sensible defaults.""" if config.backend == ModelBackend.GEMINI: model = config.model or os.getenv("GEMINI_MODEL", "gemini-2.0-flash") return ChatGoogleGenerativeAI(model=model, temperature=config.temperature) if config.backend == ModelBackend.OPENROUTER: model = config.model or os.getenv("OPENROUTER_MODEL", "openrouter/meta-llama/llama-3.1-8b-instruct") base_url = os.getenv("OPENROUTER_API_URL", "https://openrouter.ai/api/v1") api_key = os.getenv("OPENROUTER_API_KEY") if not api_key: raise ValueError("OPENROUTER_API_KEY is not set in the environment.") return ChatOpenAI(model=model, base_url=base_url, api_key=api_key, temperature=config.temperature) # Default to Groq backend model = config.model or os.getenv("GROQ_MODEL", "openai/gpt-oss-120b") api_key = os.getenv("GROQ_API_KEY") if not api_key: raise ValueError("GROQ_API_KEY is not set in the environment.") return ChatGroq(model=model, temperature=config.temperature) class AgentRunner: """Gemini/Qwen-style CLI agent that keeps conversation state across turns.""" def __init__(self, config: AgentConfig): self.config = config self.project_directory = init_project_root(config.project_directory) self.project_summary: str | None = None self.messages: List[Dict[str, str]] = [] self.llm = _build_llm(config) self.tools = [ read_file, write_file, edit_file, delete_file, list_files, print_tree, search_files, summarize_project, get_current_directory, run_cmd, ] self.react_agent = create_react_agent(self.llm, self.tools) self.graph = self._build_graph() def _build_graph(self): workflow = StateGraph(AgentGraphState) workflow.add_node("bootstrap", self._bootstrap_node) workflow.add_node("react", self._react_node) workflow.set_entry_point("bootstrap") workflow.add_edge("bootstrap", "react") workflow.add_edge("react", END) return workflow.compile() def _bootstrap_node(self, state: AgentGraphState) -> AgentGraphState: """ Ensures project root + summary exist, injects system prompt, and appends the current user prompt to the rolling conversation. """ project_root = init_project_root(state["project_directory"]) refresh_requested = state.get("refresh_context", False) summary = state.get("project_summary") if self.config.auto_context and (refresh_requested or not summary): summary = summarize_project.run(".") messages = list(state.get("messages") or []) system_prompt = cli_system_prompt(summary) if not messages or messages[0].get("role") != "system": messages.insert(0, {"role": "system", "content": system_prompt}) else: messages[0] = {"role": "system", "content": system_prompt} pending = state.get("pending_user_message") if pending: messages.append({"role": "user", "content": pending}) return { "project_directory": project_root, "project_summary": summary, "messages": messages, "pending_user_message": None, "refresh_context": False, } def _react_node(self, state: AgentGraphState) -> AgentGraphState: """Delegates to the LangGraph ReAct agent to decide tool calls.""" result = self.react_agent.invoke({"messages": state["messages"]}) return { "messages": result["messages"], "project_summary": state.get("project_summary"), "project_directory": state.get("project_directory"), } def invoke(self, user_prompt: str, *, refresh_context: bool = False, clear_history: bool = False) -> AgentGraphState: """ Runs a single user turn through the agent. Returns the updated LangGraph state (including messages for streaming / history). """ if clear_history: self.reset_history() initial_state: AgentGraphState = { "project_directory": self.project_directory, "project_summary": None if refresh_context else self.project_summary, "messages": self.messages, "pending_user_message": user_prompt, "refresh_context": refresh_context, } final_state = self.graph.invoke(initial_state, config={"recursion_limit": self.config.recursion_limit}) self.project_summary = final_state.get("project_summary", self.project_summary) self.messages = final_state.get("messages", []) return final_state def reset_history(self): """Drops the in-memory conversation.""" self.messages = [] def refresh_summary(self) -> str: """Forces a fresh project summary and returns it.""" self.project_summary = summarize_project.run(".") return self.project_summary def conversation_history(self) -> List[Dict[str, str]]: """Returns an immutable copy of the chat history.""" return list(self.messages) # Convenience helper for legacy imports def agent_factory(config: AgentConfig) -> AgentRunner: return AgentRunner(config)