| | import logging |
| | import os |
| | from datetime import datetime |
| | from langchain_community.graphs import Neo4jGraph |
| | from src.shared.common_fn import create_gcs_bucket_folder_name_hashed, delete_uploaded_local_file, load_embedding_model |
| | from src.document_sources.gcs_bucket import delete_file_from_gcs |
| | from src.shared.constants import BUCKET_UPLOAD |
| | from src.entities.source_node import sourceNode |
| | import json |
| |
|
| |
|
| | class graphDBdataAccess: |
| |
|
| | def __init__(self, graph: Neo4jGraph): |
| | self.graph = graph |
| |
|
| | def update_exception_db(self, file_name, exp_msg): |
| | try: |
| | job_status = "Failed" |
| | result = self.get_current_status_document_node(file_name) |
| | is_cancelled_status = result[0]['is_cancelled'] |
| | if bool(is_cancelled_status): |
| | job_status = 'Cancelled' |
| | self.graph.query( |
| | """MERGE(d:Document {fileName :$fName}) SET d.status = $status, d.errorMessage = $error_msg""", |
| | {"fName": file_name, "status": job_status, "error_msg": exp_msg}) |
| | except Exception as e: |
| | error_message = str(e) |
| | logging.error(f"Error in updating document node status as failed: {error_message}") |
| | raise Exception(error_message) |
| |
|
| | def create_source_node(self, obj_source_node: sourceNode): |
| | try: |
| | job_status = "New" |
| | logging.info("creating source node if does not exist") |
| | self.graph.query("""MERGE(d:Document {fileName :$fn}) SET d.fileSize = $fs, d.fileType = $ft , |
| | d.status = $st, d.url = $url, d.awsAccessKeyId = $awsacc_key_id, |
| | d.fileSource = $f_source, d.createdAt = $c_at, d.updatedAt = $u_at, |
| | d.processingTime = $pt, d.errorMessage = $e_message, d.nodeCount= $n_count, |
| | d.relationshipCount = $r_count, d.model= $model, d.gcsBucket=$gcs_bucket, |
| | d.gcsBucketFolder= $gcs_bucket_folder, d.language= $language,d.gcsProjectId= $gcs_project_id, |
| | d.is_cancelled=False, d.total_chunks=0, d.processed_chunk=0, |
| | d.access_token=$access_token""", |
| | {"fn": obj_source_node.file_name, "fs": obj_source_node.file_size, |
| | "ft": obj_source_node.file_type, "st": job_status, |
| | "url": obj_source_node.url, |
| | "awsacc_key_id": obj_source_node.awsAccessKeyId, "f_source": obj_source_node.file_source, |
| | "c_at": obj_source_node.created_at, |
| | "u_at": obj_source_node.created_at, "pt": 0, "e_message": '', "n_count": 0, "r_count": 0, |
| | "model": obj_source_node.model, |
| | "gcs_bucket": obj_source_node.gcsBucket, |
| | "gcs_bucket_folder": obj_source_node.gcsBucketFolder, |
| | "language": obj_source_node.language, "gcs_project_id": obj_source_node.gcsProjectId, |
| | "access_token": obj_source_node.access_token}) |
| | except Exception as e: |
| | error_message = str(e) |
| | logging.info(f"error_message = {error_message}") |
| | self.update_exception_db(self, obj_source_node.file_name, error_message) |
| | raise Exception(error_message) |
| |
|
| | def update_source_node(self, obj_source_node: sourceNode): |
| | try: |
| |
|
| | params = {} |
| | if obj_source_node.file_name is not None and obj_source_node.file_name != '': |
| | params['fileName'] = obj_source_node.file_name |
| |
|
| | if obj_source_node.status is not None and obj_source_node.status != '': |
| | params['status'] = obj_source_node.status |
| |
|
| | if obj_source_node.created_at is not None: |
| | params['createdAt'] = obj_source_node.created_at |
| |
|
| | if obj_source_node.updated_at is not None: |
| | params['updatedAt'] = obj_source_node.updated_at |
| |
|
| | if obj_source_node.processing_time is not None and obj_source_node.processing_time != 0: |
| | params['processingTime'] = round(obj_source_node.processing_time.total_seconds(), 2) |
| |
|
| | if obj_source_node.node_count is not None: |
| | params['nodeCount'] = obj_source_node.node_count |
| |
|
| | if obj_source_node.relationship_count is not None: |
| | params['relationshipCount'] = obj_source_node.relationship_count |
| |
|
| | if obj_source_node.model is not None and obj_source_node.model != '': |
| | params['model'] = obj_source_node.model |
| |
|
| | if obj_source_node.total_chunks is not None and obj_source_node.total_chunks != 0: |
| | params['total_chunks'] = obj_source_node.total_chunks |
| |
|
| | if obj_source_node.is_cancelled is not None: |
| | params['is_cancelled'] = obj_source_node.is_cancelled |
| |
|
| | if obj_source_node.processed_chunk is not None: |
| | params['processed_chunk'] = obj_source_node.processed_chunk |
| |
|
| | if obj_source_node.retry_condition is not None: |
| | params['retry_condition'] = obj_source_node.retry_condition |
| |
|
| | param = {"props": params} |
| |
|
| | print(f'Base Param value 1 : {param}') |
| | query = "MERGE(d:Document {fileName :$props.fileName}) SET d += $props" |
| | logging.info("Update source node properties") |
| | self.graph.query(query, param) |
| | except Exception as e: |
| | error_message = str(e) |
| | self.update_exception_db(self.file_name, error_message) |
| | raise Exception(error_message) |
| |
|
| | def get_source_list(self): |
| | """ |
| | Args: |
| | uri: URI of the graph to extract |
| | db_name: db_name is database name to connect to graph db |
| | userName: Username to use for graph creation ( if None will use username from config file ) |
| | password: Password to use for graph creation ( if None will use password from config file ) |
| | file: File object containing the PDF file to be used |
| | model: Type of model to use ('Diffbot'or'OpenAI GPT') |
| | Returns: |
| | Returns a list of sources that are in the database by querying the graph and |
| | sorting the list by the last updated date. |
| | """ |
| | logging.info("Get existing files list from graph") |
| | query = "MATCH(d:Document) WHERE d.fileName IS NOT NULL RETURN d ORDER BY d.updatedAt DESC" |
| | result = self.graph.query(query) |
| | list_of_json_objects = [entry['d'] for entry in result] |
| | return list_of_json_objects |
| |
|
| | def update_KNN_graph(self): |
| | """ |
| | Update the graph node with SIMILAR relationship where embedding scrore match |
| | """ |
| | index = self.graph.query("""show indexes yield * where type = 'VECTOR' and name = 'vector'""") |
| | |
| | knn_min_score = os.environ.get('KNN_MIN_SCORE') |
| | if len(index) > 0: |
| | logging.info('update KNN graph') |
| | self.graph.query("""MATCH (c:Chunk) |
| | WHERE c.embedding IS NOT NULL AND count { (c)-[:SIMILAR]-() } < 5 |
| | CALL db.index.vector.queryNodes('vector', 6, c.embedding) yield node, score |
| | WHERE node <> c and score >= $score MERGE (c)-[rel:SIMILAR]-(node) SET rel.score = score |
| | """, |
| | {"score": float(knn_min_score)} |
| | ) |
| | else: |
| | logging.info("Vector index does not exist, So KNN graph not update") |
| |
|
| | def connection_check_and_get_vector_dimensions(self): |
| | """ |
| | Get the vector index dimension from database and application configuration and DB connection status |
| | |
| | Args: |
| | uri: URI of the graph to extract |
| | userName: Username to use for graph creation ( if None will use username from config file ) |
| | password: Password to use for graph creation ( if None will use password from config file ) |
| | db_name: db_name is database name to connect to graph db |
| | Returns: |
| | Returns a status of connection from NEO4j is success or failure |
| | """ |
| |
|
| | db_vector_dimension = self.graph.query("""SHOW INDEXES YIELD * |
| | WHERE type = 'VECTOR' AND name = 'vector' |
| | RETURN options.indexConfig['vector.dimensions'] AS vector_dimensions |
| | """) |
| |
|
| | result_chunks = self.graph.query("""match (c:Chunk) return size(c.embedding) as embeddingSize, count(*) as chunks, |
| | count(c.embedding) as hasEmbedding |
| | """) |
| |
|
| | embedding_model = os.getenv('EMBEDDING_MODEL') |
| | embeddings, application_dimension = load_embedding_model(embedding_model) |
| | logging.info(f'embedding model:{embeddings} and dimesion:{application_dimension}') |
| | |
| |
|
| | if self.graph: |
| | if len(db_vector_dimension) > 0: |
| | return {'db_vector_dimension': db_vector_dimension[0]['vector_dimensions'], |
| | 'application_dimension': application_dimension, 'message': "Connection Successful"} |
| | else: |
| | if len(db_vector_dimension) == 0 and len(result_chunks) == 0: |
| | logging.info("Chunks and vector index does not exists in database") |
| | return {'db_vector_dimension': 0, 'application_dimension': application_dimension, |
| | 'message': "Connection Successful", "chunks_exists": False} |
| | elif len(db_vector_dimension) == 0 and result_chunks[0]['hasEmbedding'] == 0 and result_chunks[0][ |
| | 'chunks'] > 0: |
| | return {'db_vector_dimension': 0, 'application_dimension': application_dimension, |
| | 'message': "Connection Successful", "chunks_exists": True} |
| | else: |
| | return {'message': "Connection Successful"} |
| |
|
| | def execute_query(self, query, param=None): |
| | return self.graph.query(query, param) |
| |
|
| | def get_current_status_document_node(self, file_name): |
| | query = """ |
| | MATCH(d:Document {fileName : $file_name}) RETURN d.status AS Status , d.processingTime AS processingTime, |
| | d.nodeCount AS nodeCount, d.model as model, d.relationshipCount as relationshipCount, |
| | d.total_chunks AS total_chunks , d.fileSize as fileSize, |
| | d.is_cancelled as is_cancelled, d.processed_chunk as processed_chunk, d.fileSource as fileSource |
| | """ |
| | param = {"file_name": file_name} |
| | return self.execute_query(query, param) |
| |
|
| | def delete_file_from_graph(self, filenames, source_types, deleteEntities: str, merged_dir: str, uri): |
| | |
| | filename_list = list(map(str.strip, json.loads(filenames))) |
| | source_types_list = list(map(str.strip, json.loads(source_types))) |
| | gcs_file_cache = os.environ.get('GCS_FILE_CACHE') |
| | |
| | for (file_name, source_type) in zip(filename_list, source_types_list): |
| | merged_file_path = os.path.join(merged_dir, file_name) |
| | if source_type == 'local file' and gcs_file_cache == 'True': |
| | folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name) |
| | delete_file_from_gcs(BUCKET_UPLOAD, folder_name, file_name) |
| | else: |
| | logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}') |
| | delete_uploaded_local_file(merged_file_path, file_name) |
| | query_to_delete_document = """ |
| | MATCH (d:Document) where d.fileName in $filename_list and d.fileSource in $source_types_list |
| | with collect(d) as documents |
| | unwind documents as d |
| | optional match (d)<-[:PART_OF]-(c:Chunk) |
| | detach delete c, d |
| | return count(*) as deletedChunks |
| | """ |
| | query_to_delete_document_and_entities = """ |
| | match (d:Document) where d.fileName IN $filename_list and d.fileSource in $source_types_list |
| | detach delete d |
| | with collect(d) as documents |
| | unwind documents as d |
| | match (d)<-[:PART_OF]-(c:Chunk) |
| | detach delete c |
| | with * |
| | match (c)-[:HAS_ENTITY]->(e) |
| | where not exists { (e)<-[:HAS_ENTITY]-()-[:PART_OF]->(d2) where not d2 in documents } |
| | detach delete e |
| | """ |
| | param = {"filename_list": filename_list, "source_types_list": source_types_list} |
| | if deleteEntities == "true": |
| | result = self.execute_query(query_to_delete_document_and_entities, param) |
| | logging.info( |
| | f"Deleting {len(filename_list)} documents = '{filename_list}' from '{source_types_list}' from database") |
| | else: |
| | result = self.execute_query(query_to_delete_document, param) |
| | logging.info( |
| | f"Deleting {len(filename_list)} documents = '{filename_list}' from '{source_types_list}' with their entities from database") |
| | return result, len(filename_list) |
| |
|
| | def list_unconnected_nodes(self): |
| | query = """ |
| | MATCH (e:!Chunk&!Document) |
| | WHERE NOT exists { (e)--(:!Chunk&!Document) } |
| | OPTIONAL MATCH (doc:Document)<-[:PART_OF]-(c:Chunk)-[:HAS_ENTITY]->(e) |
| | RETURN e {.*, embedding:null, elementId:elementId(e), labels:labels(e)} as e, |
| | collect(distinct doc.fileName) as documents, count(distinct c) as chunkConnections |
| | ORDER BY e.id ASC |
| | LIMIT 100 |
| | """ |
| | query_total_nodes = """ |
| | MATCH (e:!Chunk&!Document) |
| | WHERE NOT exists { (e)--(:!Chunk&!Document) } |
| | RETURN count(*) as total |
| | """ |
| | nodes_list = self.execute_query(query) |
| | total_nodes = self.execute_query(query_total_nodes) |
| | return nodes_list, total_nodes[0] |
| |
|
| | def delete_unconnected_nodes(self, unconnected_entities_list): |
| | entities_list = list(map(str.strip, json.loads(unconnected_entities_list))) |
| | query = """ |
| | MATCH (e) WHERE elementId(e) IN $elementIds |
| | DETACH DELETE e |
| | """ |
| | param = {"elementIds": entities_list} |
| | return self.execute_query(query, param) |
| |
|
| | def get_duplicate_nodes_list(self): |
| | score_value = float(os.environ.get('DUPLICATE_SCORE_VALUE')) |
| | text_distance = int(os.environ.get('DUPLICATE_TEXT_DISTANCE')) |
| | query_duplicate_nodes = """ |
| | MATCH (n:!Chunk&!Document) |
| | WITH n |
| | WHERE n.embedding IS NOT NULL |
| | AND n.id IS NOT NULL |
| | AND NOT toLower(n.id) CONTAINS 'figure' // Exclude IDs containing 'figure' |
| | WITH n |
| | WITH n ORDER BY count {{ (n)--() }} DESC, size(n.id) DESC // updated |
| | WITH collect(n) as nodes |
| | UNWIND nodes as n |
| | WITH n, [other in nodes |
| | // only one pair, same labels e.g. Person with Person |
| | WHERE elementId(n) < elementId(other) and labels(n) = labels(other) |
| | // at least embedding similarity of X |
| | AND |
| | ( |
| | // either contains each other as substrings or has a text edit distinct of less than 3 |
| | (size(other.id) > 2 AND toLower(n.id) CONTAINS toLower(other.id)) OR |
| | (size(n.id) > 2 AND toLower(other.id) CONTAINS toLower(n.id)) |
| | OR (size(n.id)>5 AND apoc.text.distance(toLower(n.id), toLower(other.id)) < $duplicate_text_distance) |
| | OR |
| | vector.similarity.cosine(other.embedding, n.embedding) > $duplicate_score_value |
| | )] as similar |
| | WHERE size(similar) > 0 |
| | // remove duplicate subsets |
| | with collect([n]+similar) as all |
| | CALL {{ with all |
| | unwind all as nodes |
| | with nodes, all |
| | // skip current entry if it's smaller and a subset of any other entry |
| | where none(other in all where other <> nodes and size(other) > size(nodes) and size(apoc.coll.subtract(nodes, other))=0) |
| | return head(nodes) as n, tail(nodes) as similar |
| | }} |
| | OPTIONAL MATCH (doc:Document)<-[:PART_OF]-(c:Chunk)-[:HAS_ENTITY]->(n) |
| | {return_statement} |
| | """ |
| | return_query_duplicate_nodes = """ |
| | RETURN n {.*, embedding:null, elementId:elementId(n), labels:labels(n)} as e, |
| | [s in similar | s {.id, .description, labels:labels(s), elementId: elementId(s)}] as similar, |
| | collect(distinct doc.fileName) as documents, count(distinct c) as chunkConnections |
| | ORDER BY e.id ASC |
| | """ |
| | return_query_duplicate_nodes_total = "RETURN COUNT(DISTINCT(n)) as total" |
| |
|
| | param = {"duplicate_score_value": score_value, "duplicate_text_distance": text_distance} |
| |
|
| | nodes_list = self.execute_query(query_duplicate_nodes.format(return_statement=return_query_duplicate_nodes), |
| | param=param) |
| | total_nodes = self.execute_query( |
| | query_duplicate_nodes.format(return_statement=return_query_duplicate_nodes_total), param=param) |
| | return nodes_list, total_nodes[0] |
| |
|
| | def merge_duplicate_nodes(self, duplicate_nodes_list): |
| | nodes_list = json.loads(duplicate_nodes_list) |
| | print(f'Nodes list to merge {nodes_list}') |
| | query = """ |
| | UNWIND $rows AS row |
| | CALL { with row |
| | MATCH (first) WHERE elementId(first) = row.firstElementId |
| | MATCH (rest) WHERE elementId(rest) IN row.similarElementIds |
| | WITH first, collect (rest) as rest |
| | WITH [first] + rest as nodes |
| | CALL apoc.refactor.mergeNodes(nodes, |
| | {properties:"discard",mergeRels:true, produceSelfRel:false, preserveExistingSelfRels:false, singleElementAsArray:true}) |
| | YIELD node |
| | RETURN size(nodes) as mergedCount |
| | } |
| | RETURN sum(mergedCount) as totalMerged |
| | """ |
| | param = {"rows": nodes_list} |
| | return self.execute_query(query, param) |
| |
|
| | def drop_create_vector_index(self, isVectorIndexExist): |
| | """ |
| | drop and create the vector index when vector index dimesion are different. |
| | """ |
| | embedding_model = os.getenv('EMBEDDING_MODEL') |
| | embeddings, dimension = load_embedding_model(embedding_model) |
| |
|
| | if isVectorIndexExist == 'true': |
| | self.graph.query("""drop index vector""") |
| | |
| | self.graph.query("""CREATE VECTOR INDEX `vector` if not exists for (c:Chunk) on (c.embedding) |
| | OPTIONS {indexConfig: { |
| | `vector.dimensions`: $dimensions, |
| | `vector.similarity_function`: 'cosine' |
| | }} |
| | """, |
| | { |
| | "dimensions": dimension |
| | } |
| | ) |
| | return "Drop and Re-Create vector index succesfully" |
| |
|