vikramvasudevan commited on
Commit
e96aee8
·
verified ·
1 Parent(s): 8900e79

Upload folder using huggingface_hub

Browse files
Files changed (6) hide show
  1. app.py +0 -3
  2. db.py +23 -15
  3. main.py +13 -6
  4. modules/firebase/listener.py +1 -1
  5. modules/firebase/messaging.py +29 -8
  6. server.py +2 -0
app.py CHANGED
@@ -20,11 +20,8 @@ from chat_utils import (
20
  limited_chat_wrapper,
21
  )
22
  from config import SanatanConfig
23
- from db import SanatanDatabase
24
- from drive_downloader import ZipDownloader
25
  from graph_helper import generate_graph
26
  from modules.youtube_metadata.app import initialize_youtube_metadata_and_poll
27
- from nalayiram_helper import delete_taniyan
28
  import pycountry
29
 
30
  # Logging
 
20
  limited_chat_wrapper,
21
  )
22
  from config import SanatanConfig
 
 
23
  from graph_helper import generate_graph
24
  from modules.youtube_metadata.app import initialize_youtube_metadata_and_poll
 
25
  import pycountry
26
 
27
  # Logging
db.py CHANGED
@@ -783,6 +783,15 @@ class SanatanDatabase:
783
 
784
  return sorted(list(values))
785
 
 
 
 
 
 
 
 
 
 
786
  def build_global_index_for_scripture(self, scripture: dict, force: bool = False):
787
  scripture_name = scripture["name"]
788
  chapter_order = scripture.get("chapter_order", None)
@@ -823,6 +832,13 @@ class SanatanDatabase:
823
  chapter_order_mapping,
824
  )
825
 
 
 
 
 
 
 
 
826
  # Fetch all records (keep embeddings for upsert)
827
  MAX_RETRIES = 3
828
  RETRY_DELAY = 5 # seconds
@@ -832,15 +848,14 @@ class SanatanDatabase:
832
  results = collection.get(include=["metadatas", "documents", "embeddings"])
833
  break # success → exit loop
834
  except Exception as e:
835
- logger.error(
836
- "build_global_index_for_all_scriptures:%s Error getting data from chromadb (attempt %s/%s)",
837
- scripture_name,
838
- attempt,
839
- MAX_RETRIES,
840
- exc_info=True,
841
- )
842
-
843
  if attempt == MAX_RETRIES:
 
 
 
 
 
 
 
844
  # still failing after 3 attempts
845
  return
846
  time.sleep(RETRY_DELAY) # wait before retry
@@ -850,13 +865,6 @@ class SanatanDatabase:
850
  documents = results["documents"]
851
  embeddings = results.get("embeddings", [None] * len(ids))
852
 
853
- if not force and metadatas and "_global_index" in metadatas[0]:
854
- logger.warning(
855
- "build_global_index_for_all_scriptures:%s: global index already available. skipping collection",
856
- scripture_name,
857
- )
858
- return
859
-
860
  # Create a DataFrame for metadata sorting
861
  df = pd.DataFrame(metadatas)
862
  df["_id"] = ids
 
783
 
784
  return sorted(list(values))
785
 
786
+ def is_global_index_available_for_scripture(self, scripture: dict):
787
+ scripture_name = scripture["name"]
788
+ collection_name = scripture["collection_name"]
789
+ collection = self.chroma_client.get_or_create_collection(name=collection_name)
790
+ results = collection.get(include=["metadatas"], limit=1)
791
+ metadatas = results["metadatas"]
792
+ return True if (metadatas and "_global_index" in metadatas[0]) else False
793
+
794
+
795
  def build_global_index_for_scripture(self, scripture: dict, force: bool = False):
796
  scripture_name = scripture["name"]
797
  chapter_order = scripture.get("chapter_order", None)
 
832
  chapter_order_mapping,
833
  )
834
 
835
+ if(not force and self.is_global_index_available_for_scripture(scripture)):
836
+ logger.warning(
837
+ "build_global_index_for_all_scriptures:%s: global index already available. skipping collection",
838
+ scripture_name,
839
+ )
840
+ return
841
+
842
  # Fetch all records (keep embeddings for upsert)
843
  MAX_RETRIES = 3
844
  RETRY_DELAY = 5 # seconds
 
848
  results = collection.get(include=["metadatas", "documents", "embeddings"])
849
  break # success → exit loop
850
  except Exception as e:
 
 
 
 
 
 
 
 
851
  if attempt == MAX_RETRIES:
852
+ logger.error(
853
+ "build_global_index_for_all_scriptures:%s Error getting data from chromadb (attempt %s/%s)",
854
+ scripture_name,
855
+ attempt,
856
+ MAX_RETRIES,
857
+ exc_info=True,
858
+ )
859
  # still failing after 3 attempts
860
  return
861
  time.sleep(RETRY_DELAY) # wait before retry
 
865
  documents = results["documents"]
866
  embeddings = results.get("embeddings", [None] * len(ids))
867
 
 
 
 
 
 
 
 
868
  # Create a DataFrame for metadata sorting
869
  df = pd.DataFrame(metadatas)
870
  df["_id"] = ids
main.py CHANGED
@@ -32,6 +32,7 @@ logger.setLevel(logging.INFO)
32
 
33
 
34
  def init():
 
35
  load_dotenv(override=True)
36
  try:
37
  SanatanDatabase().test_sanity()
@@ -49,24 +50,30 @@ def init():
49
 
50
  # add global index
51
  # delete taniyan records
52
- SanatanDatabase().delete_taniyans_in_divya_prabandham()
53
- SanatanDatabase().build_global_index_for_all_scriptures()
 
 
 
 
54
  # Launch the whole thing in a background thread
 
55
  yt_init_thread = threading.Thread(target=initialize_youtube_metadata_and_poll, daemon=True)
56
  yt_init_thread.start()
 
57
 
58
 
59
  @asynccontextmanager
60
  async def fn_lifespan(app: FastAPI):
 
 
 
 
61
  logging.info("🚀 Starting Firestore listener...")
62
  start_firestore_listener()
63
  yield
64
  logging.info("🛑 Firestore listener shutdown (no explicit cleanup needed).")
65
 
66
- ###### Initialize the database ######
67
- init()
68
- ###### Initialize the database ######
69
-
70
  app = FastAPI(title="Sanatan AI Unified Server",lifespan=fn_lifespan)
71
  limiter = Limiter(key_func=get_remote_address)
72
  app.state.limiter = limiter
 
32
 
33
 
34
  def init():
35
+ logger.info("Application Initializing ...")
36
  load_dotenv(override=True)
37
  try:
38
  SanatanDatabase().test_sanity()
 
50
 
51
  # add global index
52
  # delete taniyan records
53
+ logger.info("STARTED: Deleting Taniyans ...")
54
+ sanatanDatabase = SanatanDatabase()
55
+ sanatanDatabase.delete_taniyans_in_divya_prabandham()
56
+ logger.info("STARED: Building Global Index for all scriptures ...")
57
+ sanatanDatabase.build_global_index_for_all_scriptures()
58
+ logger.info("FINISHED: Building Global Index for all scriptures ...")
59
  # Launch the whole thing in a background thread
60
+ logger.info("STARTED: Initializing youtube metadata poller...")
61
  yt_init_thread = threading.Thread(target=initialize_youtube_metadata_and_poll, daemon=True)
62
  yt_init_thread.start()
63
+ logger.info("FINISHED: Initializing youtube metadata poller...")
64
 
65
 
66
  @asynccontextmanager
67
  async def fn_lifespan(app: FastAPI):
68
+ logging.info("🚀 Starting Lifespan ...")
69
+ ###### Initialize the database ######
70
+ init()
71
+ ###### Initialize the database ######
72
  logging.info("🚀 Starting Firestore listener...")
73
  start_firestore_listener()
74
  yield
75
  logging.info("🛑 Firestore listener shutdown (no explicit cleanup needed).")
76
 
 
 
 
 
77
  app = FastAPI(title="Sanatan AI Unified Server",lifespan=fn_lifespan)
78
  limiter = Limiter(key_func=get_remote_address)
79
  app.state.limiter = limiter
modules/firebase/listener.py CHANGED
@@ -14,7 +14,7 @@ db = admin_firestore.client()
14
  _last_known_status = {}
15
 
16
  def handle_changes(doc_snapshot, changes, read_time):
17
- print("handle_changes fired!", changes)
18
 
19
  for change in changes:
20
  doc = change.document
 
14
  _last_known_status = {}
15
 
16
  def handle_changes(doc_snapshot, changes, read_time):
17
+ print("handle_changes fired!")
18
 
19
  for change in changes:
20
  doc = change.document
modules/firebase/messaging.py CHANGED
@@ -1,15 +1,17 @@
1
  import json
2
  import os
 
 
3
  from dotenv import load_dotenv
4
  import firebase_admin
5
  from firebase_admin import messaging, credentials
 
6
  from google.oauth2.service_account import Credentials
7
  from pydantic import BaseModel
8
  from fastapi import HTTPException
9
 
10
  load_dotenv()
11
 
12
-
13
  class FcmRequest(BaseModel):
14
  token: str
15
  title: str
@@ -137,22 +139,41 @@ class FcmService:
137
  return {"status": "success", "message_id": response}
138
 
139
  except Exception as e:
 
140
  raise HTTPException(status_code=500, detail=str(e))
141
 
142
 
143
- async def send_broadcast(self, title: str, body: str, data: dict | None = None):
144
- """Send a broadcast notification to all users subscribed to 'global_broadcast'."""
145
- print(f"send_broadcast: title={title} ; body = {body}; data={data}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
146
  return await self.send_to_topic(
147
  title=title,
148
  body=body,
149
- topic=os.getenv("BROADCAST_TOPIC_NAME","global_broadcast"), # use local broadcast if available for dev env.
150
  data=data or {"type": "broadcast"},
151
  channel_id="broadcast_channel",
152
- is_data_only_message=False
153
  )
154
-
155
-
156
 
157
  # ✅ Singleton instance
158
  fcm_service = FcmService()
 
1
  import json
2
  import os
3
+ import time
4
+ import uuid
5
  from dotenv import load_dotenv
6
  import firebase_admin
7
  from firebase_admin import messaging, credentials
8
+ from firebase_admin import firestore as admin_firestore
9
  from google.oauth2.service_account import Credentials
10
  from pydantic import BaseModel
11
  from fastapi import HTTPException
12
 
13
  load_dotenv()
14
 
 
15
  class FcmRequest(BaseModel):
16
  token: str
17
  title: str
 
139
  return {"status": "success", "message_id": response}
140
 
141
  except Exception as e:
142
+ print(f"Error Sending Message to topic {topic}", e)
143
  raise HTTPException(status_code=500, detail=str(e))
144
 
145
 
146
+ async def send_broadcast(self, id:str, title: str, body: str, data: dict | None = None):
147
+ """Send a broadcast notification and persist it to Firestore."""
148
+ broadcast_doc = {
149
+ "id" : id,
150
+ "title": title,
151
+ "body": body,
152
+ "data": data or {"type": "broadcast"},
153
+ "sent_at": int(time.time()),
154
+ "created_at": admin_firestore.SERVER_TIMESTAMP,
155
+ }
156
+
157
+ # 1️⃣ Persist to Firestore
158
+ try:
159
+ db = admin_firestore.client()
160
+ doc_ref = db.collection("broadcasts").document()
161
+ doc_ref.set(broadcast_doc) # Admin SDK is sync, but non-blocking + safe inside async
162
+ print(f"[Firestore] Broadcast persisted: {doc_ref.id}")
163
+ except Exception as e:
164
+ print(f"[Firestore ERROR] Failed to store broadcast: {e}")
165
+
166
+ # 2️⃣ Send FCM broadcast
167
+ print(f"send_broadcast: title={title} ; body={body}; data={data}")
168
+
169
  return await self.send_to_topic(
170
  title=title,
171
  body=body,
172
+ topic=os.getenv("BROADCAST_TOPIC_NAME", "global_broadcast"),
173
  data=data or {"type": "broadcast"},
174
  channel_id="broadcast_channel",
175
+ is_data_only_message=False,
176
  )
 
 
177
 
178
  # ✅ Singleton instance
179
  fcm_service = FcmService()
server.py CHANGED
@@ -621,6 +621,7 @@ async def summarize_scripture_verse(req: ScriptureRequest):
621
  return response
622
 
623
  class BroadcastRequest(BaseModel):
 
624
  title: str
625
  body: str
626
  data: dict | None = None
@@ -628,6 +629,7 @@ class BroadcastRequest(BaseModel):
628
  @router.post("/send_broadcast_message")
629
  async def send_broadcast_message(req: BroadcastRequest):
630
  response = await fcm_service.send_broadcast(
 
631
  title=req.title,
632
  body=req.body,
633
  data=req.data
 
621
  return response
622
 
623
  class BroadcastRequest(BaseModel):
624
+ id: str
625
  title: str
626
  body: str
627
  data: dict | None = None
 
629
  @router.post("/send_broadcast_message")
630
  async def send_broadcast_message(req: BroadcastRequest):
631
  response = await fcm_service.send_broadcast(
632
+ id=req.id,
633
  title=req.title,
634
  body=req.body,
635
  data=req.data