File size: 3,878 Bytes
544529e
 
 
 
 
 
 
 
 
199a6f1
 
 
 
 
 
 
544529e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110

from datetime import datetime
from typing import Any, Dict, List
from app.core.logging import get_logger
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

logger = get_logger(__name__)

from datetime import datetime, date

def normalize_dates(doc: dict) -> dict:
    if isinstance(doc.get("dob"), date):
        doc["dob"] = datetime.combine(doc["dob"], datetime.min.time())
    return doc

def convert_objectid_to_str(documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    for doc in documents:
        if "_id" in doc and doc["_id"] is not None:
            doc["_id"] = str(doc["_id"])
    return documents

async def get_next_sequence(db: AsyncSession, seq_name: str) -> int:
    """Get next value from PostgreSQL sequence"""
    try:
        result = await db.execute(text(f"SELECT nextval('{seq_name}')"))
        return result.scalar()
    except Exception as e:
        logger.error(f"Error getting sequence {seq_name}: {e}")
        raise

async def create_sequence_if_not_exists(db: AsyncSession, seq_name: str, start_value: int = 1):
    """Create PostgreSQL sequence if it doesn't exist"""
    try:
        # Check if sequence exists
        check_query = text("""
            SELECT EXISTS (
                SELECT 1 FROM pg_sequences 
                WHERE schemaname = 'public' AND sequencename = :seq_name
            )
        """)
        result = await db.execute(check_query, {"seq_name": seq_name})
        exists = result.scalar()
        
        if not exists:
            create_query = text(f"CREATE SEQUENCE {seq_name} START {start_value}")
            await db.execute(create_query)
            await db.commit()
            logger.info(f"Created sequence {seq_name} starting at {start_value}")
        
    except Exception as e:
        logger.error(f"Error creating sequence {seq_name}: {e}")
        raise


async def sync_pos_sequence(db: AsyncSession):
    """Sync POS sequence with existing data to prevent duplicates"""
    try:
        # Find the highest sequence number for current year
        current_year = datetime.now().year
        result = await db.execute(text(f"""
            SELECT COALESCE(MAX(
                CAST(SUBSTRING(sale_code FROM 'POS-{current_year}-([0-9]+)') AS INTEGER)
            ), 0) as max_seq
            FROM trans.pos_sale 
            WHERE sale_code LIKE 'POS-{current_year}%'
        """))
        
        max_seq = result.scalar() or 0
        next_seq = max_seq + 1
        
        # Set sequence to next available number
        await db.execute(text(f"SELECT setval('pos_number_seq', {next_seq}, false)"))
        logger.info(f"Synced POS sequence to {next_seq} based on existing data")
        
    except Exception as e:
        logger.error(f"Error syncing POS sequence: {e}")
        raise


async def initialize_sequences(db: AsyncSession):
    """Initialize all required sequences for purchases module"""
    sequences = [
        ("POS_number_seq", 1)
    ]
    for seq_name, start_value in sequences:
        await create_sequence_if_not_exists(db, seq_name, start_value)

def generate_sale_code(sequence_number: int, prefix: str = "POS") -> str:
    """
    Generate Sale order number with format: POS-YYYY-NNNNNN
    
    Args:
        sequence_number: Next sequence value
        prefix: POS number prefix (default: "POS")
    
    Returns:
        Formatted POS number like "POS-2024-000001"
    """
    current_year = datetime.now().year
    return f"{prefix}-{current_year}-{sequence_number:06d}"

async def get_next_sale_code(db: AsyncSession, prefix: str = "POS") -> str:
    """Get next available POS number"""
    # Ensure sequence exists before getting next value
    await create_sequence_if_not_exists(db, "pos_number_seq", 1)
    seq_number = await get_next_sequence(db, "pos_number_seq")
    return generate_sale_code(seq_number, prefix)