# 🔍 data_fetcher.py from firestore_client import get_firestore_client from schema_utils import has_field, resolve_field db = get_firestore_client() # Utility function to convert Gemini-style filters into Firestore queries def apply_filters(query_ref, collection_name, filters): for f in filters: raw_field = f.get("field") field = resolve_field(collection_name, raw_field) if raw_field else None op = f.get("op", "==") value = f.get("value") if field and value is not None and has_field(collection_name, field): query_ref = query_ref.where(field, op, value) # else: silently ignore invalid field for this collection return query_ref def fetch_data_from_firestore(instruction): """ instruction: dict like { "collections": [ { "name": "participants", "filters": [ {"field": "companyCode", "op": "==", "value": "QTX"}, {"field": "status", "op": "==", "value": "active"} ], "limit": 100 }, ... ] } """ results = {} for col in instruction.get("collections", []): name = col.get("name") filters = col.get("filters", []) limit = col.get("limit", 50) try: if name == "users" and any(f["field"] == "uid" for f in filters): uid_filter = next(f for f in filters if f["field"] == "uid") doc_id = uid_filter["value"] doc = db.collection("users").document(doc_id).get() if doc.exists: results[name] = [doc.to_dict()] else: results[name] = [] else: col_ref = db.collection(name) col_ref = apply_filters(col_ref, name, filters) docs = col_ref.limit(limit).stream() results[name] = [doc.to_dict() for doc in docs] except Exception as e: results[name] = {"error": str(e)} return results