File size: 5,073 Bytes
86eca65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
414e8dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86eca65
414e8dd
86eca65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
414e8dd
86eca65
 
 
 
 
 
 
 
 
 
414e8dd
86eca65
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import os
import logging
import re
import shutil
import tempfile
import time
from typing import Optional
import zipfile

import gdown
from pypdf import PdfReader
import docx as python_docx

logger = logging.getLogger(__name__)

def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]:
    logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}")
    text_content = None
    try:
        if file_type == 'pdf':
            reader = PdfReader(file_path)
            text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text())
        elif file_type == 'docx':
            doc = python_docx.Document(file_path)
            text_content = "\n".join(para.text for para in doc.paragraphs if para.text)
        elif file_type == 'txt':
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                text_content = f.read()
        else:
            logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type}")
            return None

        if not text_content or not text_content.strip():
            logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}")
            return None
        
        return text_content.strip()
    except Exception as e:
        logger.error(f"[TEXT_EXTRACTION] Error extracting text: {e}", exc_info=True)
        return None

FAISS_RAG_SUPPORTED_EXTENSIONS = {
    'pdf': lambda path: extract_text_from_file(path, 'pdf'),
    'docx': lambda path: extract_text_from_file(path, 'docx'),
    'txt': lambda path: extract_text_from_file(path, 'txt'),
}

def get_id_from_gdrive_input(url_or_id: str) -> Optional[str]:
    if not url_or_id: return None
    match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id)
    if match_folder: return match_folder.group(1)
    match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id)
    if match_file_d: return match_file_d.group(1)
    match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id)
    if match_uc: return match_uc.group(1)
    return url_or_id if len(url_or_id) > 10 else None

def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool:
    """

    Downloads a single file (like users.csv) from GDrive to a specific path.

    """
    logger.info(f"[GDRIVE_SINGLE] Downloading file. Input: {file_id_or_url}")
    file_id = get_id_from_gdrive_input(file_id_or_url)
    if not file_id: 
        logger.error("[GDRIVE_SINGLE] Invalid ID")
        return False

    try:
        # Ensure dir exists
        os.makedirs(os.path.dirname(target_path), exist_ok=True)
        # fuzzy=True allows gdown to handle permissions more gracefully
        gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True)
        
        if os.path.exists(target_path) and os.path.getsize(target_path) > 0:
            logger.info("[GDRIVE_SINGLE] Success.")
            return True
        else:
            logger.error("[GDRIVE_SINGLE] Downloaded file is empty or missing.")
            return False
    except Exception as e:
        logger.error(f"[GDRIVE_SINGLE] Error: {e}", exc_info=True)
        return False

def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool:
    logger.info(f"[GDRIVE] Downloading folder. Input: {folder_id_or_url}")
    folder_id = get_id_from_gdrive_input(folder_id_or_url)
    if not folder_id: return False

    temp_dir = tempfile.mkdtemp()
    try:
        gdown.download_folder(id=folder_id, output=temp_dir, quiet=False, use_cookies=False)
        
        if not os.path.exists(target_dir_for_contents):
            os.makedirs(target_dir_for_contents)

        src_root = temp_dir
        if len(os.listdir(temp_dir)) == 1 and os.path.isdir(os.path.join(temp_dir, os.listdir(temp_dir)[0])):
            src_root = os.path.join(temp_dir, os.listdir(temp_dir)[0])

        for item in os.listdir(src_root):
            shutil.move(os.path.join(src_root, item), os.path.join(target_dir_for_contents, item))
            
        logger.info(f"[GDRIVE] Download complete.")
        return True
    except Exception as e:
        logger.error(f"[GDRIVE] Error: {e}", exc_info=True)
        return False
    finally:
        shutil.rmtree(temp_dir, ignore_errors=True)

def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool:
    logger.info(f"[GDRIVE_ZIP] Downloading ZIP. Input: {file_id_or_url}")
    file_id = get_id_from_gdrive_input(file_id_or_url)
    if not file_id: return False

    temp_zip = os.path.join(tempfile.gettempdir(), "temp_download.zip")
    try:
        gdown.download(id=file_id, output=temp_zip, quiet=False)
        with zipfile.ZipFile(temp_zip, 'r') as zip_ref:
            zip_ref.extractall(target_extraction_dir)
        return True
    except Exception as e:
        logger.error(f"[GDRIVE_ZIP] Error: {e}", exc_info=True)
        return False