ed-cad-ref / utils.py
SakibAhmed's picture
Upload 3 files
414e8dd verified
import os
import logging
import re
import shutil
import tempfile
import time
from typing import Optional
import zipfile
import gdown
from pypdf import PdfReader
import docx as python_docx
logger = logging.getLogger(__name__)
def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]:
logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}")
text_content = None
try:
if file_type == 'pdf':
reader = PdfReader(file_path)
text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text())
elif file_type == 'docx':
doc = python_docx.Document(file_path)
text_content = "\n".join(para.text for para in doc.paragraphs if para.text)
elif file_type == 'txt':
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text_content = f.read()
else:
logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type}")
return None
if not text_content or not text_content.strip():
logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}")
return None
return text_content.strip()
except Exception as e:
logger.error(f"[TEXT_EXTRACTION] Error extracting text: {e}", exc_info=True)
return None
FAISS_RAG_SUPPORTED_EXTENSIONS = {
'pdf': lambda path: extract_text_from_file(path, 'pdf'),
'docx': lambda path: extract_text_from_file(path, 'docx'),
'txt': lambda path: extract_text_from_file(path, 'txt'),
}
def get_id_from_gdrive_input(url_or_id: str) -> Optional[str]:
if not url_or_id: return None
match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id)
if match_folder: return match_folder.group(1)
match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id)
if match_file_d: return match_file_d.group(1)
match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id)
if match_uc: return match_uc.group(1)
return url_or_id if len(url_or_id) > 10 else None
def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool:
"""
Downloads a single file (like users.csv) from GDrive to a specific path.
"""
logger.info(f"[GDRIVE_SINGLE] Downloading file. Input: {file_id_or_url}")
file_id = get_id_from_gdrive_input(file_id_or_url)
if not file_id:
logger.error("[GDRIVE_SINGLE] Invalid ID")
return False
try:
# Ensure dir exists
os.makedirs(os.path.dirname(target_path), exist_ok=True)
# fuzzy=True allows gdown to handle permissions more gracefully
gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True)
if os.path.exists(target_path) and os.path.getsize(target_path) > 0:
logger.info("[GDRIVE_SINGLE] Success.")
return True
else:
logger.error("[GDRIVE_SINGLE] Downloaded file is empty or missing.")
return False
except Exception as e:
logger.error(f"[GDRIVE_SINGLE] Error: {e}", exc_info=True)
return False
def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool:
logger.info(f"[GDRIVE] Downloading folder. Input: {folder_id_or_url}")
folder_id = get_id_from_gdrive_input(folder_id_or_url)
if not folder_id: return False
temp_dir = tempfile.mkdtemp()
try:
gdown.download_folder(id=folder_id, output=temp_dir, quiet=False, use_cookies=False)
if not os.path.exists(target_dir_for_contents):
os.makedirs(target_dir_for_contents)
src_root = temp_dir
if len(os.listdir(temp_dir)) == 1 and os.path.isdir(os.path.join(temp_dir, os.listdir(temp_dir)[0])):
src_root = os.path.join(temp_dir, os.listdir(temp_dir)[0])
for item in os.listdir(src_root):
shutil.move(os.path.join(src_root, item), os.path.join(target_dir_for_contents, item))
logger.info(f"[GDRIVE] Download complete.")
return True
except Exception as e:
logger.error(f"[GDRIVE] Error: {e}", exc_info=True)
return False
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool:
logger.info(f"[GDRIVE_ZIP] Downloading ZIP. Input: {file_id_or_url}")
file_id = get_id_from_gdrive_input(file_id_or_url)
if not file_id: return False
temp_zip = os.path.join(tempfile.gettempdir(), "temp_download.zip")
try:
gdown.download(id=file_id, output=temp_zip, quiet=False)
with zipfile.ZipFile(temp_zip, 'r') as zip_ref:
zip_ref.extractall(target_extraction_dir)
return True
except Exception as e:
logger.error(f"[GDRIVE_ZIP] Error: {e}", exc_info=True)
return False