RAG-Brain / utils.py
SakibAhmed's picture
Upload 5 files
4dacb06 verified
import os
import logging
import re
import shutil
import tempfile
import zipfile
import requests
from typing import Optional
from bs4 import BeautifulSoup
import gdown
from pypdf import PdfReader
import docx as python_docx
logger = logging.getLogger(__name__)
def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]:
logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}")
text_content = None
try:
if file_type == 'pdf':
reader = PdfReader(file_path)
text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text())
elif file_type == 'docx':
doc = python_docx.Document(file_path)
text_content = "\n".join(para.text for para in doc.paragraphs if para.text)
elif file_type == 'txt':
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text_content = f.read()
else:
logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type}")
return None
if not text_content or not text_content.strip():
logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}")
return None
return text_content.strip()
except Exception as e:
logger.error(f"[TEXT_EXTRACTION] Error extracting text: {e}", exc_info=True)
return None
FAISS_RAG_SUPPORTED_EXTENSIONS = {
'pdf': lambda path: extract_text_from_file(path, 'pdf'),
'docx': lambda path: extract_text_from_file(path, 'docx'),
'txt': lambda path: extract_text_from_file(path, 'txt'),
}
def fetch_and_clean_rentry(url: str, output_txt_path: str) -> bool:
"""Fetches HTML from a URL (specifically rentry.co), cleans it, and saves it as pure text."""
try:
logger.info(f"[URL_FETCH] Fetching and cleaning data from: {url}")
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Remove non-content tags completely
for tag in soup(["script", "style", "nav", "footer", "header", "noscript"]):
tag.decompose()
# Attempt to find the specific rentry content div
entry = soup.find('div', class_='entry-text')
if entry:
text = entry.get_text(separator='\n', strip=True)
else:
# Fallback for other sites or layout changes
text = soup.body.get_text(separator='\n', strip=True) if soup.body else soup.get_text(separator='\n', strip=True)
# Clean up excessive blank lines
text = re.sub(r'\n\s*\n', '\n\n', text)
# Ensure Q&A identifiers always start on a new line to act as strong chunk boundaries
text = re.sub(r'(?<!\n)([Qq](?:uestion|UESTION)?\s*\d*\s*:)', r'\n\n\1', text)
text = re.sub(r'\n{3,}', '\n\n', text)
# Ensure directory exists
os.makedirs(os.path.dirname(output_txt_path), exist_ok=True)
with open(output_txt_path, 'w', encoding='utf-8') as f:
f.write(text.strip())
logger.info(f"[URL_FETCH] Success! Text saved to {output_txt_path}")
return True
except Exception as e:
logger.error(f"[URL_FETCH] Error fetching/cleaning URL {url}: {e}", exc_info=True)
return False
def get_id_from_gdrive_input(url_or_id: str) -> Optional[str]:
if not url_or_id: return None
match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id)
if match_folder: return match_folder.group(1)
match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id)
if match_file_d: return match_file_d.group(1)
match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id)
if match_uc: return match_uc.group(1)
return url_or_id if len(url_or_id) > 10 else None
def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool:
logger.info(f"[GDRIVE_SINGLE] Downloading file. Input: {file_id_or_url}")
file_id = get_id_from_gdrive_input(file_id_or_url)
if not file_id: return False
try:
os.makedirs(os.path.dirname(target_path), exist_ok=True)
gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True)
if os.path.exists(target_path) and os.path.getsize(target_path) > 0:
return True
return False
except Exception as e:
logger.error(f"[GDRIVE_SINGLE] Error: {e}", exc_info=True)
return False
def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool:
logger.info(f"[GDRIVE] Downloading folder. Input: {folder_id_or_url}")
folder_id = get_id_from_gdrive_input(folder_id_or_url)
if not folder_id: return False
temp_dir = tempfile.mkdtemp()
try:
gdown.download_folder(id=folder_id, output=temp_dir, quiet=False, use_cookies=False)
if not os.path.exists(target_dir_for_contents):
os.makedirs(target_dir_for_contents)
src_root = temp_dir
if len(os.listdir(temp_dir)) == 1 and os.path.isdir(os.path.join(temp_dir, os.listdir(temp_dir)[0])):
src_root = os.path.join(temp_dir, os.listdir(temp_dir)[0])
for item in os.listdir(src_root):
shutil.move(os.path.join(src_root, item), os.path.join(target_dir_for_contents, item))
return True
except Exception as e:
logger.error(f"[GDRIVE] Error: {e}", exc_info=True)
return False
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool:
logger.info(f"[GDRIVE_ZIP] Downloading ZIP. Input: {file_id_or_url}")
file_id = get_id_from_gdrive_input(file_id_or_url)
if not file_id: return False
temp_zip = os.path.join(tempfile.gettempdir(), "temp_download.zip")
try:
gdown.download(id=file_id, output=temp_zip, quiet=False)
with zipfile.ZipFile(temp_zip, 'r') as zip_ref:
zip_ref.extractall(target_extraction_dir)
return True
except Exception as e:
logger.error(f"[GDRIVE_ZIP] Error: {e}", exc_info=True)
return False