questrag-backend / app /services /file_service.py
eeshanyaj's picture
fixed minor loading error
0db4df6
import os
from pathlib import Path
from typing import Optional, Dict, Any
from fastapi import UploadFile, HTTPException
import pytesseract
from PIL import Image
import PyPDF2
from docx import Document
from io import BytesIO
from app.utils.file_utils import (
validate_file_type, validate_file_size, generate_unique_filename,
save_upload_file, ALLOWED_IMAGE_TYPES,
# ALLOWED_DOC_TYPES,
ALLOWED_AUDIO_TYPES
)
from app.config import settings
class FileService:
"""File processing service for images, PDFs, documents, and audio"""
def __init__(self):
self.upload_dir = Path(settings.UPLOAD_DIR)
self.upload_dir.mkdir(parents=True, exist_ok=True)
print("✅ FileService initialized")
async def process_image(self, file: UploadFile, user_id: str) -> Dict[str, Any]:
"""
Upload image + OCR extraction.
Args:
file: Uploaded image file
user_id: User ID (for file organization)
Returns:
Dict with file_id, path, extracted_text, size
"""
if not validate_file_type(file, ALLOWED_IMAGE_TYPES):
raise HTTPException(400, "Invalid image type. Allowed: JPG, PNG, WEBP")
if not validate_file_size(file):
raise HTTPException(400, "File too large (max 10MB)")
# Save file
filename = generate_unique_filename(file.filename)
filepath = self.upload_dir / "images" / user_id / filename
await save_upload_file(file, filepath)
# OCR extraction
try:
image = Image.open(filepath)
text = pytesseract.image_to_string(image)
except Exception as e:
print(f"⚠️ OCR failed: {e}")
text = ""
return {
"file_id": filename,
"file_path": str(filepath.relative_to(self.upload_dir)),
"file_type": "image",
"extracted_text": text.strip(),
"size": filepath.stat().st_size,
"original_filename": file.filename
}
async def process_pdf(self, file: UploadFile, user_id: str) -> Dict[str, Any]:
"""
Upload PDF + text extraction.
Args:
file: Uploaded PDF file
user_id: User ID
Returns:
Dict with file_id, path, extracted_text, pages, size
"""
if file.content_type != "application/pdf":
raise HTTPException(400, "Invalid PDF file")
if not validate_file_size(file):
raise HTTPException(400, "File too large (max 10MB)")
# Save
filename = generate_unique_filename(file.filename)
filepath = self.upload_dir / "documents" / user_id / filename
await save_upload_file(file, filepath)
# Extract text
text = ""
pages = 0
try:
with open(filepath, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
pages = len(pdf_reader.pages)
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
except Exception as e:
print(f"⚠️ PDF extraction failed: {e}")
return {
"file_id": filename,
"file_path": str(filepath.relative_to(self.upload_dir)),
"file_type": "pdf",
"extracted_text": text.strip(),
"pages": pages,
"size": filepath.stat().st_size,
"original_filename": file.filename
}
async def process_docx(self, file: UploadFile, user_id: str) -> Dict[str, Any]:
"""
Upload DOCX + text extraction.
Args:
file: Uploaded DOCX file
user_id: User ID
Returns:
Dict with file_id, path, extracted_text, size
"""
if file.content_type != "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
raise HTTPException(400, "Invalid DOCX file")
if not validate_file_size(file):
raise HTTPException(400, "File too large (max 10MB)")
# Save
filename = generate_unique_filename(file.filename)
filepath = self.upload_dir / "documents" / user_id / filename
await save_upload_file(file, filepath)
# Extract
text = ""
try:
# doc = docx.Document(filepath)
# text = "\n".join([para.text for para in doc.paragraphs])
doc = Document(filepath)
text = "\n".join([p.text for p in doc.paragraphs])
except Exception as e:
print(f"⚠️ DOCX extraction failed: {e}")
return {
"file_id": filename,
"file_path": str(filepath.relative_to(self.upload_dir)),
"file_type": "docx",
"extracted_text": text.strip(),
"size": filepath.stat().st_size,
"original_filename": file.filename
}
async def process_text_file(self, file: UploadFile, user_id: str) -> Dict[str, Any]:
"""
Upload TXT file.
Args:
file: Uploaded text file
user_id: User ID
Returns:
Dict with file_id, path, extracted_text, size
"""
if file.content_type != "text/plain":
raise HTTPException(400, "Invalid text file")
if not validate_file_size(file):
raise HTTPException(400, "File too large (max 10MB)")
filename = generate_unique_filename(file.filename)
filepath = self.upload_dir / "documents" / user_id / filename
await save_upload_file(file, filepath)
text = ""
try:
with open(filepath, 'r', encoding='utf-8') as f:
text = f.read()
except Exception as e:
print(f"⚠️ Text file read failed: {e}")
return {
"file_id": filename,
"file_path": str(filepath.relative_to(self.upload_dir)),
"file_type": "text",
"extracted_text": text.strip(),
"size": filepath.stat().st_size,
"original_filename": file.filename
}
# ============================================================================
# NEW METHOD: Using HuggingFace Transformers Whisper (FREE!)
# ============================================================================
async def transcribe_audio(self, file: UploadFile, user_id: str) -> Dict[str, Any]:
"""
Speech-to-text using HuggingFace Transformers Whisper (FREE!).
Args:
file: Uploaded audio file
user_id: User ID
Returns:
Dict with file_id, path, transcription, size
"""
if not validate_file_type(file, ALLOWED_AUDIO_TYPES):
raise HTTPException(400, "Invalid audio type. Allowed: MP3, WAV, WEBM, OGG, M4A")
if not validate_file_size(file):
raise HTTPException(400, "File too large (max 10MB)")
# Save audio
filename = generate_unique_filename(file.filename)
filepath = self.upload_dir / "audio" / user_id / filename
await save_upload_file(file, filepath)
# Transcribe using HuggingFace Transformers Whisper (FREE!)
transcription = ""
try:
from transformers import pipeline
import torch
# Lazy load model (only first time)
if not hasattr(self, '_whisper_pipe'):
print("🎤 Loading Whisper model (one-time)...")
device = 0 if torch.cuda.is_available() else -1
self._whisper_pipe = pipeline(
"automatic-speech-recognition",
model="openai/whisper-small", # Small = fast, good accuracy
device=device
)
print("✅ Whisper model loaded")
# Transcribe
result = self._whisper_pipe(str(filepath))
transcription = result["text"]
except Exception as e:
print(f"⚠️ Whisper transcription failed: {e}")
raise HTTPException(500, f"Transcription failed: {str(e)}")
return {
"file_id": filename,
"file_path": str(filepath.relative_to(self.upload_dir)),
"file_type": "audio",
"transcription": transcription,
"size": filepath.stat().st_size,
"original_filename": file.filename
}
# ============================================================================
# Old method: OpenAI Whisper API (paid) kept for reference
# ============================================================================
# async def transcribe_audio(self, file: UploadFile, user_id: str) -> Dict[str, Any]:
# """
# Speech-to-text using OpenAI Whisper API.
# Args:
# file: Uploaded audio file
# user_id: User ID
# Returns:
# Dict with file_id, path, transcription, size
# """
# if not validate_file_type(file, ALLOWED_AUDIO_TYPES):
# raise HTTPException(400, "Invalid audio type. Allowed: MP3, WAV, WEBM, OGG, M4A")
# if not validate_file_size(file):
# raise HTTPException(400, "File too large (max 10MB)")
# # Save audio
# filename = generate_unique_filename(file.filename)
# filepath = self.upload_dir / "audio" / user_id / filename
# await save_upload_file(file, filepath)
# # Transcribe using OpenAI Whisper API
# transcription = ""
# try:
# from openai import OpenAI
# client = OpenAI(api_key=settings.OPENAI_API_KEY)
# with open(filepath, "rb") as audio_file:
# transcript = client.audio.transcriptions.create(
# model="whisper-1",
# file=audio_file,
# language="en" # Change if needed
# )
# transcription = transcript.text
# except Exception as e:
# print(f"⚠️ Whisper transcription failed: {e}")
# raise HTTPException(500, f"Transcription failed: {str(e)}")
# return {
# "file_id": filename,
# "file_path": str(filepath.relative_to(self.upload_dir)),
# "file_type": "audio",
# "transcription": transcription,
# "size": filepath.stat().st_size,
# "original_filename": file.filename
# }
def delete_file(self, file_path: str, user_id: str) -> bool:
"""
Delete uploaded file.
Args:
file_path: Relative file path (from upload_dir)
user_id: User ID (for security check)
Returns:
bool: True if deleted
"""
try:
# Security: Ensure file belongs to user
if user_id not in file_path:
return False
full_path = self.upload_dir / file_path
if full_path.exists() and full_path.is_file():
full_path.unlink()
return True
return False
except Exception as e:
print(f"⚠️ File deletion failed: {e}")
return False
# ============================================================================
# GLOBAL SERVICE INSTANCE
# ============================================================================
file_service = FileService()