Spaces:
Sleeping
Sleeping
| import io | |
| from fastapi import APIRouter, Depends, HTTPException, status, Body | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel, field_validator | |
| from typing import Literal | |
| from bson import ObjectId | |
| # Database and Auth | |
| from .db import sku_collection | |
| from .auth import get_current_active_user | |
| # Barcode Generation | |
| from .utils import generate_ean13_code | |
| import barcode | |
| from barcode.writer import ImageWriter | |
| # PDF Generation | |
| from reportlab.pdfgen import canvas | |
| from reportlab.lib.pagesizes import A4 | |
| from reportlab.lib.units import mm | |
| from reportlab.lib.utils import ImageReader | |
| from PIL import Image | |
| # === PYDANTIC MODELS === | |
| class BarcodeGenerateRequest(BaseModel): | |
| sku_id: str | |
| output: Literal["png", "single_pdf", "a4_pdf"] = "png" | |
| def validate_sku_id(cls, v): | |
| if not ObjectId.is_valid(v): | |
| raise ValueError(f"Invalid ObjectId: {v}") | |
| return v | |
| # === HELPER FUNCTIONS (Private) === | |
| async def _get_sku_or_404(sku_id: str): | |
| """Fetches a SKU from DB or raises HTTPException.""" | |
| if not ObjectId.is_valid(sku_id): | |
| raise HTTPException(status_code=400, detail=f"Invalid SKU ID format: {sku_id}") | |
| sku = await sku_collection.find_one({"_id": ObjectId(sku_id)}) | |
| if not sku: | |
| raise HTTPException(status_code=404, detail=f"SKU not found: {sku_id}") | |
| return sku | |
| async def _get_next_barcode_sequence() -> int: | |
| """ | |
| Finds the highest 'barcode.sequence' in the collection | |
| and returns the next number (e.g., max is 5, returns 6). | |
| If no barcodes exist, returns 1. | |
| """ | |
| # Find the document with the highest sequence number | |
| # sort_descending = [("barcode.sequence", -1)] | |
| cursor = sku_collection.find( | |
| {"barcode.sequence": {"$exists": True}}, | |
| projection={"barcode.sequence": 1} | |
| ).sort("barcode.sequence", -1).limit(1) | |
| highest_sku = await cursor.to_list(length=1) | |
| if not highest_sku: | |
| return 1 # No barcodes generated yet | |
| return highest_sku[0]["barcode"]["sequence"] + 1 | |
| def _generate_barcode_image_bytes(ean13_code: str) -> io.BytesIO: | |
| """Generates a PNG barcode image (with EAN-13 number) in memory.""" | |
| try: | |
| EAN13 = barcode.get_barcode_class('ean13') | |
| ean = EAN13(ean13_code, writer=ImageWriter()) | |
| # Save to a byte buffer in memory | |
| buffer = io.BytesIO() | |
| # NEW Options: Write the EAN-13 number below the barcode | |
| options = { | |
| "write_text": True, # <-- CHANGED | |
| "quiet_zone": 2.0, # Margin | |
| "module_height": 10.0, # Bar height | |
| "font_size": 10, # Font size for EAN-13 number | |
| "text_distance": 5.0 # Distance from bars to text | |
| } | |
| ean.write(buffer, options) | |
| buffer.seek(0) | |
| return buffer | |
| except Exception as e: | |
| print(f"Error generating barcode image: {e}") | |
| raise HTTPException(500, "Failed to generate barcode image.") | |
| def _draw_label_content(c: canvas.Canvas, x_start: float, y_start: float, label_width: float, label_height: float, sku: dict, barcode_image_buffer: io.BytesIO): | |
| """Helper to draw one label's content onto a PDF canvas.""" | |
| # --- Config --- | |
| padding = 3 * mm | |
| barcode_height = 15 * mm # Height of the barcode image | |
| # --- Coords (from bottom-left) --- | |
| y_name = y_start + label_height - padding - (4*mm) | |
| y_sku = y_name - (4*mm) | |
| y_barcode = y_start + padding | |
| x_content = x_start + padding | |
| # --- Draw Text --- | |
| c.setFont("Helvetica", 10) | |
| c.drawString(x_content, y_name, sku["name"][:30]) # Limit name length | |
| c.setFont("Helvetica", 8) | |
| c.drawString(x_content, y_sku, sku["sku_code"]) | |
| # --- Draw Barcode Image --- | |
| # Reset buffer | |
| barcode_image_buffer.seek(0) | |
| # Use PIL to read image and get dimensions | |
| pil_image = Image.open(barcode_image_buffer) | |
| img_width, img_height = pil_image.size | |
| # Calculate barcode width to fit, maintaining aspect ratio | |
| aspect_ratio = img_width / img_height | |
| barcode_width = barcode_height * aspect_ratio | |
| # Center the barcode | |
| barcode_x = x_start + (label_width - barcode_width) / 2 | |
| c.drawImage( | |
| ImageReader(pil_image), # Use ImageReader from reportlab | |
| barcode_x, | |
| y_barcode, | |
| width=barcode_width, | |
| height=barcode_height, | |
| mask='auto' | |
| ) | |
| def _generate_single_label_pdf_bytes(sku: dict, barcode_image_buffer: io.BytesIO) -> io.BytesIO: | |
| """Generates a 70mm x 30mm single label PDF in memory.""" | |
| label_width = 70 * mm | |
| label_height = 30 * mm | |
| buffer = io.BytesIO() | |
| # Create canvas with exact page size | |
| c = canvas.Canvas(buffer, pagesize=(label_width, label_height)) | |
| _draw_label_content(c, 0, 0, label_width, label_height, sku, barcode_image_buffer) | |
| c.showPage() | |
| c.save() | |
| buffer.seek(0) | |
| return buffer | |
| def _generate_a4_sheet_pdf_bytes(sku: dict, barcode_image_buffer: io.BytesIO) -> io.BytesIO: | |
| """Generates a full A4 sheet (3x8 grid) of labels.""" | |
| buffer = io.BytesIO() | |
| c = canvas.Canvas(buffer, pagesize=A4) # Default A4 | |
| # Label dimensions from spec | |
| label_width = 70 * mm | |
| label_height = 30 * mm | |
| # Grid: 3 columns, 8 rows | |
| num_cols = 3 | |
| num_rows = 8 | |
| # Calculate starting margins to center the 3x8 grid | |
| total_grid_width = num_cols * label_width | |
| total_grid_height = num_rows * label_height | |
| margin_x = (A4[0] - total_grid_width) / 2 | |
| margin_y = (A4[1] - total_grid_height) / 2 | |
| for row in range(num_rows): | |
| for col in range(num_cols): | |
| # Calculate (x, y) for bottom-left corner of this label | |
| # Y starts from top, so we reverse row index | |
| x_start = margin_x + (col * label_width) | |
| y_start = margin_y + ((num_rows - 1 - row) * label_height) | |
| _draw_label_content(c, x_start, y_start, label_width, label_height, sku, barcode_image_buffer) | |
| c.showPage() | |
| c.save() | |
| buffer.seek(0) | |
| return buffer | |
| # === API ROUTER === | |
| BarcodeRouter = APIRouter( | |
| prefix="/api/barcodes", | |
| tags=["Barcodes"], | |
| dependencies=[Depends(get_current_active_user)] # PROTECTS ALL ROUTES | |
| ) | |
| async def generate_barcode_for_sku( | |
| req: BarcodeGenerateRequest = Body(...), | |
| # user: dict = Depends(get_current_active_user) # We know user is valid | |
| ): | |
| """ | |
| This is the main endpoint. It checks if a barcode exists. | |
| - If YES: It generates the requested output (png/pdf) from existing data. | |
| - If NO: It generates a new barcode, saves it to the DB, | |
| then generates the requested output. | |
| """ | |
| sku = await _get_sku_or_404(req.sku_id) | |
| # --- 1. Check if barcode already exists --- | |
| if sku.get("barcode"): | |
| ean13_code = sku["barcode"]["ean13"] | |
| # --- 2. If not, generate and save a new one --- | |
| else: | |
| try: | |
| # Get next sequence number | |
| next_seq = await _get_next_barcode_sequence() | |
| # Generate the 13-digit code | |
| ean13_code = generate_ean13_code(next_seq) | |
| # Create the barcode sub-document (as per spec) | |
| barcode_data = { | |
| "ean13": ean13_code, | |
| "prefix": "999900", | |
| "sequence": next_seq | |
| } | |
| # Update the SKU in the database | |
| await sku_collection.update_one( | |
| {"_id": sku["_id"]}, | |
| {"$set": {"barcode": barcode_data}} | |
| ) | |
| # Update our local 'sku' variable | |
| sku["barcode"] = barcode_data | |
| except Exception as e: | |
| print(f"Error during new barcode generation/save: {e}") | |
| raise HTTPException(500, "Failed to generate or save new barcode.") | |
| # --- 3. Generate the requested output --- | |
| # First, always generate the base PNG image bytes | |
| try: | |
| image_buffer = _generate_barcode_image_bytes(ean13_code) | |
| except HTTPException as e: | |
| raise e # Re-raise image generation error | |
| # Return PNG | |
| if req.output == "png": | |
| return StreamingResponse(image_buffer, media_type="image/png") | |
| # Return Single Label PDF | |
| if req.output == "single_pdf": | |
| try: | |
| pdf_buffer = _generate_single_label_pdf_bytes(sku, image_buffer) | |
| return StreamingResponse( | |
| pdf_buffer, | |
| media_type="application/pdf", | |
| headers={"Content-Disposition": f"inline; filename={sku['sku_code']}_single.pdf"} | |
| ) | |
| except Exception as e: | |
| print(f"Error generating single PDF: {e}") | |
| raise HTTPException(500, "Failed to generate single label PDF.") | |
| # Return A4 Sheet PDF | |
| if req.output == "a4_pdf": | |
| try: | |
| pdf_buffer = _generate_a4_sheet_pdf_bytes(sku, image_buffer) | |
| return StreamingResponse( | |
| pdf_buffer, | |
| media_type="application/pdf", | |
| headers={"Content-Disposition": f"inline; filename={sku['sku_code']}_A4.pdf"} | |
| ) | |
| except Exception as e: | |
| print(f"Error generating A4 PDF: {e}") | |
| raise HTTPException(500, "Failed to generate A4 sheet PDF.") | |
| # Fallback (shouldn't be reached) | |
| raise HTTPException(400, "Invalid output type specified.") | |
| # === GET Endpoints (as per spec) === | |
| # These are simple 'getters' for *existing* barcodes. | |
| async def get_barcode_png(sku_id: str): | |
| sku = await _get_sku_or_404(sku_id) | |
| if not sku.get("barcode"): | |
| raise HTTPException(404, "Barcode not generated for this SKU yet.") | |
| image_buffer = _generate_barcode_image_bytes(sku["barcode"]["ean13"]) | |
| return StreamingResponse(image_buffer, media_type="image/png") | |
| async def get_barcode_pdf_single(sku_id: str): | |
| sku = await _get_sku_or_404(sku_id) | |
| if not sku.get("barcode"): | |
| raise HTTPException(404, "Barcode not generated for this SKU yet.") | |
| image_buffer = _generate_barcode_image_bytes(sku["barcode"]["ean13"]) | |
| pdf_buffer = _generate_single_label_pdf_bytes(sku, image_buffer) | |
| return StreamingResponse(pdf_buffer, media_type="application/pdf") | |
| async def get_barcode_pdf_a4(sku_id: str): | |
| sku = await _get_sku_or_404(sku_id) | |
| if not sku.get("barcode"): | |
| raise HTTPException(404, "Barcode not generated for this SKU yet.") | |
| image_buffer = _generate_barcode_image_bytes(sku["barcode"]["ean13"]) | |
| pdf_buffer = _generate_a4_sheet_pdf_bytes(sku, image_buffer) | |
| return StreamingResponse(pdf_buffer, media_type="application/pdf") |