import uuid from io import BytesIO from PIL import Image import numpy as np from qdrant_client.http.models import Filter, FieldCondition, MatchValue, ScrollRequest from qdrant_client.models import SearchParams from .clients import get_s3, get_qdrant, get_neo4j, get_s3_session from .config import S3_BUCKET, QDRANT_COLLECTION, AWS_REGION from .processing import embed_image_dino_large from .image_processing import encode_image_to_base64 def upload_image_to_s3(image_np: np.ndarray, key: str) -> str: pil = Image.fromarray(image_np) buf = BytesIO() pil.save(buf, format="PNG") buf.seek(0) get_s3().upload_fileobj(buf, S3_BUCKET, key, ExtraArgs={"ContentType":"image/png"}) # 3) build URL return f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/{key}" def download_image_from_s3(key: str) -> np.ndarray: buf = BytesIO() get_s3().download_fileobj(S3_BUCKET, key, buf) buf.seek(0) pil = Image.open(buf).convert("RGB") return np.array(pil) from qdrant_client.http.models import PointStruct import uuid import numpy as np def add_vector_to_qdrant(vectors: dict, payload: dict, view_id: str = None): """ Add one or more named vectors to Qdrant. :param vectors: Dict of named vectors, e.g., {"text_embedding": np.ndarray, "image_embedding": np.ndarray} :param payload: Metadata dictionary :param view_id: Optional specific point ID :return: view_id used for storage """ if view_id is None: view_id = str(uuid.uuid4()) # Ensure vectors are converted to lists if they are numpy arrays vector_payload = {name: vec.tolist() if isinstance(vec, np.ndarray) else vec for name, vec in vectors.items()} pt = PointStruct(id=view_id, vector=vector_payload, payload=payload) get_qdrant().upsert(collection_name=QDRANT_COLLECTION, points=[pt]) return view_id def query_vector_db_by_mask(query_image: np.ndarray, k:int=5): embeddings = embed_image_dino_large(query_image) vec = ("dinov2_embedding", embeddings.tolist()) client = get_qdrant() results = client.search(QDRANT_COLLECTION, query_vector=vec, limit=k) return results def query_vector_db_by_image_embedding(embeddings: np.ndarray, k:int=5, house_id: str = None): client = get_qdrant() if house_id: # Filter by house_id if provided filter_condition = Filter( must=[ FieldCondition(key="house_id", match=MatchValue(value=house_id)) ] ) else: filter_condition = None # Search using the provided embeddings results = client.search(QDRANT_COLLECTION, query_vector=("dinov2_embedding", embeddings.tolist()), limit=k, query_filter=filter_condition, search_params=SearchParams(exact=True)) return results def query_vector_db_by_text_embedding(embeddings: np.ndarray, k:int=5, house_id: str = None): client = get_qdrant() if house_id: # Filter by house_id if provided filter_condition = Filter( must=[ FieldCondition(key="house_id", match=MatchValue(value=house_id)) ] ) else: filter_condition = None results = client.search(QDRANT_COLLECTION, query_vector=("clip_text_embedding", embeddings.tolist()), limit=k, query_filter=filter_condition) return results def add_object_to_neo4j(object_id, house_id, description, qdrant_object_id) -> None: with get_neo4j().session() as s: s.run( "MERGE (h:House {house_id:$house_id}) " "MERGE (o:Object {object_id:$object_id}) " "SET o.description=$desc, o.qdrant_object_id=$qdrant_object_id " "MERGE (h)-[:CONTAINS]->(o)", {"house_id":house_id, "object_id":object_id, "desc":description, "qdrant_object_id":qdrant_object_id} ) def get_object_info_from_graph(house_id: str, object_id: str) -> str: with get_neo4j().session() as s: result = s.run(""" MATCH (h:House {house_id: $household_id})-[:CONTAINS]->(o:Object {object_id: $object_id}) RETURN o.description AS description """, {"household_id": house_id, "object_id": object_id}) record = result.single() if record: return record.get("description") return None def set_object_primary_location_hierarchy( object_id: str, house_id: str, location_hierarchy: list # Example: ["Kitchen", "Left Upper Cabinet", "Middle Shelf"] ) -> None: with get_neo4j().session() as s: # Ensure the house node exists s.run( "MERGE (h:House {house_id: $house_id})", {"house_id": house_id} ) # Build nested location hierarchy parent_label = "House" parent_key = "house_id" parent_value = house_id for idx, location_name in enumerate(location_hierarchy): s.run( f""" MATCH (parent:{parent_label} {{{parent_key}: $parent_value}}) MERGE (loc:Location {{name: $location_name}}) MERGE (parent)-[:CONTAINS]->(loc) """, {"parent_value": parent_value, "location_name": location_name} ) parent_label = "Location" parent_key = "name" parent_value = location_name if location_hierarchy: final_location_name = location_hierarchy[-1] # Remove existing PRIMARY_LOCATION edges s.run( """ MATCH (o:Object {object_id: $object_id})-[r:PRIMARY_LOCATION]->(:Location) DELETE r """, {"object_id": object_id} ) # Add new PRIMARY_LOCATION edge s.run( """ MATCH (o:Object {object_id: $object_id}) MATCH (loc:Location {name: $location_name}) MERGE (o)-[:PRIMARY_LOCATION]->(loc) """, {"object_id": object_id, "location_name": final_location_name} ) def get_object_location_chain(house_id: str, object_id: str, include_images: bool = False): with get_neo4j().session() as session: # Find PRIMARY_LOCATION result = session.run( """ MATCH (h:House {house_id: $house_id}) -[:CONTAINS*]->(loc:Location)<-[:PRIMARY_LOCATION]-(o:Object {object_id: $object_id}) RETURN loc """, {"house_id": house_id, "object_id": object_id} ) record = result.single() if not record: return [] # Build location chain locations = [] current_name = record["loc"]["name"] while current_name: loc_record = session.run( """ MATCH (h:House {house_id: $house_id}) -[:CONTAINS*]->(loc:Location {name: $name}) RETURN loc """, {"house_id": house_id, "name": current_name} ).single() if not loc_record: break loc_node = loc_record["loc"] loc_info = { "name": loc_node["name"], "image_uri": loc_node.get("image_uri"), "location_x": loc_node.get("location_x"), "location_y": loc_node.get("location_y"), "location_z": loc_node.get("location_z"), "shape": loc_node.get("shape"), "radius": loc_node.get("radius"), "height": loc_node.get("height"), "width": loc_node.get("width"), "depth": loc_node.get("depth"), } # Optionally include actual image data if include_images and loc_node.get("image_uri"): try: img = download_image_from_s3(loc_node["image_uri"]) loc_info["image_base64"] = encode_image_to_base64(img) except Exception as e: loc_info["image"] = None # Optionally log or raise print(f"Warning: Failed to load image from S3 for {loc_node['name']}: {e}") locations.insert(0, loc_info) parent_record = session.run( """ MATCH (parent:Location)-[:CONTAINS]->(loc:Location {name: $name}) RETURN parent.name AS parent_name """, {"name": current_name} ).single() current_name = parent_record["parent_name"] if parent_record else None return locations def get_all_locations_for_house(house_id: str, include_images: bool = False): with get_neo4j().session() as session: result = session.run( """ MATCH (h:House {house_id: $house_id})-[:CONTAINS*]->(loc:Location) OPTIONAL MATCH (parent:Location)-[:CONTAINS]->(loc) RETURN loc, parent.name AS parent_name """, {"house_id": house_id} ) locations = [] for record in result: loc_node = record["loc"] parent_name = record["parent_name"] loc_info = { "name": loc_node["name"], "parents": [parent_name] if parent_name else [], "image_uri": loc_node.get("image_uri"), "location_x": loc_node.get("location_x"), "location_y": loc_node.get("location_y"), "location_z": loc_node.get("location_z") } if include_images and loc_node.get("image_uri"): try: img = download_image_from_s3(loc_node["image_uri"]) loc_info["image_base64"] = encode_image_to_base64(img) except Exception as e: print(f"Warning: Failed to load image for {loc_node['name']}: {e}") loc_info["image_base64"] = None locations.append(loc_info) return locations def get_object_owners(house_id: str, object_id: str): with get_neo4j().session() as session: result = session.run( """ MATCH (h:House {house_id: $house_id}) -[:CONTAINS*]->(o:Object {object_id: $object_id}) -[:OWNED_BY]->(p:Person) RETURN p """, {"house_id": house_id, "object_id": object_id} ) owners = [] for record in result: p = record["p"] owners.append({ "person_id": p.get("person_id"), "name": p.get("name"), "type": p.get("type", "person"), # Defaults to "person" if missing "image_uri": p.get("image_uri") }) return owners def add_owner_by_person_id(house_id: str, object_id: str, person_id: str): with get_neo4j().session() as session: result = session.run( """ MATCH (h:House {house_id: $house_id})-[:CONTAINS*]->(o:Object {object_id: $object_id}), (p:Person {person_id: $person_id}) MERGE (o)-[:OWNED_BY]->(p) RETURN p """, {"house_id": house_id, "object_id": object_id, "person_id": person_id} ) record = result.single() return record["p"] if record else None def add_owner_by_person_name(house_id: str, object_id: str, name: str, type: str = "person"): person_id = str(uuid.uuid4()) with get_neo4j().session() as session: result = session.run( """ MATCH (h:House {house_id: $house_id})-[:CONTAINS*]->(o:Object {object_id: $object_id}) CREATE (p:Person {person_id: $person_id, name: $name, type: $type}) MERGE (o)-[:OWNED_BY]->(p) RETURN p """, {"house_id": house_id, "object_id": object_id, "person_id": person_id, "name": name, "type": type} ) record = result.single() return record["p"] if record else None def get_object_details(house_id: str, object_id: str): """ Collects and returns: - Description from Neo4j - All image views from Qdrant and S3 - All text descriptions from Qdrant - Location hierarchy from Neo4j with S3-loaded images - Owners from Neo4j """ # Fetch description from Neo4j description = get_object_info_from_graph(house_id, object_id) # Fetch all vector points (images and texts) from Qdrant client = get_qdrant() all_points = [] offset = None while True: points, offset = client.scroll( collection_name=QDRANT_COLLECTION, scroll_filter=Filter( must=[ FieldCondition(key="object_id", match=MatchValue(value=object_id)) ] ), limit=100, offset=offset ) all_points.extend(points) if offset is None: break # Separate images and texts images = [] texts = [] for point in all_points: payload = point.payload if payload.get("type") == "image" and payload.get("image_url"): try: s3_key = payload["image_url"].replace(f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/", "") img_np = download_image_from_s3(s3_key) images.append({"image": img_np, "url": payload["image_url"]}) except Exception as e: print(f"Failed to load image: {e}") elif payload.get("type") == "text" and payload.get("description"): texts.append(payload["description"]) # Fetch location hierarchy WITHOUT embedded images locations = get_object_location_chain(house_id, object_id, include_images=False) # Load images for each location if image_uri exists location_images = [] for loc in locations: uri = loc.get("image_uri") if uri: try: s3_key = uri.replace(f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/", "") img_np = download_image_from_s3(s3_key) location_images.append(img_np) except Exception as e: print(f"Failed to load location image {uri}: {e}") # Fetch owners owners = get_object_owners(house_id, object_id) return { "description": description, "images": images, "texts": texts, "locations": locations, "location_images": location_images, "owners": owners }