File size: 4,130 Bytes
beb8990
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
Base Manager - Common CRUD operations for all Calendar services
"""

import sqlite3
import logging
from typing import Dict, Optional, List, Any
from datetime import datetime

logger = logging.getLogger(__name__)


class BaseManager:
    """Base manager for common database operations"""

    def __init__(self, db_path: str):
        self.db_path = db_path

    def execute_query(self, query: str, params: tuple = ()) -> List[Dict]:
        """Execute a SELECT query and return results"""
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        try:
            cursor = conn.execute(query, params)
            return [dict(row) for row in cursor.fetchall()]
        finally:
            conn.close()

    def execute_insert(self, query: str, params: tuple = ()) -> int:
        """Execute an INSERT query and return the last row ID"""
        conn = sqlite3.connect(self.db_path)
        try:
            cursor = conn.execute(query, params)
            conn.commit()
            return cursor.lastrowid
        except sqlite3.IntegrityError as e:
            logger.error(f"Database integrity error: {e}")
            raise ValueError(f"Database constraint violation: {e}")
        finally:
            conn.close()

    def execute_update(self, query: str, params: tuple = ()) -> int:
        """Execute an UPDATE query and return the number of affected rows"""
        conn = sqlite3.connect(self.db_path)
        try:
            cursor = conn.execute(query, params)
            conn.commit()
            return cursor.rowcount
        finally:
            conn.close()

    def execute_delete(self, query: str, params: tuple = ()) -> int:
        """Execute a DELETE query and return the number of affected rows"""
        conn = sqlite3.connect(self.db_path)
        try:
            cursor = conn.execute(query, params)
            conn.commit()
            return cursor.rowcount
        finally:
            conn.close()

    def get_by_id(self, table: str, record_id: int) -> Optional[Dict]:
        """Get a record by ID from any table"""
        query = f"SELECT * FROM {table} WHERE id = ?"
        results = self.execute_query(query, (record_id,))
        return results[0] if results else None

    def get_all(self, table: str, limit: int = 100, offset: int = 0, order_by: str = "id DESC") -> List[Dict]:
        """Get all records from a table with pagination"""
        query = f"SELECT * FROM {table} ORDER BY {order_by} LIMIT ? OFFSET ?"
        return self.execute_query(query, (limit, offset))

    def count_records(self, table: str, where_clause: str = "", params: tuple = ()) -> int:
        """Count records in a table"""
        query = f"SELECT COUNT(*) as count FROM {table}"
        if where_clause:
            query += f" WHERE {where_clause}"
        result = self.execute_query(query, params)
        return result[0]["count"] if result else 0

    def update_record(self, table: str, record_id: int, updates: Dict[str, Any]) -> bool:
        """Update a record with given field values"""
        if not updates:
            return False

        # Add updated_at timestamp if the table has this column
        updates["updated_at"] = datetime.now().isoformat()

        set_clauses = []
        params = []

        for field, value in updates.items():
            set_clauses.append(f"{field} = ?")
            params.append(value)

        params.append(record_id)

        query = f"UPDATE {table} SET {', '.join(set_clauses)} WHERE id = ?"
        affected_rows = self.execute_update(query, tuple(params))
        return affected_rows > 0

    def delete_record(self, table: str, record_id: int) -> bool:
        """Delete a record by ID"""
        query = f"DELETE FROM {table} WHERE id = ?"
        affected_rows = self.execute_delete(query, (record_id,))
        return affected_rows > 0

    def table_exists(self, table_name: str) -> bool:
        """Check if a table exists"""
        query = "SELECT name FROM sqlite_master WHERE type='table' AND name=?"
        result = self.execute_query(query, (table_name,))
        return len(result) > 0