Spaces:
Sleeping
Sleeping
File size: 5,019 Bytes
4b6fc9f e45306a 1cea405 4b6fc9f c8ae01d 4b6fc9f c8ae01d 4b6fc9f b6c2e6e ff5132a 227c1cb ff5132a 66a8403 ff5132a d3f6e1e 86a1b2d 227c1cb 7062e3e 86a1b2d ff5132a 4b6fc9f 1cea405 66a8403 1cea405 c8ae01d e45306a 1cea405 b17ec4a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | import os
import sys
import logging
import json
from importlib.metadata import metadata
from typing import List, Optional
from dotenv import load_dotenv
import chromadb
from sklearn.manifold import TSNE
import numpy as np
### Internal classes
from agents.deterministic_planning_agent import DeterministicPlanningAgent
from agents.deals import Opportunity
load_dotenv(override=True)
# Colors for logging
BG_BLUE = "\033[44m"
WHITE = "\033[37m"
RESET = "\033[0m"
# Colors for plot
CATEGORIES = [
'Appliances',
'Automotive',
'Cell_Phones_and_Accessories',
'Electronics','Musical_Instruments',
'Office_Products',
'Tools_and_Home_Improvement',
'Toys_and_Games',
"Industrial_and_Scientific",
"Arts_Crafts_and_Sewing",
"Handmade_Products",
"All_Beauty",
"Gift_Cards"
]
COLORS = ['red', 'blue', 'brown', 'orange', 'yellow', 'green' , 'purple', 'cyan', "purple", "black", "gray", "pink", "olive"]
def init_logging():
root = logging.getLogger()
root.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"[%(asctime)s] [Agents] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S %z",
)
handler.setFormatter(formatter)
root.addHandler(handler)
class DealAgentFramework:
DB = "products_vectordb_production"
MEMORY_FILENAME = "memory.json"
def __init__(self):
init_logging()
client = chromadb.PersistentClient(self.DB)
self.memory: List[Opportunity] = self.read_memory()
self.collection = client.get_or_create_collection("products")
self.planner = None # lazy initialization
def init_agent_as_needed(self):
if not self.planner:
self.log("Initializing Agent Framework...")
self.planner = DeterministicPlanningAgent(self.collection)
self.log("Agent Framework is ready!")
def read_memory(self) -> List[Opportunity]:
"""
Read the memory.json file and convert it into Opportunity pydantic model
:return: A list of Opportunity models, or an empty list if no memory file is found.
"""
if os.path.exists(self.MEMORY_FILENAME):
with open(self.MEMORY_FILENAME, "r") as f:
data: List[dict] = json.load(f)
result = [Opportunity(**opp) for opp in data]
return result
return []
def write_memory(self) -> None:
"""
Write opportunities into the memory.json file
"""
if self.memory:
data: List[dict] = [opp.model_dump() for opp in self.memory]
with open(self.MEMORY_FILENAME, "w") as f:
json.dump(data, f, indent=2)
@classmethod
def reset_memory(cls) -> None:
"""
Reset data in the memory.json file back to the default state
"""
data = []
if os.path.exists(cls.MEMORY_FILENAME):
with open(cls.MEMORY_FILENAME, "r") as f:
data = json.load(f)
truncated = data[:2]
with open(cls.MEMORY_FILENAME, "w") as f:
json.dump(truncated, f, indent=2)
def log(self, message: str):
text = BG_BLUE + WHITE + "[Agent Framework] " + message + RESET
logging.info(text)
def run(self) -> List[Opportunity]:
"""
Initialize and start to run the planner agent that manages the entire agentic workflow
Process:
1. Init the planner agent
2. Fetch result (Opportunity pydantic model -- single Opportunity model with the best deal picked out)
3. If result is fetched successfully, append it to the memory and write it into memory.json
4. Return the Opportunity models
:return: A list of Opportunity models
"""
self.log("Deal Agent Framework is initializing Planning Agent...")
if not self.planner:
self.init_agent_as_needed()
result: Opportunity = self.planner.plan(memory=self.memory)
self.log(f"Planning Agent has completed and returned {result}")
if result:
self.memory.append(result)
self.write_memory()
return self.memory
@classmethod
def get_plot_data(cls, max_datapoints=2000):
client = chromadb.PersistentClient(path=cls.DB)
collection = client.get_or_create_collection("products")
result = collection.get(
include=["embeddings", "documents", "metadatas"], limit=max_datapoints
)
vectors = np.array(result["embeddings"])
documents = result["documents"]
categories = [metadata["category"] for metadata in result["metadatas"]]
colors = [COLORS[CATEGORIES.index(c)] for c in categories]
tsne = TSNE(n_components=3, random_state=42, n_jobs=1)
reduced_vectors = tsne.fit_transform(vectors)
return documents, reduced_vectors, colors
if __name__ == "__main__":
DealAgentFramework().run()
|