Makhfi_AI / workflow /utils.py
Aasher's picture
Add core functionality for Makhfi AI, including agent logic, memory management, and vectorstore integration
f45999f
from datetime import datetime
from langchain_core.documents import Document
def format_memories(memories: list[dict]) -> str:
"""
Formats a list of memory dictionaries into a human-readable string.
"""
if not memories:
return "No memories found for this user."
output_parts = []
for i, memory in enumerate(memories):
memory_text = memory.get('memory', 'N/A')
created_at_str = memory.get('created_at')
categories = memory.get('categories', [])
# --- Format the timestamp for readability ---
if created_at_str:
try:
dt_obj = datetime.fromisoformat(created_at_str)
dt_obj = dt_obj.astimezone() # Convert to system timezone
formatted_date = dt_obj.strftime("%B %d, %Y at %I:%M %p")
except (ValueError, TypeError):
formatted_date = f"Invalid date format ({created_at_str})"
else:
formatted_date = "N/A"
# --- Format the list of categories ---
if categories and isinstance(categories, list):
formatted_categories = ', '.join(cat.capitalize() for cat in categories)
else:
formatted_categories = "None"
entry = (
f"Memory #{i + 1}\n"
f" Created: {formatted_date}\n"
f" Categories: {formatted_categories}\n"
f" Details: {memory_text}"
)
output_parts.append(entry)
return "\n------------\n".join(output_parts)
def format_docs(docs: list[Document]) -> str:
"""
Converts a list of documents into XML-formatted string.
"""
formatted_docs = "\n\n---\n\n".join(
[
f'<Document href="{doc.metadata["link"]}" title="{doc.metadata["title"]}">\n{doc.page_content}\n</Document>'
for i, doc in enumerate(docs)
]
)
return formatted_docs