cx_ai_agent_v1 / mcp /servers /store_server.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
# file: mcp/servers/store_server.py
#!/usr/bin/env python3
import json
import os
from pathlib import Path
from datetime import datetime
from aiohttp import web
import asyncio
class StoreServer:
"""Store MCP server with JSON persistence"""
def __init__(self):
self.data_dir = Path(__file__).parent.parent.parent / "data"
self.data_dir.mkdir(exist_ok=True)
self.prospects_file = self.data_dir / "prospects.json"
self.companies_file = self.data_dir / "companies_store.json"
self.facts_file = self.data_dir / "facts.json"
self.contacts_file = self.data_dir / "contacts.json"
self.handoffs_file = self.data_dir / "handoffs.json"
self.lock = asyncio.Lock()
self._load_data()
def _load_data(self):
"""Load data from files"""
self.prospects = self._load_json(self.prospects_file, [])
self.companies = self._load_json(self.companies_file, [])
self.facts = self._load_json(self.facts_file, [])
self.contacts = self._load_json(self.contacts_file, [])
self.handoffs = self._load_json(self.handoffs_file, [])
# Load suppressions
supp_file = self.data_dir / "suppression.json"
self.suppressions = self._load_json(supp_file, [])
def _load_json(self, path, default):
"""Load JSON file safely"""
if path.exists():
try:
with open(path) as f:
content = json.load(f)
# Return empty list if content is None or not a list/dict
if content is None:
return default
return content
except (json.JSONDecodeError, IOError):
return default
return default
def _save_json(self, path, data):
"""Save JSON file"""
with open(path, "w") as f:
json.dump(data, f, indent=2, default=str)
async def handle_rpc(self, request):
data = await request.json()
method = data.get("method")
params = data.get("params", {})
if method == "health":
return web.json_response({"result": "ok"})
async with self.lock:
if method == "store.save_prospect":
prospect = params["prospect"]
# Update or add
found = False
for i, p in enumerate(self.prospects):
if p["id"] == prospect["id"]:
self.prospects[i] = prospect
found = True
break
if not found:
self.prospects.append(prospect)
self._save_json(self.prospects_file, self.prospects)
return web.json_response({"result": "saved"})
elif method == "store.get_prospect":
prospect_id = params["id"]
for p in self.prospects:
if p["id"] == prospect_id:
return web.json_response({"result": p})
return web.json_response({"result": None})
elif method == "store.list_prospects":
return web.json_response({"result": self.prospects})
elif method == "store.save_company":
company = params["company"]
found = False
for i, c in enumerate(self.companies):
if c["id"] == company["id"]:
self.companies[i] = company
found = True
break
if not found:
self.companies.append(company)
self._save_json(self.companies_file, self.companies)
return web.json_response({"result": "saved"})
elif method == "store.get_company":
company_id = params["id"]
for c in self.companies:
if c["id"] == company_id:
return web.json_response({"result": c})
# Check seed file
seed_file = self.data_dir / "companies.json"
if seed_file.exists():
with open(seed_file) as f:
seeds = json.load(f)
for c in seeds:
if c["id"] == company_id:
return web.json_response({"result": c})
return web.json_response({"result": None})
elif method == "store.save_fact":
fact = params["fact"]
# Check if fact already exists by ID
existing_ids = {f.get("id") for f in self.facts if f.get("id")}
if fact.get("id") not in existing_ids:
self.facts.append(fact)
self._save_json(self.facts_file, self.facts)
return web.json_response({"result": "saved"})
elif method == "store.save_contact":
contact = params["contact"]
# Check if contact already exists by ID
existing_ids = {c.get("id") for c in self.contacts if c.get("id")}
if contact.get("id") not in existing_ids:
self.contacts.append(contact)
self._save_json(self.contacts_file, self.contacts)
return web.json_response({"result": "saved"})
elif method == "store.list_contacts_by_domain":
domain = params["domain"]
# Ensure contacts is a list
if not isinstance(self.contacts, list):
self.contacts = []
results = []
for c in self.contacts:
# Ensure contact has email field
if isinstance(c, dict) and "email" in c:
email = c["email"]
# Check if email ends with the domain
if email.endswith(f"@{domain}"):
results.append(c)
return web.json_response({"result": results})
elif method == "store.check_suppression":
supp_type = params["type"]
value = params["value"]
# Ensure suppressions is a list
if not isinstance(self.suppressions, list):
self.suppressions = []
for supp in self.suppressions:
if isinstance(supp, dict):
if supp.get("type") == supp_type and supp.get("value") == value:
# Check expiry
if supp.get("expires_at"):
try:
expires = datetime.fromisoformat(supp["expires_at"].replace("Z", "+00:00"))
if expires < datetime.utcnow():
continue
except:
pass
return web.json_response({"result": True})
return web.json_response({"result": False})
elif method == "store.save_handoff":
packet = params["packet"]
self.handoffs.append(packet)
self._save_json(self.handoffs_file, self.handoffs)
return web.json_response({"result": "saved"})
elif method == "store.clear_all":
self.prospects = []
self.companies = []
self.facts = []
self.contacts = []
self.handoffs = []
self._save_json(self.prospects_file, [])
self._save_json(self.companies_file, [])
self._save_json(self.facts_file, [])
self._save_json(self.contacts_file, [])
self._save_json(self.handoffs_file, [])
return web.json_response({"result": "cleared"})
return web.json_response({"error": f"Unknown method: {method}"}, status=400)
app = web.Application()
server = StoreServer()
app.router.add_post("/rpc", server.handle_rpc)
if __name__ == "__main__":
web.run_app(app, port=9004)