smriti-ai / handler.py
luciferai-devil's picture
Deploy Smriti AI Hugging Face handler
6922a90 verified
"""Hugging Face custom inference handler for Smriti AI.
This file is intentionally deployment glue. Core memory, retrieval, graph, and
identity behavior comes from the installed `smriti` package.
"""
from __future__ import annotations
import json
import logging
import os
import re
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from threading import RLock
from typing import Any, Dict, List, Optional, Tuple
VENDOR_SRC = Path(__file__).resolve().parent / "smriti_vendor"
if VENDOR_SRC.exists() and str(VENDOR_SRC) not in sys.path:
sys.path.insert(0, str(VENDOR_SRC))
from smriti import IdentityFingerprint, MemPalaceLite, SmritiAILite # noqa: E402
from smriti.backends import ( # noqa: E402
JsonBackend,
MemoryBackend,
MemoryCipher,
PostgresBackend,
RedisBackend,
SqliteBackend,
)
from smriti.production_safety import ( # noqa: E402
GEMMA4_MODEL_ID,
is_production_mode,
validate_model_id_for_environment,
)
LOGGER = logging.getLogger("smriti.hf_handler")
if not LOGGER.handlers:
logging.basicConfig(level=os.getenv("SMRITI_LOG_LEVEL", "INFO"))
DEFAULT_CONFIG = {
"project": "Smriti AI",
"base_model": GEMMA4_MODEL_ID,
"retrieval_mode": "semantic_graph_identity",
"memory_backend": "json",
"public_demo": False,
"max_memory_entries": 1000,
"enable_identity": True,
"enable_graph": True,
"enable_encryption": True,
}
class EndpointHandler:
"""Hugging Face custom inference endpoint handler."""
def __init__(self, path: str = ""):
self.root = _resolve_root(path)
self.config = _load_config(self.root / "config.json")
self.lock = RLock()
self.memories: Dict[str, MemPalaceLite] = {}
self.identities: Dict[str, IdentityFingerprint] = {}
self.backend_warning: Optional[str] = None
self.endpoint_url = os.getenv("HF_ENDPOINT_URL", "").strip()
base_model_env = os.getenv("BASE_MODEL_ID")
base_model_raw = base_model_env if base_model_env is not None else self.config.get("base_model", "")
self.base_model_id = _clean_model_id(
base_model_raw,
allow_empty=bool(self.endpoint_url) or (base_model_env is not None and not is_production_mode()),
)
self.hf_token = os.getenv("HF_TOKEN", "").strip()
self.default_retrieval_mode = os.getenv(
"SMRITI_RETRIEVAL_MODE",
str(self.config.get("retrieval_mode", "semantic_graph_identity")),
)
self.max_memory_entries = _int_env(
"SMRITI_MAX_MEMORY_ENTRIES",
int(self.config.get("max_memory_entries", 1000)),
)
self.public_demo = _bool_env("SMRITI_PUBLIC_DEMO", bool(self.config.get("public_demo", False)))
self.enable_graph_default = bool(self.config.get("enable_graph", True))
self.enable_identity_default = bool(self.config.get("enable_identity", True))
self.enable_encryption = bool(self.config.get("enable_encryption", True))
self.backend, self.backend_name = self._init_backend()
self.model = None
self.tokenizer = None
self.device = "cpu"
if self.endpoint_url:
LOGGER.info(
"Smriti AI handler using remote model endpoint; backend=%s retrieval=%s",
self.backend_name,
self.default_retrieval_mode,
)
elif self.base_model_id:
self._load_local_model(self.base_model_id)
else:
LOGGER.warning(
"No BASE_MODEL_ID or HF_ENDPOINT_URL configured; handler will run memory-only."
)
LOGGER.info(
"Smriti AI handler ready: base_model=%s remote_endpoint=%s backend=%s retrieval=%s encryption=%s public_demo=%s",
self.base_model_id or "memory-only",
bool(self.endpoint_url),
self.backend_name,
self.default_retrieval_mode,
self.enable_encryption and bool(os.getenv("SMRITI_ENCRYPTION_KEY") or os.getenv("SMRITI_MEMORY_KEY")),
self.public_demo,
)
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
start = time.perf_counter()
try:
inputs, parameters = _normalize_request(data)
operation = str(inputs.get("operation", "chat")).lower()
if operation == "health":
return self._health(start)
if operation == "delete_memory":
return self._delete_memory(inputs, start)
if operation != "chat":
return _error(f"Unsupported operation: {operation}", start)
return self._chat(inputs, parameters, start)
except Exception as exc: # Defensive boundary for endpoint runtimes.
LOGGER.exception("Unhandled Smriti AI handler error")
return _error(f"handler_error:{exc.__class__.__name__}: {exc}", start)
# ------------------------------------------------------------------
# Operation handlers
# ------------------------------------------------------------------
def _chat(
self,
inputs: Dict[str, Any],
parameters: Dict[str, Any],
start: float,
) -> Dict[str, Any]:
user_id = str(inputs.get("user_id") or "").strip()
message = str(inputs.get("message") or "").strip()
topic_id = str(inputs.get("topic_id") or "general").strip() or "general"
if not user_id:
return _error("user_id is required", start)
if not message:
return _error("message is required for chat operation", start)
retrieval_mode = str(inputs.get("retrieval_mode") or self.default_retrieval_mode)
base_retrieval = _base_retrieval_mode(retrieval_mode)
include_graph = self.enable_graph_default and "graph" in retrieval_mode
identity_enabled = self.enable_identity_default and "identity" in retrieval_mode
with self.lock:
memory = self._get_memory(user_id, topic_id, base_retrieval)
context, retrieved_memories, graph_facts, retrieval_warning = self._retrieve_context(
memory,
user_id,
topic_id,
message,
include_graph,
)
identity = self._get_identity(user_id, identity_enabled)
agent = SmritiAILite(
model=self.model,
tokenizer=self.tokenizer,
retrieval_mode=base_retrieval,
session_id=user_id,
topic_id=topic_id,
memory=memory,
identity=identity,
auto_device=False,
)
agent.build_prompt = lambda user_input: _build_prompt(
agent,
memory,
user_id,
topic_id,
user_input,
include_graph,
identity_enabled,
)
generation_calls = 0
def generate(prompt: str, max_tokens: int = 256) -> str:
nonlocal generation_calls
generation_calls += 1
return self._generate_text(prompt, parameters, max_tokens=max_tokens)
agent._generate = generate # type: ignore[method-assign]
try:
response = agent.chat(message)
except Exception as exc:
LOGGER.exception("Model generation failed")
return _error(f"model_generation_failed:{exc.__class__.__name__}: {exc}", start)
response = _stabilize_recall_answer(message, response, retrieved_memories, graph_facts)
_replace_last_assistant_history(memory, response)
identity_check = agent.identity.evaluate_output(response) if identity_enabled else None
save_warning = self._save_memory(user_id, memory)
warnings = [item for item in [self.backend_warning, retrieval_warning, save_warning] if item]
return {
"response": response,
"retrieved_memories": retrieved_memories,
"graph_facts": graph_facts,
"identity": {
"enabled": identity_enabled,
"drift_score": float(identity_check.distance) if identity_check else 0.0,
"refinement_triggered": generation_calls > 1,
},
"latency_ms": round((time.perf_counter() - start) * 1000, 3),
"backend": self.backend_name,
"retrieval_mode": retrieval_mode,
"warnings": warnings,
}
def _delete_memory(self, inputs: Dict[str, Any], start: float) -> Dict[str, Any]:
user_id = str(inputs.get("user_id") or "").strip()
if not user_id:
return _error("user_id is required for delete_memory operation", start)
with self.lock:
existed_cache = self.memories.pop(user_id, None) is not None
self.identities.pop(user_id, None)
try:
deleted_backend = self.backend.delete_user(user_id)
except Exception as exc:
LOGGER.exception("Memory backend delete failed")
return _error(f"backend_delete_failed:{exc.__class__.__name__}: {exc}", start)
return {
"deleted": bool(existed_cache or deleted_backend),
"user_id": user_id,
"latency_ms": round((time.perf_counter() - start) * 1000, 3),
"backend": self.backend_name,
}
def _health(self, start: float) -> Dict[str, Any]:
return {
"status": "ok",
"project": "Smriti AI",
"base_model": self.base_model_id or ("remote-endpoint" if self.endpoint_url else "memory-only"),
"backend": self.backend_name,
"retrieval_mode": self.default_retrieval_mode,
"latency_ms": round((time.perf_counter() - start) * 1000, 3),
}
# ------------------------------------------------------------------
# Runtime setup
# ------------------------------------------------------------------
def _init_backend(self) -> Tuple[MemoryBackend, str]:
encryption_key = os.getenv("SMRITI_ENCRYPTION_KEY") or os.getenv("SMRITI_MEMORY_KEY")
if encryption_key:
os.environ["SMRITI_MEMORY_KEY"] = encryption_key
cipher = MemoryCipher(encryption_key if self.enable_encryption else None)
redis_url = os.getenv("REDIS_URL") or os.getenv("SMRITI_REDIS_URL")
postgres_dsn = os.getenv("POSTGRES_DSN") or os.getenv("SMRITI_POSTGRES_DSN")
selected = (os.getenv("SMRITI_MEMORY_BACKEND") or self.config.get("memory_backend") or "json").lower()
memory_path = os.getenv("SMRITI_MEMORY_PATH", "/tmp/smriti_hf_memory")
if redis_url:
return RedisBackend(url=redis_url, cipher=cipher), "redis"
if postgres_dsn:
return PostgresBackend(dsn=postgres_dsn, cipher=cipher), "postgres"
if selected == "redis":
return RedisBackend(url=redis_url or "redis://localhost:6379/0", cipher=cipher), "redis"
if selected in {"postgres", "postgresql"}:
return PostgresBackend(dsn=postgres_dsn or "", cipher=cipher), "postgres"
if selected == "sqlite":
return SqliteBackend(path=memory_path, cipher=cipher), "sqlite"
return JsonBackend(root=_json_root(memory_path), cipher=cipher), "json"
def _load_local_model(self, model_id: str) -> None:
try:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
except Exception as exc:
raise RuntimeError("Install torch and transformers to load a local base model.") from exc
self.device = "cuda" if torch.cuda.is_available() else "cpu"
dtype = torch.float32
if self.device == "cuda":
dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16
kwargs = {"token": self.hf_token} if self.hf_token else {}
self.tokenizer = AutoTokenizer.from_pretrained(model_id, **kwargs)
if getattr(self.tokenizer, "pad_token_id", None) is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
try:
self.model = AutoModelForCausalLM.from_pretrained(model_id, dtype=dtype, **kwargs)
except TypeError:
self.model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=dtype, **kwargs)
self.model.to(self.device)
self.model.eval()
LOGGER.info("Loaded local base model %s on %s", model_id, self.device)
# ------------------------------------------------------------------
# Memory and generation helpers
# ------------------------------------------------------------------
def _get_memory(self, user_id: str, topic_id: str, retrieval_mode: str) -> MemPalaceLite:
self.backend_warning = None
if user_id not in self.memories:
state = None
try:
state = self.backend.load(user_id)
except Exception as exc:
LOGGER.exception("Memory backend load failed; starting empty memory")
self.backend_warning = f"backend_load_failed:{exc.__class__.__name__}"
if state:
memory = MemPalaceLite.from_dict(
state,
retrieval_mode=retrieval_mode,
session_id=user_id,
topic_id=topic_id,
max_facts=self.max_memory_entries,
max_entries_per_topic=self.max_memory_entries,
)
else:
memory = MemPalaceLite(
retrieval_mode=retrieval_mode,
session_id=user_id,
topic_id=topic_id,
max_facts=self.max_memory_entries,
max_entries_per_topic=self.max_memory_entries,
)
self.memories[user_id] = memory
memory = self.memories[user_id]
if memory.retrieval_mode != retrieval_mode:
memory = MemPalaceLite.from_dict(
memory.to_dict(),
retrieval_mode=retrieval_mode,
session_id=user_id,
topic_id=topic_id,
max_facts=self.max_memory_entries,
max_entries_per_topic=self.max_memory_entries,
)
self.memories[user_id] = memory
memory.session_id = user_id
memory.topic_id = topic_id
return memory
def _get_identity(self, user_id: str, enabled: bool) -> IdentityFingerprint:
if user_id not in self.identities:
threshold = 0.35 if enabled else 2.0
self.identities[user_id] = IdentityFingerprint(
role="helpful AI assistant with persistent memory",
threshold=threshold,
)
identity = self.identities[user_id]
if not enabled:
identity.threshold = 2.0
return identity
def _retrieve_context(
self,
memory: MemPalaceLite,
user_id: str,
topic_id: str,
message: str,
include_graph: bool,
) -> Tuple[str, List[str], List[str], Optional[str]]:
try:
context = memory.get_context(
query=message,
session_id=user_id,
topic_id=topic_id,
include_graph=include_graph,
)
retrieved_memories = memory.retrieve_facts(
message,
k=5,
session_id=user_id,
topic_id=topic_id,
)
graph_facts = _section_bullets(context, "[RELATED GRAPH FACTS]") if include_graph else []
return context, retrieved_memories, graph_facts, None
except Exception as exc:
LOGGER.exception("Memory retrieval failed")
return "", [], [], f"retrieval_failed:{exc.__class__.__name__}"
def _save_memory(self, user_id: str, memory: MemPalaceLite) -> Optional[str]:
try:
self.backend.save(user_id, memory.to_dict())
return None
except Exception as exc:
LOGGER.exception("Memory backend save failed")
return f"backend_save_failed:{exc.__class__.__name__}"
def _generate_text(self, prompt: str, parameters: Dict[str, Any], max_tokens: int = 256) -> str:
max_new_tokens = int(parameters.get("max_new_tokens", max_tokens) or max_tokens)
temperature = float(parameters.get("temperature", 0.7))
top_p = float(parameters.get("top_p", 0.9))
if self.endpoint_url:
return self._generate_remote(prompt, max_new_tokens, temperature, top_p)
if self.model is not None and self.tokenizer is not None:
return self._generate_local(prompt, max_new_tokens, temperature, top_p)
return _memory_only_answer(prompt)
def _generate_local(
self,
prompt: str,
max_new_tokens: int,
temperature: float,
top_p: float,
) -> str:
import torch
messages = [{"role": "user", "content": prompt}]
try:
formatted = self.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
)
except Exception:
formatted = prompt
inputs = self.tokenizer(
formatted,
return_tensors="pt",
truncation=True,
max_length=2048,
)
inputs = {key: value.to(self.device) for key, value in inputs.items()}
generate_kwargs = {
"max_new_tokens": max_new_tokens,
"do_sample": temperature > 0,
"pad_token_id": getattr(self.tokenizer, "eos_token_id", None),
}
if temperature > 0:
generate_kwargs["temperature"] = temperature
generate_kwargs["top_p"] = top_p
with torch.inference_mode():
output = self.model.generate(**inputs, **generate_kwargs)
return self.tokenizer.decode(
output[0, inputs["input_ids"].shape[1] :].detach().cpu(),
skip_special_tokens=True,
).strip()
def _generate_remote(
self,
prompt: str,
max_new_tokens: int,
temperature: float,
top_p: float,
) -> str:
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": max_new_tokens,
"temperature": temperature,
"top_p": top_p,
},
}
headers = {"Content-Type": "application/json"}
if self.hf_token:
headers["Authorization"] = f"Bearer {self.hf_token}"
request = urllib.request.Request(
self.endpoint_url,
data=json.dumps(payload).encode("utf-8"),
headers=headers,
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=120) as response: # noqa: S310
raw = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"remote endpoint HTTP {exc.code}: {body[:300]}") from exc
parsed = json.loads(raw)
return _extract_generated_text(parsed)
# ----------------------------------------------------------------------
# Request, context, and formatting helpers
# ----------------------------------------------------------------------
def _resolve_root(path: str) -> Path:
if path:
root = Path(path).resolve()
return root.parent if root.is_file() else root
return Path(__file__).resolve().parent
def _load_config(path: Path) -> Dict[str, Any]:
if not path.exists():
return dict(DEFAULT_CONFIG)
data = json.loads(path.read_text(encoding="utf-8"))
config = dict(DEFAULT_CONFIG)
config.update(data)
return config
def _normalize_request(data: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
if not isinstance(data, dict):
raise ValueError("Request body must be a JSON object.")
if "inputs" in data:
inputs = data.get("inputs") or {}
if isinstance(inputs, str):
inputs = {"message": inputs}
parameters = data.get("parameters") or {}
else:
inputs = data
parameters = data.get("parameters") or {}
if not isinstance(inputs, dict) or not isinstance(parameters, dict):
raise ValueError("inputs and parameters must be JSON objects.")
return inputs, parameters
def _base_retrieval_mode(mode: str) -> str:
return "tfidf" if str(mode).lower().startswith("tfidf") else "semantic"
def _build_prompt(
agent: SmritiAILite,
memory: MemPalaceLite,
user_id: str,
topic_id: str,
user_input: str,
include_graph: bool,
identity_enabled: bool,
) -> str:
identity = agent.identity.get_identity_prompt() if identity_enabled else ""
context = memory.get_context(
query=user_input,
session_id=user_id,
topic_id=topic_id,
include_graph=include_graph,
)
parts = [part for part in [identity.strip(), context.strip(), user_input.strip()] if part]
return "\n\n".join(parts)
def _section_bullets(context: str, heading: str) -> List[str]:
if heading not in context:
return []
after = context.split(heading, 1)[1]
chunks = re.split(r"\n\[[A-Z ]+\]", after, maxsplit=1)
section = chunks[0]
bullets = []
for line in section.splitlines():
cleaned = line.strip()
if cleaned.startswith("*"):
bullets.append(cleaned.lstrip("* ").strip())
return bullets
def _memory_only_answer(prompt: str) -> str:
facts = _section_bullets(prompt, "[REMEMBERED FACTS]")
graph = _section_bullets(prompt, "[RELATED GRAPH FACTS]")
combined = facts + [item for item in graph if item not in facts]
if combined:
return "I remember: " + "; ".join(combined[:5])
return "Memory updated. No prior relevant context was found."
def _is_recall_query(message: str) -> bool:
lowered = message.lower()
return any(
phrase in lowered
for phrase in [
"remember",
"what do you know about me",
"who am i",
"where do i work",
"what is my name",
"what do i do",
]
)
def _stabilize_recall_answer(
message: str,
response: str,
retrieved_memories: List[str],
graph_facts: List[str],
) -> str:
if not _is_recall_query(message):
return response
combined = retrieved_memories + [item for item in graph_facts if item not in retrieved_memories]
if not combined:
return response
if _mentions_memory_terms(response, combined):
return response
return "I remember: " + "; ".join(combined[:5])
def _mentions_memory_terms(response: str, memories: List[str]) -> bool:
response_terms = set(re.findall(r"[a-z0-9']{4,}", response.lower()))
memory_terms = set()
for memory in memories:
memory_terms.update(re.findall(r"[a-z0-9']{4,}", memory.lower()))
return bool(response_terms & memory_terms)
def _replace_last_assistant_history(memory: MemPalaceLite, response: str) -> None:
if memory.history and memory.history[-1].category == "assistant_output":
memory.history[-1].content = "Assistant: " + response[:200]
def _extract_generated_text(parsed: Any) -> str:
if isinstance(parsed, list) and parsed:
return _extract_generated_text(parsed[0])
if isinstance(parsed, dict):
for key in ["generated_text", "response", "text", "output"]:
value = parsed.get(key)
if isinstance(value, str):
return value.strip()
if "outputs" in parsed:
return _extract_generated_text(parsed["outputs"])
if isinstance(parsed, str):
return parsed.strip()
raise RuntimeError("Remote endpoint did not return generated text.")
def _json_root(memory_path: str) -> Path:
path = Path(memory_path)
if path.suffix.lower() in {".json", ".jsonl"}:
return path.with_suffix("")
return path
def _clean_model_id(value: str, *, allow_empty: bool = False) -> str:
value = (value or "").strip()
return validate_model_id_for_environment(
value,
context="Smriti AI Hugging Face handler",
allow_empty=allow_empty,
)
def _bool_env(name: str, default: bool) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def _int_env(name: str, default: int) -> int:
try:
return int(os.getenv(name, str(default)))
except ValueError:
return default
def _error(message: str, start: float) -> Dict[str, Any]:
return {
"error": message,
"latency_ms": round((time.perf_counter() - start) * 1000, 3),
}