Tools / main.py
jebin2's picture
impro
a638e13
from fastapi import FastAPI, Request, File, UploadFile, HTTPException, Form, Query
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from jinja2 import Environment, FileSystemLoader, select_autoescape
from custom_logger import logger_config
from image.image_base import ImageBase
from pydantic import BaseModel
from image.converter import Converter
from image.remove_metadata import RemoveMetadata
from image.remove_background import RemoveBackground
from PDF.images_to_pdf import ImagesToPDF
import mimetypes
import os
app = FastAPI(title="Tools Collection", description="Collection of utility tools")
# Startup event to cleanup old files
@app.on_event("startup")
async def startup_cleanup():
"""Run cleanup of files older than 2 days on server startup"""
try:
image_base = ImageBase()
deleted_count = image_base.cleanup_old_files(max_age_days=2)
if deleted_count > 0:
logger_config.info(f"Startup cleanup: Removed {deleted_count} old file(s)")
else:
logger_config.info("Startup cleanup: No old files to remove")
except Exception as e:
logger_config.warning(f"Startup cleanup failed: {e}")
# Mount static/image at /image
app.mount("/image/javascript", StaticFiles(directory="image/javascript"), name="image")
# Templates
template_dirs = [".", "./image"]
env = Environment(
loader=FileSystemLoader(template_dirs),
autoescape=select_autoescape(['html', 'xml'])
)
# Available features
FEATURES = {
"image": {
"name": "Image Tools",
"description": "HEIC to PNG/JPG conversion and metadata removal",
"icon": "๐Ÿ–ผ๏ธ",
"features": ["convert", "remove_metadata", "remove_background"],
"folder": "image",
"tags": ["image", "heic", "png", "jpg", "convert", "metadata"]
},
"pdf": {
"name": "PDF Tools",
"description": "Convert images to PDF, merge PDFs, and more",
"icon": "๐Ÿ“„",
"features": ["images_to_pdf"],
"folder": "pdf",
"tags": ["pdf", "merge", "convert", "images", "document"]
},
"video": {
"name": "Video Tools",
"description": "Video player with playlist management - add videos and convert via โ‹ฎ menu",
"icon": "๐ŸŽฌ",
"features": ["player"],
"folder": "video",
"tags": ["video", "player", "media", "watch", "convert", "playlist"]
}
}
# Routes
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
template = env.get_template("index.html") # From tool/
html_content = template.render(request=request)
return HTMLResponse(content=html_content)
@app.get("/image/convert", response_class=HTMLResponse)
async def image_tools(request: Request):
template = env.get_template("image/convert.html") # From tool/image/
html_content = template.render(request=request)
return HTMLResponse(content=html_content)
@app.get("/image/remove_metadata", response_class=HTMLResponse)
async def image_tools(request: Request):
template = env.get_template("image/remove_metadata.html") # From tool/image/
html_content = template.render(request=request)
return HTMLResponse(content=html_content)
@app.get("/image/remove_background", response_class=HTMLResponse)
async def image_tools(request: Request):
template = env.get_template("image/remove_background.html") # From tool/image/
html_content = template.render(request=request)
return HTMLResponse(content=html_content)
@app.get("/video/player")
async def video_player():
return RedirectResponse(url="https://jebin2.github.io/JellyJump/player.html", status_code=302)
@app.get("/pdf/images_to_pdf", response_class=HTMLResponse)
async def pdf_images_to_pdf(request: Request):
template = env.get_template("PDF/images_to_pdf.html")
html_content = template.render(request=request)
return HTMLResponse(content=html_content)
@app.post("/pdf/upload")
async def upload_pdf_image(
id: str = Form(...),
image: UploadFile = File(...)
):
try:
converter = ImagesToPDF()
contents = await image.read()
converter.upload(id, contents)
return JSONResponse({
"success": True,
"message": "Image uploaded successfully"
})
except Exception as e:
logger_config.error(f"Upload failed: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/pdf/convert")
async def convert_images_to_pdf(
ids: str = Form(...),
output_name: str = Form("output.pdf")
):
try:
# Parse comma-separated IDs
image_ids = [id.strip() for id in ids.split(',') if id.strip()]
converter = ImagesToPDF()
output_path = converter.convert_to_pdf(image_ids, output_name)
if output_path is None:
raise ValueError("PDF conversion failed")
return JSONResponse({
"success": True,
"message": "PDF created successfully",
"new_filename": output_path.split("/")[-1]
})
except Exception as e:
logger_config.error(f"PDF conversion failed: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/pdf/download")
async def download_pdf(id: str = Query(...)):
try:
converter = ImagesToPDF()
file_path = f"{converter.output_dir}/{id}"
if os.path.exists(file_path):
media_type, _ = mimetypes.guess_type(file_path)
return FileResponse(
path=file_path,
media_type=media_type or "application/pdf",
filename=id
)
raise HTTPException(status_code=404, detail="File not found")
except Exception as e:
logger_config.error(f"Download failed: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/image/upload")
async def upload_image(
id: str = Form(...),
image: UploadFile = File(...)
):
try:
image_base = ImageBase()
image_base.upload(id, image)
# Return success response
return JSONResponse({
"success": True,
"message": "Image uploaded successfully"
})
except ValueError as ve:
logger_config.error(f"Validation error: {str(ve)}")
raise HTTPException(
status_code=400,
detail=str(ve)
)
except Exception as e:
logger_config.error(f"Unexpected error during upload: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@app.post("/image/convert")
async def convert_image(
id: str = Form(...),
to_format: str = Form(...),
quality: int = Form(100),
scale: float = Form(1.0)
):
try:
# Check if input is SVG - PIL cannot read SVG files
if id.lower().endswith('.svg'):
raise ValueError("SVG files cannot be converted. SVG is only available as an output format.")
converter = Converter()
output_path = converter.convert_image(id, to_format, quality=quality, scale=scale)
# Check if conversion failed
if output_path is None:
raise ValueError("Image conversion failed. The file may be corrupted or in an unsupported format.")
# Return success response
return JSONResponse({
"success": True,
"message": "Image converted successfully",
"new_filename": output_path.split("/")[-1]
})
except ValueError as ve:
logger_config.error(f"Validation error: {str(ve)}")
raise HTTPException(
status_code=400,
detail=str(ve)
)
except Exception as e:
logger_config.error(f"Unexpected error during upload: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@app.post("/image/remove_metadata")
async def remove_metadata(
id: str = Form(...)
):
try:
# Check if input is SVG - PIL cannot read SVG files
if id.lower().endswith('.svg'):
raise ValueError("SVG files cannot be processed. SVG is a vector format without embedded metadata.")
removeMetadata = RemoveMetadata()
output_path, metadata = removeMetadata.process(id)
# Check if processing failed
if output_path is None:
raise ValueError("Metadata removal failed. The file may be corrupted or in an unsupported format.")
# Return success response
return JSONResponse({
"success": True,
"message": "Metadata removed successfully",
"new_filename": output_path.split("/")[-1],
"other_info": metadata
})
except ValueError as ve:
logger_config.error(f"Validation error: {str(ve)}")
raise HTTPException(
status_code=400,
detail=str(ve)
)
except Exception as e:
logger_config.error(f"Unexpected error during remove_metadata: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@app.post("/image/remove_background")
async def remove_background(
id: str = Form(...)
):
try:
removeBackground = RemoveBackground()
output_path = removeBackground.process(id)
# Return success response
return JSONResponse({
"success": True,
"message": "Image uploaded successfully",
"new_filename": output_path.split("/")[-1]
})
except ValueError as ve:
logger_config.error(f"Validation error: {str(ve)}")
raise HTTPException(
status_code=400,
detail=str(ve)
)
except Exception as e:
logger_config.error(f"Unexpected error during remove_background: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@app.get("/image/download")
async def download_converted_image(
id: str = Query(...)
):
try:
image_base = ImageBase()
file_path = image_base.download_url(id)
mime_type, _ = mimetypes.guess_type(file_path)
return FileResponse(file_path, media_type=mime_type, filename=id)
except ValueError as ve:
logger_config.error(f"Validation error: {str(ve)}")
raise HTTPException(
status_code=400,
detail=str(ve)
)
except Exception as e:
logger_config.error(f"Unexpected error during upload: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
class DeleteRequest(BaseModel):
ids: list[str]
@app.post("/image/delete")
async def delete_images(request: DeleteRequest):
try:
image_base = ImageBase()
image_base.delete(request.ids)
# Return success response
return JSONResponse({
"success": True,
"message": "Image deleted successfully"
})
except ValueError as ve:
logger_config.error(f"Validation error: {str(ve)}")
raise HTTPException(
status_code=400,
detail=str(ve)
)
except Exception as e:
logger_config.error(f"Unexpected error during upload: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@app.get("/api/features")
async def get_features():
"""Get available features"""
return {"features": FEATURES}
@app.get("/api/search")
async def search_features(q: str = ""):
"""Search features by name, description, or tags"""
if not q:
return {"features": FEATURES}
q = q.lower()
filtered_features = {}
for key, feature in FEATURES.items():
# Search in name, description, and tags
search_text = f"{feature['name']} {feature['description']} {' '.join(feature.get('tags', []))}".lower()
if q in search_text:
filtered_features[key] = feature
return {"features": filtered_features}
@app.get("/api/status")
async def get_feature_status():
"""Get feature status"""
features_status = {}
for key in FEATURES.keys():
features_status[key] = {
"feature_name": None,
"is_busy": False,
"process_id": None
}
return features_status
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)