File size: 4,130 Bytes
97f9138
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import sqlite3
import json
from datetime import datetime
from typing import Optional, List, Dict
from contextlib import contextmanager
from src.config import config

class Database:
    def __init__(self, db_path: str = None):
        self.db_path = db_path or config.DATABASE_PATH
        self._init_db()
    
    @contextmanager
    def get_connection(self):
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()
    
    def _init_db(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS urls (
                    id TEXT PRIMARY KEY,
                    url TEXT NOT NULL UNIQUE,
                    status TEXT NOT NULL,
                    created_at TEXT NOT NULL,
                    updated_at TEXT NOT NULL,
                    completed_at TEXT,
                    error_message TEXT,
                    chunk_count INTEGER DEFAULT 0,
                    metadata TEXT
                )
            """)
            cursor.execute("""
                CREATE INDEX IF NOT EXISTS idx_url_status ON urls(status)
            """)
            cursor.execute("""
                CREATE INDEX IF NOT EXISTS idx_url_created ON urls(created_at)
            """)
    
    def create_url_record(self, url_id: str, url: str) -> Dict:
        now = datetime.utcnow().isoformat()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("""
                INSERT INTO urls (id, url, status, created_at, updated_at)
                VALUES (?, ?, ?, ?, ?)
            """, (url_id, url, "pending", now, now))
        
        return {
            "id": url_id,
            "url": url,
            "status": "pending",
            "created_at": now,
            "updated_at": now
        }
    
    def get_url_record(self, url_id: str) -> Optional[Dict]:
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT * FROM urls WHERE id = ?", (url_id,))
            row = cursor.fetchone()
            
            if row:
                return dict(row)
            return None
    
    def update_url_status(self, url_id: str, status: str, 
                         error_message: Optional[str] = None,
                         chunk_count: Optional[int] = None,
                         metadata: Optional[Dict] = None):
        now = datetime.utcnow().isoformat()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            
            if status == "completed":
                cursor.execute("""
                    UPDATE urls 
                    SET status = ?, updated_at = ?, completed_at = ?, 
                        chunk_count = ?, metadata = ?
                    WHERE id = ?
                """, (status, now, now, chunk_count or 0, 
                     json.dumps(metadata) if metadata else None, url_id))
            elif status == "failed":
                cursor.execute("""
                    UPDATE urls 
                    SET status = ?, updated_at = ?, error_message = ?
                    WHERE id = ?
                """, (status, now, error_message, url_id))
            else:
                cursor.execute("""
                    UPDATE urls 
                    SET status = ?, updated_at = ?
                    WHERE id = ?
                """, (status, now, url_id))
    
    def get_all_urls(self, status: Optional[str] = None) -> List[Dict]:
        with self.get_connection() as conn:
            cursor = conn.cursor()
            if status:
                cursor.execute("SELECT * FROM urls WHERE status = ? ORDER BY created_at DESC", (status,))
            else:
                cursor.execute("SELECT * FROM urls ORDER BY created_at DESC")
            
            return [dict(row) for row in cursor.fetchall()]

db = Database()