storage_manager / app.py
ford442's picture
Update app.py
f6f491d verified
# storage manager
import os
import json
import uuid
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from typing import List, Optional
from datetime import datetime
import uvicorn
from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from aiocache import Cache
# Google Cloud Imports
from google.cloud import storage
from google.oauth2 import service_account
# --- CONFIGURATION ---
BUCKET_NAME = os.environ.get("GCP_BUCKET_NAME")
# Handle Credentials: If provided as a raw JSON string in env var
CREDENTIALS_JSON = os.environ.get("GCP_CREDENTIALS")
# --- STORAGE MAP ---
# Defines the folder structure inside the bucket
STORAGE_MAP = {
"song": {"folder": "songs/", "index": "songs/_songs.json"},
"pattern": {"folder": "patterns/", "index": "patterns/_patterns.json"},
"bank": {"folder": "banks/", "index": "banks/_banks.json"},
"sample": {"folder": "samples/", "index": "samples/_samples.json"},
"music": {"folder": "music/", "index": "music/_music.json"},
"note": {"folder": "notes/", "index": "notes/_notes.json"},
"default": {"folder": "misc/", "index": "misc/_misc.json"}
}
# --- GLOBAL OBJECTS ---
gcs_client = None
bucket = None
# GCS handles high concurrency well
io_executor = ThreadPoolExecutor(max_workers=20)
cache = Cache(Cache.MEMORY)
INDEX_LOCK = asyncio.Lock() # Prevents race conditions during index writes
# --- HELPERS ---
def get_gcs_client():
"""Initializes the GCS Client from environment variable string or file"""
if CREDENTIALS_JSON:
# Load credentials from the JSON string stored in secrets
cred_info = json.loads(CREDENTIALS_JSON)
creds = service_account.Credentials.from_service_account_info(
cred_info)
return storage.Client(credentials=creds)
else:
# Fallback to standard environment variable lookups (local dev)
return storage.Client()
async def run_io(func, *args, **kwargs):
"""Runs blocking GCS I/O in a thread pool"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(io_executor, lambda: func(*args, **kwargs))
# --- LIFESPAN ---
@asynccontextmanager
async def lifespan(app: FastAPI):
global gcs_client, bucket
try:
gcs_client = get_gcs_client()
bucket = gcs_client.bucket(BUCKET_NAME)
print(f"--- GCS CONNECTED: {BUCKET_NAME} ---")
except Exception as e:
print(f"!!! GCS CONNECTION FAILED: {e}")
yield
io_executor.shutdown()
app = FastAPI(lifespan=lifespan)
# --- CORS ---
# Replace ["*"] with your actual external site URL to prevent strangers from using your API
ALLOWED_ORIGINS = [
"http://localhost:3000", # For your local testing
"https://test.1ink.us", # <--- REPLACE THIS with your actual site
"https://go.1ink.us", # <--- REPLACE THIS with your actual site
"https://noahcohn.com", # <--- REPLACE THIS with your actual site
]
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS, # Uses the list above
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- DIRECT STORAGE LISTING ---
@app.get("/api/storage/files")
async def list_gcs_folder(folder: str = Query(..., description="Folder name, e.g., 'songs' or 'samples'")):
"""
Directly lists files in a GCS folder (ignoring the JSON index).
Useful for seeing what is actually on the disk.
"""
# 1. Get the correct prefix from your STORAGE_MAP, or use the folder name directly
# This handles cases where user types "song" but folder is "songs/"
config = STORAGE_MAP.get(folder)
prefix = config["folder"] if config else f"{folder}/"
try:
# 2. Run GCS List Blobs in a thread (to keep server fast)
def _fetch_blobs():
# 'delimiter' makes it behave like a folder (doesn't show sub-sub-files)
blobs = bucket.list_blobs(prefix=prefix, delimiter="/")
file_list = []
for blob in blobs:
# Remove the folder prefix (e.g. "songs/beat1.json" -> "beat1.json")
name = blob.name.replace(prefix, "")
if name and name != "":
file_list.append({
"filename": name,
"size": blob.size,
"updated": blob.updated.isoformat() if blob.updated else None,
"url": blob.public_url if blob.public_url else None
})
return file_list
files = await run_io(_fetch_blobs)
return {"folder": prefix, "count": len(files), "files": files}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# --- MODELS ---
class ItemPayload(BaseModel):
name: str
author: str
description: Optional[str] = ""
type: str = "song"
data: dict
rating: Optional[int] = None
class MetaData(BaseModel):
id: str
name: str
author: str
date: str
type: str
description: Optional[str] = ""
filename: str
rating: Optional[int] = None
genre: Optional[str] = None
last_played: Optional[str] = None # ISO timestamp
# --- GCS I/O HELPERS ---
def _read_json_sync(blob_path):
blob = bucket.blob(blob_path)
if blob.exists():
return json.loads(blob.download_as_text())
return []
def _write_json_sync(blob_path, data):
blob = bucket.blob(blob_path)
# Upload as JSON string with correct content type
blob.upload_from_string(
json.dumps(data),
content_type='application/json'
)
# --- ENDPOINTS ---
@app.get("/")
def home():
return {"status": "online", "provider": "Google Cloud Storage"}
# --- 0.5 HEALTH CHECK & TEST DATA ---
@app.post("/api/admin/sync-music")
async def sync_music_folder():
"""Scans the music/ folder and rebuilds the music index."""
config = STORAGE_MAP["music"]
report = {"added": 0, "removed": 0}
async with INDEX_LOCK:
try:
# 1. List all files in music/
blobs = await run_io(lambda: list(bucket.list_blobs(prefix=config["folder"])))
# Filter for audio files (FLAC, WAV, MP3)
audio_files = []
for b in blobs:
fname = b.name.replace(config["folder"], "")
if fname and not b.name.endswith(config["index"]):
lower = fname.lower()
if lower.endswith(('.flac', '.wav', '.mp3', '.ogg')):
audio_files.append({
"filename": fname,
"name": fname,
"size": b.size,
"url": b.public_url
})
# 2. Get current index
index_data = await run_io(_read_json_sync, config["index"])
if not isinstance(index_data, list):
index_data = []
# 3. Compare and update
index_map = {item["filename"]: item for item in index_data}
disk_set = set(f["filename"] for f in audio_files)
# Remove missing files
new_index = [item for item in index_data if item["filename"] in disk_set]
report["removed"] = len(index_data) - len(new_index)
# Add new files
for file_info in audio_files:
if file_info["filename"] not in index_map:
new_entry = {
"id": str(uuid.uuid4()),
"filename": file_info["filename"],
"name": file_info["name"],
"type": "music",
"date": datetime.now().strftime("%Y-%m-%d"),
"author": "Unknown",
"description": "",
"rating": None,
"genre": None,
"last_played": None,
"url": file_info["url"],
"size": file_info["size"]
}
new_index.insert(0, new_entry)
report["added"] += 1
if report["added"] > 0 or report["removed"] > 0:
await run_io(_write_json_sync, config["index"], new_index)
await cache.delete("library:music")
await cache.delete("library:all")
report["total"] = len(new_index)
return report
except Exception as e:
raise HTTPException(500, f"Failed to sync music: {str(e)}")
@app.post("/api/admin/seed-test-samples")
async def seed_test_samples():
"""Creates test sample entries for development."""
config = STORAGE_MAP["sample"]
test_samples = [
{
"id": "test-flac-001",
"name": "Test Ambient Track.flac",
"filename": "test-flac-001.flac",
"type": "sample",
"author": "Test Artist",
"date": "2024-02-09",
"description": "Test ambient track",
"rating": 8,
"genre": "ambient"
},
{
"id": "test-wav-002",
"name": "Test Bass Line.wav",
"filename": "test-wav-002.wav",
"type": "sample",
"author": "Test Artist",
"date": "2024-02-09",
"description": "Test bass line",
"rating": 7,
"genre": "bass"
},
{
"id": "test-flac-003",
"name": "Unrated Demo.flac",
"filename": "test-flac-003.flac",
"type": "sample",
"author": "Unknown",
"date": "2024-02-09",
"description": "Demo without rating",
"rating": None,
"genre": None
}
]
async with INDEX_LOCK:
try:
# Read existing
index_data = await run_io(_read_json_sync, config["index"])
if not isinstance(index_data, list):
index_data = []
# Add test samples (avoid duplicates)
existing_ids = {item.get("id") for item in index_data}
added = 0
for sample in test_samples:
if sample["id"] not in existing_ids:
index_data.insert(0, sample)
added += 1
await run_io(_write_json_sync, config["index"], index_data)
await cache.delete("library:sample")
await cache.delete("library:all")
return {"success": True, "added": added, "total": len(index_data)}
except Exception as e:
raise HTTPException(500, f"Failed to seed: {str(e)}")
@app.get("/api/health")
async def health_check():
"""Returns storage manager status and index counts."""
status = {}
for item_type, config in STORAGE_MAP.items():
if item_type == "default":
continue
try:
items = await run_io(_read_json_sync, config["index"])
count = len(items) if isinstance(items, list) else 0
status[item_type] = {"count": count, "status": "ok"}
except Exception as e:
status[item_type] = {"count": 0, "status": "error", "error": str(e)}
return {"status": "online", "storage": status}
# --- 1. LISTING (Cached) ---
from enum import Enum
class SortBy(str, Enum):
date = "date"
rating = "rating"
name = "name"
last_played = "last_played"
genre = "genre"
@app.get("/api/songs", response_model=List[MetaData])
async def list_library(
type: Optional[str] = Query(None),
sort_by: SortBy = Query(SortBy.date),
sort_desc: bool = Query(True),
genre: Optional[str] = Query(None),
min_rating: Optional[int] = Query(None, ge=1, le=10)
):
cache_key = f"library:{type or 'all'}:{sort_by}:{sort_desc}:{genre}:{min_rating}"
cached = await cache.get(cache_key)
if cached:
return cached
search_types = [type] if type else ["song", "pattern", "bank", "sample", "music"]
results = []
for t in search_types:
config = STORAGE_MAP.get(t, STORAGE_MAP["default"])
try:
# Fetch index file from GCS
items = await run_io(_read_json_sync, config["index"])
if isinstance(items, list):
results.extend(items)
except Exception as e:
print(f"Error listing {t}: {e}")
# Filter by genre
if genre:
results = [r for r in results if r.get("genre") == genre]
# Filter by min rating
if min_rating is not None:
results = [r for r in results if (r.get("rating") or 0) >= min_rating]
# Sort results
def sort_key(item):
val = item.get(sort_by.value)
if val is None:
# None values go to the end
return (1, "")
return (0, val)
results.sort(key=sort_key, reverse=sort_desc)
await cache.set(cache_key, results, ttl=30)
return results
# --- 2. UPLOAD JSON ---
@app.post("/api/songs")
async def upload_item(payload: ItemPayload):
item_id = str(uuid.uuid4())
date_str = datetime.now().strftime("%Y-%m-%d")
item_type = payload.type if payload.type in STORAGE_MAP else "song"
config = STORAGE_MAP[item_type]
filename = f"{item_id}.json"
full_path = f"{config['folder']}{filename}" # e.g., songs/uuid.json
meta = {
"id": item_id,
"name": payload.name,
"author": payload.author,
"date": date_str,
"type": item_type,
"description": payload.description,
"filename": filename,
"rating": payload.rating
}
# Add meta to the actual data file too
payload.data["_cloud_meta"] = meta
async with INDEX_LOCK:
try:
# 1. Write the Data File
await run_io(_write_json_sync, full_path, payload.data)
# 2. Update the Index
def _update_index():
current = _read_json_sync(config["index"])
current.insert(0, meta)
_write_json_sync(config["index"], current)
await run_io(_update_index)
# Clear cache
await cache.clear()
return {"success": True, "id": item_id}
except Exception as e:
raise HTTPException(500, f"Upload failed: {str(e)}")
# --- 2.5 UPDATE JSON (PUT) ---
@app.put("/api/songs/{item_id}")
async def update_item(item_id: str, payload: ItemPayload):
# Verify type configuration
item_type = payload.type if payload.type in STORAGE_MAP else "song"
config = STORAGE_MAP[item_type]
# Assume filename is {id}.json (standard convention for this app)
filename = f"{item_id}.json"
full_path = f"{config['folder']}{filename}"
# We preserve the original ID but update other metadata
# Note: 'date' field in metadata is typically 'created_at'.
# We could add an 'updated_at' field if needed, but for now we keep 'date' as is or update it?
# Usually you don't change the creation date.
# We will fetch the original creation date if possible, or just use current if not found.
# However, to avoid complexity, we'll just update the metadata entry in the index.
# Use current date as 'last updated' effectively?
date_str = datetime.now().strftime("%Y-%m-%d")
# Or should we try to preserve original date?
# Let's try to preserve it by reading the index first.
new_meta = {
"id": item_id,
"name": payload.name,
"author": payload.author,
"date": date_str, # Defaulting to now if not found
"type": item_type,
"description": payload.description,
"filename": filename,
"rating": payload.rating
}
# Add meta to data
payload.data["_cloud_meta"] = new_meta
async with INDEX_LOCK:
try:
# 1. Check if file exists (optional, but good for 404)
# blob = bucket.blob(full_path)
# if not await run_io(blob.exists):
# raise HTTPException(404, "Item not found")
# 2. Write the Data File (Overwrite)
await run_io(_write_json_sync, full_path, payload.data)
# 3. Update the Index
def _update_index_logic():
current = _read_json_sync(config["index"])
if not isinstance(current, list):
current = []
# Find and remove existing entry for this ID
# Also, capture the original date if possible to preserve "Created Date" behavior
# But user might want "Updated Date". Let's stick to updating it to "now" so it bubbles to top.
existing_index = next((i for i, item in enumerate(
current) if item.get("id") == item_id), -1)
if existing_index != -1:
# Preserve original creation date if desired, but user wants 'latest' usually.
# Let's keep it simple: Remove old, insert new at top (Fresh Update)
current.pop(existing_index)
current.insert(0, new_meta)
_write_json_sync(config["index"], current)
await run_io(_update_index_logic)
# Clear cache
await cache.clear()
return {"success": True, "id": item_id, "action": "updated"}
except Exception as e:
raise HTTPException(500, f"Update failed: {str(e)}")
# --- 3. FETCH METADATA (without full data) ---
@app.get("/api/songs/{item_id}/meta")
async def get_item_metadata(item_id: str, type: Optional[str] = Query(None)):
"""
Returns only the metadata for an item without the full data payload.
Much faster for listing/details views.
"""
search_types = [type] if type else ["song", "pattern", "bank"]
for t in search_types:
config = STORAGE_MAP.get(t)
if not config:
continue
# Read from index (much faster than fetching full file)
index_data = await run_io(_read_json_sync, config["index"])
if isinstance(index_data, list):
entry = next((item for item in index_data if item.get("id") == item_id), None)
if entry:
return entry
raise HTTPException(404, "Item not found")
# --- 3.5 FETCH JSON ITEM ---
@app.get("/api/songs/{item_id}")
async def get_item(item_id: str, type: Optional[str] = Query(None)):
# Try to find the file
search_types = [type] if type else ["song", "pattern", "bank"]
for t in search_types:
config = STORAGE_MAP.get(t)
filepath = f"{config['folder']}{item_id}.json"
# Check existence efficiently
blob = bucket.blob(filepath)
exists = await run_io(blob.exists)
if exists:
data = await run_io(blob.download_as_text)
return json.loads(data)
raise HTTPException(404, "Item not found")
# --- 4. STREAMING SAMPLES (Upload & Download) ---
@app.post("/api/samples")
async def upload_sample(
file: UploadFile = File(...),
author: str = Form(...),
description: str = Form(""),
rating: Optional[int] = Form(None)
):
sample_id = str(uuid.uuid4())
ext = os.path.splitext(file.filename)[1]
storage_filename = f"{sample_id}{ext}"
config = STORAGE_MAP["sample"]
full_path = f"{config['folder']}{storage_filename}"
meta = {
"id": sample_id,
"name": file.filename,
"author": author,
"date": datetime.now().strftime("%Y-%m-%d"),
"type": "sample",
"description": description,
"filename": storage_filename,
"rating": rating
}
async with INDEX_LOCK:
try:
# 1. Stream Upload to GCS
blob = bucket.blob(full_path)
# GCS Python client doesn't support async streaming upload easily out of the box,
# but upload_from_file is efficient.
# We wrap the spooled temp file from FastAPI
await run_io(blob.upload_from_file, file.file, content_type=file.content_type)
# 2. Update Index
def _update_idx():
idx = _read_json_sync(config["index"])
idx.insert(0, meta)
_write_json_sync(config["index"], idx)
await run_io(_update_idx)
await cache.delete("library:sample")
return {"success": True, "id": sample_id}
except Exception as e:
raise HTTPException(500, str(e))
@app.get("/api/samples/{sample_id}")
async def get_sample(sample_id: str):
config = STORAGE_MAP["sample"]
# 1. Lookup in Index (to get original filename/extension)
idx = await run_io(_read_json_sync, config["index"])
entry = next((i for i in idx if i["id"] == sample_id), None)
if not entry:
raise HTTPException(404, "Sample not found in index")
blob_path = f"{config['folder']}{entry['filename']}"
blob = bucket.blob(blob_path)
if not await run_io(blob.exists):
raise HTTPException(404, "File missing from storage")
# 2. Stream Download
# GCS blob.open() returns a file-like object we can stream
def iterfile():
with blob.open("rb") as f:
while chunk := f.read(1024 * 1024): # 1MB chunks
yield chunk
return StreamingResponse(
iterfile(),
media_type="application/octet-stream",
headers={
"Content-Disposition": f"attachment; filename={entry['name']}"}
)
# --- 4.4 RECORD PLAY ---
@app.post("/api/samples/{sample_id}/play")
async def record_play(sample_id: str):
"""Records that a sample was played by updating last_played timestamp."""
config = STORAGE_MAP["sample"]
index_path = config["index"]
now = datetime.now().isoformat()
async with INDEX_LOCK:
try:
index_data = await run_io(_read_json_sync, index_path)
if not isinstance(index_data, list):
raise HTTPException(status_code=500, detail="Sample index is corrupted.")
entry = next((item for item in index_data if item.get("id") == sample_id), None)
if not entry:
raise HTTPException(status_code=404, detail="Sample not found.")
entry["last_played"] = now
await run_io(_write_json_sync, index_path, index_data)
await cache.delete("library:sample")
await cache.delete("library:all")
return {"success": True, "id": sample_id, "last_played": now}
except HTTPException:
raise
except Exception as e:
logging.error(f"Failed to record play for {sample_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to record play: {str(e)}")
# --- 4.5 UPDATE SAMPLE METADATA ---
class SampleMetaUpdatePayload(BaseModel):
"""Payload for updating a sample's metadata."""
name: Optional[str] = None
description: Optional[str] = None
rating: Optional[int] = Field(
None, ge=1, le=10, description="A rating from 1 to 10.")
genre: Optional[str] = None
last_played: Optional[str] = None # ISO timestamp
@app.put("/api/samples/{sample_id}")
async def update_sample_metadata(sample_id: str, payload: SampleMetaUpdatePayload):
"""
Updates the metadata (name, description, rating) for a sample (e.g., a FLAC file).
This only modifies the JSON index, not the audio file itself.
"""
config = STORAGE_MAP["sample"]
index_path = config["index"]
async with INDEX_LOCK:
try:
# 1. Read the index
index_data = await run_io(_read_json_sync, index_path)
if not isinstance(index_data, list):
raise HTTPException(
status_code=500, detail="Sample index is corrupted.")
# 2. Find the item and its index
entry_index = next((i for i, item in enumerate(
index_data) if item.get("id") == sample_id), -1)
if entry_index == -1:
raise HTTPException(
status_code=404, detail="Sample not found in index.")
entry = index_data[entry_index]
update_happened = False
# 3. Update metadata fields if provided in the payload
if payload.name is not None and payload.name != entry.get("name"):
entry["name"] = payload.name
update_happened = True
if payload.description is not None and payload.description != entry.get("description"):
entry["description"] = payload.description
update_happened = True
if payload.rating is not None and payload.rating != entry.get("rating"):
entry["rating"] = payload.rating
update_happened = True
if payload.genre is not None and payload.genre != entry.get("genre"):
entry["genre"] = payload.genre
update_happened = True
if payload.last_played is not None and payload.last_played != entry.get("last_played"):
entry["last_played"] = payload.last_played
update_happened = True
if not update_happened:
return {"success": True, "id": sample_id, "action": "no_change", "message": "No new data provided to update."}
# 4. Write the updated index back to GCS
await run_io(_write_json_sync, index_path, index_data)
# 5. Clear relevant cache
await cache.delete("library:sample")
await cache.delete("library:all")
return {"success": True, "id": sample_id, "action": "metadata_updated"}
except HTTPException:
raise # Re-raise FastAPI exceptions
except Exception as e:
logging.error(
f"Failed to update sample metadata for {sample_id}: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to update sample metadata: {str(e)}")
# --- 4.6 MUSIC ENDPOINTS ---
@app.get("/api/music/{music_id}")
async def get_music_file(music_id: str):
"""Streams a music file from the music folder."""
config = STORAGE_MAP["music"]
# 1. Lookup in Index
idx = await run_io(_read_json_sync, config["index"])
entry = next((i for i in idx if i["id"] == music_id), None)
if not entry:
raise HTTPException(404, "Music track not found in index")
blob_path = f"{config['folder']}{entry['filename']}"
blob = bucket.blob(blob_path)
if not await run_io(blob.exists):
raise HTTPException(404, "File missing from storage")
# 2. Stream Download
def iterfile():
with blob.open("rb") as f:
while chunk := f.read(1024 * 1024): # 1MB chunks
yield chunk
# Determine content type
lower_name = entry['filename'].lower()
if lower_name.endswith('.flac'):
media_type = 'audio/flac'
elif lower_name.endswith('.wav'):
media_type = 'audio/wav'
elif lower_name.endswith('.mp3'):
media_type = 'audio/mpeg'
else:
media_type = 'audio/mpeg'
return StreamingResponse(
iterfile(),
media_type=media_type,
headers={"Content-Disposition": f'inline; filename="{entry["name"]}"'}
)
@app.put("/api/music/{music_id}")
async def update_music_metadata(music_id: str, payload: SampleMetaUpdatePayload):
"""Updates metadata for a music track."""
config = STORAGE_MAP["music"]
index_path = config["index"]
async with INDEX_LOCK:
try:
index_data = await run_io(_read_json_sync, index_path)
if not isinstance(index_data, list):
raise HTTPException(status_code=500, detail="Music index is corrupted.")
entry_index = next((i for i, item in enumerate(index_data) if item.get("id") == music_id), -1)
if entry_index == -1:
raise HTTPException(status_code=404, detail="Music track not found.")
entry = index_data[entry_index]
update_happened = False
if payload.name is not None and payload.name != entry.get("name"):
entry["name"] = payload.name
update_happened = True
if payload.rating is not None and payload.rating != entry.get("rating"):
entry["rating"] = payload.rating
update_happened = True
if payload.genre is not None and payload.genre != entry.get("genre"):
entry["genre"] = payload.genre
update_happened = True
if payload.last_played is not None and payload.last_played != entry.get("last_played"):
entry["last_played"] = payload.last_played
update_happened = True
if payload.description is not None and payload.description != entry.get("description"):
entry["description"] = payload.description
update_happened = True
if update_happened:
await run_io(_write_json_sync, index_path, index_data)
await cache.delete("library:music")
await cache.delete("library:all")
return {"success": True, "id": music_id, "action": "metadata_updated" if update_happened else "no_change"}
except HTTPException:
raise
except Exception as e:
logging.error(f"Failed to update music metadata: {e}")
raise HTTPException(status_code=500, detail=f"Failed to update: {str(e)}")
# --- 5. SMART SYNC (The "Magic" Button) ---
@app.post("/api/admin/sync")
async def sync_gcs_storage():
"""
Scans Google Cloud Storage to rebuild JSON indexes based on actual files.
"""
report = {}
async with INDEX_LOCK:
for item_type, config in STORAGE_MAP.items():
if item_type == "default" or item_type == "music":
continue # Skip music in default sync, handle separately
added = 0
removed = 0
try:
# 1. List ALL objects in this folder prefix
# prefix="songs/" returns "songs/123.json", "songs/456.json", etc.
blobs = await run_io(lambda: list(bucket.list_blobs(prefix=config["folder"])))
# Filter out the index file itself
actual_files = []
for b in blobs:
# Remove the folder prefix to get just filename (e.g., "123.json")
fname = b.name.replace(config["folder"], "")
# Ensure it's not the index file
if fname and not b.name.endswith(config["index"]):
actual_files.append(fname)
# 2. Get Current Index
index_data = await run_io(_read_json_sync, config["index"])
# 3. Compare
index_map = {item["filename"]: item for item in index_data}
disk_set = set(actual_files)
# Find Ghosts (In Index, Not on Disk)
new_index = []
for item in index_data:
if item["filename"] in disk_set:
new_index.append(item)
else:
removed += 1
# Find Orphans (On Disk, Not in Index)
for filename in actual_files:
if filename not in index_map:
# Create new entry
new_entry = {
# Generate new ID or parse from filename if possible
"id": str(uuid.uuid4()),
"filename": filename,
"type": item_type,
"date": datetime.now().strftime("%Y-%m-%d"),
"name": filename,
"author": "Unknown",
"description": "Auto-discovered via Sync",
"genre": None,
"last_played": None
}
# If JSON, peek inside for metadata
if filename.endswith(".json") and item_type in ["song", "pattern", "bank"]:
try:
b = bucket.blob(
f"{config['folder']}{filename}")
content = json.loads(b.download_as_text())
if "name" in content:
new_entry["name"] = content["name"]
if "author" in content:
new_entry["author"] = content["author"]
except:
pass
new_index.insert(0, new_entry)
added += 1
# 4. Save if changed
if added > 0 or removed > 0:
await run_io(_write_json_sync, config["index"], new_index)
report[item_type] = {"added": added,
"removed": removed, "status": "synced"}
except Exception as e:
report[item_type] = {"error": str(e)}
await cache.clear()
return report
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)