Spaces:
Running
Running
File size: 6,393 Bytes
ca6e669 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | import os
import logging
import re
import shutil
import tempfile
import zipfile
import requests
from typing import Optional
from bs4 import BeautifulSoup
import gdown
from pypdf import PdfReader
import docx as python_docx
logger = logging.getLogger(__name__)
def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]:
logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}")
text_content = None
try:
if file_type == 'pdf':
reader = PdfReader(file_path)
text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text())
elif file_type == 'docx':
doc = python_docx.Document(file_path)
text_content = "\n".join(para.text for para in doc.paragraphs if para.text)
elif file_type == 'txt':
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text_content = f.read()
else:
logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type}")
return None
if not text_content or not text_content.strip():
logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}")
return None
return text_content.strip()
except Exception as e:
logger.error(f"[TEXT_EXTRACTION] Error extracting text: {e}", exc_info=True)
return None
FAISS_RAG_SUPPORTED_EXTENSIONS = {
'pdf': lambda path: extract_text_from_file(path, 'pdf'),
'docx': lambda path: extract_text_from_file(path, 'docx'),
'txt': lambda path: extract_text_from_file(path, 'txt'),
'csv': lambda path: "CSV_HANDLED_NATIVELY" # Bypassed directly in components to allow row-by-row chunks
}
def fetch_and_clean_url(url: str, output_txt_path: str) -> bool:
"""Fetches HTML from a URL, cleans it, and saves it as pure text."""
try:
logger.info(f"[URL_FETCH] Fetching and cleaning data from: {url}")
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Remove non-content tags completely
for tag in soup(["script", "style", "nav", "footer", "header", "noscript"]):
tag.decompose()
# Attempt to find specific content divs if they exist (generalizes rentry and others)
entry = soup.find('div', class_='entry-text')
if entry:
text = entry.get_text(separator='\n', strip=True)
else:
# Fallback for other sites or layout changes
text = soup.body.get_text(separator='\n', strip=True) if soup.body else soup.get_text(separator='\n', strip=True)
# Clean up excessive blank lines
text = re.sub(r'\n\s*\n', '\n\n', text)
# Ensure directory exists
os.makedirs(os.path.dirname(output_txt_path), exist_ok=True)
with open(output_txt_path, 'w', encoding='utf-8') as f:
f.write(text)
logger.info(f"[URL_FETCH] Success! Text saved to {output_txt_path}")
return True
except Exception as e:
logger.error(f"[URL_FETCH] Error fetching/cleaning URL {url}: {e}", exc_info=True)
return False
def get_id_from_gdrive_input(url_or_id: str) -> Optional[str]:
if not url_or_id: return None
match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id)
if match_folder: return match_folder.group(1)
match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id)
if match_file_d: return match_file_d.group(1)
match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id)
if match_uc: return match_uc.group(1)
return url_or_id if len(url_or_id) > 10 else None
def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool:
logger.info(f"[GDRIVE_SINGLE] Downloading file. Input: {file_id_or_url}")
file_id = get_id_from_gdrive_input(file_id_or_url)
if not file_id: return False
try:
os.makedirs(os.path.dirname(target_path), exist_ok=True)
gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True)
if os.path.exists(target_path) and os.path.getsize(target_path) > 0:
return True
return False
except Exception as e:
logger.error(f"[GDRIVE_SINGLE] Error: {e}", exc_info=True)
return False
def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool:
logger.info(f"[GDRIVE] Downloading folder. Input: {folder_id_or_url}")
folder_id = get_id_from_gdrive_input(folder_id_or_url)
if not folder_id: return False
temp_dir = tempfile.mkdtemp()
try:
gdown.download_folder(id=folder_id, output=temp_dir, quiet=False, use_cookies=False)
if not os.path.exists(target_dir_for_contents):
os.makedirs(target_dir_for_contents)
src_root = temp_dir
if len(os.listdir(temp_dir)) == 1 and os.path.isdir(os.path.join(temp_dir, os.listdir(temp_dir)[0])):
src_root = os.path.join(temp_dir, os.listdir(temp_dir)[0])
for item in os.listdir(src_root):
shutil.move(os.path.join(src_root, item), os.path.join(target_dir_for_contents, item))
return True
except Exception as e:
logger.error(f"[GDRIVE] Error: {e}", exc_info=True)
return False
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool:
logger.info(f"[GDRIVE_ZIP] Downloading ZIP. Input: {file_id_or_url}")
file_id = get_id_from_gdrive_input(file_id_or_url)
if not file_id: return False
temp_zip = os.path.join(tempfile.gettempdir(), "temp_download.zip")
try:
gdown.download(id=file_id, output=temp_zip, quiet=False)
with zipfile.ZipFile(temp_zip, 'r') as zip_ref:
zip_ref.extractall(target_extraction_dir)
return True
except Exception as e:
logger.error(f"[GDRIVE_ZIP] Error: {e}", exc_info=True)
return False |