File size: 3,957 Bytes
358c9e4
 
613fc49
358c9e4
 
f38d527
358c9e4
f38d527
fb3c5a2
358c9e4
2423011
358c9e4
 
 
 
 
 
8929016
 
 
358c9e4
2ac1935
358c9e4
2ac1935
 
358c9e4
2ac1935
 
 
 
 
 
 
 
 
 
f1172de
8929016
2ac1935
 
d0bbbfb
 
 
 
 
f38d527
 
8929016
518225a
f38d527
 
 
d0bbbfb
f38d527
 
d0bbbfb
f38d527
 
 
2ac1935
f38d527
 
0f81c79
 
 
 
 
604bbfe
0f81c79
604bbfe
 
0f81c79
 
 
 
f1172de
6c94fb7
358c9e4
f38d527
8929016
358c9e4
 
 
8929016
 
fb3c5a2
864b6b0
fb3c5a2
 
0f81c79
 
 
 
358c9e4
f38d527
358c9e4
6c94fb7
358c9e4
 
 
 
 
 
 
 
8929016
 
 
fb3c5a2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import os
import zipfile
import re
import shutil
import logging
import subprocess
from dotenv import load_dotenv
import gradio as gr
from utils import patient_info  # Importing patient_info from utils

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()

def unzip_folder(zip_path, extract_path):
    output_dir = os.path.join(extract_path, "ragtest")
    os.makedirs(output_dir, exist_ok=True)
    
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    
    # Adjust to handle the extra ragtest folder inside the zip
    actual_output_dir = os.path.join(output_dir, "ragtest")
    
    if os.path.exists(actual_output_dir):
        logger.info(f"Extracted contents to {actual_output_dir}")
        logger.info(f"Contents of {actual_output_dir}:")
        for file in os.listdir(actual_output_dir):
            logger.info(os.path.join(actual_output_dir, file))
    else:
        logger.error(f"Expected directory {actual_output_dir} does not exist. Check the structure of the zip file.")
        actual_output_dir = output_dir  # fallback in case the structure is not as expected
    
    return actual_output_dir

def run_graphrag_query(query, ragtest_dir):
    # Log the directory and its contents
    logger.info(f"Running GraphRAG query with root: {ragtest_dir}")
    logger.info(f"Contents of {ragtest_dir}:")
    for file in os.listdir(ragtest_dir):
        logger.info(os.path.join(ragtest_dir, file))

    # Define the command
    command = [
        "python", "-m", "graphrag.query",
        "--root", ragtest_dir,
        "--method", "global",
        query
    ]
    
    # Run the command
    result = subprocess.run(command, capture_output=True, text=True)
    
    # Return the output or error message
    if result.returncode == 0:
        return result.stdout
    else:
        logger.error(f"GraphRAG query failed with error: {result.stderr}")
        return result.stderr

def clean_response(response):
    # Find the position of "SUCCESS: Global Search Response:"
    search_str = "SUCCESS: Global Search Response:"
    start_index = response.find(search_str)
    
    # If the search string is found, return the substring starting from after this string
    if start_index != -1:
        # Add the length of search_str to start_index to begin after the search string
        return response[start_index + len(search_str):].strip()
    else:
        # If the search string is not found, return the original response
        return response
        
def qa_tool_graph_rag(user_question):
    original_dir = os.getcwd()  # Store the original directory
    try:
        zip_path = os.getenv('ZIP_PATH', '/home/user/app/ragtest.zip')
        extract_path = os.getenv('EXTRACT_PATH', '/home/user/app')
        
        output_dir = unzip_folder(zip_path, extract_path)
        
        os.chdir(extract_path)
        
        # Combine patient_info with user_question
        combined_input = f"{patient_info}\n\n{user_question}"
        
        # Run the GraphRAG query with the combined input
        raw_answer = run_graphrag_query(combined_input, output_dir)
        
        # Clean the response to remove everything before "SUCCESS: Global Search Response:"
        answer = clean_response(raw_answer)
        
        logger.info(f"GraphRAG answer generated: {answer}")
        
        images = []  # Adjust as needed for your application
        
        return answer, images, gr.update(visible=True), gr.update(visible=True)
    
    except Exception as e:
        logger.error(f"Error in GraphRAG processing: {str(e)}")
        return f"An error occurred: {str(e)}", [], gr.update(visible=False), gr.update(visible=False)
    
    finally:
        if 'output_dir' in locals():
            shutil.rmtree(output_dir)
        
        os.chdir(original_dir)