import os import zipfile import re import shutil import logging import subprocess from dotenv import load_dotenv import gradio as gr from utils import patient_info # Importing patient_info from utils logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Load environment variables load_dotenv() def unzip_folder(zip_path, extract_path): output_dir = os.path.join(extract_path, "ragtest") os.makedirs(output_dir, exist_ok=True) with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extract_path) # Adjust to handle the extra ragtest folder inside the zip actual_output_dir = os.path.join(output_dir, "ragtest") if os.path.exists(actual_output_dir): logger.info(f"Extracted contents to {actual_output_dir}") logger.info(f"Contents of {actual_output_dir}:") for file in os.listdir(actual_output_dir): logger.info(os.path.join(actual_output_dir, file)) else: logger.error(f"Expected directory {actual_output_dir} does not exist. Check the structure of the zip file.") actual_output_dir = output_dir # fallback in case the structure is not as expected return actual_output_dir def run_graphrag_query(query, ragtest_dir): # Log the directory and its contents logger.info(f"Running GraphRAG query with root: {ragtest_dir}") logger.info(f"Contents of {ragtest_dir}:") for file in os.listdir(ragtest_dir): logger.info(os.path.join(ragtest_dir, file)) # Define the command command = [ "python", "-m", "graphrag.query", "--root", ragtest_dir, "--method", "global", query ] # Run the command result = subprocess.run(command, capture_output=True, text=True) # Return the output or error message if result.returncode == 0: return result.stdout else: logger.error(f"GraphRAG query failed with error: {result.stderr}") return result.stderr def clean_response(response): # Find the position of "SUCCESS: Global Search Response:" search_str = "SUCCESS: Global Search Response:" start_index = response.find(search_str) # If the search string is found, return the substring starting from after this string if start_index != -1: # Add the length of search_str to start_index to begin after the search string return response[start_index + len(search_str):].strip() else: # If the search string is not found, return the original response return response def qa_tool_graph_rag(user_question): original_dir = os.getcwd() # Store the original directory try: zip_path = os.getenv('ZIP_PATH', '/home/user/app/ragtest.zip') extract_path = os.getenv('EXTRACT_PATH', '/home/user/app') output_dir = unzip_folder(zip_path, extract_path) os.chdir(extract_path) # Combine patient_info with user_question combined_input = f"{patient_info}\n\n{user_question}" # Run the GraphRAG query with the combined input raw_answer = run_graphrag_query(combined_input, output_dir) # Clean the response to remove everything before "SUCCESS: Global Search Response:" answer = clean_response(raw_answer) logger.info(f"GraphRAG answer generated: {answer}") images = [] # Adjust as needed for your application return answer, images, gr.update(visible=True), gr.update(visible=True) except Exception as e: logger.error(f"Error in GraphRAG processing: {str(e)}") return f"An error occurred: {str(e)}", [], gr.update(visible=False), gr.update(visible=False) finally: if 'output_dir' in locals(): shutil.rmtree(output_dir) os.chdir(original_dir)