Multi-Agent-App / app /utils.py
AlexFoxalt's picture
Init commit session 6
01e95d0
import os
import uuid
from pathlib import Path
import tiktoken
from langchain_core.messages import HumanMessage
from states import State
os.makedirs("./data", exist_ok=True)
def tiktoken_len(text):
tokens = tiktoken.encoding_for_model("gpt-4o-mini").encode(
text,
)
return len(tokens)
def agent_node(state, agent, name):
result = agent.invoke(state)
return {"messages": [HumanMessage(content=result["output"], name=name)]}
def enter_chain(message: str) -> dict:
results = {
"messages": [HumanMessage(content=message)],
}
return results
def get_last_message(state: State) -> str:
return state["messages"][-1].content
def join_graph(response: dict):
return {"messages": [response["messages"][-1]]}
def create_random_subdirectory():
random_id = str(uuid.uuid4())[:8] # Use first 8 characters of a UUID
subdirectory_path = os.path.join("./data", random_id)
os.makedirs(subdirectory_path, exist_ok=True)
return subdirectory_path
def prelude(state):
written_files = []
if not WORKING_DIRECTORY.exists():
WORKING_DIRECTORY.mkdir()
try:
written_files = [
f.relative_to(WORKING_DIRECTORY) for f in WORKING_DIRECTORY.rglob("*")
]
except Exception:
pass
if not written_files:
return {**state, "current_files": "No files written."}
return {
**state,
"current_files": "\nBelow are files your team has written to the directory:\n"
+ "\n".join([f" - {f}" for f in written_files]),
}
WORKING_DIRECTORY = Path(create_random_subdirectory())