rohanshaw's picture
Upload 8 files
cfd8098 verified
import io
from fastapi import APIRouter, Depends, HTTPException, status, Body
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, field_validator
from typing import Literal
from bson import ObjectId
# Database and Auth
from .db import sku_collection
from .auth import get_current_active_user
# Barcode Generation
from .utils import generate_ean13_code
import barcode
from barcode.writer import ImageWriter
# PDF Generation
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import A4
from reportlab.lib.units import mm
from reportlab.lib.utils import ImageReader
from PIL import Image
# === PYDANTIC MODELS ===
class BarcodeGenerateRequest(BaseModel):
sku_id: str
output: Literal["png", "single_pdf", "a4_pdf"] = "png"
@field_validator('sku_id')
def validate_sku_id(cls, v):
if not ObjectId.is_valid(v):
raise ValueError(f"Invalid ObjectId: {v}")
return v
# === HELPER FUNCTIONS (Private) ===
async def _get_sku_or_404(sku_id: str):
"""Fetches a SKU from DB or raises HTTPException."""
if not ObjectId.is_valid(sku_id):
raise HTTPException(status_code=400, detail=f"Invalid SKU ID format: {sku_id}")
sku = await sku_collection.find_one({"_id": ObjectId(sku_id)})
if not sku:
raise HTTPException(status_code=404, detail=f"SKU not found: {sku_id}")
return sku
async def _get_next_barcode_sequence() -> int:
"""
Finds the highest 'barcode.sequence' in the collection
and returns the next number (e.g., max is 5, returns 6).
If no barcodes exist, returns 1.
"""
# Find the document with the highest sequence number
# sort_descending = [("barcode.sequence", -1)]
cursor = sku_collection.find(
{"barcode.sequence": {"$exists": True}},
projection={"barcode.sequence": 1}
).sort("barcode.sequence", -1).limit(1)
highest_sku = await cursor.to_list(length=1)
if not highest_sku:
return 1 # No barcodes generated yet
return highest_sku[0]["barcode"]["sequence"] + 1
def _generate_barcode_image_bytes(ean13_code: str) -> io.BytesIO:
"""Generates a PNG barcode image (with EAN-13 number) in memory."""
try:
EAN13 = barcode.get_barcode_class('ean13')
ean = EAN13(ean13_code, writer=ImageWriter())
# Save to a byte buffer in memory
buffer = io.BytesIO()
# NEW Options: Write the EAN-13 number below the barcode
options = {
"write_text": True, # <-- CHANGED
"quiet_zone": 2.0, # Margin
"module_height": 10.0, # Bar height
"font_size": 10, # Font size for EAN-13 number
"text_distance": 5.0 # Distance from bars to text
}
ean.write(buffer, options)
buffer.seek(0)
return buffer
except Exception as e:
print(f"Error generating barcode image: {e}")
raise HTTPException(500, "Failed to generate barcode image.")
def _draw_label_content(c: canvas.Canvas, x_start: float, y_start: float, label_width: float, label_height: float, sku: dict, barcode_image_buffer: io.BytesIO):
"""Helper to draw one label's content onto a PDF canvas."""
# --- Config ---
padding = 3 * mm
barcode_height = 15 * mm # Height of the barcode image
# --- Coords (from bottom-left) ---
y_name = y_start + label_height - padding - (4*mm)
y_sku = y_name - (4*mm)
y_barcode = y_start + padding
x_content = x_start + padding
# --- Draw Text ---
c.setFont("Helvetica", 10)
c.drawString(x_content, y_name, sku["name"][:30]) # Limit name length
c.setFont("Helvetica", 8)
c.drawString(x_content, y_sku, sku["sku_code"])
# --- Draw Barcode Image ---
# Reset buffer
barcode_image_buffer.seek(0)
# Use PIL to read image and get dimensions
pil_image = Image.open(barcode_image_buffer)
img_width, img_height = pil_image.size
# Calculate barcode width to fit, maintaining aspect ratio
aspect_ratio = img_width / img_height
barcode_width = barcode_height * aspect_ratio
# Center the barcode
barcode_x = x_start + (label_width - barcode_width) / 2
c.drawImage(
ImageReader(pil_image), # Use ImageReader from reportlab
barcode_x,
y_barcode,
width=barcode_width,
height=barcode_height,
mask='auto'
)
def _generate_single_label_pdf_bytes(sku: dict, barcode_image_buffer: io.BytesIO) -> io.BytesIO:
"""Generates a 70mm x 30mm single label PDF in memory."""
label_width = 70 * mm
label_height = 30 * mm
buffer = io.BytesIO()
# Create canvas with exact page size
c = canvas.Canvas(buffer, pagesize=(label_width, label_height))
_draw_label_content(c, 0, 0, label_width, label_height, sku, barcode_image_buffer)
c.showPage()
c.save()
buffer.seek(0)
return buffer
def _generate_a4_sheet_pdf_bytes(sku: dict, barcode_image_buffer: io.BytesIO) -> io.BytesIO:
"""Generates a full A4 sheet (3x8 grid) of labels."""
buffer = io.BytesIO()
c = canvas.Canvas(buffer, pagesize=A4) # Default A4
# Label dimensions from spec
label_width = 70 * mm
label_height = 30 * mm
# Grid: 3 columns, 8 rows
num_cols = 3
num_rows = 8
# Calculate starting margins to center the 3x8 grid
total_grid_width = num_cols * label_width
total_grid_height = num_rows * label_height
margin_x = (A4[0] - total_grid_width) / 2
margin_y = (A4[1] - total_grid_height) / 2
for row in range(num_rows):
for col in range(num_cols):
# Calculate (x, y) for bottom-left corner of this label
# Y starts from top, so we reverse row index
x_start = margin_x + (col * label_width)
y_start = margin_y + ((num_rows - 1 - row) * label_height)
_draw_label_content(c, x_start, y_start, label_width, label_height, sku, barcode_image_buffer)
c.showPage()
c.save()
buffer.seek(0)
return buffer
# === API ROUTER ===
BarcodeRouter = APIRouter(
prefix="/api/barcodes",
tags=["Barcodes"],
dependencies=[Depends(get_current_active_user)] # PROTECTS ALL ROUTES
)
@BarcodeRouter.post("/generate",
summary="Generate or Get Barcode for an SKU",
responses={
200: {
"content": {
"image/png": {},
"application/pdf": {}
},
"description": "Returns the barcode as PNG, single PDF, or A4 sheet PDF."
}
})
async def generate_barcode_for_sku(
req: BarcodeGenerateRequest = Body(...),
# user: dict = Depends(get_current_active_user) # We know user is valid
):
"""
This is the main endpoint. It checks if a barcode exists.
- If YES: It generates the requested output (png/pdf) from existing data.
- If NO: It generates a new barcode, saves it to the DB,
then generates the requested output.
"""
sku = await _get_sku_or_404(req.sku_id)
# --- 1. Check if barcode already exists ---
if sku.get("barcode"):
ean13_code = sku["barcode"]["ean13"]
# --- 2. If not, generate and save a new one ---
else:
try:
# Get next sequence number
next_seq = await _get_next_barcode_sequence()
# Generate the 13-digit code
ean13_code = generate_ean13_code(next_seq)
# Create the barcode sub-document (as per spec)
barcode_data = {
"ean13": ean13_code,
"prefix": "999900",
"sequence": next_seq
}
# Update the SKU in the database
await sku_collection.update_one(
{"_id": sku["_id"]},
{"$set": {"barcode": barcode_data}}
)
# Update our local 'sku' variable
sku["barcode"] = barcode_data
except Exception as e:
print(f"Error during new barcode generation/save: {e}")
raise HTTPException(500, "Failed to generate or save new barcode.")
# --- 3. Generate the requested output ---
# First, always generate the base PNG image bytes
try:
image_buffer = _generate_barcode_image_bytes(ean13_code)
except HTTPException as e:
raise e # Re-raise image generation error
# Return PNG
if req.output == "png":
return StreamingResponse(image_buffer, media_type="image/png")
# Return Single Label PDF
if req.output == "single_pdf":
try:
pdf_buffer = _generate_single_label_pdf_bytes(sku, image_buffer)
return StreamingResponse(
pdf_buffer,
media_type="application/pdf",
headers={"Content-Disposition": f"inline; filename={sku['sku_code']}_single.pdf"}
)
except Exception as e:
print(f"Error generating single PDF: {e}")
raise HTTPException(500, "Failed to generate single label PDF.")
# Return A4 Sheet PDF
if req.output == "a4_pdf":
try:
pdf_buffer = _generate_a4_sheet_pdf_bytes(sku, image_buffer)
return StreamingResponse(
pdf_buffer,
media_type="application/pdf",
headers={"Content-Disposition": f"inline; filename={sku['sku_code']}_A4.pdf"}
)
except Exception as e:
print(f"Error generating A4 PDF: {e}")
raise HTTPException(500, "Failed to generate A4 sheet PDF.")
# Fallback (shouldn't be reached)
raise HTTPException(400, "Invalid output type specified.")
# === GET Endpoints (as per spec) ===
# These are simple 'getters' for *existing* barcodes.
@BarcodeRouter.get("/{sku_id}/png", summary="Get existing barcode as PNG")
async def get_barcode_png(sku_id: str):
sku = await _get_sku_or_404(sku_id)
if not sku.get("barcode"):
raise HTTPException(404, "Barcode not generated for this SKU yet.")
image_buffer = _generate_barcode_image_bytes(sku["barcode"]["ean13"])
return StreamingResponse(image_buffer, media_type="image/png")
@BarcodeRouter.get("/{sku_id}/pdf/single", summary="Get existing barcode as single PDF label")
async def get_barcode_pdf_single(sku_id: str):
sku = await _get_sku_or_404(sku_id)
if not sku.get("barcode"):
raise HTTPException(404, "Barcode not generated for this SKU yet.")
image_buffer = _generate_barcode_image_bytes(sku["barcode"]["ean13"])
pdf_buffer = _generate_single_label_pdf_bytes(sku, image_buffer)
return StreamingResponse(pdf_buffer, media_type="application/pdf")
@BarcodeRouter.get("/{sku_id}/pdf/a4", summary="Get existing barcode as A4 PDF sheet")
async def get_barcode_pdf_a4(sku_id: str):
sku = await _get_sku_or_404(sku_id)
if not sku.get("barcode"):
raise HTTPException(404, "Barcode not generated for this SKU yet.")
image_buffer = _generate_barcode_image_bytes(sku["barcode"]["ean13"])
pdf_buffer = _generate_a4_sheet_pdf_bytes(sku, image_buffer)
return StreamingResponse(pdf_buffer, media_type="application/pdf")