Saurab Mishra commited on
Commit
e3ddf50
Β·
1 Parent(s): 5c95ec4

feat: handle chunked database assembly on pull for files >40MB

Browse files
Files changed (2) hide show
  1. mcp_server/cloud_server.py +32 -7
  2. sync.py +74 -46
mcp_server/cloud_server.py CHANGED
@@ -96,15 +96,40 @@ def pull_from_supabase():
96
  print(f"[CLOUD] Downloading {compressed_name} from Supabase...",
97
  file=sys.stderr)
98
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
  r = httpx.get(
100
  f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{compressed_name}",
101
  headers=headers,
102
  timeout=120,
103
  )
104
 
105
- # Fallback to uncompressed if .gz is missing (for backward compatibility)
106
  if r.status_code == 404:
107
- print(f"[CLOUD] ℹ️ Compressed not found, trying raw {remote_name}...", file=sys.stderr)
108
  r = httpx.get(
109
  f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{remote_name}",
110
  headers=headers,
@@ -114,16 +139,16 @@ def pull_from_supabase():
114
  with open(local_path, "wb") as f:
115
  f.write(r.content)
116
  size_mb = len(r.content) / (1024 * 1024)
117
- print(f"[CLOUD] βœ… Raw {remote_name} downloaded ({size_mb:.1f} MB).", file=sys.stderr)
118
  continue
119
 
120
  if r.status_code != 200:
121
- print(f"[CLOUD] ❌ {compressed_name} download failed ({r.status_code}): {r.text}",
122
  file=sys.stderr)
123
  return False
124
 
125
  # Decompress in memory and write
126
- print(f"[CLOUD] Decompressing {compressed_name}...", file=sys.stderr)
127
  decompressed_data = gzip.decompress(r.content)
128
 
129
  with open(local_path, "wb") as f:
@@ -131,10 +156,10 @@ def pull_from_supabase():
131
 
132
  download_mb = len(r.content) / (1024 * 1024)
133
  decompressed_mb = len(decompressed_data) / (1024 * 1024)
134
- print(f"[CLOUD] βœ… {remote_name} ready ({download_mb:.1f}MB β†’ {decompressed_mb:.1f}MB).",
135
  file=sys.stderr)
136
  except Exception as e:
137
- print(f"[CLOUD] ❌ Failed to download {remote_name}: {e}",
138
  file=sys.stderr)
139
  return False
140
 
 
96
  print(f"[CLOUD] Downloading {compressed_name} from Supabase...",
97
  file=sys.stderr)
98
  try:
99
+ # Try chunked download first (part000, part001, ...)
100
+ assembled = b""
101
+ part_idx = 0
102
+ while True:
103
+ chunk_name = f"{compressed_name}.part{part_idx:03d}"
104
+ r = httpx.get(
105
+ f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{chunk_name}",
106
+ headers=headers, timeout=120,
107
+ )
108
+ if r.status_code == 200:
109
+ assembled += r.content
110
+ print(f"[CLOUD] \u2705 {chunk_name} ({len(r.content)/1024/1024:.1f} MB)", file=sys.stderr)
111
+ part_idx += 1
112
+ else:
113
+ break # No more chunks
114
+
115
+ if assembled:
116
+ print(f"[CLOUD] Decompressing assembled {compressed_name}...", file=sys.stderr)
117
+ decompressed_data = gzip.decompress(assembled)
118
+ with open(local_path, "wb") as f:
119
+ f.write(decompressed_data)
120
+ print(f"[CLOUD] \u2705 {remote_name} ready ({len(assembled)/1024/1024:.1f}MB -> {len(decompressed_data)/1024/1024:.1f}MB).", file=sys.stderr)
121
+ continue
122
+
123
+ # Fallback: try single .gz file
124
  r = httpx.get(
125
  f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{compressed_name}",
126
  headers=headers,
127
  timeout=120,
128
  )
129
 
130
+ # Fallback to uncompressed if .gz is missing
131
  if r.status_code == 404:
132
+ print(f"[CLOUD] \u2139\ufe0f Compressed not found, trying raw {remote_name}...", file=sys.stderr)
133
  r = httpx.get(
134
  f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{remote_name}",
135
  headers=headers,
 
139
  with open(local_path, "wb") as f:
140
  f.write(r.content)
141
  size_mb = len(r.content) / (1024 * 1024)
142
+ print(f"[CLOUD] \u2705 Raw {remote_name} downloaded ({size_mb:.1f} MB).", file=sys.stderr)
143
  continue
144
 
145
  if r.status_code != 200:
146
+ print(f"[CLOUD] \u274c {compressed_name} download failed ({r.status_code}): {r.text}",
147
  file=sys.stderr)
148
  return False
149
 
150
  # Decompress in memory and write
151
+ print(f"[CLOUD] Decompressing single {compressed_name}...", file=sys.stderr)
152
  decompressed_data = gzip.decompress(r.content)
153
 
154
  with open(local_path, "wb") as f:
 
156
 
157
  download_mb = len(r.content) / (1024 * 1024)
158
  decompressed_mb = len(decompressed_data) / (1024 * 1024)
159
+ print(f"[CLOUD] \u2705 {remote_name} ready ({download_mb:.1f}MB -> {decompressed_mb:.1f}MB).",
160
  file=sys.stderr)
161
  except Exception as e:
162
+ print(f"[CLOUD] \u274c Failed to download {remote_name}: {e}",
163
  file=sys.stderr)
164
  return False
165
 
sync.py CHANGED
@@ -119,7 +119,7 @@ def push():
119
 
120
  raw_size_mb = path.stat().st_size / (1024 * 1024)
121
 
122
- # We always compress before uploading
123
  compressed_name = f"{remote_name}.gz"
124
  print(f" Compressing {remote_name} ({raw_size_mb:.1f} MB)...", file=sys.stderr)
125
 
@@ -127,34 +127,44 @@ def push():
127
  file_content = gzip.compress(f_in.read())
128
 
129
  comp_size_mb = len(file_content) / (1024 * 1024)
130
- print(f" Uploading {compressed_name} ({comp_size_mb:.1f} MB)...", file=sys.stderr)
131
-
132
- # Try to remove existing file first (upsert)
133
- httpx.delete(
134
- f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{compressed_name}",
135
- headers=_headers(),
136
- timeout=30,
137
- )
138
-
139
- # Upload compressed file
140
- r = httpx.post(
141
- f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{compressed_name}",
142
- headers={
143
- **_headers(),
144
- "Content-Type": "application/gzip",
145
- "x-upsert": "true",
146
- },
147
- content=file_content,
148
- timeout=120,
149
- )
150
-
151
- if r.status_code in (200, 201):
152
- print(f" βœ… {compressed_name} uploaded successfully.", file=sys.stderr)
153
  else:
154
- print(
155
- f" ❌ {compressed_name} upload failed ({r.status_code}): {r.text}",
156
- file=sys.stderr,
 
157
  )
 
 
 
 
 
 
 
 
 
 
158
 
159
  print("\nβœ… Cloud sync (push) complete.", file=sys.stderr)
160
 
@@ -162,6 +172,7 @@ def push():
162
  def pull():
163
  """Download database and FAISS index from Supabase Storage."""
164
  import httpx
 
165
 
166
  _check_credentials()
167
 
@@ -171,39 +182,56 @@ def pull():
171
  }
172
 
173
  for remote_name, local_path in files_to_download.items():
 
174
  print(f" Downloading {remote_name}...", file=sys.stderr)
175
 
176
  try:
177
- r = httpx.get(
178
- f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{remote_name}",
179
- headers=_headers(),
180
- timeout=120,
181
- )
182
 
183
- if r.status_code != 200:
184
- print(
185
- f" ❌ {remote_name} download failed ({r.status_code}): {r.text}",
186
- file=sys.stderr,
 
 
 
 
187
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
188
  continue
189
 
190
- # Ensure parent directory exists
191
- Path(local_path).parent.mkdir(parents=True, exist_ok=True)
192
-
193
- with open(local_path, "wb") as f:
194
- f.write(r.content)
195
-
196
- size_mb = len(r.content) / (1024 * 1024)
197
- print(
198
- f" βœ… {remote_name} downloaded ({size_mb:.1f} MB).",
199
- file=sys.stderr,
200
  )
 
 
 
 
 
 
 
 
201
  except Exception as e:
202
  print(f" ❌ {remote_name}: {e}", file=sys.stderr)
203
 
204
  print("\nβœ… Cloud sync (pull) complete.", file=sys.stderr)
205
 
206
 
 
207
  def status():
208
  """Check what files exist in the Supabase Storage bucket."""
209
  import httpx
 
119
 
120
  raw_size_mb = path.stat().st_size / (1024 * 1024)
121
 
122
+ # Compress before uploading
123
  compressed_name = f"{remote_name}.gz"
124
  print(f" Compressing {remote_name} ({raw_size_mb:.1f} MB)...", file=sys.stderr)
125
 
 
127
  file_content = gzip.compress(f_in.read())
128
 
129
  comp_size_mb = len(file_content) / (1024 * 1024)
130
+
131
+ # Supabase free tier has a ~50MB object size limit β€” split into 40MB chunks
132
+ CHUNK_SIZE = 40 * 1024 * 1024 # 40 MB
133
+ if len(file_content) > CHUNK_SIZE:
134
+ chunks = [file_content[i:i+CHUNK_SIZE] for i in range(0, len(file_content), CHUNK_SIZE)]
135
+ print(f" Splitting {compressed_name} ({comp_size_mb:.1f} MB) into {len(chunks)} chunks...", file=sys.stderr)
136
+ for idx, chunk in enumerate(chunks):
137
+ chunk_name = f"{compressed_name}.part{idx:03d}"
138
+ httpx.delete(
139
+ f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{chunk_name}",
140
+ headers=_headers(), timeout=30,
141
+ )
142
+ r = httpx.post(
143
+ f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{chunk_name}",
144
+ headers={**_headers(), "Content-Type": "application/octet-stream", "x-upsert": "true"},
145
+ content=chunk,
146
+ timeout=180,
147
+ )
148
+ if r.status_code in (200, 201):
149
+ print(f" βœ… {chunk_name} ({len(chunk)/1024/1024:.1f} MB) uploaded.", file=sys.stderr)
150
+ else:
151
+ print(f" ❌ {chunk_name} failed ({r.status_code}): {r.text}", file=sys.stderr)
 
152
  else:
153
+ print(f" Uploading {compressed_name} ({comp_size_mb:.1f} MB)...", file=sys.stderr)
154
+ httpx.delete(
155
+ f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{compressed_name}",
156
+ headers=_headers(), timeout=30,
157
  )
158
+ r = httpx.post(
159
+ f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{compressed_name}",
160
+ headers={**_headers(), "Content-Type": "application/gzip", "x-upsert": "true"},
161
+ content=file_content,
162
+ timeout=120,
163
+ )
164
+ if r.status_code in (200, 201):
165
+ print(f" βœ… {compressed_name} uploaded successfully.", file=sys.stderr)
166
+ else:
167
+ print(f" ❌ {compressed_name} upload failed ({r.status_code}): {r.text}", file=sys.stderr)
168
 
169
  print("\nβœ… Cloud sync (push) complete.", file=sys.stderr)
170
 
 
172
  def pull():
173
  """Download database and FAISS index from Supabase Storage."""
174
  import httpx
175
+ import gzip
176
 
177
  _check_credentials()
178
 
 
182
  }
183
 
184
  for remote_name, local_path in files_to_download.items():
185
+ compressed_name = f"{remote_name}.gz"
186
  print(f" Downloading {remote_name}...", file=sys.stderr)
187
 
188
  try:
189
+ Path(local_path).parent.mkdir(parents=True, exist_ok=True)
 
 
 
 
190
 
191
+ # Try chunked download first (part000, part001, ...)
192
+ assembled = b""
193
+ part_idx = 0
194
+ while True:
195
+ chunk_name = f"{compressed_name}.part{part_idx:03d}"
196
+ r = httpx.get(
197
+ f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{chunk_name}",
198
+ headers=_headers(), timeout=180,
199
  )
200
+ if r.status_code == 200:
201
+ assembled += r.content
202
+ print(f" βœ… {chunk_name} ({len(r.content)/1024/1024:.1f} MB)", file=sys.stderr)
203
+ part_idx += 1
204
+ else:
205
+ break # No more parts
206
+
207
+ if assembled:
208
+ # Decompress the reassembled chunks
209
+ data = gzip.decompress(assembled)
210
+ with open(local_path, "wb") as f:
211
+ f.write(data)
212
+ print(f" βœ… {remote_name} reassembled ({len(data)/1024/1024:.1f} MB uncompressed).", file=sys.stderr)
213
  continue
214
 
215
+ # Fallback: try downloading as a single compressed file
216
+ r = httpx.get(
217
+ f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET}/{compressed_name}",
218
+ headers=_headers(), timeout=180,
 
 
 
 
 
 
219
  )
220
+ if r.status_code == 200:
221
+ data = gzip.decompress(r.content)
222
+ with open(local_path, "wb") as f:
223
+ f.write(data)
224
+ print(f" βœ… {remote_name} downloaded ({len(data)/1024/1024:.1f} MB).", file=sys.stderr)
225
+ else:
226
+ print(f" ❌ {remote_name} not found in Supabase ({r.status_code}).", file=sys.stderr)
227
+
228
  except Exception as e:
229
  print(f" ❌ {remote_name}: {e}", file=sys.stderr)
230
 
231
  print("\nβœ… Cloud sync (pull) complete.", file=sys.stderr)
232
 
233
 
234
+
235
  def status():
236
  """Check what files exist in the Supabase Storage bucket."""
237
  import httpx