File size: 3,012 Bytes
358c9e4
 
 
 
f38d527
358c9e4
f38d527
358c9e4
 
 
 
 
 
 
 
8929016
 
 
358c9e4
2ac1935
358c9e4
2ac1935
 
358c9e4
2ac1935
 
 
 
 
 
 
 
 
 
f1172de
8929016
2ac1935
 
d0bbbfb
 
 
 
 
f38d527
 
8929016
518225a
f38d527
 
 
d0bbbfb
f38d527
 
d0bbbfb
f38d527
 
 
2ac1935
f38d527
 
f1172de
6c94fb7
358c9e4
f38d527
8929016
358c9e4
 
 
8929016
 
58d53e4
358c9e4
f38d527
358c9e4
6c94fb7
358c9e4
 
 
 
 
 
 
 
8929016
 
 
6c94fb7
2ac1935
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import os
import zipfile
import shutil
import logging
import subprocess
from dotenv import load_dotenv
import gradio as gr

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()

def unzip_folder(zip_path, extract_path):
    output_dir = os.path.join(extract_path, "ragtest")
    os.makedirs(output_dir, exist_ok=True)
    
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    
    # Adjust to handle the extra ragtest folder inside the zip
    actual_output_dir = os.path.join(output_dir, "ragtest")
    
    if os.path.exists(actual_output_dir):
        logger.info(f"Extracted contents to {actual_output_dir}")
        logger.info(f"Contents of {actual_output_dir}:")
        for file in os.listdir(actual_output_dir):
            logger.info(os.path.join(actual_output_dir, file))
    else:
        logger.error(f"Expected directory {actual_output_dir} does not exist. Check the structure of the zip file.")
        actual_output_dir = output_dir  # fallback in case the structure is not as expected
    
    return actual_output_dir

def run_graphrag_query(query, ragtest_dir):
    # Log the directory and its contents
    logger.info(f"Running GraphRAG query with root: {ragtest_dir}")
    logger.info(f"Contents of {ragtest_dir}:")
    for file in os.listdir(ragtest_dir):
        logger.info(os.path.join(ragtest_dir, file))

    # Define the command
    command = [
        "python", "-m", "graphrag.query",
        "--root", ragtest_dir,
        "--method", "global",
        query
    ]
    
    # Run the command
    result = subprocess.run(command, capture_output=True, text=True)
    
    # Return the output or error message
    if result.returncode == 0:
        return result.stdout
    else:
        logger.error(f"GraphRAG query failed with error: {result.stderr}")
        return result.stderr

def qa_tool_graph_rag(user_question):
    original_dir = os.getcwd()  # Store the original directory
    try:
        zip_path = os.getenv('ZIP_PATH', '/home/user/app/ragtest.zip')
        extract_path = os.getenv('EXTRACT_PATH', '/home/user/app')
        
        output_dir = unzip_folder(zip_path, extract_path)
        
        os.chdir(extract_path)
        
        answer = run_graphrag_query(user_question, output_dir)
        
        logger.info(f"GraphRAG answer generated: {answer}")
        
        images = []  # Adjust as needed for your application
        
        return answer, images, gr.update(visible=True), gr.update(visible=True)
    
    except Exception as e:
        logger.error(f"Error in GraphRAG processing: {str(e)}")
        return f"An error occurred: {str(e)}", [], gr.update(visible=False), gr.update(visible=False)
    
    finally:
        if 'output_dir' in locals():
            shutil.rmtree(output_dir)
        
        os.chdir(original_dir)  # Return to the original directory