neuravend / neuravend.py
Joshnotfound's picture
Update neuravend.py
03e2321 verified
# neuravend.py
# NEURAVEND - Core multi-agent vendor decision system (importable)
# Paste this file as neuravend.py (lowercase)
import os
import json
import time
import logging
import random
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional, Tuple
# CRITICAL FIX: Ensure all required third-party libraries are imported
import numpy as np
import pandas as pd
import requests
# --- Logging and Metrics ---
LOG_FN = "neuravend.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
handlers=[logging.FileHandler(LOG_FN, mode="w"), logging.StreamHandler()]
)
logger = logging.getLogger("NEURAVEND")
# Metrics
metrics = {"runs": 0, "gemini_calls": 0, "search_calls": 0, "iterations_total": 0}
def save_metrics():
with open("metrics.json", "w") as f:
json.dump(metrics, f, indent=2)
# Session persistence
SESSION_FN = "session.json"
@dataclass
class Session:
session_id: str = field(default_factory=lambda: f"session-{int(time.time())}")
vendors_df_json: str = ""
topsis_results: Dict[str, Any] = field(default_factory=dict)
history: List[Dict[str, Any]] = field(default_factory=list)
mem: Dict[str, Any] = field(default_factory=dict)
reasoning_traces: List[Dict[str, Any]] = field(default_factory=list)
def persist(self):
with open(SESSION_FN, "w") as f:
json.dump(asdict(self), f, indent=2)
logger.info("Session persisted")
@classmethod
def load(cls):
if os.path.exists(SESSION_FN):
with open(SESSION_FN, "r") as f:
data = json.load(f)
logger.info("Loaded existing session from disk")
return cls(**data)
return None
# --- Gemini Configuration (Fallback Safe) ---
def load_gemini():
key = os.environ.get("GEMINI_API_KEY") or os.environ.get("AI_API_KEY")
if not key:
logger.info("No Gemini key found. Running in offline/fallback mode.")
return None, False
try:
import google.generativeai as genai
genai.configure(api_key=key)
try:
model = genai.GenerativeModel("gemini-1.5-flash")
except Exception as e:
logger.warning(f"Could not load gemini-1.5-flash model: {e}")
model = None
if model:
logger.info("Gemini configured (optional).")
return (genai, model), True
except Exception as e:
logger.warning(f"Gemini SDK init failed: {e}; continuing offline.")
return None, False
GENAI_ENV, USE_GEMINI = load_gemini()
# Seed
def _set_seed():
seed = int((time.time()*1000) % (2**31 - 1))
random.seed(seed)
np.random.seed(seed)
logger.info("Internal seed set.")
_set_seed()
# --- Vendor Synthesis ---
NAME_WORDS = ["Astra", "Blue", "Nova", "Prime", "Eco", "Vertex", "Luma", "Grid", "Core", "Pioneer", "Green"]
SUFFIX = ["Ltd", "Pvt Ltd", "Inc", "LLC", "Corp"]
def gen_name(i: int) -> str:
return f"{random.choice(NAME_WORDS)}{random.choice(NAME_WORDS)}-{1000+i} {random.choice(SUFFIX)}"
def synthesize_vendors(n: int = 10, scenario: str = "Normal") -> pd.DataFrame:
rows = []
for i in range(n):
name = gen_name(i)
# Data synthesis logic based on scenario
if scenario == "Disruption":
cost = int(max(1000, random.gauss(11000, 3000)))
quality = int(min(100, max(40, random.gauss(72, 10))))
delivery = int(max(1, random.gauss(12, 6)))
risk = int(min(100, max(5, random.gauss(55, 18))))
ethics = int(min(100, max(25, random.gauss(65, 14))))
elif scenario == "Regulation":
cost = int(max(1000, random.gauss(10500, 2200)))
quality = int(min(100, max(60, random.gauss(78, 7))))
delivery = int(max(1, random.gauss(8, 3)))
risk = int(min(100, max(5, random.gauss(35, 10))))
ethics = int(min(100, max(40, random.gauss(80, 8))))
elif scenario == "LowBudget":
cost = int(max(1000, random.gauss(8000, 1500)))
quality = int(min(100, max(30, random.gauss(65, 12))))
delivery = int(max(1, random.gauss(9, 4)))
risk = int(min(100, max(10, random.gauss(45, 15))))
ethics = int(min(100, max(20, random.gauss(60, 15))))
else: # Normal
cost = int(max(1000, random.gauss(10000, 2000)))
quality = int(min(100, max(45, random.gauss(75, 8))))
delivery = int(max(1, random.gauss(7, 2)))
risk = int(min(100, max(5, random.gauss(40, 12))))
ethics = int(min(100, max(35, random.gauss(70, 12))))
rows.append([name, cost, quality, delivery, risk, ethics])
df = pd.DataFrame(rows, columns=["VendorID", "Cost", "Quality", "DeliveryTime", "Risk", "Ethics"])
for col in ["Cost", "Quality", "DeliveryTime", "Risk", "Ethics"]:
df[col] = df[col].astype(int)
return df
# --- LLM and Offline Fallback ---
# Gemini safe-call
def gemini_safe_call(prompt: str, max_output_tokens: int = 200) -> Tuple[bool, str]:
global USE_GEMINI
if not USE_GEMINI or not GENAI_ENV:
return False, ""
try:
genai, model = GENAI_ENV
resp = model.generate_content(prompt,
generation_config=genai.types.GenerateContentConfig(
max_output_tokens=max_output_tokens
))
metrics["gemini_calls"] += 1; save_metrics()
text = getattr(resp, "text", str(resp))
return True, text.strip()
except Exception as e:
logger.warning(f"Gemini call failed: {e}")
USE_GEMINI = False
return False, str(e)
return False, ""
# Offline enrichment
def offline_profile_str(row: pd.Series) -> str:
return (f"{row['VendorID']}: cost {row['Cost']}, quality {row['Quality']}/100, "
f"delivery {row['DeliveryTime']} days, risk {row['Risk']}/100, ethics {row.get('Ethics', 0)}/100.")
# --- TOPSIS MCDA ---
def topsis_scores(df: pd.DataFrame, criteria: List[str], weights: List[float], criteria_type: Dict[str, str]) -> pd.DataFrame:
X = df[criteria].astype(float).values
w = np.array(weights, dtype=float)
if w.sum() == 0:
# Handle zero weights by raising an error or defaulting, raise for robustness
raise ValueError("Weights sum to zero")
w = w / w.sum()
# 1. Normalization (Vector Normalization)
denom = np.sqrt((X**2).sum(axis=0)); denom[denom == 0] = 1e-12
R = X / denom
# 2. Weighted Normalized Decision Matrix
V = R * w
# 3. Determine Ideal Best (A+) and Ideal Worst (A-)
m = V.shape[1]
ideal_best = np.zeros(m); ideal_worst = np.zeros(m)
for j, crit in enumerate(criteria):
if criteria_type[crit] == 'benefit':
ideal_best[j] = V[:, j].max(); ideal_worst[j] = V[:, j].min()
else: # cost
ideal_best[j] = V[:, j].min(); ideal_worst[j] = V[:, j].max()
# 4. Calculate Separation Measure (Euclidean distance)
dist_best = np.sqrt(((V - ideal_best) ** 2).sum(axis=1))
dist_worst = np.sqrt(((V - ideal_worst) ** 2).sum(axis=1))
# 5. Calculate Relative Closeness (TOPSIS Score)
denom2 = dist_best + dist_worst; denom2[denom2 == 0] = 1e-12
score = dist_worst / denom2
# 6. Final Results
res = df.copy().reset_index(drop=True)
res["TOPSIS_Score"] = score
res["Rank"] = res["TOPSIS_Score"].rank(ascending=False, method="min").astype(int)
return res.sort_values("Rank").reset_index(drop=True)
# --- Agent System ---
class Agent:
def __init__(self, name: str, session: Session):
self.name = name
self.session = session
self.log = logging.getLogger(name)
def log_event(self, tag: str, details: Dict[str, Any]):
self.session.history.append({"time": time.time(), "agent": self.name, "tag": tag, "details": details})
self.session.persist()
self.log.info(f"{self.name}:{tag}")
class DataRetrievalAgent(Agent):
def __init__(self, name: str, session: Session, vendors_df: pd.DataFrame):
super().__init__(name, session)
self.vendors_df = vendors_df
def run(self, query: str = "general market scan") -> pd.DataFrame:
enriched = []
for idx, row in self.vendors_df.iterrows():
prompt = f"Provide a 1-2 sentence procurement profile for: {row.to_dict()}"
ok, text = gemini_safe_call(prompt, max_output_tokens=120) if USE_GEMINI else (False, "")
enriched.append(text if (ok and text) else offline_profile_str(row))
self.vendors_df["Description"] = enriched
self.log_event("data_enriched", {"count": len(enriched)})
return self.vendors_df
class EvaluationAgent(Agent):
def __init__(self, name: str, session: Session):
super().__init__(name, session)
def run(self, vendors_df: pd.DataFrame, scenarios: Dict[str, List[float]], criteria: List[str], criteria_type: Dict[str, str], perturb: bool = True) -> Dict[str, Any]:
results = {}
for scen_name, weights in scenarios.items():
# Weight perturbation logic
if perturb:
delta = np.random.normal(0, 0.02, size=len(weights))
w = np.array(weights) + delta
w = np.clip(w, 0, None)
if w.sum() == 0:
w = np.array(weights)
w = list(w / w.sum())
else:
w = weights
# CRITICAL CHECK: Weights length validation
if len(w) != len(criteria):
self.log.error(f"Weights length ({len(w)}) does not match criteria length ({len(criteria)}) for scenario {scen_name}. Skipping.")
continue
res = topsis_scores(vendors_df, criteria, w, criteria_type)
results[scen_name] = {"meta": {"weights": [float(x) for x in w]}, "result_table": res.to_dict(orient="list")}
self.log_event("scenario_scored", {"scenario": scen_name, "top1": res.iloc[0]["VendorID"]})
self.session.topsis_results.update(results) # Use update to preserve compliance iterations
self.session.persist()
return results
class EthicsAgent(Agent):
def __init__(self, name: str, session: Session):
super().__init__(name, session)
def run(self, vendors_df: pd.DataFrame) -> pd.DataFrame:
penalties = []
for _, r in vendors_df.iterrows():
eth = r.get("Ethics", 50)
penalty = max(0, (70 - eth) / 70)
penalties.append(round(penalty, 3))
vendors_df["EthicsPenalty"] = penalties
self.log_event("ethics_evaluated", {"avg_penalty": float(np.mean(penalties))})
return vendors_df
class ComplianceAgent(Agent):
def __init__(self, name: str, session: Session, risk_threshold: float = 50, max_iters: int = 5):
super().__init__(name, session)
self.risk_threshold = risk_threshold
self.max_iters = max_iters
def _assess(self, row: Dict[str, Any]) -> List[str]:
issues = []
# Check risk threshold and quality compliance
if row.get("Risk", 100) > self.risk_threshold:
issues.append("HighRisk")
if row.get("Quality", 0) < 60:
issues.append("LowQuality")
return issues
def find_compliant(self, topsis_df: pd.DataFrame) -> Optional[Dict[str, Any]]:
for _, r in topsis_df.iterrows():
if not self._assess(r.to_dict()):
return {"vendor": r["VendorID"], "row": r.to_dict()}
return None
def run(self, session: Session, scenarios: Dict[str, List[float]], criteria: List[str], criteria_type: Dict[str, str]) -> Dict[str, Any]:
iterations = 0
chosen = None
chosen_scenario = None
# 1. Check existing results for compliance
for name, out in session.topsis_results.items():
if "result_table" not in out:
continue
try:
df = pd.DataFrame(out["result_table"])
except Exception as e:
self.log.error(f"Failed to create DataFrame for scenario {name}: {e}")
continue
candidate = self.find_compliant(df)
if candidate:
chosen = candidate
chosen_scenario = name
break
# 2. Iteratively re-run if no compliant vendor is found
while chosen is None and iterations < self.max_iters:
iterations += 1
# Define new weights to try finding a compliant vendor
if iterations == 1:
new = {"W_risk_strong": [0.1, 0.2, 0.2, 0.5]}
elif iterations == 2:
new = {"W_quality_strong": [0.15, 0.6, 0.15, 0.1]}
else:
new = {"W_balanced_recheck": [0.25, 0.35, 0.2, 0.2]}
eval_agent = EvaluationAgent("EvalInner", session)
# Recreate DataFrame from JSON string for inner loop consistency
vendors_df_from_json = pd.read_json(session.vendors_df_json, orient="records")
# Run evaluation with new weights
new_results = eval_agent.run(vendors_df_from_json, new, criteria, criteria_type, perturb=True)
session.topsis_results.update(new_results)
session.persist()
# Check new results
for name, out in new_results.items():
if "result_table" not in out:
continue
try:
df = pd.DataFrame(out["result_table"])
except Exception:
continue
candidate = self.find_compliant(df)
if candidate:
chosen = candidate
chosen_scenario = name
break
metrics["iterations_total"] += iterations
save_metrics()
self.log_event("compliance_completed", {"iterations": iterations, "chosen": bool(chosen)})
return {"iterations": iterations, "found": bool(chosen), "chosen": chosen, "chosen_scenario": chosen_scenario}
class DecisionAgent(Agent):
def __init__(self, name: str, session: Session):
super().__init__(name, session)
def run(self, chosen: Optional[Dict[str, Any]], initial_top: Optional[Dict[str, Any]], scenario_name: str, risk_threshold: float) -> str:
chosen_vendor_id = chosen.get('vendor') if chosen else 'None'
initial_top_id = initial_top.get('VendorID') if initial_top else 'None'
prompt = (f"You are a procurement analyst. Explain why '{chosen_vendor_id}' is selected over '{initial_top_id}'. "
f"Include key tradeoffs and next steps. Scenario: {scenario_name}, risk threshold: {risk_threshold}.")
ok, text = gemini_safe_call(prompt, max_output_tokens=250) if USE_GEMINI else (False, "")
if ok and text:
report = text
else:
report = (f"Selected **{chosen_vendor_id}**: meets compliance checks (Risk < {risk_threshold}, Quality > 60) and offers acceptable tradeoffs. "
f"Initial top candidate (before compliance check): **{initial_top_id}**. Next steps: conduct due diligence and reference checks.")
self.session.mem["last_explanation"] = report
self.session.persist()
self.log_event("decision_explained", {"chosen": chosen})
return report
class ReportAgent(Agent):
def __init__(self, name: str, session: Session):
super().__init__(name, session)
def run(self, session: Session) -> Dict[str, Any]:
summary = {"session_id": session.session_id, "scenarios": list(session.topsis_results.keys())}
self.log_event("report_generated", {"scenarios": len(summary["scenarios"])})
return summary
# --- Orchestrator ---
def run_full_pipeline(n_vendors: int = 10, profile_weights: Optional[Dict[str, List[float]]] = None, scenario: str = "Normal", risk_threshold: float = 50) -> Tuple[Session, Dict[str, Any]]:
session = Session.load() or Session()
# --- 1. Synthesis & Setup ---
vendors_df = synthesize_vendors(n_vendors, scenario)
session.vendors_df_json = vendors_df.to_json(orient="records")
session.persist()
# --- 2. Data & Ethics Processing ---
data_agent = DataRetrievalAgent("DataAgent", session, vendors_df)
vendors_df = data_agent.run(query="market scan")
ethics_agent = EthicsAgent("EthicsAgent", session)
vendors_df = ethics_agent.run(vendors_df)
criteria = ["Cost", "Quality", "DeliveryTime", "Risk"]
criteria_type = {"Cost": "cost", "Quality": "benefit", "DeliveryTime": "cost", "Risk": "cost"}
scenarios = profile_weights or {"ProfileBase": [0.25, 0.35, 0.2, 0.2], "Equal": [0.25, 0.25, 0.25, 0.25]}
# --- 3. Initial Evaluation ---
eval_agent = EvaluationAgent("EvalAgent", session)
eval_agent.run(vendors_df, scenarios, criteria, criteria_type, perturb=True)
# --- 4. Compliance Check & Rerun Loop ---
comp_agent = ComplianceAgent("CompAgent", session, risk_threshold=risk_threshold, max_iters=4)
comp_out = comp_agent.run(session, scenarios, criteria, criteria_type)
chosen = comp_out.get("chosen")
chosen_scenario = comp_out.get("chosen_scenario") or list(scenarios.keys())[0]
# --- 5. Decision & Reporting ---
initial_top = None
if chosen_scenario and session.topsis_results.get(chosen_scenario):
rt = session.topsis_results[chosen_scenario]
if "result_table" in rt:
df = pd.DataFrame(rt["result_table"])
initial_top = df.iloc[0].to_dict()
decision_agent = DecisionAgent("DecisionAgent", session)
report_text = decision_agent.run(chosen or {"vendor": "None"}, initial_top, chosen_scenario, risk_threshold)
report_agent = ReportAgent("ReportAgent", session)
summary = report_agent.run(session)
metrics["runs"] += 1
save_metrics()
print(f"NEURAVEND run: session={session.session_id} scenario={scenario} vendors={n_vendors}")
return session, summary