File size: 4,754 Bytes
825942f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from typing import List, Any, Tuple, TYPE_CHECKING
if TYPE_CHECKING:
    from sqlalchemy.orm import Session
    from sqlalchemy.sql.selectable import Select

def get_pagination(
    total_items: int,
    page: int = 1,
    size: int = 50
) -> dict:
    """Get pagination details.
    
    Args:
        total_items: Total number of items
        page: Current page number (1-based)
        size: Items per page
    
    Returns:
        Dictionary with pagination details
    """
    page = max(1, page)
    size = max(1, min(size, 100))
    total_pages = (total_items + size - 1) // size
    page = min(page, total_pages) if total_pages > 0 else 1
    
    return {
        'current_page': page,
        'page_size': size,
        'total_pages': total_pages,
        'is_first_page': page == 1,
        'is_last_page': page >= total_pages,
        'previous_page': page - 1 if page > 1 else None,
        'next_page': page + 1 if page < total_pages else None,
        'total_items': total_items
    }

def get_clusters_sqlalchemy(query: "Select", column: str, session: "Session") -> List[Tuple[Any, int]]:
    """
    A SQLAlchemy helper function.

    Takes a query and runs it. 

    If there is any WHERE clause in the query that filters for including / excluding the `column` or uses column `IN` or column `NOT IN`, thoses WHERE clauses should
    be removed before executing the query

    Then runs a GROUP BY on the results of the query.

    Returns that result.

    Basically provides an interface for creating buttons that user can press to add / remove tags

    Example:

    [
        { 'category': 'A', name: 'Apple' },
        { 'category': 'A', name: 'Astronaut' },
        { 'category': 'B', name: 'Banana' },
        { 'category': 'B', name: 'Bird' },
        { 'category': 'B', name: 'Bison' },
    ]

    CASE 1:
    if we run this table through the normal get_clusters, we should get:
    [
        { 'category': 'A', count: 2 },
        { 'category': 'B', count: 3 },
    ]
    
    CASE 2:
    if we run this table through the query:
    WHERE name == 'Apple', and cluster this via get_clusters, we should get:
    [
        { 'category': 'A', count: 1 },
    ]

    CASE 3:
    if we run this table through the query:
    WHERE category = 'A'
    the where clause should be removed and we should get:
    [
        { 'category': 'A', count: 2 },
        { 'category': 'B', count: 3 },
    ]

    CASE 4:
    if we run this table through the query:
    WHERE category IN ('A')
    the where clause should be removed and we should get:
    [
        { 'category': 'A', count: 2 },
        { 'category': 'B', count: 3 },
    ]
    as we don't want WHERE clauses that filter for including / excluding the `column` or uses column `IN` or column `NOT IN` to affect the clustering

    CASE 5:
    if we run this table through the query:
    WHERE category NOT IN ('A')
    the where clause should be removed and we should get:
    [
        { 'category': 'A', count: 2 },
        { 'category': 'B', count: 3 },
    ]

    """
    from sqlalchemy import select, func
    
    # Create a new select statement based on the original query
    modified_query = query
    
    # Remove WHERE clauses that filter on the specified column
    if hasattr(query, '_where_criteria') and query._where_criteria:
        new_criteria = []
        for criterion in query._where_criteria:
            # Skip criteria that involve the specified column
            if hasattr(criterion, 'left') and hasattr(criterion.left, 'name'):
                if criterion.left.name == column:
                    continue
                # Also skip IN/NOT IN clauses for the column
                if hasattr(criterion, 'operator') and criterion.operator.__name__ in ('in_op', 'notin_op'):
                    if criterion.left.name == column:
                        continue
            new_criteria.append(criterion)
        
        # Create new query with filtered WHERE clauses
        if len(new_criteria) != len(query._where_criteria):
            modified_query = select(*query.selected_columns)
            if new_criteria:
                modified_query = modified_query.where(*new_criteria)
    
    # Create a subquery once and reuse it
    subq = modified_query.subquery()
    
    # Create the grouping query using the same subquery reference
    group_query = (
        select(
            subq.c[column],
            func.count().label('count')
        )
        .select_from(subq)
        .group_by(subq.c[column])
        .order_by(subq.c[column])
    )
    
    # Execute and return results
    results = session.execute(group_query).all()
    return [(row[0], row[1]) for row in results]

# run.vim:vert term pytest tests/test_db_api_base.py