File size: 6,393 Bytes
ca6e669
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import os
import logging
import re
import shutil
import tempfile
import zipfile
import requests
from typing import Optional
from bs4 import BeautifulSoup

import gdown
from pypdf import PdfReader
import docx as python_docx

logger = logging.getLogger(__name__)

def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]:
    logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}")
    text_content = None
    try:
        if file_type == 'pdf':
            reader = PdfReader(file_path)
            text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text())
        elif file_type == 'docx':
            doc = python_docx.Document(file_path)
            text_content = "\n".join(para.text for para in doc.paragraphs if para.text)
        elif file_type == 'txt':
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                text_content = f.read()
        else:
            logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type}")
            return None

        if not text_content or not text_content.strip():
            logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}")
            return None
        
        return text_content.strip()
    except Exception as e:
        logger.error(f"[TEXT_EXTRACTION] Error extracting text: {e}", exc_info=True)
        return None

FAISS_RAG_SUPPORTED_EXTENSIONS = {
    'pdf': lambda path: extract_text_from_file(path, 'pdf'),
    'docx': lambda path: extract_text_from_file(path, 'docx'),
    'txt': lambda path: extract_text_from_file(path, 'txt'),
    'csv': lambda path: "CSV_HANDLED_NATIVELY" # Bypassed directly in components to allow row-by-row chunks
}

def fetch_and_clean_url(url: str, output_txt_path: str) -> bool:
    """Fetches HTML from a URL, cleans it, and saves it as pure text."""
    try:
        logger.info(f"[URL_FETCH] Fetching and cleaning data from: {url}")
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
        response = requests.get(url, headers=headers, timeout=15)
        response.raise_for_status()
        
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # Remove non-content tags completely
        for tag in soup(["script", "style", "nav", "footer", "header", "noscript"]):
            tag.decompose()
        
        # Attempt to find specific content divs if they exist (generalizes rentry and others)
        entry = soup.find('div', class_='entry-text')
        if entry:
            text = entry.get_text(separator='\n', strip=True)
        else:
            # Fallback for other sites or layout changes
            text = soup.body.get_text(separator='\n', strip=True) if soup.body else soup.get_text(separator='\n', strip=True)
        
        # Clean up excessive blank lines
        text = re.sub(r'\n\s*\n', '\n\n', text)
        
        # Ensure directory exists
        os.makedirs(os.path.dirname(output_txt_path), exist_ok=True)
        
        with open(output_txt_path, 'w', encoding='utf-8') as f:
            f.write(text)
            
        logger.info(f"[URL_FETCH] Success! Text saved to {output_txt_path}")
        return True
    except Exception as e:
        logger.error(f"[URL_FETCH] Error fetching/cleaning URL {url}: {e}", exc_info=True)
        return False

def get_id_from_gdrive_input(url_or_id: str) -> Optional[str]:
    if not url_or_id: return None
    match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id)
    if match_folder: return match_folder.group(1)
    match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id)
    if match_file_d: return match_file_d.group(1)
    match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id)
    if match_uc: return match_uc.group(1)
    return url_or_id if len(url_or_id) > 10 else None

def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool:
    logger.info(f"[GDRIVE_SINGLE] Downloading file. Input: {file_id_or_url}")
    file_id = get_id_from_gdrive_input(file_id_or_url)
    if not file_id: return False

    try:
        os.makedirs(os.path.dirname(target_path), exist_ok=True)
        gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True)
        
        if os.path.exists(target_path) and os.path.getsize(target_path) > 0:
            return True
        return False
    except Exception as e:
        logger.error(f"[GDRIVE_SINGLE] Error: {e}", exc_info=True)
        return False

def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool:
    logger.info(f"[GDRIVE] Downloading folder. Input: {folder_id_or_url}")
    folder_id = get_id_from_gdrive_input(folder_id_or_url)
    if not folder_id: return False

    temp_dir = tempfile.mkdtemp()
    try:
        gdown.download_folder(id=folder_id, output=temp_dir, quiet=False, use_cookies=False)
        if not os.path.exists(target_dir_for_contents):
            os.makedirs(target_dir_for_contents)

        src_root = temp_dir
        if len(os.listdir(temp_dir)) == 1 and os.path.isdir(os.path.join(temp_dir, os.listdir(temp_dir)[0])):
            src_root = os.path.join(temp_dir, os.listdir(temp_dir)[0])

        for item in os.listdir(src_root):
            shutil.move(os.path.join(src_root, item), os.path.join(target_dir_for_contents, item))
        return True
    except Exception as e:
        logger.error(f"[GDRIVE] Error: {e}", exc_info=True)
        return False
    finally:
        shutil.rmtree(temp_dir, ignore_errors=True)

def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool:
    logger.info(f"[GDRIVE_ZIP] Downloading ZIP. Input: {file_id_or_url}")
    file_id = get_id_from_gdrive_input(file_id_or_url)
    if not file_id: return False

    temp_zip = os.path.join(tempfile.gettempdir(), "temp_download.zip")
    try:
        gdown.download(id=file_id, output=temp_zip, quiet=False)
        with zipfile.ZipFile(temp_zip, 'r') as zip_ref:
            zip_ref.extractall(target_extraction_dir)
        return True
    except Exception as e:
        logger.error(f"[GDRIVE_ZIP] Error: {e}", exc_info=True)
        return False