from datetime import datetime from typing import Any, Dict, List from app.core.logging import get_logger from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession logger = get_logger(__name__) from datetime import datetime, date def normalize_dates(doc: dict) -> dict: if isinstance(doc.get("dob"), date): doc["dob"] = datetime.combine(doc["dob"], datetime.min.time()) return doc def convert_objectid_to_str(documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]: for doc in documents: if "_id" in doc and doc["_id"] is not None: doc["_id"] = str(doc["_id"]) return documents async def get_next_sequence(db: AsyncSession, seq_name: str) -> int: """Get next value from PostgreSQL sequence""" try: result = await db.execute(text(f"SELECT nextval('{seq_name}')")) return result.scalar() except Exception as e: logger.error(f"Error getting sequence {seq_name}: {e}") raise async def create_sequence_if_not_exists(db: AsyncSession, seq_name: str, start_value: int = 1): """Create PostgreSQL sequence if it doesn't exist""" try: # Check if sequence exists check_query = text(""" SELECT EXISTS ( SELECT 1 FROM pg_sequences WHERE schemaname = 'public' AND sequencename = :seq_name ) """) result = await db.execute(check_query, {"seq_name": seq_name}) exists = result.scalar() if not exists: create_query = text(f"CREATE SEQUENCE {seq_name} START {start_value}") await db.execute(create_query) await db.commit() logger.info(f"Created sequence {seq_name} starting at {start_value}") except Exception as e: logger.error(f"Error creating sequence {seq_name}: {e}") raise async def sync_pos_sequence(db: AsyncSession): """Sync POS sequence with existing data to prevent duplicates""" try: # Find the highest sequence number for current year current_year = datetime.now().year result = await db.execute(text(f""" SELECT COALESCE(MAX( CAST(SUBSTRING(sale_code FROM 'POS-{current_year}-([0-9]+)') AS INTEGER) ), 0) as max_seq FROM trans.pos_sale WHERE sale_code LIKE 'POS-{current_year}%' """)) max_seq = result.scalar() or 0 next_seq = max_seq + 1 # Set sequence to next available number await db.execute(text(f"SELECT setval('pos_number_seq', {next_seq}, false)")) logger.info(f"Synced POS sequence to {next_seq} based on existing data") except Exception as e: logger.error(f"Error syncing POS sequence: {e}") raise async def initialize_sequences(db: AsyncSession): """Initialize all required sequences for purchases module""" sequences = [ ("POS_number_seq", 1) ] for seq_name, start_value in sequences: await create_sequence_if_not_exists(db, seq_name, start_value) def generate_sale_code(sequence_number: int, prefix: str = "POS") -> str: """ Generate Sale order number with format: POS-YYYY-NNNNNN Args: sequence_number: Next sequence value prefix: POS number prefix (default: "POS") Returns: Formatted POS number like "POS-2024-000001" """ current_year = datetime.now().year return f"{prefix}-{current_year}-{sequence_number:06d}" async def get_next_sale_code(db: AsyncSession, prefix: str = "POS") -> str: """Get next available POS number""" # Ensure sequence exists before getting next value await create_sequence_if_not_exists(db, "pos_number_seq", 1) seq_number = await get_next_sequence(db, "pos_number_seq") return generate_sale_code(seq_number, prefix)