vanitha
changed schema based on UI
199a6f1
from datetime import datetime
from typing import Any, Dict, List
from app.core.logging import get_logger
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
logger = get_logger(__name__)
from datetime import datetime, date
def normalize_dates(doc: dict) -> dict:
if isinstance(doc.get("dob"), date):
doc["dob"] = datetime.combine(doc["dob"], datetime.min.time())
return doc
def convert_objectid_to_str(documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
for doc in documents:
if "_id" in doc and doc["_id"] is not None:
doc["_id"] = str(doc["_id"])
return documents
async def get_next_sequence(db: AsyncSession, seq_name: str) -> int:
"""Get next value from PostgreSQL sequence"""
try:
result = await db.execute(text(f"SELECT nextval('{seq_name}')"))
return result.scalar()
except Exception as e:
logger.error(f"Error getting sequence {seq_name}: {e}")
raise
async def create_sequence_if_not_exists(db: AsyncSession, seq_name: str, start_value: int = 1):
"""Create PostgreSQL sequence if it doesn't exist"""
try:
# Check if sequence exists
check_query = text("""
SELECT EXISTS (
SELECT 1 FROM pg_sequences
WHERE schemaname = 'public' AND sequencename = :seq_name
)
""")
result = await db.execute(check_query, {"seq_name": seq_name})
exists = result.scalar()
if not exists:
create_query = text(f"CREATE SEQUENCE {seq_name} START {start_value}")
await db.execute(create_query)
await db.commit()
logger.info(f"Created sequence {seq_name} starting at {start_value}")
except Exception as e:
logger.error(f"Error creating sequence {seq_name}: {e}")
raise
async def sync_pos_sequence(db: AsyncSession):
"""Sync POS sequence with existing data to prevent duplicates"""
try:
# Find the highest sequence number for current year
current_year = datetime.now().year
result = await db.execute(text(f"""
SELECT COALESCE(MAX(
CAST(SUBSTRING(sale_code FROM 'POS-{current_year}-([0-9]+)') AS INTEGER)
), 0) as max_seq
FROM trans.pos_sale
WHERE sale_code LIKE 'POS-{current_year}%'
"""))
max_seq = result.scalar() or 0
next_seq = max_seq + 1
# Set sequence to next available number
await db.execute(text(f"SELECT setval('pos_number_seq', {next_seq}, false)"))
logger.info(f"Synced POS sequence to {next_seq} based on existing data")
except Exception as e:
logger.error(f"Error syncing POS sequence: {e}")
raise
async def initialize_sequences(db: AsyncSession):
"""Initialize all required sequences for purchases module"""
sequences = [
("POS_number_seq", 1)
]
for seq_name, start_value in sequences:
await create_sequence_if_not_exists(db, seq_name, start_value)
def generate_sale_code(sequence_number: int, prefix: str = "POS") -> str:
"""
Generate Sale order number with format: POS-YYYY-NNNNNN
Args:
sequence_number: Next sequence value
prefix: POS number prefix (default: "POS")
Returns:
Formatted POS number like "POS-2024-000001"
"""
current_year = datetime.now().year
return f"{prefix}-{current_year}-{sequence_number:06d}"
async def get_next_sale_code(db: AsyncSession, prefix: str = "POS") -> str:
"""Get next available POS number"""
# Ensure sequence exists before getting next value
await create_sequence_if_not_exists(db, "pos_number_seq", 1)
seq_number = await get_next_sequence(db, "pos_number_seq")
return generate_sale_code(seq_number, prefix)