Spaces:
Runtime error
Runtime error
File size: 4,754 Bytes
825942f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | from typing import List, Any, Tuple, TYPE_CHECKING
if TYPE_CHECKING:
from sqlalchemy.orm import Session
from sqlalchemy.sql.selectable import Select
def get_pagination(
total_items: int,
page: int = 1,
size: int = 50
) -> dict:
"""Get pagination details.
Args:
total_items: Total number of items
page: Current page number (1-based)
size: Items per page
Returns:
Dictionary with pagination details
"""
page = max(1, page)
size = max(1, min(size, 100))
total_pages = (total_items + size - 1) // size
page = min(page, total_pages) if total_pages > 0 else 1
return {
'current_page': page,
'page_size': size,
'total_pages': total_pages,
'is_first_page': page == 1,
'is_last_page': page >= total_pages,
'previous_page': page - 1 if page > 1 else None,
'next_page': page + 1 if page < total_pages else None,
'total_items': total_items
}
def get_clusters_sqlalchemy(query: "Select", column: str, session: "Session") -> List[Tuple[Any, int]]:
"""
A SQLAlchemy helper function.
Takes a query and runs it.
If there is any WHERE clause in the query that filters for including / excluding the `column` or uses column `IN` or column `NOT IN`, thoses WHERE clauses should
be removed before executing the query
Then runs a GROUP BY on the results of the query.
Returns that result.
Basically provides an interface for creating buttons that user can press to add / remove tags
Example:
[
{ 'category': 'A', name: 'Apple' },
{ 'category': 'A', name: 'Astronaut' },
{ 'category': 'B', name: 'Banana' },
{ 'category': 'B', name: 'Bird' },
{ 'category': 'B', name: 'Bison' },
]
CASE 1:
if we run this table through the normal get_clusters, we should get:
[
{ 'category': 'A', count: 2 },
{ 'category': 'B', count: 3 },
]
CASE 2:
if we run this table through the query:
WHERE name == 'Apple', and cluster this via get_clusters, we should get:
[
{ 'category': 'A', count: 1 },
]
CASE 3:
if we run this table through the query:
WHERE category = 'A'
the where clause should be removed and we should get:
[
{ 'category': 'A', count: 2 },
{ 'category': 'B', count: 3 },
]
CASE 4:
if we run this table through the query:
WHERE category IN ('A')
the where clause should be removed and we should get:
[
{ 'category': 'A', count: 2 },
{ 'category': 'B', count: 3 },
]
as we don't want WHERE clauses that filter for including / excluding the `column` or uses column `IN` or column `NOT IN` to affect the clustering
CASE 5:
if we run this table through the query:
WHERE category NOT IN ('A')
the where clause should be removed and we should get:
[
{ 'category': 'A', count: 2 },
{ 'category': 'B', count: 3 },
]
"""
from sqlalchemy import select, func
# Create a new select statement based on the original query
modified_query = query
# Remove WHERE clauses that filter on the specified column
if hasattr(query, '_where_criteria') and query._where_criteria:
new_criteria = []
for criterion in query._where_criteria:
# Skip criteria that involve the specified column
if hasattr(criterion, 'left') and hasattr(criterion.left, 'name'):
if criterion.left.name == column:
continue
# Also skip IN/NOT IN clauses for the column
if hasattr(criterion, 'operator') and criterion.operator.__name__ in ('in_op', 'notin_op'):
if criterion.left.name == column:
continue
new_criteria.append(criterion)
# Create new query with filtered WHERE clauses
if len(new_criteria) != len(query._where_criteria):
modified_query = select(*query.selected_columns)
if new_criteria:
modified_query = modified_query.where(*new_criteria)
# Create a subquery once and reuse it
subq = modified_query.subquery()
# Create the grouping query using the same subquery reference
group_query = (
select(
subq.c[column],
func.count().label('count')
)
.select_from(subq)
.group_by(subq.c[column])
.order_by(subq.c[column])
)
# Execute and return results
results = session.execute(group_query).all()
return [(row[0], row[1]) for row in results]
# run.vim:vert term pytest tests/test_db_api_base.py
|