File size: 7,226 Bytes
0ae355f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import os
import uuid
import sqlalchemy
from typing import Iterable
from sqlalchemy.orm import Session
from sqlalchemy.dialects import sqlite

from app.assets.helpers import utcnow
from app.assets.database.models import Asset, AssetCacheState, AssetInfo, AssetInfoTag, AssetInfoMeta

MAX_BIND_PARAMS = 800

def _chunk_rows(rows: list[dict], cols_per_row: int, max_bind_params: int) -> Iterable[list[dict]]:
    if not rows:
        return []
    rows_per_stmt = max(1, max_bind_params // max(1, cols_per_row))
    for i in range(0, len(rows), rows_per_stmt):
        yield rows[i:i + rows_per_stmt]

def _iter_chunks(seq, n: int):
    for i in range(0, len(seq), n):
        yield seq[i:i + n]

def _rows_per_stmt(cols: int) -> int:
    return max(1, MAX_BIND_PARAMS // max(1, cols))


def seed_from_paths_batch(
    session: Session,
    *,
    specs: list[dict],
    owner_id: str = "",
) -> dict:
    """Each spec is a dict with keys:
      - abs_path: str
      - size_bytes: int
      - mtime_ns: int
      - info_name: str
      - tags: list[str]
      - fname: Optional[str]
    """
    if not specs:
        return {"inserted_infos": 0, "won_states": 0, "lost_states": 0}

    now = utcnow()
    asset_rows: list[dict] = []
    state_rows: list[dict] = []
    path_to_asset: dict[str, str] = {}
    asset_to_info: dict[str, dict] = {}  # asset_id -> prepared info row
    path_list: list[str] = []

    for sp in specs:
        ap = os.path.abspath(sp["abs_path"])
        aid = str(uuid.uuid4())
        iid = str(uuid.uuid4())
        path_list.append(ap)
        path_to_asset[ap] = aid

        asset_rows.append(
            {
                "id": aid,
                "hash": None,
                "size_bytes": sp["size_bytes"],
                "mime_type": None,
                "created_at": now,
            }
        )
        state_rows.append(
            {
                "asset_id": aid,
                "file_path": ap,
                "mtime_ns": sp["mtime_ns"],
            }
        )
        asset_to_info[aid] = {
            "id": iid,
            "owner_id": owner_id,
            "name": sp["info_name"],
            "asset_id": aid,
            "preview_id": None,
            "user_metadata": {"filename": sp["fname"]} if sp["fname"] else None,
            "created_at": now,
            "updated_at": now,
            "last_access_time": now,
            "_tags": sp["tags"],
            "_filename": sp["fname"],
        }

    # insert all seed Assets (hash=NULL)
    ins_asset = sqlite.insert(Asset)
    for chunk in _iter_chunks(asset_rows, _rows_per_stmt(5)):
        session.execute(ins_asset, chunk)

    # try to claim AssetCacheState (file_path)
    # Insert with ON CONFLICT DO NOTHING, then query to find which paths were actually inserted
    ins_state = (
        sqlite.insert(AssetCacheState)
        .on_conflict_do_nothing(index_elements=[AssetCacheState.file_path])
    )
    for chunk in _iter_chunks(state_rows, _rows_per_stmt(3)):
        session.execute(ins_state, chunk)

    # Query to find which of our paths won (were actually inserted)
    winners_by_path: set[str] = set()
    for chunk in _iter_chunks(path_list, MAX_BIND_PARAMS):
        result = session.execute(
            sqlalchemy.select(AssetCacheState.file_path)
            .where(AssetCacheState.file_path.in_(chunk))
            .where(AssetCacheState.asset_id.in_([path_to_asset[p] for p in chunk]))
        )
        winners_by_path.update(result.scalars().all())

    all_paths_set = set(path_list)
    losers_by_path = all_paths_set - winners_by_path
    lost_assets = [path_to_asset[p] for p in losers_by_path]
    if lost_assets:  # losers get their Asset removed
        for id_chunk in _iter_chunks(lost_assets, MAX_BIND_PARAMS):
            session.execute(sqlalchemy.delete(Asset).where(Asset.id.in_(id_chunk)))

    if not winners_by_path:
        return {"inserted_infos": 0, "won_states": 0, "lost_states": len(losers_by_path)}

    # insert AssetInfo only for winners
    # Insert with ON CONFLICT DO NOTHING, then query to find which were actually inserted
    winner_info_rows = [asset_to_info[path_to_asset[p]] for p in winners_by_path]
    ins_info = (
        sqlite.insert(AssetInfo)
        .on_conflict_do_nothing(index_elements=[AssetInfo.asset_id, AssetInfo.owner_id, AssetInfo.name])
    )
    for chunk in _iter_chunks(winner_info_rows, _rows_per_stmt(9)):
        session.execute(ins_info, chunk)

    # Query to find which info rows were actually inserted (by matching our generated IDs)
    all_info_ids = [row["id"] for row in winner_info_rows]
    inserted_info_ids: set[str] = set()
    for chunk in _iter_chunks(all_info_ids, MAX_BIND_PARAMS):
        result = session.execute(
            sqlalchemy.select(AssetInfo.id).where(AssetInfo.id.in_(chunk))
        )
        inserted_info_ids.update(result.scalars().all())

    # build and insert tag + meta rows for the AssetInfo
    tag_rows: list[dict] = []
    meta_rows: list[dict] = []
    if inserted_info_ids:
        for row in winner_info_rows:
            iid = row["id"]
            if iid not in inserted_info_ids:
                continue
            for t in row["_tags"]:
                tag_rows.append({
                    "asset_info_id": iid,
                    "tag_name": t,
                    "origin": "automatic",
                    "added_at": now,
                })
            if row["_filename"]:
                meta_rows.append(
                    {
                        "asset_info_id": iid,
                        "key": "filename",
                        "ordinal": 0,
                        "val_str": row["_filename"],
                        "val_num": None,
                        "val_bool": None,
                        "val_json": None,
                    }
                )

    bulk_insert_tags_and_meta(session, tag_rows=tag_rows, meta_rows=meta_rows, max_bind_params=MAX_BIND_PARAMS)
    return {
        "inserted_infos": len(inserted_info_ids),
        "won_states": len(winners_by_path),
        "lost_states": len(losers_by_path),
    }


def bulk_insert_tags_and_meta(
    session: Session,
    *,
    tag_rows: list[dict],
    meta_rows: list[dict],
    max_bind_params: int,
) -> None:
    """Batch insert into asset_info_tags and asset_info_meta with ON CONFLICT DO NOTHING.
    - tag_rows keys: asset_info_id, tag_name, origin, added_at
    - meta_rows keys: asset_info_id, key, ordinal, val_str, val_num, val_bool, val_json
    """
    if tag_rows:
        ins_links = (
            sqlite.insert(AssetInfoTag)
            .on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name])
        )
        for chunk in _chunk_rows(tag_rows, cols_per_row=4, max_bind_params=max_bind_params):
            session.execute(ins_links, chunk)
    if meta_rows:
        ins_meta = (
            sqlite.insert(AssetInfoMeta)
            .on_conflict_do_nothing(
                index_elements=[AssetInfoMeta.asset_info_id, AssetInfoMeta.key, AssetInfoMeta.ordinal]
            )
        )
        for chunk in _chunk_rows(meta_rows, cols_per_row=7, max_bind_params=max_bind_params):
            session.execute(ins_meta, chunk)