File size: 8,570 Bytes
8bab08d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# file: mcp/servers/store_server.py
#!/usr/bin/env python3
import json
import os
from pathlib import Path
from datetime import datetime
from aiohttp import web
import asyncio
class StoreServer:
"""Store MCP server with JSON persistence"""
def __init__(self):
self.data_dir = Path(__file__).parent.parent.parent / "data"
self.data_dir.mkdir(exist_ok=True)
self.prospects_file = self.data_dir / "prospects.json"
self.companies_file = self.data_dir / "companies_store.json"
self.facts_file = self.data_dir / "facts.json"
self.contacts_file = self.data_dir / "contacts.json"
self.handoffs_file = self.data_dir / "handoffs.json"
self.lock = asyncio.Lock()
self._load_data()
def _load_data(self):
"""Load data from files"""
self.prospects = self._load_json(self.prospects_file, [])
self.companies = self._load_json(self.companies_file, [])
self.facts = self._load_json(self.facts_file, [])
self.contacts = self._load_json(self.contacts_file, [])
self.handoffs = self._load_json(self.handoffs_file, [])
# Load suppressions
supp_file = self.data_dir / "suppression.json"
self.suppressions = self._load_json(supp_file, [])
def _load_json(self, path, default):
"""Load JSON file safely"""
if path.exists():
try:
with open(path) as f:
content = json.load(f)
# Return empty list if content is None or not a list/dict
if content is None:
return default
return content
except (json.JSONDecodeError, IOError):
return default
return default
def _save_json(self, path, data):
"""Save JSON file"""
with open(path, "w") as f:
json.dump(data, f, indent=2, default=str)
async def handle_rpc(self, request):
data = await request.json()
method = data.get("method")
params = data.get("params", {})
if method == "health":
return web.json_response({"result": "ok"})
async with self.lock:
if method == "store.save_prospect":
prospect = params["prospect"]
# Update or add
found = False
for i, p in enumerate(self.prospects):
if p["id"] == prospect["id"]:
self.prospects[i] = prospect
found = True
break
if not found:
self.prospects.append(prospect)
self._save_json(self.prospects_file, self.prospects)
return web.json_response({"result": "saved"})
elif method == "store.get_prospect":
prospect_id = params["id"]
for p in self.prospects:
if p["id"] == prospect_id:
return web.json_response({"result": p})
return web.json_response({"result": None})
elif method == "store.list_prospects":
return web.json_response({"result": self.prospects})
elif method == "store.save_company":
company = params["company"]
found = False
for i, c in enumerate(self.companies):
if c["id"] == company["id"]:
self.companies[i] = company
found = True
break
if not found:
self.companies.append(company)
self._save_json(self.companies_file, self.companies)
return web.json_response({"result": "saved"})
elif method == "store.get_company":
company_id = params["id"]
for c in self.companies:
if c["id"] == company_id:
return web.json_response({"result": c})
# Check seed file
seed_file = self.data_dir / "companies.json"
if seed_file.exists():
with open(seed_file) as f:
seeds = json.load(f)
for c in seeds:
if c["id"] == company_id:
return web.json_response({"result": c})
return web.json_response({"result": None})
elif method == "store.save_fact":
fact = params["fact"]
# Check if fact already exists by ID
existing_ids = {f.get("id") for f in self.facts if f.get("id")}
if fact.get("id") not in existing_ids:
self.facts.append(fact)
self._save_json(self.facts_file, self.facts)
return web.json_response({"result": "saved"})
elif method == "store.save_contact":
contact = params["contact"]
# Check if contact already exists by ID
existing_ids = {c.get("id") for c in self.contacts if c.get("id")}
if contact.get("id") not in existing_ids:
self.contacts.append(contact)
self._save_json(self.contacts_file, self.contacts)
return web.json_response({"result": "saved"})
elif method == "store.list_contacts_by_domain":
domain = params["domain"]
# Ensure contacts is a list
if not isinstance(self.contacts, list):
self.contacts = []
results = []
for c in self.contacts:
# Ensure contact has email field
if isinstance(c, dict) and "email" in c:
email = c["email"]
# Check if email ends with the domain
if email.endswith(f"@{domain}"):
results.append(c)
return web.json_response({"result": results})
elif method == "store.check_suppression":
supp_type = params["type"]
value = params["value"]
# Ensure suppressions is a list
if not isinstance(self.suppressions, list):
self.suppressions = []
for supp in self.suppressions:
if isinstance(supp, dict):
if supp.get("type") == supp_type and supp.get("value") == value:
# Check expiry
if supp.get("expires_at"):
try:
expires = datetime.fromisoformat(supp["expires_at"].replace("Z", "+00:00"))
if expires < datetime.utcnow():
continue
except:
pass
return web.json_response({"result": True})
return web.json_response({"result": False})
elif method == "store.save_handoff":
packet = params["packet"]
self.handoffs.append(packet)
self._save_json(self.handoffs_file, self.handoffs)
return web.json_response({"result": "saved"})
elif method == "store.clear_all":
self.prospects = []
self.companies = []
self.facts = []
self.contacts = []
self.handoffs = []
self._save_json(self.prospects_file, [])
self._save_json(self.companies_file, [])
self._save_json(self.facts_file, [])
self._save_json(self.contacts_file, [])
self._save_json(self.handoffs_file, [])
return web.json_response({"result": "cleared"})
return web.json_response({"error": f"Unknown method: {method}"}, status=400)
app = web.Application()
server = StoreServer()
app.router.add_post("/rpc", server.handle_rpc)
if __name__ == "__main__":
web.run_app(app, port=9004) |