|
|
import os |
|
|
import datetime |
|
|
import uuid |
|
|
import time |
|
|
import threading |
|
|
import traceback |
|
|
import logging |
|
|
from queue import Queue |
|
|
from dotenv import load_dotenv |
|
|
import json |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
|
POSTGRES_DSN = os.getenv("POSTGRES_DSN", "postgresql://user:password@localhost:5432/agentdb") |
|
|
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") |
|
|
BASE_MODEL_NAME = os.getenv("BASE_MODEL_NAME", "gpt-4o-mini") |
|
|
|
|
|
|
|
|
LEARNING_INTERVAL_HOURS = int(os.getenv("LEARNING_INTERVAL_HOURS", "6")) |
|
|
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from langchain_openai import ChatOpenAI, OpenAIEmbeddings |
|
|
from langchain.agents import AgentExecutor, create_react_agent, Tool |
|
|
|
|
|
|
|
|
|
|
|
from sqlalchemy import create_engine, Column, Integer, String, Float, Boolean, DateTime, Text, MetaData, Index |
|
|
from sqlalchemy.dialects.postgresql import UUID, JSONB |
|
|
|
|
|
|
|
|
from sqlalchemy.orm import sessionmaker, declarative_base |
|
|
import sqlalchemy |
|
|
|
|
|
|
|
|
import redis |
|
|
|
|
|
|
|
|
from sentence_transformers import SentenceTransformer |
|
|
|
|
|
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler |
|
|
from apscheduler.triggers.interval import IntervalTrigger |
|
|
|
|
|
|
|
|
import torch |
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig |
|
|
from peft import LoraConfig |
|
|
from trl import PPOTrainer, PPOConfig, AutoModelForCausalLMWithValueHead, create_reference_model |
|
|
from trl.core import LengthSampler |
|
|
|
|
|
|
|
|
Base = declarative_base() |
|
|
engine = create_engine(POSTGRES_DSN) |
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) |
|
|
|
|
|
|
|
|
class Experience(Base): |
|
|
__tablename__ = "experiences" |
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) |
|
|
timestamp = Column(DateTime, default=datetime.datetime.utcnow) |
|
|
goal = Column(Text) |
|
|
task = Column(Text) |
|
|
|
|
|
action_info = Column(JSONB) |
|
|
observation_summary = Column(Text) |
|
|
success = Column(Boolean) |
|
|
feedback_score = Column(Float, default=0.0) |
|
|
execution_time = Column(Float) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Task(Base): |
|
|
__tablename__ = "tasks" |
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) |
|
|
goal = Column(Text) |
|
|
task_description = Column(Text) |
|
|
status = Column(String, default="pending") |
|
|
created_at = Column(DateTime, default=datetime.datetime.utcnow) |
|
|
updated_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow) |
|
|
result = Column(Text, nullable=True) |
|
|
|
|
|
|
|
|
Base.metadata.create_all(bind=engine) |
|
|
|
|
|
|
|
|
redis_client = redis.from_url(REDIS_URL, decode_responses=True) |
|
|
TASK_QUEUE_KEY = "agent_task_queue" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
embedding_model_name = 'all-MiniLM-L6-v2' |
|
|
logging.info(f"Loading sentence transformer model: {embedding_model_name}...") |
|
|
|
|
|
sentence_model = SentenceTransformer(embedding_model_name, device='cpu') |
|
|
logging.info("Sentence transformer model loaded.") |
|
|
|
|
|
def get_vector(text: str): |
|
|
"""Generates a vector embedding for the given text.""" |
|
|
if not text: |
|
|
return None |
|
|
|
|
|
|
|
|
vector = sentence_model.encode(text, convert_to_numpy=True) |
|
|
|
|
|
|
|
|
return vector.tolist() |
|
|
|
|
|
|
|
|
def add_experience_db(task_info: dict, agent_output: dict, success: bool, feedback: float = 0.0, exec_time: float = 0.0): |
|
|
"""Adds an agent's experience to the PostgreSQL database.""" |
|
|
db = SessionLocal() |
|
|
try: |
|
|
|
|
|
task_vector = get_vector(task_info.get("task")) |
|
|
obs_summary = agent_output.get("output", "")[:500] |
|
|
observation_vector = get_vector(obs_summary) |
|
|
|
|
|
state_vector = None |
|
|
if task_vector and observation_vector: |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
action_info = { |
|
|
"action": agent_output.get("action", "unknown"), |
|
|
"input": agent_output.get("action_input", "unknown"), |
|
|
|
|
|
} |
|
|
|
|
|
exp = Experience( |
|
|
goal=task_info.get("goal"), |
|
|
task=task_info.get("task"), |
|
|
action_info=action_info, |
|
|
observation_summary=obs_summary, |
|
|
success=success, |
|
|
feedback_score=feedback, |
|
|
execution_time=exec_time, |
|
|
|
|
|
|
|
|
|
|
|
) |
|
|
db.add(exp) |
|
|
db.commit() |
|
|
logging.debug(f"Experience added to DB: Success={success}, Task={task_info.get('task')[:50]}") |
|
|
except Exception as e: |
|
|
db.rollback() |
|
|
logging.error(f"Failed to add experience to DB: {e}", exc_info=True) |
|
|
finally: |
|
|
db.close() |
|
|
|
|
|
def retrieve_relevant_experiences_db(query: str, k: int = 3) -> list[Experience]: |
|
|
"""Retrieves relevant experiences using vector similarity search (requires pgvector).""" |
|
|
db = SessionLocal() |
|
|
try: |
|
|
query_vector = get_vector(query) |
|
|
if query_vector is None: |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.warning("Vector search in DB requested but not implemented (requires pgvector). Returning empty list.") |
|
|
return [] |
|
|
except Exception as e: |
|
|
logging.error(f"Failed to retrieve experiences from DB: {e}", exc_info=True) |
|
|
return [] |
|
|
finally: |
|
|
db.close() |
|
|
|
|
|
|
|
|
|
|
|
tools = [ |
|
|
Tool(name="Search", func=search.run, description="..."), |
|
|
Tool(name="PythonREPL", func=python_repl.run, description="..."), |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
agent_llm = ChatOpenAI(model=BASE_MODEL_NAME, temperature=0.3, api_key=OPENAI_API_KEY) |
|
|
prompt_template = hub.pull("hwchase17/react-chat") |
|
|
agent = create_react_agent(agent_llm, tools, prompt_template) |
|
|
agent_executor = AgentExecutor( |
|
|
agent=agent, tools=tools, verbose=False, handle_parsing_errors=True, max_iterations=10, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
learning_lock = threading.Lock() |
|
|
ppo_trainer = None |
|
|
fine_tuned_model_path = "./fine_tuned_model" |
|
|
|
|
|
def calculate_reward(experience_data: dict) -> float: |
|
|
"""Calculates a reward score based on experience.""" |
|
|
reward = 0.0 |
|
|
if experience_data.get("success"): |
|
|
reward += 1.0 |
|
|
else: |
|
|
reward -= 1.0 |
|
|
|
|
|
|
|
|
exec_time = experience_data.get("execution_time", 1.0) |
|
|
if exec_time > 1.0: |
|
|
reward -= 0.1 * min(max(0, exec_time), 300)**0.5 |
|
|
|
|
|
|
|
|
reward += experience_data.get("feedback_score", 0.0) * 0.5 |
|
|
|
|
|
return reward |
|
|
|
|
|
def prepare_ppo_data(experiences: list[Experience]) -> list[dict]: |
|
|
"""Prepares data in the format expected by TRL's PPOTrainer.""" |
|
|
ppo_data = [] |
|
|
for exp in experiences: |
|
|
|
|
|
query_text = f"Goal: {exp.goal}\nTask: {exp.task}" |
|
|
|
|
|
response_text = exp.observation_summary |
|
|
|
|
|
reward_score = calculate_reward(exp.metadata) |
|
|
|
|
|
if query_text and response_text: |
|
|
ppo_data.append({ |
|
|
"query": query_text, |
|
|
"response": response_text, |
|
|
"reward": torch.tensor([reward_score], dtype=torch.float3_tensors) |
|
|
}) |
|
|
return ppo_data |
|
|
|
|
|
|
|
|
def run_learning_cycle(): |
|
|
"""The main learning process using TRL.""" |
|
|
global ppo_trainer |
|
|
if not torch.cuda.is_available(): |
|
|
logging.warning("CUDA not available. Skipping fine-tuning cycle.") |
|
|
return |
|
|
|
|
|
with learning_lock: |
|
|
logging.info(f"[Learning Cycle Triggered] - Device: {DEVICE}") |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
logging.info("Fetching recent experiences from PostgreSQL...") |
|
|
db = SessionLocal() |
|
|
try: |
|
|
|
|
|
recent_experiences = db.query(Experience).order_by(Experience.timestamp.desc()).limit(500).all() |
|
|
finally: |
|
|
db.close() |
|
|
|
|
|
if not recent_experiences or len(recent_experiences) < 50: |
|
|
logging.info(f"Not enough new experiences ({len(recent_experiences)}). Skipping fine-tuning.") |
|
|
return |
|
|
logging.info(f"Fetched {len(recent_experiences)} experiences for learning.") |
|
|
|
|
|
|
|
|
logging.info("Preparing data for PPO...") |
|
|
ppo_data = prepare_ppo_data(recent_experiences) |
|
|
if not ppo_data: |
|
|
logging.warning("No valid data points after preparation. Skipping fine-tuning.") |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.info("Setting up TRL PPOTrainer...") |
|
|
try: |
|
|
|
|
|
ppo_config = PPOConfig( |
|
|
model_name=BASE_MODEL_NAME, |
|
|
learning_rate=1.41e-5, |
|
|
batch_size=16, |
|
|
mini_batch_size=4, |
|
|
gradient_accumulation_steps=1, |
|
|
optimize_cuda_cache=True, |
|
|
|
|
|
|
|
|
ppo_epochs=4, |
|
|
seed=42, |
|
|
|
|
|
use_lora=True, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
lora_config = LoraConfig( |
|
|
r=16, lora_alpha=32, lora_dropout=0.05, bias="none", task_type="CAUSAL_LM" |
|
|
) |
|
|
tokenizer = AutoTokenizer.from_pretrained(ppo_config.model_name) |
|
|
if getattr(tokenizer, "pad_token", None) is None: |
|
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
|
|
|
|
|
model = AutoModelForCausalLMWithValueHead.from_pretrained( |
|
|
ppo_config.model_name, |
|
|
|
|
|
peft_config=lora_config, |
|
|
|
|
|
torch_dtype=torch.float16, |
|
|
device_map="auto" |
|
|
) |
|
|
|
|
|
ref_model = create_reference_model(model) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.info("Starting PPO Training Loop (Simulation - Actual requires dataset)...") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
time.sleep(10) |
|
|
|
|
|
|
|
|
logging.info("Saving fine-tuned LoRA adapters...") |
|
|
|
|
|
|
|
|
logging.info(f"Fine-tuned adapters saved to {fine_tuned_model_path}") |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Error during TRL setup or training: {e}", exc_info=True) |
|
|
|
|
|
del model, ref_model, ppo_trainer |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
logging.info(f"Learning cycle finished. Duration: {time.time() - start_time:.2f}s") |
|
|
|
|
|
|
|
|
def add_task_mq(task: str, goal: str): |
|
|
"""Adds a task to the Redis queue.""" |
|
|
task_id = str(uuid.uuid4()) |
|
|
task_data = json.dumps({"id": task_id, "task": task, "goal": goal}) |
|
|
try: |
|
|
redis_client.lpush(TASK_QUEUE_KEY, task_data) |
|
|
logging.info(f"Task {task_id} added to Redis queue: {task[:50]}...") |
|
|
except Exception as e: |
|
|
logging.error(f"Failed to add task to Redis: {e}") |
|
|
|
|
|
|
|
|
def agent_worker(worker_id: int): |
|
|
"""Processes tasks from the Redis queue.""" |
|
|
logging.info(f"Agent Worker-{worker_id} started.") |
|
|
while True: |
|
|
try: |
|
|
|
|
|
_, task_data_json = redis_client.brpop(TASK_QUEUE_KEY) |
|
|
task_info = json.loads(task_data_json) |
|
|
task_id = task_info["id"] |
|
|
task_desc = task_info["task"] |
|
|
goal = task_info["goal"] |
|
|
|
|
|
logging.info(f"Worker-{worker_id} processing Task {task_id}: {task_desc[:50]}...") |
|
|
start_time = time.time() |
|
|
success = False |
|
|
final_output = None |
|
|
agent_result = {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
input_messages = [ |
|
|
SystemMessage(content=f"Your long term goal is: {goal}. Think step-by-step."), |
|
|
|
|
|
HumanMessage(content=f"Current task: {task_desc}") |
|
|
] |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
agent_result = agent_executor.invoke({"input": input_messages}) |
|
|
final_output = agent_result.get("output", "No output.") |
|
|
|
|
|
success = not any(err in final_output.lower() for err in ["error", "fail", "unable"]) |
|
|
except Exception as e: |
|
|
logging.error(f"Worker-{worker_id} Task {task_id} failed during execution: {e}", exc_info=True) |
|
|
final_output = f"Agent execution failed: {e}" |
|
|
success = False |
|
|
agent_result = {"output": final_output, "action": "error"} |
|
|
|
|
|
|
|
|
exec_time = time.time() - start_time |
|
|
|
|
|
feedback_score = 0.0 |
|
|
add_experience_db(task_info, agent_result, success, feedback_score, exec_time) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.info(f"Worker-{worker_id} finished Task {task_id}. Success: {success}. Time: {exec_time:.2f}s") |
|
|
|
|
|
except redis.exceptions.ConnectionError as e: |
|
|
logging.error(f"Worker-{worker_id} Redis connection error: {e}. Retrying in 10s...") |
|
|
time.sleep(10) |
|
|
except Exception as e: |
|
|
logging.error(f"Worker-{worker_id} encountered an unexpected error: {e}", exc_info=True) |
|
|
time.sleep(5) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
logging.info("Initializing Agent System...") |
|
|
|
|
|
|
|
|
scheduler = BackgroundScheduler(daemon=True) |
|
|
scheduler.add_job( |
|
|
run_learning_cycle, |
|
|
trigger=IntervalTrigger(hours=LEARNING_INTERVAL_HOURS), |
|
|
id="learning_job", |
|
|
name="Fine-tuning Learning Cycle", |
|
|
replace_existing=True |
|
|
) |
|
|
scheduler.start() |
|
|
logging.info(f"Background learning scheduler started. Interval: {LEARNING_INTERVAL_HOURS} hours.") |
|
|
|
|
|
|
|
|
num_workers = int(os.getenv("NUM_WORKERS", "2")) |
|
|
worker_threads = [] |
|
|
for i in range(num_workers): |
|
|
thread = threading.Thread(target=agent_worker, args=(i+1,), daemon=True) |
|
|
thread.start() |
|
|
worker_threads.append(thread) |
|
|
logging.info(f"{num_workers} Agent worker threads started.") |
|
|
|
|
|
|
|
|
add_task_mq("Explain the difference between LoRA and full fine-tuning for LLMs.", |
|
|
"Understand AI model optimization techniques.") |
|
|
add_task_mq("Write a Python script using pandas to read a CSV file named 'data.csv' and print the first 5 rows.", |
|
|
"Develop data processing scripts.") |
|
|
|
|
|
logging.info("Agent system is running. Workers processing tasks from Redis.") |
|
|
logging.info("Press Ctrl+C to stop.") |
|
|
|
|
|
try: |
|
|
|
|
|
while True: |
|
|
time.sleep(60) |
|
|
|
|
|
logging.debug("Main thread alive...") |
|
|
except KeyboardInterrupt: |
|
|
logging.info("Shutdown signal received...") |
|
|
scheduler.shutdown() |
|
|
|
|
|
|
|
|
logging.info("Agent system stopped.") |