Spaces:
Sleeping
Sleeping
| from datetime import datetime | |
| from typing import Any, Dict, List | |
| from app.core.logging import get_logger | |
| from sqlalchemy import text | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| logger = get_logger(__name__) | |
| from datetime import datetime, date | |
| def normalize_dates(doc: dict) -> dict: | |
| if isinstance(doc.get("dob"), date): | |
| doc["dob"] = datetime.combine(doc["dob"], datetime.min.time()) | |
| return doc | |
| def convert_objectid_to_str(documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| for doc in documents: | |
| if "_id" in doc and doc["_id"] is not None: | |
| doc["_id"] = str(doc["_id"]) | |
| return documents | |
| async def get_next_sequence(db: AsyncSession, seq_name: str) -> int: | |
| """Get next value from PostgreSQL sequence""" | |
| try: | |
| result = await db.execute(text(f"SELECT nextval('{seq_name}')")) | |
| return result.scalar() | |
| except Exception as e: | |
| logger.error(f"Error getting sequence {seq_name}: {e}") | |
| raise | |
| async def create_sequence_if_not_exists(db: AsyncSession, seq_name: str, start_value: int = 1): | |
| """Create PostgreSQL sequence if it doesn't exist""" | |
| try: | |
| # Check if sequence exists | |
| check_query = text(""" | |
| SELECT EXISTS ( | |
| SELECT 1 FROM pg_sequences | |
| WHERE schemaname = 'public' AND sequencename = :seq_name | |
| ) | |
| """) | |
| result = await db.execute(check_query, {"seq_name": seq_name}) | |
| exists = result.scalar() | |
| if not exists: | |
| create_query = text(f"CREATE SEQUENCE {seq_name} START {start_value}") | |
| await db.execute(create_query) | |
| await db.commit() | |
| logger.info(f"Created sequence {seq_name} starting at {start_value}") | |
| except Exception as e: | |
| logger.error(f"Error creating sequence {seq_name}: {e}") | |
| raise | |
| async def sync_pos_sequence(db: AsyncSession): | |
| """Sync POS sequence with existing data to prevent duplicates""" | |
| try: | |
| # Find the highest sequence number for current year | |
| current_year = datetime.now().year | |
| result = await db.execute(text(f""" | |
| SELECT COALESCE(MAX( | |
| CAST(SUBSTRING(sale_code FROM 'POS-{current_year}-([0-9]+)') AS INTEGER) | |
| ), 0) as max_seq | |
| FROM trans.pos_sale | |
| WHERE sale_code LIKE 'POS-{current_year}%' | |
| """)) | |
| max_seq = result.scalar() or 0 | |
| next_seq = max_seq + 1 | |
| # Set sequence to next available number | |
| await db.execute(text(f"SELECT setval('pos_number_seq', {next_seq}, false)")) | |
| logger.info(f"Synced POS sequence to {next_seq} based on existing data") | |
| except Exception as e: | |
| logger.error(f"Error syncing POS sequence: {e}") | |
| raise | |
| async def initialize_sequences(db: AsyncSession): | |
| """Initialize all required sequences for purchases module""" | |
| sequences = [ | |
| ("POS_number_seq", 1) | |
| ] | |
| for seq_name, start_value in sequences: | |
| await create_sequence_if_not_exists(db, seq_name, start_value) | |
| def generate_sale_code(sequence_number: int, prefix: str = "POS") -> str: | |
| """ | |
| Generate Sale order number with format: POS-YYYY-NNNNNN | |
| Args: | |
| sequence_number: Next sequence value | |
| prefix: POS number prefix (default: "POS") | |
| Returns: | |
| Formatted POS number like "POS-2024-000001" | |
| """ | |
| current_year = datetime.now().year | |
| return f"{prefix}-{current_year}-{sequence_number:06d}" | |
| async def get_next_sale_code(db: AsyncSession, prefix: str = "POS") -> str: | |
| """Get next available POS number""" | |
| # Ensure sequence exists before getting next value | |
| await create_sequence_if_not_exists(db, "pos_number_seq", 1) | |
| seq_number = await get_next_sequence(db, "pos_number_seq") | |
| return generate_sale_code(seq_number, prefix) | |