Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| src_directory = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..", "src")) | |
| sys.path.append(src_directory) | |
| from pinecone import Pinecone, ServerlessSpec | |
| import time | |
| from model.clip_model import ClipModel | |
| from dotenv import load_dotenv | |
| from utils import logger | |
| import pandas as pd | |
| load_dotenv() | |
| pincone_api_key = os.environ.get("PINECONE_API_KEY") | |
| logger = logger.get_logger() | |
| clip_model = ClipModel() | |
| def create_index(pinecone, index_name): | |
| pinecone.create_index( | |
| name=index_name, | |
| dimension=512, | |
| metric="cosine", | |
| spec=ServerlessSpec( | |
| cloud="aws", | |
| region="us-east-1" | |
| ) | |
| ) | |
| def wait_till_index_loaded(pinecone, index_name): | |
| while True: | |
| index = pinecone.describe_index(index_name) | |
| if index.status.get("ready", False): | |
| index = pinecone.Index(index_name) | |
| logger.info(f"Index '{index_name}' is ready and is now accessible.") | |
| return index | |
| else: | |
| logger.debug(f"Index '{index_name}' is not ready yet. Checking again in 1 second.") | |
| time.sleep(1) | |
| def get_index(): | |
| global index | |
| index = None | |
| try: | |
| pc = Pinecone(api_key=pincone_api_key) | |
| index_name = "imagesearch" | |
| logger.info(f"Checking if the index '{index_name}' exists...") | |
| if not pc.has_index(index_name): | |
| logger.info(f"Index '{index_name}' does not exist. Creating a new index...") | |
| create_index(pc,index_name) | |
| logger.info(f"Index '{index_name}' creation initiated. Waiting for it to be ready...") | |
| index = wait_till_index_loaded(index_name,pc) | |
| else: | |
| index = pc.Index(index_name) | |
| logger.info(f"Index '{index_name}' already exists. Returning the existing index.") | |
| except Exception as e: | |
| logger.info(f"Error occurred while getting or creating the Pinecone index: {str(e)}", exc_info=True) | |
| return index | |
| def process_and_upsert_data(index, data: pd.Series, url_key: str, id_key: str): | |
| """ | |
| Processes a single row of data (pandas Series) by extracting the URL and ID, generating image embeddings using | |
| a clip model, and then upserting the generated embeddings into a pinecone database index. | |
| This function handles: | |
| - Extracting the URL and ID from the provided `data` (a pandas Series) using the specified keys (`url_key` and `id_key`). | |
| - Using the `clip_model` to generate embeddings for the image found at the extracted URL. | |
| - Upserting the generated embeddings, along with the photo ID and URL, into the pinecone database index using the `upsert` method. | |
| Args: | |
| data (pandas.Series): A single row of data from the DataFrame, containing the URL and ID. | |
| url_key (str): The column name in the Series that contains the URL of the image. | |
| id_key (str): The column name in the Series that contains the photo ID. | |
| """ | |
| # Validate if the required columns exist in the row (Series) | |
| if url_key not in data or id_key not in data: | |
| raise ValueError(f"Missing required keys: '{url_key}' or '{id_key}' in the data") | |
| try: | |
| logger.info("Started to process and upsert the data") | |
| url = data[url_key] | |
| photo_id = data[id_key] | |
| embeddings = clip_model.get_image_embedding(url) | |
| index.upsert( | |
| vectors=[{ | |
| "id": photo_id, | |
| "values": embeddings, | |
| "metadata": { | |
| "url": url, | |
| "photo_id": photo_id | |
| } | |
| }], | |
| namespace="image-search-dataset", | |
| ) | |
| logger.info(f"Successfully upserted data for photo_id {photo_id} with URL {url}") | |
| except ValueError as ve: | |
| logger.error(f"ValueError: {ve}") | |
| except Exception as e: | |
| logger.error(f"Error processing row with photo_id {data.get(id_key, 'unknown')}: {e}") |