Spaces:
Runtime error
Runtime error
File size: 3,957 Bytes
358c9e4 613fc49 358c9e4 f38d527 358c9e4 f38d527 fb3c5a2 358c9e4 2423011 358c9e4 8929016 358c9e4 2ac1935 358c9e4 2ac1935 358c9e4 2ac1935 f1172de 8929016 2ac1935 d0bbbfb f38d527 8929016 518225a f38d527 d0bbbfb f38d527 d0bbbfb f38d527 2ac1935 f38d527 0f81c79 604bbfe 0f81c79 604bbfe 0f81c79 f1172de 6c94fb7 358c9e4 f38d527 8929016 358c9e4 8929016 fb3c5a2 864b6b0 fb3c5a2 0f81c79 358c9e4 f38d527 358c9e4 6c94fb7 358c9e4 8929016 fb3c5a2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
import os
import zipfile
import re
import shutil
import logging
import subprocess
from dotenv import load_dotenv
import gradio as gr
from utils import patient_info # Importing patient_info from utils
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
def unzip_folder(zip_path, extract_path):
output_dir = os.path.join(extract_path, "ragtest")
os.makedirs(output_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
# Adjust to handle the extra ragtest folder inside the zip
actual_output_dir = os.path.join(output_dir, "ragtest")
if os.path.exists(actual_output_dir):
logger.info(f"Extracted contents to {actual_output_dir}")
logger.info(f"Contents of {actual_output_dir}:")
for file in os.listdir(actual_output_dir):
logger.info(os.path.join(actual_output_dir, file))
else:
logger.error(f"Expected directory {actual_output_dir} does not exist. Check the structure of the zip file.")
actual_output_dir = output_dir # fallback in case the structure is not as expected
return actual_output_dir
def run_graphrag_query(query, ragtest_dir):
# Log the directory and its contents
logger.info(f"Running GraphRAG query with root: {ragtest_dir}")
logger.info(f"Contents of {ragtest_dir}:")
for file in os.listdir(ragtest_dir):
logger.info(os.path.join(ragtest_dir, file))
# Define the command
command = [
"python", "-m", "graphrag.query",
"--root", ragtest_dir,
"--method", "global",
query
]
# Run the command
result = subprocess.run(command, capture_output=True, text=True)
# Return the output or error message
if result.returncode == 0:
return result.stdout
else:
logger.error(f"GraphRAG query failed with error: {result.stderr}")
return result.stderr
def clean_response(response):
# Find the position of "SUCCESS: Global Search Response:"
search_str = "SUCCESS: Global Search Response:"
start_index = response.find(search_str)
# If the search string is found, return the substring starting from after this string
if start_index != -1:
# Add the length of search_str to start_index to begin after the search string
return response[start_index + len(search_str):].strip()
else:
# If the search string is not found, return the original response
return response
def qa_tool_graph_rag(user_question):
original_dir = os.getcwd() # Store the original directory
try:
zip_path = os.getenv('ZIP_PATH', '/home/user/app/ragtest.zip')
extract_path = os.getenv('EXTRACT_PATH', '/home/user/app')
output_dir = unzip_folder(zip_path, extract_path)
os.chdir(extract_path)
# Combine patient_info with user_question
combined_input = f"{patient_info}\n\n{user_question}"
# Run the GraphRAG query with the combined input
raw_answer = run_graphrag_query(combined_input, output_dir)
# Clean the response to remove everything before "SUCCESS: Global Search Response:"
answer = clean_response(raw_answer)
logger.info(f"GraphRAG answer generated: {answer}")
images = [] # Adjust as needed for your application
return answer, images, gr.update(visible=True), gr.update(visible=True)
except Exception as e:
logger.error(f"Error in GraphRAG processing: {str(e)}")
return f"An error occurred: {str(e)}", [], gr.update(visible=False), gr.update(visible=False)
finally:
if 'output_dir' in locals():
shutil.rmtree(output_dir)
os.chdir(original_dir) |