File size: 5,019 Bytes
4b6fc9f
 
 
 
e45306a
1cea405
4b6fc9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c8ae01d
 
 
 
 
 
 
 
 
 
 
 
4b6fc9f
c8ae01d
 
 
4b6fc9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b6c2e6e
ff5132a
 
 
 
 
227c1cb
ff5132a
66a8403
ff5132a
 
 
 
 
 
 
d3f6e1e
 
 
 
 
 
 
 
 
 
 
 
 
86a1b2d
 
 
 
227c1cb
 
 
 
 
7062e3e
 
 
 
 
 
 
 
 
 
 
 
 
 
86a1b2d
ff5132a
 
 
4b6fc9f
1cea405
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66a8403
1cea405
 
 
 
 
 
 
 
c8ae01d
 
e45306a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1cea405
 
b17ec4a
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import sys
import logging
import json
from importlib.metadata import metadata
from typing import List, Optional
from dotenv import load_dotenv
import chromadb
from sklearn.manifold import TSNE
import numpy as np
### Internal classes
from agents.deterministic_planning_agent import DeterministicPlanningAgent
from agents.deals import Opportunity

load_dotenv(override=True)

# Colors for logging
BG_BLUE = "\033[44m"
WHITE = "\033[37m"
RESET = "\033[0m"

# Colors for plot
CATEGORIES = [
    'Appliances',
    'Automotive',
    'Cell_Phones_and_Accessories',
    'Electronics','Musical_Instruments',
    'Office_Products',
    'Tools_and_Home_Improvement',
    'Toys_and_Games',
    "Industrial_and_Scientific",
    "Arts_Crafts_and_Sewing",
    "Handmade_Products",
    "All_Beauty",
    "Gift_Cards"
]

COLORS = ['red', 'blue', 'brown', 'orange', 'yellow', 'green' , 'purple', 'cyan', "purple", "black", "gray", "pink", "olive"]



def init_logging():
    root = logging.getLogger()
    root.setLevel(logging.INFO)

    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "[%(asctime)s] [Agents] [%(levelname)s] %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S %z",
    )
    handler.setFormatter(formatter)
    root.addHandler(handler)


class DealAgentFramework:
    DB = "products_vectordb_production"
    MEMORY_FILENAME = "memory.json"

    def __init__(self):
        init_logging()
        client = chromadb.PersistentClient(self.DB)
        self.memory: List[Opportunity] = self.read_memory()
        self.collection = client.get_or_create_collection("products")
        self.planner = None # lazy initialization

    def init_agent_as_needed(self):
        if not self.planner:
            self.log("Initializing Agent Framework...")
            self.planner = DeterministicPlanningAgent(self.collection)
            self.log("Agent Framework is ready!")

    def read_memory(self) -> List[Opportunity]:
        """
        Read the memory.json file and convert it into Opportunity pydantic model
        :return: A list of Opportunity models, or an empty list if no memory file is found.
        """

        if os.path.exists(self.MEMORY_FILENAME):
            with open(self.MEMORY_FILENAME, "r") as f:
                data: List[dict] = json.load(f)
                result = [Opportunity(**opp) for opp in data]
            return result
        return []

    def write_memory(self) -> None:
        """
        Write opportunities into the memory.json file
        """
        if self.memory:
            data: List[dict] = [opp.model_dump() for opp in self.memory]
            with open(self.MEMORY_FILENAME, "w") as f:
                json.dump(data, f, indent=2)

    @classmethod
    def reset_memory(cls) -> None:
        """
        Reset data in the memory.json file back to the default state
        """
        data = []

        if os.path.exists(cls.MEMORY_FILENAME):
            with open(cls.MEMORY_FILENAME, "r") as f:
                data = json.load(f)
        truncated = data[:2]

        with open(cls.MEMORY_FILENAME, "w") as f:
            json.dump(truncated, f, indent=2)

    def log(self, message: str):
        text = BG_BLUE + WHITE + "[Agent Framework] " + message + RESET
        logging.info(text)

    def run(self) -> List[Opportunity]:
        """
        Initialize and start to run the planner agent that manages the entire agentic workflow

        Process:
        1. Init the planner agent
        2. Fetch result (Opportunity pydantic model -- single Opportunity model with the best deal picked out)
        3. If result is fetched successfully, append it to the memory and write it into memory.json
        4. Return the Opportunity models

        :return: A list of Opportunity models
        """
        self.log("Deal Agent Framework is initializing Planning Agent...")
        if not self.planner:
            self.init_agent_as_needed()

        result: Opportunity = self.planner.plan(memory=self.memory)
        self.log(f"Planning Agent has completed and returned {result}")
        if result:
            self.memory.append(result)
            self.write_memory()

        return self.memory


    @classmethod
    def get_plot_data(cls, max_datapoints=2000):
        client = chromadb.PersistentClient(path=cls.DB)
        collection = client.get_or_create_collection("products")

        result = collection.get(
            include=["embeddings", "documents", "metadatas"], limit=max_datapoints
        )
        vectors = np.array(result["embeddings"])
        documents = result["documents"]
        categories = [metadata["category"] for metadata in result["metadatas"]]
        colors = [COLORS[CATEGORIES.index(c)] for c in categories]

        tsne = TSNE(n_components=3, random_state=42, n_jobs=1)
        reduced_vectors = tsne.fit_transform(vectors)

        return documents, reduced_vectors, colors


if __name__ == "__main__":
    DealAgentFramework().run()