object-memory / core /storage.py
russ4stall
fresh history
24f3fb6
import uuid
from io import BytesIO
from PIL import Image
import numpy as np
from qdrant_client.http.models import Filter, FieldCondition, MatchValue, ScrollRequest
from qdrant_client.models import SearchParams
from .clients import get_s3, get_qdrant, get_neo4j, get_s3_session
from .config import S3_BUCKET, QDRANT_COLLECTION, AWS_REGION
from .processing import embed_image_dino_large
from .image_processing import encode_image_to_base64
def upload_image_to_s3(image_np: np.ndarray, key: str) -> str:
pil = Image.fromarray(image_np)
buf = BytesIO()
pil.save(buf, format="PNG")
buf.seek(0)
get_s3().upload_fileobj(buf, S3_BUCKET, key, ExtraArgs={"ContentType":"image/png"})
# 3) build URL
return f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/{key}"
def download_image_from_s3(key: str) -> np.ndarray:
buf = BytesIO()
get_s3().download_fileobj(S3_BUCKET, key, buf)
buf.seek(0)
pil = Image.open(buf).convert("RGB")
return np.array(pil)
from qdrant_client.http.models import PointStruct
import uuid
import numpy as np
def add_vector_to_qdrant(vectors: dict, payload: dict, view_id: str = None):
"""
Add one or more named vectors to Qdrant.
:param vectors: Dict of named vectors, e.g., {"text_embedding": np.ndarray, "image_embedding": np.ndarray}
:param payload: Metadata dictionary
:param view_id: Optional specific point ID
:return: view_id used for storage
"""
if view_id is None:
view_id = str(uuid.uuid4())
# Ensure vectors are converted to lists if they are numpy arrays
vector_payload = {name: vec.tolist() if isinstance(vec, np.ndarray) else vec for name, vec in vectors.items()}
pt = PointStruct(id=view_id, vector=vector_payload, payload=payload)
get_qdrant().upsert(collection_name=QDRANT_COLLECTION, points=[pt])
return view_id
def query_vector_db_by_mask(query_image: np.ndarray, k:int=5):
embeddings = embed_image_dino_large(query_image)
vec = ("dinov2_embedding", embeddings.tolist())
client = get_qdrant()
results = client.search(QDRANT_COLLECTION, query_vector=vec, limit=k)
return results
def query_vector_db_by_image_embedding(embeddings: np.ndarray, k:int=5, house_id: str = None):
client = get_qdrant()
if house_id:
# Filter by house_id if provided
filter_condition = Filter(
must=[
FieldCondition(key="house_id", match=MatchValue(value=house_id))
]
)
else:
filter_condition = None
# Search using the provided embeddings
results = client.search(QDRANT_COLLECTION,
query_vector=("dinov2_embedding", embeddings.tolist()),
limit=k,
query_filter=filter_condition,
search_params=SearchParams(exact=True))
return results
def query_vector_db_by_text_embedding(embeddings: np.ndarray, k:int=5, house_id: str = None):
client = get_qdrant()
if house_id:
# Filter by house_id if provided
filter_condition = Filter(
must=[
FieldCondition(key="house_id", match=MatchValue(value=house_id))
]
)
else:
filter_condition = None
results = client.search(QDRANT_COLLECTION,
query_vector=("clip_text_embedding", embeddings.tolist()),
limit=k,
query_filter=filter_condition)
return results
def add_object_to_neo4j(object_id, house_id, description, qdrant_object_id) -> None:
with get_neo4j().session() as s:
s.run(
"MERGE (h:House {house_id:$house_id}) "
"MERGE (o:Object {object_id:$object_id}) "
"SET o.description=$desc, o.qdrant_object_id=$qdrant_object_id "
"MERGE (h)-[:CONTAINS]->(o)",
{"house_id":house_id, "object_id":object_id,
"desc":description, "qdrant_object_id":qdrant_object_id}
)
def get_object_info_from_graph(house_id: str, object_id: str) -> str:
with get_neo4j().session() as s:
result = s.run("""
MATCH (h:House {house_id: $household_id})-[:CONTAINS]->(o:Object {object_id: $object_id})
RETURN o.description AS description
""", {"household_id": house_id, "object_id": object_id})
record = result.single()
if record:
return record.get("description")
return None
def set_object_primary_location_hierarchy(
object_id: str,
house_id: str,
location_hierarchy: list # Example: ["Kitchen", "Left Upper Cabinet", "Middle Shelf"]
) -> None:
with get_neo4j().session() as s:
# Ensure the house node exists
s.run(
"MERGE (h:House {house_id: $house_id})",
{"house_id": house_id}
)
# Build nested location hierarchy
parent_label = "House"
parent_key = "house_id"
parent_value = house_id
for idx, location_name in enumerate(location_hierarchy):
s.run(
f"""
MATCH (parent:{parent_label} {{{parent_key}: $parent_value}})
MERGE (loc:Location {{name: $location_name}})
MERGE (parent)-[:CONTAINS]->(loc)
""",
{"parent_value": parent_value, "location_name": location_name}
)
parent_label = "Location"
parent_key = "name"
parent_value = location_name
if location_hierarchy:
final_location_name = location_hierarchy[-1]
# Remove existing PRIMARY_LOCATION edges
s.run(
"""
MATCH (o:Object {object_id: $object_id})-[r:PRIMARY_LOCATION]->(:Location)
DELETE r
""",
{"object_id": object_id}
)
# Add new PRIMARY_LOCATION edge
s.run(
"""
MATCH (o:Object {object_id: $object_id})
MATCH (loc:Location {name: $location_name})
MERGE (o)-[:PRIMARY_LOCATION]->(loc)
""",
{"object_id": object_id, "location_name": final_location_name}
)
def get_object_location_chain(house_id: str, object_id: str, include_images: bool = False):
with get_neo4j().session() as session:
# Find PRIMARY_LOCATION
result = session.run(
"""
MATCH (h:House {house_id: $house_id})
-[:CONTAINS*]->(loc:Location)<-[:PRIMARY_LOCATION]-(o:Object {object_id: $object_id})
RETURN loc
""",
{"house_id": house_id, "object_id": object_id}
)
record = result.single()
if not record:
return []
# Build location chain
locations = []
current_name = record["loc"]["name"]
while current_name:
loc_record = session.run(
"""
MATCH (h:House {house_id: $house_id})
-[:CONTAINS*]->(loc:Location {name: $name})
RETURN loc
""",
{"house_id": house_id, "name": current_name}
).single()
if not loc_record:
break
loc_node = loc_record["loc"]
loc_info = {
"name": loc_node["name"],
"image_uri": loc_node.get("image_uri"),
"location_x": loc_node.get("location_x"),
"location_y": loc_node.get("location_y"),
"location_z": loc_node.get("location_z"),
"shape": loc_node.get("shape"),
"radius": loc_node.get("radius"),
"height": loc_node.get("height"),
"width": loc_node.get("width"),
"depth": loc_node.get("depth"),
}
# Optionally include actual image data
if include_images and loc_node.get("image_uri"):
try:
img = download_image_from_s3(loc_node["image_uri"])
loc_info["image_base64"] = encode_image_to_base64(img)
except Exception as e:
loc_info["image"] = None # Optionally log or raise
print(f"Warning: Failed to load image from S3 for {loc_node['name']}: {e}")
locations.insert(0, loc_info)
parent_record = session.run(
"""
MATCH (parent:Location)-[:CONTAINS]->(loc:Location {name: $name})
RETURN parent.name AS parent_name
""",
{"name": current_name}
).single()
current_name = parent_record["parent_name"] if parent_record else None
return locations
def get_all_locations_for_house(house_id: str, include_images: bool = False):
with get_neo4j().session() as session:
result = session.run(
"""
MATCH (h:House {house_id: $house_id})-[:CONTAINS*]->(loc:Location)
OPTIONAL MATCH (parent:Location)-[:CONTAINS]->(loc)
RETURN loc, parent.name AS parent_name
""",
{"house_id": house_id}
)
locations = []
for record in result:
loc_node = record["loc"]
parent_name = record["parent_name"]
loc_info = {
"name": loc_node["name"],
"parents": [parent_name] if parent_name else [],
"image_uri": loc_node.get("image_uri"),
"location_x": loc_node.get("location_x"),
"location_y": loc_node.get("location_y"),
"location_z": loc_node.get("location_z")
}
if include_images and loc_node.get("image_uri"):
try:
img = download_image_from_s3(loc_node["image_uri"])
loc_info["image_base64"] = encode_image_to_base64(img)
except Exception as e:
print(f"Warning: Failed to load image for {loc_node['name']}: {e}")
loc_info["image_base64"] = None
locations.append(loc_info)
return locations
def get_object_owners(house_id: str, object_id: str):
with get_neo4j().session() as session:
result = session.run(
"""
MATCH (h:House {house_id: $house_id})
-[:CONTAINS*]->(o:Object {object_id: $object_id})
-[:OWNED_BY]->(p:Person)
RETURN p
""",
{"house_id": house_id, "object_id": object_id}
)
owners = []
for record in result:
p = record["p"]
owners.append({
"person_id": p.get("person_id"),
"name": p.get("name"),
"type": p.get("type", "person"), # Defaults to "person" if missing
"image_uri": p.get("image_uri")
})
return owners
def add_owner_by_person_id(house_id: str, object_id: str, person_id: str):
with get_neo4j().session() as session:
result = session.run(
"""
MATCH (h:House {house_id: $house_id})-[:CONTAINS*]->(o:Object {object_id: $object_id}),
(p:Person {person_id: $person_id})
MERGE (o)-[:OWNED_BY]->(p)
RETURN p
""",
{"house_id": house_id, "object_id": object_id, "person_id": person_id}
)
record = result.single()
return record["p"] if record else None
def add_owner_by_person_name(house_id: str, object_id: str, name: str, type: str = "person"):
person_id = str(uuid.uuid4())
with get_neo4j().session() as session:
result = session.run(
"""
MATCH (h:House {house_id: $house_id})-[:CONTAINS*]->(o:Object {object_id: $object_id})
CREATE (p:Person {person_id: $person_id, name: $name, type: $type})
MERGE (o)-[:OWNED_BY]->(p)
RETURN p
""",
{"house_id": house_id, "object_id": object_id, "person_id": person_id, "name": name, "type": type}
)
record = result.single()
return record["p"] if record else None
def get_object_details(house_id: str, object_id: str):
"""
Collects and returns:
- Description from Neo4j
- All image views from Qdrant and S3
- All text descriptions from Qdrant
- Location hierarchy from Neo4j with S3-loaded images
- Owners from Neo4j
"""
# Fetch description from Neo4j
description = get_object_info_from_graph(house_id, object_id)
# Fetch all vector points (images and texts) from Qdrant
client = get_qdrant()
all_points = []
offset = None
while True:
points, offset = client.scroll(
collection_name=QDRANT_COLLECTION,
scroll_filter=Filter(
must=[
FieldCondition(key="object_id", match=MatchValue(value=object_id))
]
),
limit=100,
offset=offset
)
all_points.extend(points)
if offset is None:
break
# Separate images and texts
images = []
texts = []
for point in all_points:
payload = point.payload
if payload.get("type") == "image" and payload.get("image_url"):
try:
s3_key = payload["image_url"].replace(f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/", "")
img_np = download_image_from_s3(s3_key)
images.append({"image": img_np, "url": payload["image_url"]})
except Exception as e:
print(f"Failed to load image: {e}")
elif payload.get("type") == "text" and payload.get("description"):
texts.append(payload["description"])
# Fetch location hierarchy WITHOUT embedded images
locations = get_object_location_chain(house_id, object_id, include_images=False)
# Load images for each location if image_uri exists
location_images = []
for loc in locations:
uri = loc.get("image_uri")
if uri:
try:
s3_key = uri.replace(f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/", "")
img_np = download_image_from_s3(s3_key)
location_images.append(img_np)
except Exception as e:
print(f"Failed to load location image {uri}: {e}")
# Fetch owners
owners = get_object_owners(house_id, object_id)
return {
"description": description,
"images": images,
"texts": texts,
"locations": locations,
"location_images": location_images,
"owners": owners
}