Spaces:
Sleeping
Sleeping
| # 🔍 data_fetcher.py | |
| from firestore_client import get_firestore_client | |
| from schema_utils import has_field, resolve_field | |
| db = get_firestore_client() | |
| # Utility function to convert Gemini-style filters into Firestore queries | |
| def apply_filters(query_ref, collection_name, filters): | |
| for f in filters: | |
| raw_field = f.get("field") | |
| field = resolve_field(collection_name, raw_field) if raw_field else None | |
| op = f.get("op", "==") | |
| value = f.get("value") | |
| if field and value is not None and has_field(collection_name, field): | |
| query_ref = query_ref.where(field, op, value) | |
| # else: silently ignore invalid field for this collection | |
| return query_ref | |
| def fetch_data_from_firestore(instruction): | |
| """ | |
| instruction: dict like | |
| { | |
| "collections": [ | |
| { | |
| "name": "participants", | |
| "filters": [ | |
| {"field": "companyCode", "op": "==", "value": "QTX"}, | |
| {"field": "status", "op": "==", "value": "active"} | |
| ], | |
| "limit": 100 | |
| }, | |
| ... | |
| ] | |
| } | |
| """ | |
| results = {} | |
| for col in instruction.get("collections", []): | |
| name = col.get("name") | |
| filters = col.get("filters", []) | |
| limit = col.get("limit", 50) | |
| try: | |
| if name == "users" and any(f["field"] == "uid" for f in filters): | |
| uid_filter = next(f for f in filters if f["field"] == "uid") | |
| doc_id = uid_filter["value"] | |
| doc = db.collection("users").document(doc_id).get() | |
| if doc.exists: | |
| results[name] = [doc.to_dict()] | |
| else: | |
| results[name] = [] | |
| else: | |
| col_ref = db.collection(name) | |
| col_ref = apply_filters(col_ref, name, filters) | |
| docs = col_ref.limit(limit).stream() | |
| results[name] = [doc.to_dict() for doc in docs] | |
| except Exception as e: | |
| results[name] = {"error": str(e)} | |
| return results |