| import csv |
|
|
| from langchain_community.graphs import Neo4jGraph |
| from src.shared.constants import (BUCKET_UPLOAD, PROJECT_ID, QUERY_TO_GET_CHUNKS, |
| QUERY_TO_DELETE_EXISTING_ENTITIES, |
| QUERY_TO_GET_LAST_PROCESSED_CHUNK_POSITION, |
| QUERY_TO_GET_LAST_PROCESSED_CHUNK_WITHOUT_ENTITY, |
| START_FROM_BEGINNING, |
| START_FROM_LAST_PROCESSED_POSITION, |
| DELETE_ENTITIES_AND_START_FROM_BEGINNING) |
| from src.shared.schema_extraction import schema_extraction_from_text |
| from dotenv import load_dotenv |
| from datetime import datetime |
| import logging |
| from src.create_chunks import CreateChunksofDocument |
| from src.graphDB_dataAccess import graphDBdataAccess |
| from src.document_sources.local_file import get_documents_from_file_by_path |
| from src.entities.source_node import sourceNode |
| from src.llm import get_graph_from_llm |
| from src.document_sources.gcs_bucket import * |
| from src.document_sources.s3_bucket import * |
| from src.document_sources.wikipedia import * |
| from src.document_sources.youtube import * |
| from src.shared.common_fn import * |
| from src.make_relationships import * |
| from src.document_sources.web_pages import * |
| import re |
| from langchain_community.document_loaders import WikipediaLoader, WebBaseLoader |
| import warnings |
| from pytube import YouTube |
| import sys |
| import shutil |
| import urllib.parse |
| import json |
|
|
| warnings.filterwarnings("ignore") |
| load_dotenv() |
| logging.basicConfig(format='%(asctime)s - %(message)s', level='INFO') |
| OPENAI_API_KEY = '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' |
| os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY |
|
|
|
|
| def create_source_node_graph_url_local(graph, model, local_folder_path, source_type): |
| success_count = 0 |
| failed_count = 0 |
| lst_file_name = [] |
|
|
| |
| if not os.path.isdir(local_folder_path): |
| raise NotADirectoryError(f"The path {local_folder_path} is not a valid directory.") |
|
|
| |
| lst_file_metadata = get_local_files_info(local_folder_path) |
|
|
| for file_metadata in lst_file_metadata: |
| obj_source_node = sourceNode() |
| obj_source_node.file_name = file_metadata['fileName'] |
| obj_source_node.file_size = file_metadata['fileSize'] |
| obj_source_node.url = file_metadata['url'] |
| obj_source_node.file_source = source_type |
| obj_source_node.model = model |
| obj_source_node.file_type = file_metadata['fileType'] |
| obj_source_node.localFolder = local_folder_path |
| obj_source_node.created_at = datetime.now() |
|
|
| try: |
| graphDb_data_Access = graphDBdataAccess(graph) |
| graphDb_data_Access.create_source_node(obj_source_node) |
| success_count += 1 |
| lst_file_name.append({'fileName': obj_source_node.file_name, 'fileSize': obj_source_node.file_size, |
| 'url': obj_source_node.url, 'status': 'Success'}) |
| except Exception as e: |
| failed_count += 1 |
| lst_file_name.append({'fileName': obj_source_node.file_name, 'fileSize': obj_source_node.file_size, |
| 'url': obj_source_node.url, 'status': 'Failed'}) |
| return lst_file_name, success_count, failed_count |
|
|
|
|
| def get_local_files_info(local_folder_path): |
| file_metadata_list = [] |
|
|
| |
| for file_name in os.listdir(local_folder_path): |
| file_path = os.path.join(local_folder_path, file_name) |
|
|
| |
| if os.path.isfile(file_path) and (file_name.endswith('.pdf') or file_name.endswith('.md')): |
| file_type = 'pdf' if file_name.endswith('.pdf') else 'markdown' |
| file_metadata_list.append({ |
| 'fileName': file_name, |
| 'fileSize': os.path.getsize(file_path), |
| 'url': f"file://{file_path}", |
| 'fileType': file_type |
| }) |
|
|
| return file_metadata_list |
|
|
|
|
| def create_source_node_graph_url_s3(graph, model, source_url, aws_access_key_id, aws_secret_access_key, source_type): |
| lst_file_name = [] |
| files_info = get_s3_files_info(source_url, aws_access_key_id=aws_access_key_id, |
| aws_secret_access_key=aws_secret_access_key) |
| if len(files_info) == 0: |
| raise Exception('No pdf files found.') |
| logging.info(f'files info : {files_info}') |
| success_count = 0 |
| failed_count = 0 |
|
|
| for file_info in files_info: |
| file_name = file_info['file_key'] |
| obj_source_node = sourceNode() |
| obj_source_node.file_name = file_name.split('/')[-1] |
| obj_source_node.file_type = 'pdf' |
| obj_source_node.file_size = file_info['file_size_bytes'] |
| obj_source_node.file_source = source_type |
| obj_source_node.model = model |
| obj_source_node.url = str(source_url + file_name) |
| obj_source_node.awsAccessKeyId = aws_access_key_id |
| obj_source_node.created_at = datetime.now() |
| try: |
| graphDb_data_Access = graphDBdataAccess(graph) |
| graphDb_data_Access.create_source_node(obj_source_node) |
| success_count += 1 |
| lst_file_name.append({'fileName': obj_source_node.file_name, 'fileSize': obj_source_node.file_size, |
| 'url': obj_source_node.url, 'status': 'Success'}) |
|
|
| except Exception as e: |
| failed_count += 1 |
| |
| lst_file_name.append({'fileName': obj_source_node.file_name, 'fileSize': obj_source_node.file_size, |
| 'url': obj_source_node.url, 'status': 'Failed'}) |
| return lst_file_name, success_count, failed_count |
|
|
|
|
| def create_source_node_graph_url_gcs(graph, model, gcs_project_id, gcs_bucket_name, gcs_bucket_folder, source_type, |
| credentials): |
| success_count = 0 |
| failed_count = 0 |
| lst_file_name = [] |
|
|
| lst_file_metadata = get_gcs_bucket_files_info(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, credentials) |
| for file_metadata in lst_file_metadata: |
| obj_source_node = sourceNode() |
| obj_source_node.file_name = file_metadata['fileName'] |
| obj_source_node.file_size = file_metadata['fileSize'] |
| obj_source_node.url = file_metadata['url'] |
| obj_source_node.file_source = source_type |
| obj_source_node.model = model |
| obj_source_node.file_type = 'pdf' |
| obj_source_node.gcsBucket = gcs_bucket_name |
| obj_source_node.gcsBucketFolder = file_metadata['gcsBucketFolder'] |
| obj_source_node.gcsProjectId = file_metadata['gcsProjectId'] |
| obj_source_node.created_at = datetime.now() |
| obj_source_node.access_token = credentials.token |
|
|
| try: |
| graphDb_data_Access = graphDBdataAccess(graph) |
| graphDb_data_Access.create_source_node(obj_source_node) |
| success_count += 1 |
| lst_file_name.append({'fileName': obj_source_node.file_name, 'fileSize': obj_source_node.file_size, |
| 'url': obj_source_node.url, 'status': 'Success', |
| 'gcsBucketName': gcs_bucket_name, 'gcsBucketFolder': obj_source_node.gcsBucketFolder, |
| 'gcsProjectId': obj_source_node.gcsProjectId}) |
| except Exception as e: |
| failed_count += 1 |
| lst_file_name.append({'fileName': obj_source_node.file_name, 'fileSize': obj_source_node.file_size, |
| 'url': obj_source_node.url, 'status': 'Failed', |
| 'gcsBucketName': gcs_bucket_name, 'gcsBucketFolder': obj_source_node.gcsBucketFolder, |
| 'gcsProjectId': obj_source_node.gcsProjectId}) |
| return lst_file_name, success_count, failed_count |
|
|
|
|
| def create_source_node_graph_web_url(graph, model, source_url, source_type): |
| success_count = 0 |
| failed_count = 0 |
| lst_file_name = [] |
| pages = WebBaseLoader(source_url, verify_ssl=False).load() |
| if pages == None or len(pages) == 0: |
| failed_count += 1 |
| message = f"Unable to read data for given url : {source_url}" |
| raise Exception(message) |
| obj_source_node = sourceNode() |
| obj_source_node.file_type = 'text' |
| obj_source_node.file_source = source_type |
| obj_source_node.model = model |
| obj_source_node.url = urllib.parse.unquote(source_url) |
| obj_source_node.created_at = datetime.now() |
| obj_source_node.file_name = pages[0].metadata['title'] |
| obj_source_node.language = pages[0].metadata['language'] |
| obj_source_node.file_size = sys.getsizeof(pages[0].page_content) |
|
|
| graphDb_data_Access = graphDBdataAccess(graph) |
| graphDb_data_Access.create_source_node(obj_source_node) |
| lst_file_name.append( |
| {'fileName': obj_source_node.file_name, 'fileSize': obj_source_node.file_size, 'url': obj_source_node.url, |
| 'status': 'Success'}) |
| success_count += 1 |
| return lst_file_name, success_count, failed_count |
|
|
|
|
| def create_source_node_graph_url_youtube(graph, model, source_url, source_type): |
| youtube_url, language = check_url_source(source_type=source_type, yt_url=source_url) |
| success_count = 0 |
| failed_count = 0 |
| lst_file_name = [] |
| obj_source_node = sourceNode() |
| obj_source_node.file_type = 'text' |
| obj_source_node.file_source = source_type |
| obj_source_node.model = model |
| obj_source_node.url = youtube_url |
| obj_source_node.created_at = datetime.now() |
| match = re.search(r'(?:v=)([0-9A-Za-z_-]{11})\s*', obj_source_node.url) |
| logging.info(f"match value: {match}") |
| obj_source_node.file_name = YouTube(obj_source_node.url).title |
| transcript = get_youtube_combined_transcript(match.group(1)) |
| if transcript == None or len(transcript) == 0: |
| message = f"Youtube transcript is not available for : {obj_source_node.file_name}" |
| raise Exception(message) |
| else: |
| obj_source_node.file_size = sys.getsizeof(transcript) |
|
|
| graphDb_data_Access = graphDBdataAccess(graph) |
| graphDb_data_Access.create_source_node(obj_source_node) |
| lst_file_name.append( |
| {'fileName': obj_source_node.file_name, 'fileSize': obj_source_node.file_size, 'url': obj_source_node.url, |
| 'status': 'Success'}) |
| success_count += 1 |
| return lst_file_name, success_count, failed_count |
|
|
|
|
| def create_source_node_graph_url_wikipedia(graph, model, wiki_query, source_type): |
| success_count = 0 |
| failed_count = 0 |
| lst_file_name = [] |
| |
| wiki_query_id, language = check_url_source(source_type=source_type, wiki_query=wiki_query) |
| logging.info(f"Creating source node for {wiki_query_id.strip()}, {language}") |
| pages = WikipediaLoader(query=wiki_query_id.strip(), lang=language, load_max_docs=1, |
| load_all_available_meta=True).load() |
| if pages == None or len(pages) == 0: |
| failed_count += 1 |
| message = f"Unable to read data for given Wikipedia url : {wiki_query}" |
| raise Exception(message) |
| else: |
| obj_source_node = sourceNode() |
| obj_source_node.file_name = wiki_query_id.strip() |
| obj_source_node.file_type = 'text' |
| obj_source_node.file_source = source_type |
| obj_source_node.file_size = sys.getsizeof(pages[0].page_content) |
| obj_source_node.model = model |
| obj_source_node.url = urllib.parse.unquote(pages[0].metadata['source']) |
| obj_source_node.created_at = datetime.now() |
| obj_source_node.language = language |
| graphDb_data_Access = graphDBdataAccess(graph) |
| graphDb_data_Access.create_source_node(obj_source_node) |
| success_count += 1 |
| lst_file_name.append( |
| {'fileName': obj_source_node.file_name, 'fileSize': obj_source_node.file_size, 'url': obj_source_node.url, |
| 'language': obj_source_node.language, 'status': 'Success'}) |
| return lst_file_name, success_count, failed_count |
|
|
|
|
| def extract_graph_from_file_local_file(uri, userName, password, database, model, merged_file_path, fileName, |
| allowedNodes, allowedRelationship, retry_condition): |
| logging.info(f'Process file name :{fileName}') |
| if retry_condition is None: |
| file_name, pages, file_extension = get_documents_from_file_by_path(merged_file_path, fileName) |
| if pages is None or len(pages) == 0: |
| raise Exception(f'File content is not available for file : {file_name}') |
| return processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, |
| allowedRelationship, True, merged_file_path) |
| else: |
| return processing_source(uri, userName, password, database, model, fileName, [], allowedNodes, |
| allowedRelationship, True, merged_file_path, retry_condition) |
|
|
|
|
| def processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, |
| is_uploaded_from_local=None, merged_file_path=None, retry_condition=None): |
| """ |
| Extracts a Neo4jGraph from a PDF file based on the model. |
| |
| Args: |
| uri: URI of the graph to extract |
| db_name : db_name is database name to connect graph db |
| userName: Username to use for graph creation ( if None will use username from config file ) |
| password: Password to use for graph creation ( if None will use password from config file ) |
| file: File object containing the PDF file to be used |
| model: Type of model to use ('Diffbot'or'OpenAI GPT') |
| |
| Returns: |
| Json response to API with fileName, nodeCount, relationshipCount, processingTime, |
| status and model as attributes. |
| """ |
| start_time = datetime.now() |
| graph = create_graph_database_connection(uri, userName, password, database) |
| graphDb_data_Access = graphDBdataAccess(graph) |
|
|
| total_chunks, chunkId_chunkDoc_list = get_chunkId_chunkDoc_list(graph, file_name, pages, retry_condition) |
| print('total_chunks', total_chunks) |
| print('chunkId_chunkDoc_list', chunkId_chunkDoc_list) |
| result = graphDb_data_Access.get_current_status_document_node(file_name) |
| print('>>>>result', result) |
| select_chunks_with_retry = 0 |
| node_count = 0 |
| rel_count = 0 |
|
|
| if len(result) > 0: |
| if result[0]['Status'] != 'Processing': |
| obj_source_node = sourceNode() |
| status = "Processing" |
| obj_source_node.file_name = file_name |
| obj_source_node.status = status |
| obj_source_node.total_chunks = total_chunks |
| obj_source_node.model = model |
| if retry_condition == START_FROM_LAST_PROCESSED_POSITION: |
| node_count = result[0]['nodeCount'] |
| rel_count = result[0]['relationshipCount'] |
| select_chunks_with_retry = result[0]['processed_chunk'] |
| obj_source_node.processed_chunk = 0 + select_chunks_with_retry |
| logging.info(file_name) |
| logging.info(obj_source_node) |
| graphDb_data_Access.update_source_node(obj_source_node) |
|
|
| logging.info('Update the status as Processing') |
| update_graph_chunk_processed = int(os.environ.get('UPDATE_GRAPH_CHUNKS_PROCESSED')) |
| |
| is_cancelled_status = False |
| job_status = "Completed" |
|
|
| for i in range(0, len(chunkId_chunkDoc_list), update_graph_chunk_processed): |
| select_chunks_upto = i + update_graph_chunk_processed |
| logging.info(f'Selected Chunks upto: {select_chunks_upto}') |
| if len(chunkId_chunkDoc_list) <= select_chunks_upto: |
| select_chunks_upto = len(chunkId_chunkDoc_list) |
| selected_chunks = chunkId_chunkDoc_list[i:select_chunks_upto] |
|
|
| result = graphDb_data_Access.get_current_status_document_node(file_name) |
| is_cancelled_status = result[0]['is_cancelled'] |
| logging.info(f"Value of is_cancelled : {result[0]['is_cancelled']}") |
| if bool(is_cancelled_status): |
| job_status = "Cancelled" |
| logging.info('Exit from running loop of processing file') |
| break |
| else: |
| node_count, rel_count = processing_chunks(selected_chunks, graph, uri, userName, password, database, |
| file_name, model, allowedNodes, allowedRelationship, |
| node_count, rel_count) |
| end_time = datetime.now() |
| processed_time = end_time - start_time |
|
|
| obj_source_node = sourceNode() |
| obj_source_node.file_name = file_name |
| obj_source_node.updated_at = end_time |
| obj_source_node.processing_time = processed_time |
| obj_source_node.processed_chunk = select_chunks_upto + select_chunks_with_retry |
| obj_source_node.node_count = node_count |
| obj_source_node.relationship_count = rel_count |
| graphDb_data_Access.update_source_node(obj_source_node) |
|
|
| result = graphDb_data_Access.get_current_status_document_node(file_name) |
| is_cancelled_status = result[0]['is_cancelled'] |
| if bool(is_cancelled_status): |
| logging.info(f'Is_cancelled True at the end extraction') |
| job_status = 'Cancelled' |
| logging.info(f'Job Status at the end : {job_status}') |
| end_time = datetime.now() |
| processed_time = end_time - start_time |
| obj_source_node = sourceNode() |
| obj_source_node.file_name = file_name |
| obj_source_node.status = job_status |
| obj_source_node.processing_time = processed_time |
|
|
| graphDb_data_Access.update_source_node(obj_source_node) |
| logging.info('Updated the nodeCount and relCount properties in Document node') |
| logging.info(f'file:{file_name} extraction has been completed') |
|
|
| |
|
|
| if is_uploaded_from_local: |
| gcs_file_cache = os.environ.get('GCS_FILE_CACHE') |
| |
| |
| |
| |
| |
|
|
| return { |
| "fileName": file_name, |
| "nodeCount": node_count, |
| "relationshipCount": rel_count, |
| "processingTime": round(processed_time.total_seconds(), 2), |
| "status": job_status, |
| "model": model, |
| "success_count": 1 |
| } |
| else: |
| logging.info('File does not process because it\'s already in Processing status') |
| else: |
| error_message = "Unable to get the status of docuemnt node." |
| logging.error(error_message) |
| raise Exception(error_message) |
|
|
|
|
| def processing_chunks(chunkId_chunkDoc_list, graph, uri, userName, password, database, file_name, model, allowedNodes, |
| allowedRelationship, node_count, rel_count): |
| |
| if graph is not None: |
| if graph._driver._closed: |
| graph = create_graph_database_connection(uri, userName, password, database) |
| else: |
| graph = create_graph_database_connection(uri, userName, password, database) |
|
|
| update_embedding_create_vector_index(graph, chunkId_chunkDoc_list, file_name) |
| logging.info("Get graph document list from models") |
| graph_documents = get_graph_from_llm(model, chunkId_chunkDoc_list, allowedNodes, allowedRelationship) |
| cleaned_graph_documents = handle_backticks_nodes_relationship_id_type(graph_documents) |
| save_graphDocuments_in_neo4j(graph, cleaned_graph_documents) |
| chunks_and_graphDocuments_list = get_chunk_and_graphDocument(cleaned_graph_documents, chunkId_chunkDoc_list) |
| merge_relationship_between_chunk_and_entites(graph, chunks_and_graphDocuments_list) |
| |
|
|
| distinct_nodes = set() |
| relations = [] |
| for graph_document in graph_documents: |
| |
| for node in graph_document.nodes: |
| node_id = node.id |
| node_type = node.type |
| if (node_id, node_type) not in distinct_nodes: |
| distinct_nodes.add((node_id, node_type)) |
| |
| for relation in graph_document.relationships: |
| relations.append(relation.type) |
|
|
| node_count += len(distinct_nodes) |
| rel_count += len(relations) |
| print(f'node count internal func:{node_count}') |
| print(f'relation count internal func:{rel_count}') |
| return node_count, rel_count |
|
|
|
|
| def get_chunkId_chunkDoc_list(graph, file_name, pages, retry_condition): |
| if retry_condition is None: |
| logging.info("Break down file into chunks") |
| bad_chars = ['"', "\n", "'"] |
| for i in range(0, len(pages)): |
| text = pages[i].page_content |
| for j in bad_chars: |
| if j == '\n': |
| text = text.replace(j, ' ') |
| else: |
| text = text.replace(j, '') |
| pages[i] = Document(page_content=str(text), metadata=pages[i].metadata) |
| create_chunks_obj = CreateChunksofDocument(pages, graph) |
| chunks = create_chunks_obj.split_file_into_chunks() |
| chunkId_chunkDoc_list = create_relation_between_chunks(graph, file_name, chunks) |
| return len(chunks), chunkId_chunkDoc_list |
|
|
| else: |
| chunkId_chunkDoc_list = [] |
| chunks = graph.query(QUERY_TO_GET_CHUNKS, params={"filename": file_name}) |
| for chunk in chunks: |
| chunk_doc = Document(page_content=chunk['text'], |
| metadata={'id': chunk['id'], 'position': chunk['position']}) |
| chunkId_chunkDoc_list.append({'chunk_id': chunk['id'], 'chunk_doc': chunk_doc}) |
|
|
| if retry_condition == START_FROM_LAST_PROCESSED_POSITION: |
| logging.info(f"Retry : start_from_last_processed_position") |
| starting_chunk = graph.query(QUERY_TO_GET_LAST_PROCESSED_CHUNK_POSITION, params={"filename": file_name}) |
| if starting_chunk[0]["position"] < len(chunkId_chunkDoc_list): |
| return len(chunks), chunkId_chunkDoc_list[starting_chunk[0]["position"] - 1:] |
|
|
| elif starting_chunk[0]["position"] == len(chunkId_chunkDoc_list): |
| starting_chunk = graph.query(QUERY_TO_GET_LAST_PROCESSED_CHUNK_WITHOUT_ENTITY, |
| params={"filename": file_name}) |
| return len(chunks), chunkId_chunkDoc_list[starting_chunk[0]["position"] - 1:] |
|
|
| else: |
| raise Exception( |
| f"All chunks of {file_name} are alreday processed. If you want to re-process, Please start from begnning") |
|
|
| else: |
| logging.info(f"Retry : start_from_beginning with chunks {len(chunkId_chunkDoc_list)}") |
| return len(chunks), chunkId_chunkDoc_list |
|
|
|
|
| def get_source_list_from_graph(uri, userName, password, db_name=None): |
| """ |
| Args: |
| uri: URI of the graph to extract |
| db_name: db_name is database name to connect to graph db |
| userName: Username to use for graph creation ( if None will use username from config file ) |
| password: Password to use for graph creation ( if None will use password from config file ) |
| file: File object containing the PDF file to be used |
| model: Type of model to use ('Diffbot'or'OpenAI GPT') |
| Returns: |
| Returns a list of sources that are in the database by querying the graph and |
| sorting the list by the last updated date. |
| """ |
| logging.info("Get existing files list from graph") |
| graph = Neo4jGraph(url=uri, database=db_name, username=userName, password=password) |
| graph_DB_dataAccess = graphDBdataAccess(graph) |
| if not graph._driver._closed: |
| logging.info(f"closing connection for sources_list api") |
| graph._driver.close() |
| return graph_DB_dataAccess.get_source_list() |
|
|
|
|
| def update_graph(graph): |
| """ |
| Update the graph node with SIMILAR relationship where embedding score match |
| """ |
| graph_DB_dataAccess = graphDBdataAccess(graph) |
| graph_DB_dataAccess.update_KNN_graph() |
|
|
|
|
| def connection_check_and_get_vector_dimensions(graph): |
| """ |
| Args: |
| uri: URI of the graph to extract |
| userName: Username to use for graph creation ( if None will use username from config file ) |
| password: Password to use for graph creation ( if None will use password from config file ) |
| db_name: db_name is database name to connect to graph db |
| Returns: |
| Returns a status of connection from NEO4j is success or failure |
| """ |
| graph_DB_dataAccess = graphDBdataAccess(graph) |
| return graph_DB_dataAccess.connection_check_and_get_vector_dimensions() |
|
|
|
|
| def merge_chunks_local(file_name, total_chunks, chunk_dir, merged_dir): |
| if not os.path.exists(merged_dir): |
| os.mkdir(merged_dir) |
| logging.info(f'Merged File Path: {merged_dir}') |
| merged_file_path = os.path.join(merged_dir, file_name) |
| with open(merged_file_path, "wb") as write_stream: |
| for i in range(1, total_chunks + 1): |
| chunk_file_path = os.path.join(chunk_dir, f"{file_name}_part_{i}") |
| logging.info(f'Chunk File Path While Merging Parts:{chunk_file_path}') |
| with open(chunk_file_path, "rb") as chunk_file: |
| shutil.copyfileobj(chunk_file, write_stream) |
| os.unlink(chunk_file_path) |
| logging.info("Chunks merged successfully and return file size") |
|
|
| file_size = os.path.getsize(merged_file_path) |
| return file_size |
|
|
|
|
| def upload_file(graph, model, chunk, chunk_number: int, total_chunks: int, originalname, uri, chunk_dir, merged_dir): |
| gcs_file_cache = os.environ.get('GCS_FILE_CACHE') |
| logging.info(f'gcs file cache: {gcs_file_cache}') |
|
|
| if gcs_file_cache == 'True': |
| folder_name = create_gcs_bucket_folder_name_hashed(uri, originalname) |
| upload_file_to_gcs(chunk, chunk_number, originalname, BUCKET_UPLOAD, folder_name) |
| else: |
| if not os.path.exists(chunk_dir): |
| os.mkdir(chunk_dir) |
|
|
| chunk_file_path = os.path.join(chunk_dir, f"{originalname}_part_{chunk_number}") |
| logging.info(f'Chunk File Path: {chunk_file_path}') |
|
|
| with open(chunk_file_path, "wb") as chunk_file: |
| chunk_file.write(chunk.file.read()) |
|
|
| if int(chunk_number) == int(total_chunks): |
| |
| if gcs_file_cache == 'True': |
| file_size = merge_file_gcs(BUCKET_UPLOAD, originalname, folder_name, int(total_chunks)) |
| else: |
| file_size = merge_chunks_local(originalname, int(total_chunks), chunk_dir, merged_dir) |
|
|
| logging.info("File merged successfully") |
| file_extension = originalname.split('.')[-1] |
| obj_source_node = sourceNode() |
| obj_source_node.file_name = originalname |
| obj_source_node.file_type = file_extension |
| obj_source_node.file_size = file_size |
| obj_source_node.file_source = 'local file' |
| obj_source_node.model = model |
| obj_source_node.created_at = datetime.now() |
| graphDb_data_Access = graphDBdataAccess(graph) |
|
|
| graphDb_data_Access.create_source_node(obj_source_node) |
| return {'file_size': file_size, 'file_name': originalname, 'file_extension': file_extension, |
| 'message': f"Chunk {chunk_number}/{total_chunks} saved"} |
| return f"Chunk {chunk_number}/{total_chunks} saved" |
|
|
|
|
| def get_labels_and_relationtypes(graph): |
| query = """ |
| RETURN collect { |
| CALL db.labels() yield label |
| WHERE NOT label IN ['Chunk','_Bloom_Perspective_'] |
| return label order by label limit 100 } as labels, |
| collect { |
| CALL db.relationshipTypes() yield relationshipType as type |
| WHERE NOT type IN ['PART_OF', 'NEXT_CHUNK', 'HAS_ENTITY', '_Bloom_Perspective_'] |
| return type order by type LIMIT 100 } as relationshipTypes |
| """ |
| graphDb_data_Access = graphDBdataAccess(graph) |
| result = graphDb_data_Access.execute_query(query) |
| if result is None: |
| result = [] |
| return result |
|
|
|
|
| def manually_cancelled_job(graph, filenames, source_types, merged_dir, uri): |
| filename_list = list(map(str.strip, json.loads(filenames))) |
| source_types_list = list(map(str.strip, json.loads(source_types))) |
| gcs_file_cache = os.environ.get('GCS_FILE_CACHE') |
|
|
| for (file_name, source_type) in zip(filename_list, source_types_list): |
| obj_source_node = sourceNode() |
| obj_source_node.file_name = file_name |
| obj_source_node.is_cancelled = True |
| obj_source_node.status = 'Cancelled' |
| obj_source_node.updated_at = datetime.now() |
| graphDb_data_Access = graphDBdataAccess(graph) |
| graphDb_data_Access.update_source_node(obj_source_node) |
| obj_source_node = None |
| merged_file_path = os.path.join(merged_dir, file_name) |
| if source_type == 'local file' and gcs_file_cache == 'True': |
| folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name) |
| delete_file_from_gcs(BUCKET_UPLOAD, folder_name, file_name) |
| else: |
| logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}') |
| delete_uploaded_local_file(merged_file_path, file_name) |
| return "Cancelled the processing job successfully" |
|
|
|
|
| def populate_graph_schema_from_text(text, model, is_schema_description_cheked): |
| """_summary_ |
| |
| Args: |
| graph (Neo4Graph): Neo4jGraph connection object |
| input_text (str): rendom text from PDF or user input. |
| model (str): AI model to use extraction from text |
| |
| Returns: |
| data (list): list of lebels and relationTypes |
| """ |
| result = schema_extraction_from_text(text, model, is_schema_description_cheked) |
| return {"labels": result.labels, "relationshipTypes": result.relationshipTypes} |
|
|
|
|
| def set_status_retry(graph, file_name, retry_condition): |
| graphDb_data_Access = graphDBdataAccess(graph) |
| obj_source_node = sourceNode() |
| status = "Reprocess" |
| obj_source_node.file_name = file_name |
| obj_source_node.status = status |
| obj_source_node.retry_condition = retry_condition |
| obj_source_node.is_cancelled = False |
| if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING or retry_condition == START_FROM_BEGINNING: |
| obj_source_node.processed_chunk = 0 |
| if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING: |
| graph.query(QUERY_TO_DELETE_EXISTING_ENTITIES, params={"filename": file_name}) |
| obj_source_node.node_count = 0 |
| obj_source_node.relationship_count = 0 |
| logging.info(obj_source_node) |
| graphDb_data_Access.update_source_node(obj_source_node) |
|
|
|
|
| if __name__ == "__main__": |
| load_dotenv() |
| neo4j_uri = os.getenv("NEO4J_URI") |
| username = os.getenv("NEO4J_USERNAME") |
| password = os.getenv("NEO4J_PASSWORD") |
| model = os.getenv("VITE_LLM_MODELS") |
| graph = create_graph_database_connection(uri=neo4j_uri, userName=username, password=password, database='neo4j') |
| GCS_PROJECT_NAME = 'metal_am' |
| GCS_BUCKET_NAME = "user_guide_images" |
| LOCAL_FOLDER_PATH = 'user_guide_docs_single' |
| SOURCE_TYPE = 'local url' |
| |
| |
| |
| |
| |
| |
| |
| |
| directory = 'user_guide_docs_single' |
| success_count = 0 |
| failed_count = 0 |
| lst_file_name = [] |
| |
| for filename in os.listdir(directory): |
| |
|
|
| |
| if not os.path.isdir(LOCAL_FOLDER_PATH): |
| raise NotADirectoryError(f"The path {LOCAL_FOLDER_PATH} is not a valid directory.") |
|
|
| |
| lst_file_metadata = get_local_files_info(LOCAL_FOLDER_PATH) |
|
|
| for file_metadata in lst_file_metadata: |
| obj_source_node = sourceNode() |
| obj_source_node.file_name = file_metadata['fileName'] |
| obj_source_node.file_size = file_metadata['fileSize'] |
| obj_source_node.url = file_metadata['url'] |
| obj_source_node.file_source = SOURCE_TYPE |
| obj_source_node.model = model |
| obj_source_node.file_type = file_metadata['fileType'] |
| obj_source_node.localFolder = LOCAL_FOLDER_PATH |
| obj_source_node.created_at = datetime.now() |
|
|
| try: |
| graphDb_data_Access = graphDBdataAccess(graph) |
| graphDb_data_Access.create_source_node(obj_source_node) |
| success_count += 1 |
| lst_file_name.append({'fileName': obj_source_node.file_name, 'fileSize': obj_source_node.file_size, |
| 'url': obj_source_node.url, 'status': 'Success'}) |
| except Exception as e: |
| failed_count += 1 |
| lst_file_name.append({'fileName': obj_source_node.file_name, 'fileSize': obj_source_node.file_size, |
| 'url': obj_source_node.url, 'status': 'Failed'}) |
|
|
| merged_file_path = os.path.join(directory, filename) |
|
|
| |
| extract_graph_from_file_local_file(uri=neo4j_uri, userName=username, password=password, model=model, |
| database='neo4j', |
| merged_file_path=merged_file_path, |
| fileName=filename, |
| allowedNodes=None, |
| allowedRelationship=None, |
| retry_condition=None) |
|
|
| graphDb_data_Access = graphDBdataAccess(graph) |
| nodes_list, total_nodes = graphDb_data_Access.list_unconnected_nodes() |
| print('>>>>>>node_list', nodes_list) |
| |
| filename = 'unconnected_nodes.csv' |
|
|
| |
| with open(filename, mode='w', newline='') as file: |
| writer = csv.writer(file) |
| |
| writer.writerow(["Node", "Total Nodes"]) |
| |
| for node in nodes_list: |
| writer.writerow([node, total_nodes]) |
| print(f"Data saved to {filename}") |
| |
| |