Quotes / app.py
sivakumar330's picture
Update app.py
6d18a2d verified
# app.py - Complete Flask application for eBook management with Hugging Face Dataset Storage
# Guaranteed working download for all file types
import os
import sys
import uuid
import datetime
import tempfile
import json
import hashlib
import random
import mimetypes
from flask import Flask, request, jsonify, redirect, send_file, render_template_string, url_for, make_response, abort
from flask_cors import CORS
from werkzeug.utils import secure_filename
# Try to import huggingface_hub, provide helpful error if not installed
try:
from huggingface_hub import HfApi
HF_HUB_AVAILABLE = True
except ImportError:
HF_HUB_AVAILABLE = False
print("=" * 60)
print("⚠️ WARNING: huggingface_hub module not installed!")
print("Please install it using: pip install huggingface-hub")
print("=" * 60)
import requests
from pathlib import Path
import shutil
app = Flask(__name__)
CORS(app) # Enable CORS for frontend communication
# ===== CONFIGURATION =====
ALLOWED_EXTENSIONS = {'pdf', 'txt', 'docx'}
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB limit
# Hugging Face Configuration - Set these in Space Secrets
HF_TOKEN = os.environ.get("HF_TOKEN") # Add in Space Secrets
HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "sivakumar330/Ebook") # Change this
# Local cache folder (temporary, for file operations)
CACHE_FOLDER = '/tmp/ebook_cache'
os.makedirs(CACHE_FOLDER, exist_ok=True)
# Initialize Hugging Face API if available
api = None
if HF_HUB_AVAILABLE and HF_TOKEN:
try:
api = HfApi()
print("βœ… Hugging Face Hub API initialized successfully")
except Exception as e:
print(f"⚠️ Failed to initialize Hugging Face API: {e}")
api = None
# Default cover images for eBooks
DEFAULT_COVERS = [
"https://images.unsplash.com/photo-1512820790803-83ca734da794?w=500",
"https://images.unsplash.com/photo-1495446815901-a7297e633e8d?w=500",
"https://images.unsplash.com/photo-1532012197267-da84d127e765?w=500",
"https://images.unsplash.com/photo-1456513080510-7bf3a84b82f8?w=500",
"https://images.unsplash.com/photo-1524995997946-a1c2e315a42f?w=500",
"https://images.unsplash.com/photo-1544716278-ca5e3f4abd8c?w=500",
"https://images.unsplash.com/photo-1526243741027-444d633d7365?w=500"
]
# ===== HELPER FUNCTIONS =====
def allowed_file(filename):
"""Check if file extension is allowed"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_file_size_format(size_bytes):
"""Convert file size to human readable format"""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.2f} KB"
else:
return f"{size_bytes / (1024 * 1024):.2f} MB"
def get_file_hash(file_path):
"""Generate MD5 hash of file"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def get_mime_type(filename):
"""Get MIME type based on file extension"""
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
mime_types = {
'pdf': 'application/pdf',
'epub': 'application/epub+zip',
'txt': 'text/plain',
'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
}
return mime_types.get(ext, 'application/octet-stream')
# ===== HUGGING FACE DATASET OPERATIONS =====
def load_metadata():
"""Load all eBook metadata from Hugging Face dataset"""
if not HF_HUB_AVAILABLE or not api or not HF_TOKEN:
# Fallback to local file for development
local_metadata = os.path.join(CACHE_FOLDER, 'all_metadata.json')
if os.path.exists(local_metadata):
try:
with open(local_metadata, 'r') as f:
return json.load(f)
except:
pass
return {}
try:
# Try to download metadata file from dataset
metadata_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/metadata/all_metadata.json"
headers = {}
if HF_TOKEN:
headers["Authorization"] = f"Bearer {HF_TOKEN}"
response = requests.get(metadata_url, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
else:
# Return empty dict if metadata doesn't exist
print(f"πŸ“ Metadata not found (status: {response.status_code}), creating new...")
return {}
except Exception as e:
print(f"⚠️ Error loading metadata: {e}")
# Try to load from local cache
local_metadata = os.path.join(CACHE_FOLDER, 'all_metadata.json')
if os.path.exists(local_metadata):
try:
with open(local_metadata, 'r') as f:
return json.load(f)
except:
pass
return {}
def save_metadata(metadata):
"""Save all eBook metadata to Hugging Face dataset"""
# Always save locally first as backup
local_metadata = os.path.join(CACHE_FOLDER, 'all_metadata.json')
try:
with open(local_metadata, 'w') as f:
json.dump(metadata, f, indent=2)
except Exception as e:
print(f"⚠️ Error saving local metadata: {e}")
if not HF_HUB_AVAILABLE or not api or not HF_TOKEN:
print("⚠️ Hugging Face Hub not available, metadata saved locally only")
return True
try:
# Save metadata to temporary file
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as tmp:
json.dump(metadata, tmp, indent=2)
tmp_path = tmp.name
# Upload metadata file to dataset
api.upload_file(
path_or_fileobj=tmp_path,
path_in_repo="metadata/all_metadata.json",
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN
)
os.remove(tmp_path)
print(f"βœ… Metadata saved successfully: {len(metadata)} ebooks")
return True
except Exception as e:
print(f"❌ Error saving metadata: {e}")
# Metadata already saved locally, so return True
return True
def upload_file_to_dataset(file_path, remote_path):
"""Upload a file to Hugging Face dataset"""
if not HF_HUB_AVAILABLE or not api or not HF_TOKEN:
print("⚠️ Hugging Face Hub not available, cannot upload file")
return False
try:
api.upload_file(
path_or_fileobj=file_path,
path_in_repo=remote_path,
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN
)
return True
except Exception as e:
print(f"❌ Error uploading file: {e}")
return False
def delete_file_from_dataset(remote_path):
"""Delete a file from Hugging Face dataset"""
if not HF_HUB_AVAILABLE or not api or not HF_TOKEN:
print("⚠️ Hugging Face Hub not available, cannot delete file")
return False
try:
api.delete_file(
path_in_repo=remote_path,
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN
)
return True
except Exception as e:
print(f"❌ Error deleting file: {e}")
return False
def get_file_url(remote_path):
"""Get direct URL for file in dataset"""
return f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{remote_path}"
def download_file_from_dataset(remote_path):
"""Download file from dataset to local cache"""
try:
file_url = get_file_url(remote_path)
headers = {}
if HF_TOKEN:
headers["Authorization"] = f"Bearer {HF_TOKEN}"
response = requests.get(file_url, headers=headers, stream=True, timeout=30)
if response.status_code == 200:
# Save to cache
local_filename = os.path.join(CACHE_FOLDER, os.path.basename(remote_path))
with open(local_filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return local_filename
return None
except Exception as e:
print(f"❌ Error downloading file: {e}")
return None
# ===== SIMPLE HTML PAGE FOR TESTING =====
INDEX_HTML = '''
<!DOCTYPE html>
<html>
<head>
<title>eBook Download Test</title>
<style>
body { font-family: Arial; padding: 20px; background: #f5f5f5; }
.container { max-width: 800px; margin: 0 auto; background: white; padding: 20px; border-radius: 10px; }
.ebook { padding: 10px; margin: 10px 0; border: 1px solid #ddd; border-radius: 5px; }
.btn { padding: 8px 15px; margin: 5px; border: none; border-radius: 5px; cursor: pointer; }
.download { background: #4CAF50; color: white; }
.view { background: #2196F3; color: white; }
</style>
</head>
<body>
<div class="container">
<h1>πŸ“š eBook Download Test</h1>
<div id="ebooks"></div>
</div>
<script>
async function loadEbooks() {
const response = await fetch('/api/ebooks');
const ebooks = await response.json();
const html = ebooks.map(ebook => `
<div class="ebook">
<h3>${ebook.title}</h3>
<p>By: ${ebook.author} | Size: ${ebook.size_formatted} | Downloads: ${ebook.download_count}</p>
<button class="btn download" onclick="downloadEbook('${ebook.id}')">πŸ“₯ Download</button>
<button class="btn view" onclick="viewEbook('${ebook.id}')">πŸ‘οΈ View PDF</button>
</div>
`).join('');
document.getElementById('ebooks').innerHTML = html;
}
function downloadEbook(id) {
window.location.href = `/api/download/${id}`;
}
function viewEbook(id) {
window.location.href = `/api/view/${id}`;
}
loadEbooks();
</script>
</body>
</html>
'''
# ===== API ENDPOINTS =====
@app.route('/', methods=['GET'])
def index():
"""Root endpoint - HTML test page"""
return INDEX_HTML
@app.route('/api/upload', methods=['POST'])
def upload_ebook():
"""Upload eBook to Hugging Face dataset - Permanent storage"""
try:
# Check if file exists in request
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
# Check file type
if not allowed_file(file.filename):
return jsonify({'error': f'File type not allowed. Allowed types: {", ".join(ALLOWED_EXTENSIONS)}'}), 400
# Get metadata from form
title = request.form.get('title', '').strip()
author = request.form.get('author', '').strip()
category = request.form.get('category', 'General')
uploaded_by = request.form.get('uploaded_by', '').strip()
image_url = request.form.get('image_url', '').strip()
if not uploaded_by:
return jsonify({'error': 'User ID (uploaded_by) is required'}), 400
# Save file temporarily
with tempfile.NamedTemporaryFile(delete=False) as tmp:
file.save(tmp.name)
temp_path = tmp.name
file_size = os.path.getsize(temp_path)
# Check file size
if file_size > MAX_FILE_SIZE:
os.remove(temp_path)
return jsonify({'error': f'File too large. Max size: {MAX_FILE_SIZE/1024/1024}MB'}), 400
# Generate unique ID and filename
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
unique_id = str(uuid.uuid4())
original_filename = secure_filename(file.filename)
file_ext = original_filename.rsplit('.', 1)[1].lower()
# Organize by year/month
year_month = datetime.datetime.now().strftime('%Y/%m')
# Create filename with timestamp and unique ID
stored_filename = f"{timestamp}_{unique_id}_{original_filename}"
remote_path = f"ebooks/{year_month}/{stored_filename}"
# Upload file to Hugging Face dataset (if available)
upload_success = True
if HF_HUB_AVAILABLE and api and HF_TOKEN:
upload_success = upload_file_to_dataset(temp_path, remote_path)
else:
# Fallback: Save locally
local_storage = os.path.join(CACHE_FOLDER, 'uploads', year_month)
os.makedirs(local_storage, exist_ok=True)
local_path = os.path.join(local_storage, stored_filename)
shutil.copy2(temp_path, local_path)
print(f"πŸ“ File saved locally: {local_path}")
# Clean up temp file
os.remove(temp_path)
if not upload_success and (HF_HUB_AVAILABLE and api and HF_TOKEN):
return jsonify({'error': 'Failed to upload file to Hugging Face dataset'}), 500
# Select random default cover if no image URL provided
if not image_url:
image_url = random.choice(DEFAULT_COVERS)
# Calculate file hash
file_hash = hashlib.md5(f"{unique_id}_{file_size}".encode()).hexdigest()
# Create metadata
ebook_metadata = {
'id': unique_id,
'filename': original_filename,
'stored_filename': stored_filename,
'stored_path': remote_path,
'title': title or original_filename,
'author': author or 'Unknown',
'category': category,
'image_url': image_url,
'uploaded_by': uploaded_by,
'size': file_size,
'size_formatted': get_file_size_format(file_size),
'file_hash': file_hash,
'upload_date': datetime.datetime.now().isoformat(),
'last_accessed': None,
'download_count': 0,
'storage_type': 'huggingface_dataset' if (HF_HUB_AVAILABLE and HF_TOKEN) else 'local_fallback',
'file_extension': file_ext
}
# Load existing metadata and add new entry
all_metadata = load_metadata()
all_metadata[unique_id] = ebook_metadata
# Save metadata back to dataset
if save_metadata(all_metadata):
storage_msg = "Hugging Face dataset" if (HF_HUB_AVAILABLE and HF_TOKEN) else "local storage (fallback)"
return jsonify({
'message': f'eBook uploaded successfully to {storage_msg}',
'ebook': ebook_metadata
}), 201
else:
return jsonify({'error': 'Failed to save metadata'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/ebooks', methods=['GET'])
def get_ebooks():
"""Get list of all eBooks from Hugging Face dataset"""
try:
metadata = load_metadata()
# Convert metadata dict to list
ebook_list = []
for ebook_id, ebook_data in metadata.items():
ebook_info = ebook_data.copy()
ebook_list.append(ebook_info)
# Sort by upload date (newest first)
ebook_list.sort(key=lambda x: x.get('upload_date', ''), reverse=True)
return jsonify(ebook_list), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/ebooks/<ebook_id>', methods=['GET'])
def get_ebook(ebook_id):
"""Get single eBook by ID"""
try:
metadata = load_metadata()
if ebook_id not in metadata:
return jsonify({'error': 'eBook not found'}), 404
ebook_data = metadata[ebook_id]
return jsonify(ebook_data), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/view/<ebook_id>', methods=['GET'])
def view_ebook(ebook_id):
"""View PDF in browser"""
try:
metadata = load_metadata()
if ebook_id not in metadata:
return jsonify({'error': 'eBook not found'}), 404
ebook_data = metadata[ebook_id]
# Check if it's a PDF
if ebook_data.get('file_extension') != 'pdf':
# For non-PDF files, force download
return redirect(f"/api/download/{ebook_id}")
# Get the file path
file_path = None
if HF_HUB_AVAILABLE and api and HF_TOKEN and 'stored_path' in ebook_data:
# Download from HF
file_path = download_file_from_dataset(ebook_data['stored_path'])
else:
# Try local file
local_path = os.path.join(CACHE_FOLDER, 'uploads', ebook_data.get('stored_path', ''))
if os.path.exists(local_path):
file_path = local_path
if not file_path or not os.path.exists(file_path):
return jsonify({'error': 'File not found'}), 404
# Increment download count
ebook_data['download_count'] = ebook_data.get('download_count', 0) + 1
ebook_data['last_accessed'] = datetime.datetime.now().isoformat()
# Update metadata
metadata[ebook_id] = ebook_data
save_metadata(metadata)
# Read file and serve with proper headers for viewing
with open(file_path, 'rb') as f:
file_data = f.read()
response = make_response(file_data)
response.headers['Content-Type'] = 'application/pdf'
response.headers['Content-Disposition'] = f'inline; filename="{ebook_data["filename"]}"'
response.headers['Content-Length'] = ebook_data['size']
return response
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/download/<ebook_id>', methods=['GET'])
def download_ebook(ebook_id):
"""Download eBook - ALWAYS forces download"""
try:
metadata = load_metadata()
if ebook_id not in metadata:
return jsonify({'error': 'eBook not found'}), 404
ebook_data = metadata[ebook_id]
# Get the file path
file_path = None
if HF_HUB_AVAILABLE and api and HF_TOKEN and 'stored_path' in ebook_data:
# Download from HF
file_path = download_file_from_dataset(ebook_data['stored_path'])
else:
# Try local file
local_path = os.path.join(CACHE_FOLDER, 'uploads', ebook_data.get('stored_path', ''))
if os.path.exists(local_path):
file_path = local_path
if not file_path or not os.path.exists(file_path):
return jsonify({'error': 'File not found'}), 404
# Increment download count
ebook_data['download_count'] = ebook_data.get('download_count', 0) + 1
ebook_data['last_accessed'] = datetime.datetime.now().isoformat()
# Update metadata
metadata[ebook_id] = ebook_data
save_metadata(metadata)
# Read file and serve with attachment headers (FORCES DOWNLOAD)
with open(file_path, 'rb') as f:
file_data = f.read()
response = make_response(file_data)
response.headers['Content-Type'] = 'application/octet-stream'
response.headers['Content-Disposition'] = f'attachment; filename="{ebook_data["filename"]}"'
response.headers['Content-Length'] = ebook_data['size']
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/ebooks/<ebook_id>', methods=['DELETE'])
def delete_ebook(ebook_id):
"""Delete eBook from Hugging Face dataset (only by uploader)"""
try:
data = request.get_json()
user_id = data.get('user_id') if data else None
if not user_id:
return jsonify({'error': 'User ID required'}), 400
metadata = load_metadata()
if ebook_id not in metadata:
return jsonify({'error': 'eBook not found'}), 404
ebook_data = metadata[ebook_id]
# Check if user is the uploader
if ebook_data['uploaded_by'] != user_id:
return jsonify({'error': 'Unauthorized: Only the uploader can delete this eBook'}), 403
# Get remote path
if 'stored_path' in ebook_data:
remote_path = ebook_data['stored_path']
else:
remote_path = f"ebooks/{ebook_data['stored_filename']}"
# Delete file from dataset (if Hugging Face is available)
file_deleted = True
if HF_HUB_AVAILABLE and api and HF_TOKEN:
file_deleted = delete_file_from_dataset(remote_path)
else:
# Try to delete local file
local_path = os.path.join(CACHE_FOLDER, 'uploads', remote_path)
if os.path.exists(local_path):
os.remove(local_path)
file_deleted = True
# Remove from metadata
del metadata[ebook_id]
if save_metadata(metadata):
return jsonify({
'message': 'eBook deleted successfully',
'file_deleted': file_deleted
}), 200
else:
return jsonify({'error': 'Failed to update metadata'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/ebooks/search', methods=['GET'])
def search_ebooks():
"""Search eBooks by title, author, or category"""
try:
query = request.args.get('q', '').lower()
category = request.args.get('category', '')
metadata = load_metadata()
results = []
for ebook_id, ebook_data in metadata.items():
match = True
if query:
match = match and (
query in ebook_data.get('title', '').lower() or
query in ebook_data.get('author', '').lower() or
query in ebook_data.get('filename', '').lower()
)
if category:
match = match and category.lower() == ebook_data.get('category', '').lower()
if match:
ebook_info = ebook_data.copy()
results.append(ebook_info)
return jsonify(results), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/ebooks/user/<user_id>', methods=['GET'])
def get_user_ebooks(user_id):
"""Get eBooks uploaded by specific user"""
try:
metadata = load_metadata()
user_ebooks = []
for ebook_id, ebook_data in metadata.items():
if ebook_data.get('uploaded_by') == user_id:
ebook_info = ebook_data.copy()
user_ebooks.append(ebook_info)
return jsonify(user_ebooks), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/stats', methods=['GET'])
def get_stats():
"""Get storage statistics"""
try:
metadata = load_metadata()
total_size = 0
category_stats = {}
user_stats = {}
extension_stats = {}
total_downloads = 0
for ebook_data in metadata.values():
size = ebook_data.get('size', 0)
total_size += size
total_downloads += ebook_data.get('download_count', 0)
# Category statistics
category = ebook_data.get('category', 'General')
category_stats[category] = category_stats.get(category, 0) + 1
# User statistics
user = ebook_data.get('uploaded_by', 'unknown')
user_stats[user] = user_stats.get(user, 0) + 1
# File extension statistics
ext = ebook_data.get('file_extension', 'unknown')
extension_stats[ext] = extension_stats.get(ext, 0) + 1
return jsonify({
'total_ebooks': len(metadata),
'total_size_mb': round(total_size / (1024 * 1024), 2),
'total_downloads': total_downloads,
'category_distribution': category_stats,
'user_distribution': user_stats,
'file_type_distribution': extension_stats,
'dataset_repo': HF_DATASET_REPO,
'storage_type': 'Hugging Face Dataset (Permanent)' if (HF_HUB_AVAILABLE and HF_TOKEN) else 'Local Storage (Fallback)'
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
try:
metadata = load_metadata()
return jsonify({
'status': 'healthy',
'timestamp': datetime.datetime.now().isoformat(),
'dataset': HF_DATASET_REPO,
'total_ebooks': len(metadata),
'hf_hub_available': HF_HUB_AVAILABLE,
'hf_token_configured': bool(HF_TOKEN)
}), 200
except Exception as e:
return jsonify({
'status': 'unhealthy',
'error': str(e),
'dataset': HF_DATASET_REPO,
'hf_hub_available': HF_HUB_AVAILABLE,
'hf_token_configured': bool(HF_TOKEN)
}), 500
@app.route('/api/init', methods=['POST'])
def initialize_dataset():
"""Initialize dataset with empty metadata"""
try:
metadata = load_metadata()
if not metadata:
if save_metadata({}):
return jsonify({
'message': 'Dataset initialized successfully',
'metadata_created': True,
'dataset': HF_DATASET_REPO
}), 200
else:
return jsonify({'error': 'Failed to initialize metadata'}), 500
else:
return jsonify({
'message': 'Dataset already initialized',
'ebooks_count': len(metadata),
'dataset': HF_DATASET_REPO
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
# ===== MAIN =====
if __name__ == '__main__':
port = int(os.environ.get('PORT', 7860))
print("=" * 60)
print("πŸš€ eBook Management Server Starting...")
print("=" * 60)
print(f"πŸ“š Dataset: {HF_DATASET_REPO}")
print(f"πŸ”— Hugging Face Hub available: {HF_HUB_AVAILABLE}")
print(f"πŸ”‘ HF Token configured: {bool(HF_TOKEN)}")
print(f"πŸ“ Allowed file types: {', '.join(ALLOWED_EXTENSIONS)}")
print(f"πŸ“Š Max file size: {MAX_FILE_SIZE/1024/1024}MB")
print(f"πŸ“‚ Cache folder: {CACHE_FOLDER}")
print("=" * 60)
print("βœ… DOWNLOAD ENDPOINTS:")
print(" πŸ‘‰ /api/download/<ebook_id> - FORCES download for ALL files")
print(" πŸ‘‰ /api/view/<ebook_id> - Views PDF in browser")
print("=" * 60)
# Try to load metadata on startup
try:
metadata = load_metadata()
print(f"βœ… Loaded {len(metadata)} eBooks from storage")
except Exception as e:
print(f"⚠️ Could not load metadata: {e}")
app.run(host='0.0.0.0', port=port, debug=False)