drrobot9 commited on
Commit
e4c5a04
·
1 Parent(s): 24f02de

Update app/tasks/rag_updater.py

Browse files
Files changed (1) hide show
  1. app/tasks/rag_updater.py +141 -141
app/tasks/rag_updater.py CHANGED
@@ -1,141 +1,141 @@
1
- # farmlingua_backend/app/tasks/rag_updater.py
2
- import os
3
- import sys
4
- from datetime import datetime, date
5
- import logging
6
- import requests
7
- from bs4 import BeautifulSoup
8
- from apscheduler.schedulers.background import BackgroundScheduler
9
-
10
- from langchain_community.vectorstores import FAISS
11
- from langchain_community.embeddings import SentenceTransformerEmbeddings
12
- from langchain_community.docstore.document import Document
13
- from langchain_text_splitters import RecursiveCharacterTextSplitter
14
-
15
- from app.utils import config
16
-
17
- BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
18
- if BASE_DIR not in sys.path:
19
- sys.path.insert(0, BASE_DIR)
20
-
21
- logging.basicConfig(
22
- format="%(asctime)s [%(levelname)s] %(message)s",
23
- level=logging.INFO
24
- )
25
-
26
- session = requests.Session()
27
-
28
- def fetch_weather_now():
29
- """Fetch current weather for all configured states."""
30
- docs = []
31
- for state in config.STATES:
32
- try:
33
- url = "http://api.weatherapi.com/v1/current.json"
34
- params = {
35
- "key": config.WEATHER_API_KEY,
36
- "q": f"{state}, Nigeria",
37
- "aqi": "no"
38
- }
39
- res = session.get(url, params=params, timeout=10)
40
- res.raise_for_status()
41
- data = res.json()
42
-
43
- if "current" in data:
44
- condition = data['current']['condition']['text']
45
- temp_c = data['current']['temp_c']
46
- humidity = data['current']['humidity']
47
- text = (
48
- f"Weather in {state}: {condition}, "
49
- f"Temperature: {temp_c}°C, Humidity: {humidity}%"
50
- )
51
- docs.append(Document(
52
- page_content=text,
53
- metadata={
54
- "source": "WeatherAPI",
55
- "location": state,
56
- "timestamp": datetime.utcnow().isoformat()
57
- }
58
- ))
59
- except Exception as e:
60
- logging.error(f"Weather fetch failed for {state}: {e}")
61
- return docs
62
-
63
- def fetch_harvestplus_articles():
64
- """Fetch ALL today's articles from HarvestPlus site."""
65
- try:
66
- res = session.get(config.DATA_SOURCES["harvestplus"], timeout=10)
67
- res.raise_for_status()
68
- soup = BeautifulSoup(res.text, "html.parser")
69
- articles = soup.find_all("article")
70
-
71
- docs = []
72
- today_str = date.today().strftime("%Y-%m-%d")
73
-
74
- for a in articles:
75
- content = a.get_text(strip=True)
76
- if content and len(content) > 100:
77
-
78
- if today_str in a.text or True:
79
- docs.append(Document(
80
- page_content=content,
81
- metadata={
82
- "source": "HarvestPlus",
83
- "timestamp": datetime.utcnow().isoformat()
84
- }
85
- ))
86
- return docs
87
- except Exception as e:
88
- logging.error(f"HarvestPlus fetch failed: {e}")
89
- return []
90
-
91
- def build_rag_vectorstore(reset=False):
92
- job_type = "FULL REBUILD" if reset else "INCREMENTAL UPDATE"
93
- logging.info(f"RAG update started — {job_type}")
94
-
95
- all_docs = fetch_weather_now() + fetch_harvestplus_articles()
96
-
97
- logging.info(f"Weather docs fetched: {len([d for d in all_docs if d.metadata['source'] == 'WeatherAPI'])}")
98
- logging.info(f"News docs fetched: {len([d for d in all_docs if d.metadata['source'] == 'HarvestPlus'])}")
99
-
100
- if not all_docs:
101
- logging.warning("No documents fetched, skipping update")
102
- return
103
-
104
- splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64)
105
- chunks = splitter.split_documents(all_docs)
106
-
107
- embedder = SentenceTransformerEmbeddings(model_name=config.EMBEDDING_MODEL)
108
-
109
- vectorstore_path = config.LIVE_VS_PATH
110
-
111
- if reset and os.path.exists(vectorstore_path):
112
- for file in os.listdir(vectorstore_path):
113
- file_path = os.path.join(vectorstore_path, file)
114
- try:
115
- os.remove(file_path)
116
- logging.info(f"Deleted old file: {file_path}")
117
- except Exception as e:
118
- logging.error(f"Failed to delete {file_path}: {e}")
119
-
120
- if os.path.exists(vectorstore_path) and not reset:
121
- vs = FAISS.load_local(
122
- vectorstore_path,
123
- embedder,
124
- allow_dangerous_deserialization=True
125
- )
126
- vs.add_documents(chunks)
127
- else:
128
- vs = FAISS.from_documents(chunks, embedder)
129
-
130
- os.makedirs(vectorstore_path, exist_ok=True)
131
- vs.save_local(vectorstore_path)
132
-
133
- logging.info(f"Vectorstore updated at {vectorstore_path}")
134
-
135
- def schedule_updates():
136
- scheduler = BackgroundScheduler()
137
- scheduler.add_job(build_rag_vectorstore, 'interval', hours=12, kwargs={"reset": False})
138
- scheduler.add_job(build_rag_vectorstore, 'interval', days=7, kwargs={"reset": True})
139
- scheduler.start()
140
- logging.info("Scheduler started — 12-hour incremental updates + weekly full rebuild")
141
- return scheduler
 
1
+ # farmlingua_backend/app/tasks/rag_updater.py
2
+ import os
3
+ import sys
4
+ from datetime import datetime, date
5
+ import logging
6
+ import requests
7
+ from bs4 import BeautifulSoup
8
+ from apscheduler.schedulers.background import BackgroundScheduler
9
+
10
+ from langchain_community.vectorstores import FAISS
11
+ from langchain_community.embeddings import SentenceTransformerEmbeddings
12
+ from langchain_community.docstore.document import Document
13
+ from langchain_text_splitters import RecursiveCharacterTextSplitter
14
+
15
+ from app.utils import config
16
+
17
+ BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
18
+ if BASE_DIR not in sys.path:
19
+ sys.path.insert(0, BASE_DIR)
20
+
21
+ logging.basicConfig(
22
+ format="%(asctime)s [%(levelname)s] %(message)s",
23
+ level=logging.INFO
24
+ )
25
+
26
+ session = requests.Session()
27
+
28
+ def fetch_weather_now():
29
+ """Fetch current weather for all configured states."""
30
+ docs = []
31
+ for state in config.STATES:
32
+ try:
33
+ url = "http://api.weatherapi.com/v1/current.json"
34
+ params = {
35
+ "key": config.WEATHER_API_KEY,
36
+ "q": f"{state}, Nigeria",
37
+ "aqi": "no"
38
+ }
39
+ res = session.get(url, params=params, timeout=10)
40
+ res.raise_for_status()
41
+ data = res.json()
42
+
43
+ if "current" in data:
44
+ condition = data['current']['condition']['text']
45
+ temp_c = data['current']['temp_c']
46
+ humidity = data['current']['humidity']
47
+ text = (
48
+ f"Weather in {state}: {condition}, "
49
+ f"Temperature: {temp_c}°C, Humidity: {humidity}%"
50
+ )
51
+ docs.append(Document(
52
+ page_content=text,
53
+ metadata={
54
+ "source": "WeatherAPI",
55
+ "location": state,
56
+ "timestamp": datetime.utcnow().isoformat()
57
+ }
58
+ ))
59
+ except Exception as e:
60
+ logging.error(f"Weather fetch failed for {state}: {e}")
61
+ return docs
62
+
63
+ def fetch_harvestplus_articles():
64
+ """Fetch ALL today's articles from HarvestPlus site."""
65
+ try:
66
+ res = session.get(config.DATA_SOURCES["harvestplus"], timeout=10)
67
+ res.raise_for_status()
68
+ soup = BeautifulSoup(res.text, "html.parser")
69
+ articles = soup.find_all("article")
70
+
71
+ docs = []
72
+ today_str = date.today().strftime("%Y-%m-%d")
73
+
74
+ for a in articles:
75
+ content = a.get_text(strip=True)
76
+ if content and len(content) > 100:
77
+
78
+ if today_str in a.text or True:
79
+ docs.append(Document(
80
+ page_content=content,
81
+ metadata={
82
+ "source": "HarvestPlus",
83
+ "timestamp": datetime.utcnow().isoformat()
84
+ }
85
+ ))
86
+ return docs
87
+ except Exception as e:
88
+ logging.error(f"HarvestPlus fetch failed: {e}")
89
+ return []
90
+
91
+ def build_rag_vectorstore(reset=False):
92
+ job_type = "FULL REBUILD" if reset else "INCREMENTAL UPDATE"
93
+ logging.info(f"RAG update started — {job_type}")
94
+
95
+ all_docs = fetch_weather_now() + fetch_harvestplus_articles()
96
+
97
+ logging.info(f"Weather docs fetched: {len([d for d in all_docs if d.metadata['source'] == 'WeatherAPI'])}")
98
+ logging.info(f"News docs fetched: {len([d for d in all_docs if d.metadata['source'] == 'HarvestPlus'])}")
99
+
100
+ if not all_docs:
101
+ logging.warning("No documents fetched, skipping update")
102
+ return
103
+
104
+ splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64)
105
+ chunks = splitter.split_documents(all_docs)
106
+
107
+ embedder = SentenceTransformerEmbeddings(model_name=config.EMBEDDING_MODEL)
108
+
109
+ vectorstore_path = config.LIVE_VS_PATH
110
+
111
+ if reset and os.path.exists(vectorstore_path):
112
+ for file in os.listdir(vectorstore_path):
113
+ file_path = os.path.join(vectorstore_path, file)
114
+ try:
115
+ os.remove(file_path)
116
+ logging.info(f"Deleted old file: {file_path}")
117
+ except Exception as e:
118
+ logging.error(f"Failed to delete {file_path}: {e}")
119
+
120
+ if os.path.exists(vectorstore_path) and not reset:
121
+ vs = FAISS.load_local(
122
+ vectorstore_path,
123
+ embedder,
124
+ allow_dangerous_deserialization=True
125
+ )
126
+ vs.add_documents(chunks)
127
+ else:
128
+ vs = FAISS.from_documents(chunks, embedder)
129
+
130
+ os.makedirs(vectorstore_path, exist_ok=True)
131
+ vs.save_local(vectorstore_path)
132
+
133
+ logging.info(f"Vectorstore updated at {vectorstore_path}")
134
+
135
+ def schedule_updates():
136
+ scheduler = BackgroundScheduler()
137
+ scheduler.add_job(build_rag_vectorstore, 'interval', hours=12, kwargs={"reset": False})
138
+ scheduler.add_job(build_rag_vectorstore, 'interval', days=7, kwargs={"reset": True})
139
+ scheduler.start()
140
+ logging.info("Scheduler started — 12-hour incremental updates + weekly full rebuild")
141
+ return scheduler