Mbonea commited on
Commit
c8a9a0a
·
1 Parent(s): d17dd76

Add background task progress logging

Browse files
App/routers/bonds/routes.py CHANGED
@@ -1,3 +1,4 @@
 
1
  from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
2
  from tortoise.contrib.pydantic.creator import pydantic_queryset_creator
3
  from tortoise.transactions import in_transaction
@@ -12,6 +13,7 @@ from datetime import date
12
 
13
 
14
  router = APIRouter(prefix="/bonds", tags=["Bonds"])
 
15
 
16
 
17
  def _bond_time_left(maturity_date):
@@ -104,6 +106,7 @@ async def run_bond_import_task(task_id: int):
104
  updated_count = 0
105
  failed_count = 0
106
  processed_isins = set()
 
107
 
108
  try:
109
  async for bond_data in scraper.scrape_all_bond_data():
@@ -112,7 +115,11 @@ async def run_bond_import_task(task_id: int):
112
  continue
113
 
114
  if bond_data.isin and bond_data.isin in processed_isins:
115
- print(f"Skipping duplicate ISIN in current scrape: {bond_data.isin}")
 
 
 
 
116
  continue
117
 
118
  async with in_transaction(): # Ensure atomicity for each bond
@@ -133,18 +140,33 @@ async def run_bond_import_task(task_id: int):
133
  if existing_bond:
134
  await Bond.filter(id=existing_bond.id).update(**bond_data.dict(exclude_unset=True))
135
  updated_count += 1
136
- print(f"Updated bond: ISIN {bond_data.isin}, AuNo {bond_data.auction_number}")
137
  else:
138
  await Bond.create(**bond_data.dict())
139
  created_count += 1
140
- print(f"Created bond: ISIN {bond_data.isin}, AuNo {bond_data.auction_number}")
141
 
142
  if bond_data.isin:
143
  processed_isins.add(bond_data.isin)
 
 
 
 
 
 
 
 
 
 
144
 
145
  except Exception as db_e:
146
  failed_count += 1
147
- print(f"Database error for bond au_no {bond_data.auction_number}: {db_e}")
 
 
 
 
 
148
 
149
  summary = {
150
  "created": created_count,
@@ -153,10 +175,10 @@ async def run_bond_import_task(task_id: int):
153
  "message": "Bond import process finished."
154
  }
155
  await ImportTask.filter(id=task_id).update(status="completed", details=summary)
156
- print(f"Bond import task {task_id} completed. Summary: {summary}")
157
 
158
  except Exception as e:
159
- print(f"Fatal error in bond import task {task_id}: {e}")
160
  await ImportTask.filter(id=task_id).update(status="failed", details={"error": str(e)})
161
 
162
 
 
1
+ import logging
2
  from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
3
  from tortoise.contrib.pydantic.creator import pydantic_queryset_creator
4
  from tortoise.transactions import in_transaction
 
13
 
14
 
15
  router = APIRouter(prefix="/bonds", tags=["Bonds"])
16
+ logger = logging.getLogger(__name__)
17
 
18
 
19
  def _bond_time_left(maturity_date):
 
106
  updated_count = 0
107
  failed_count = 0
108
  processed_isins = set()
109
+ logger.info("[bond-import:%s] background import started", task_id)
110
 
111
  try:
112
  async for bond_data in scraper.scrape_all_bond_data():
 
115
  continue
116
 
117
  if bond_data.isin and bond_data.isin in processed_isins:
118
+ logger.info(
119
+ "[bond-import:%s] duplicate isin=%s skipped",
120
+ task_id,
121
+ bond_data.isin,
122
+ )
123
  continue
124
 
125
  async with in_transaction(): # Ensure atomicity for each bond
 
140
  if existing_bond:
141
  await Bond.filter(id=existing_bond.id).update(**bond_data.dict(exclude_unset=True))
142
  updated_count += 1
143
+ action = "updated"
144
  else:
145
  await Bond.create(**bond_data.dict())
146
  created_count += 1
147
+ action = "created"
148
 
149
  if bond_data.isin:
150
  processed_isins.add(bond_data.isin)
151
+ logger.info(
152
+ "[bond-import:%s] auction=%s isin=%s %s, created=%s updated=%s failed=%s",
153
+ task_id,
154
+ bond_data.auction_number,
155
+ bond_data.isin,
156
+ action,
157
+ created_count,
158
+ updated_count,
159
+ failed_count,
160
+ )
161
 
162
  except Exception as db_e:
163
  failed_count += 1
164
+ logger.error(
165
+ "[bond-import:%s] database error for auction=%s: %s",
166
+ task_id,
167
+ bond_data.auction_number,
168
+ db_e,
169
+ )
170
 
171
  summary = {
172
  "created": created_count,
 
175
  "message": "Bond import process finished."
176
  }
177
  await ImportTask.filter(id=task_id).update(status="completed", details=summary)
178
+ logger.info("[bond-import:%s] completed: %s", task_id, summary)
179
 
180
  except Exception as e:
181
+ logger.exception("[bond-import:%s] failed: %s", task_id, e)
182
  await ImportTask.filter(id=task_id).update(status="failed", details={"error": str(e)})
183
 
184
 
App/routers/bonds/utils.py CHANGED
@@ -1,4 +1,5 @@
1
  import asyncio
 
2
  from curl_cffi.requests import AsyncSession,RequestsError
3
  from bs4 import BeautifulSoup
4
  from App.routers.bonds.schemas import BondCreate # Adjust import path
@@ -9,6 +10,8 @@ import re
9
  from datetime import datetime as dt
10
  from typing import Tuple, Optional, List, Dict, Any
11
 
 
 
12
  def parse_bond_title_details(title_str: str) -> Tuple[Optional[float], Optional[str], str, Optional[int]]:
13
  coupon_rate_val = None
14
  maturity_years_val = None
@@ -72,7 +75,7 @@ def parse_date_flexible(date_str: str, default=None) -> Optional[dt]:
72
  return dt.strptime(date_str, fmt).date()
73
  except ValueError:
74
  continue
75
- print(f"Warning: Could not parse date string: {date_str}")
76
  return default
77
 
78
  def get_summary_item_value(summary_list: List[Dict[str, Any]], item_desc_key: str, default=None) -> Optional[str]:
@@ -116,9 +119,9 @@ class BondDataScraper:
116
  response.raise_for_status()
117
  return response.json() if is_json else response.text
118
  except RequestsError as e: # Updated exception type for curl_cffi
119
- print(f"HTTP error fetching {url}: {e}") # curl_cffi errors might not have response.status_code directly
120
  except Exception as e:
121
- print(f"Unexpected error fetching {url}: {e}")
122
  return None
123
 
124
  async def _parse_main_tbonds_page(self, html_content: str) -> List[Dict[str, Any]]:
@@ -128,13 +131,13 @@ class BondDataScraper:
128
  soup = BeautifulSoup(html_content, 'html.parser')
129
  table = soup.find('table', class_='tbond-table') # Or id="DataTables_Table_0"
130
  if not table:
131
- print("Main T-Bonds table not found.")
132
  return []
133
 
134
  parsed_rows = []
135
  tbody = table.find('tbody')
136
  if not tbody:
137
- print("Tbody not found in main T-Bonds table.")
138
  return []
139
 
140
  for row in tbody.find_all('tr'):
@@ -150,7 +153,7 @@ class BondDataScraper:
150
 
151
  view_button = cols[4].find('button', id='showSummaryDetails')
152
  if not view_button or not view_button.get('value'):
153
- print(f"Skipping row, view button or value not found for auction: {auction_number_text}")
154
  continue
155
 
156
  button_value_parts = view_button['value'].split('_')
@@ -165,7 +168,7 @@ class BondDataScraper:
165
  'au_days_part': au_days_part,
166
  })
167
  except Exception as e:
168
- print(f"Error parsing main table row: {e}. Row: {[c.get_text(strip=True) for c in cols]}")
169
  return parsed_rows
170
 
171
  async def _fetch_bond_details(self, session: AsyncSession, au_no: int, au_days_part: str) -> Optional[Dict[str, Any]]:
@@ -174,7 +177,7 @@ class BondDataScraper:
174
 
175
  def _parse_bond_details_json(self, json_data: Dict[str, Any], initial_data: Dict[str, Any]) -> Optional[BondCreate]:
176
  if not json_data or json_data.get("message") != "SUCCESS":
177
- print(f"Bond details JSON invalid or not successful for au_no: {initial_data.get('au_no')}")
178
  return None
179
 
180
  summary_list = json_data.get("tbondSummary", [])
@@ -183,7 +186,7 @@ class BondDataScraper:
183
 
184
  auction_date_obj = parse_date_flexible(initial_data['table_auction_date_str'])
185
  if not auction_date_obj: # Critical, skip if no valid auction date
186
- print(f"Critical: Could not parse auction_date for au_no: {initial_data.get('au_no')}")
187
  return None
188
 
189
 
@@ -200,7 +203,7 @@ class BondDataScraper:
200
  try:
201
  face_value_val = int(float(face_value_str.replace(",", "")) * 1_000_000)
202
  except ValueError:
203
- print(f"Could not parse face_value: {face_value_str} for au_no: {initial_data.get('au_no')}")
204
 
205
  price_per_100_str = get_summary_item_value(summary_list, "MINIMUM SUCCESSFUL PRICE / 100") # Or WAP?
206
  price_per_100_val = None
@@ -208,7 +211,7 @@ class BondDataScraper:
208
  try:
209
  price_per_100_val = float(price_per_100_str)
210
  except ValueError:
211
- print(f"Could not parse price_per_100: {price_per_100_str} for au_no: {initial_data.get('au_no')}")
212
 
213
 
214
  holding_number_str = json_data.get("auctionNumber") # This is the "1" from "AUCTION NUMBER 1 HELD ON..."
@@ -241,25 +244,42 @@ class BondDataScraper:
241
 
242
  main_page_html = await self._fetch_content(session, self.TBONDS_URL, method="GET")
243
  if not main_page_html:
244
- print("Failed to fetch main T-Bonds page.")
245
  return
246
 
247
  initial_bond_rows = await self._parse_main_tbonds_page(main_page_html)
248
 
249
- print(f"Found {len(initial_bond_rows)} initial bond rows from main table.")
 
250
 
251
- for row_data in initial_bond_rows:
252
  if row_data["au_no"] in existing_auction_numbers:
253
- print(f"Skipping existing bond details for au_no: {row_data['au_no']}")
 
 
 
 
 
254
  continue
255
 
256
- print(f"Fetching details for au_no: {row_data['au_no']}...")
 
 
 
 
 
257
  await asyncio.sleep(0.5) # Small delay to be polite
258
 
259
  details_json = await self._fetch_bond_details(session, row_data['au_no'], row_data['au_days_part'])
260
  if details_json:
261
  bond_create_obj = self._parse_bond_details_json(details_json, row_data)
262
  if bond_create_obj:
 
 
 
 
 
 
263
  yield bond_create_obj
264
  else:
265
- print(f"Failed to fetch or parse details for au_no: {row_data['au_no']}")
 
1
  import asyncio
2
+ import logging
3
  from curl_cffi.requests import AsyncSession,RequestsError
4
  from bs4 import BeautifulSoup
5
  from App.routers.bonds.schemas import BondCreate # Adjust import path
 
10
  from datetime import datetime as dt
11
  from typing import Tuple, Optional, List, Dict, Any
12
 
13
+ logger = logging.getLogger(__name__)
14
+
15
  def parse_bond_title_details(title_str: str) -> Tuple[Optional[float], Optional[str], str, Optional[int]]:
16
  coupon_rate_val = None
17
  maturity_years_val = None
 
75
  return dt.strptime(date_str, fmt).date()
76
  except ValueError:
77
  continue
78
+ logger.warning("Could not parse date string: %s", date_str)
79
  return default
80
 
81
  def get_summary_item_value(summary_list: List[Dict[str, Any]], item_desc_key: str, default=None) -> Optional[str]:
 
119
  response.raise_for_status()
120
  return response.json() if is_json else response.text
121
  except RequestsError as e: # Updated exception type for curl_cffi
122
+ logger.error("HTTP error fetching %s: %s", url, e) # curl_cffi errors might not have response.status_code directly
123
  except Exception as e:
124
+ logger.error("Unexpected error fetching %s: %s", url, e)
125
  return None
126
 
127
  async def _parse_main_tbonds_page(self, html_content: str) -> List[Dict[str, Any]]:
 
131
  soup = BeautifulSoup(html_content, 'html.parser')
132
  table = soup.find('table', class_='tbond-table') # Or id="DataTables_Table_0"
133
  if not table:
134
+ logger.warning("Main T-Bonds table not found.")
135
  return []
136
 
137
  parsed_rows = []
138
  tbody = table.find('tbody')
139
  if not tbody:
140
+ logger.warning("Tbody not found in main T-Bonds table.")
141
  return []
142
 
143
  for row in tbody.find_all('tr'):
 
153
 
154
  view_button = cols[4].find('button', id='showSummaryDetails')
155
  if not view_button or not view_button.get('value'):
156
+ logger.warning("Skipping row, view button or value not found for auction: %s", auction_number_text)
157
  continue
158
 
159
  button_value_parts = view_button['value'].split('_')
 
168
  'au_days_part': au_days_part,
169
  })
170
  except Exception as e:
171
+ logger.warning("Error parsing main table row: %s. Row: %s", e, [c.get_text(strip=True) for c in cols])
172
  return parsed_rows
173
 
174
  async def _fetch_bond_details(self, session: AsyncSession, au_no: int, au_days_part: str) -> Optional[Dict[str, Any]]:
 
177
 
178
  def _parse_bond_details_json(self, json_data: Dict[str, Any], initial_data: Dict[str, Any]) -> Optional[BondCreate]:
179
  if not json_data or json_data.get("message") != "SUCCESS":
180
+ logger.warning("Bond details JSON invalid or not successful for au_no: %s", initial_data.get("au_no"))
181
  return None
182
 
183
  summary_list = json_data.get("tbondSummary", [])
 
186
 
187
  auction_date_obj = parse_date_flexible(initial_data['table_auction_date_str'])
188
  if not auction_date_obj: # Critical, skip if no valid auction date
189
+ logger.warning("Critical: could not parse auction_date for au_no: %s", initial_data.get("au_no"))
190
  return None
191
 
192
 
 
203
  try:
204
  face_value_val = int(float(face_value_str.replace(",", "")) * 1_000_000)
205
  except ValueError:
206
+ logger.warning("Could not parse face_value %s for au_no: %s", face_value_str, initial_data.get("au_no"))
207
 
208
  price_per_100_str = get_summary_item_value(summary_list, "MINIMUM SUCCESSFUL PRICE / 100") # Or WAP?
209
  price_per_100_val = None
 
211
  try:
212
  price_per_100_val = float(price_per_100_str)
213
  except ValueError:
214
+ logger.warning("Could not parse price_per_100 %s for au_no: %s", price_per_100_str, initial_data.get("au_no"))
215
 
216
 
217
  holding_number_str = json_data.get("auctionNumber") # This is the "1" from "AUCTION NUMBER 1 HELD ON..."
 
244
 
245
  main_page_html = await self._fetch_content(session, self.TBONDS_URL, method="GET")
246
  if not main_page_html:
247
+ logger.error("Failed to fetch main T-Bonds page.")
248
  return
249
 
250
  initial_bond_rows = await self._parse_main_tbonds_page(main_page_html)
251
 
252
+ total_rows = len(initial_bond_rows)
253
+ logger.info("[bond-scraper] main table loaded, rows=%s", total_rows)
254
 
255
+ for index, row_data in enumerate(initial_bond_rows, start=1):
256
  if row_data["au_no"] in existing_auction_numbers:
257
+ logger.info(
258
+ "[bond-scraper] %s/%s auction=%s already stored, skipping detail fetch",
259
+ index,
260
+ total_rows,
261
+ row_data["au_no"],
262
+ )
263
  continue
264
 
265
+ logger.info(
266
+ "[bond-scraper] %s/%s auction=%s fetching details",
267
+ index,
268
+ total_rows,
269
+ row_data["au_no"],
270
+ )
271
  await asyncio.sleep(0.5) # Small delay to be polite
272
 
273
  details_json = await self._fetch_bond_details(session, row_data['au_no'], row_data['au_days_part'])
274
  if details_json:
275
  bond_create_obj = self._parse_bond_details_json(details_json, row_data)
276
  if bond_create_obj:
277
+ logger.info(
278
+ "[bond-scraper] %s/%s auction=%s parsed details",
279
+ index,
280
+ total_rows,
281
+ row_data["au_no"],
282
+ )
283
  yield bond_create_obj
284
  else:
285
+ logger.warning("Failed to fetch or parse details for au_no: %s", row_data["au_no"])
App/routers/funds/runner.py CHANGED
@@ -7,6 +7,8 @@ Usage (from FastAPI BackgroundTasks):
7
  """
8
  import asyncio
9
  import logging
 
 
10
  from concurrent.futures import ThreadPoolExecutor
11
  from typing import List, Optional
12
 
@@ -69,13 +71,21 @@ def _get_scrapers(manager_name: str = "all") -> List[BaseFundScraper]:
69
  def _run_scraper_sync(scraper: BaseFundScraper) -> List[FundRecord]:
70
  """Run sync scraper; returns flat list of all FundRecord objects."""
71
  try:
 
 
72
  results = scraper.scrape_all()
73
  records: List[FundRecord] = []
74
  for fund_records in results.values():
75
  records.extend(fund_records)
 
 
 
 
 
 
76
  return records
77
  except Exception as e:
78
- logger.error(f"[{scraper.manager_name}] scrape failed: {e}")
79
  return []
80
 
81
 
@@ -98,7 +108,16 @@ async def _save_records(records: List[FundRecord], manager_name: str, website: s
98
  by_fund.setdefault(rec.fund_name, []).append(rec)
99
 
100
  # 3. For each fund, upsert MutualFund then bulk-insert new FundPerformance rows
101
- for fund_name, fund_records in by_fund.items():
 
 
 
 
 
 
 
 
 
102
  meta = _FUND_META.get(fund_name, {})
103
  currency = meta.get("currency") or (fund_records[0].currency if fund_records else "TZS")
104
 
@@ -160,6 +179,15 @@ async def _save_records(records: List[FundRecord], manager_name: str, website: s
160
  if new_rows:
161
  await FundPerformance.bulk_create(new_rows, ignore_conflicts=True)
162
  stats["rows_added"] += len(new_rows)
 
 
 
 
 
 
 
 
 
163
 
164
  return stats
165
 
@@ -169,27 +197,71 @@ async def run_import(manager_name: str = "all") -> dict:
169
  Entry point called by FastAPI BackgroundTasks.
170
  Runs sync scrapers in a thread pool, then saves results to DB.
171
  """
 
 
172
  scrapers = _get_scrapers(manager_name)
173
  loop = asyncio.get_event_loop()
174
  all_stats: dict = {"managers": {}}
 
 
 
 
 
 
175
 
176
  with ThreadPoolExecutor(max_workers=len(scrapers)) as pool:
177
  futures = {
178
  loop.run_in_executor(pool, _run_scraper_sync, s): s
179
  for s in scrapers
180
  }
181
- for future, scraper in futures.items():
 
182
  try:
 
 
 
 
 
 
 
183
  records = await future
 
 
 
 
 
 
 
 
184
  stats = await _save_records(
185
  records,
186
  manager_name=scraper.manager_name,
187
  website=getattr(scraper, "base_url", ""),
188
  )
189
  all_stats["managers"][scraper.manager_name] = stats
190
- logger.info(f"[{scraper.manager_name}] import done: {stats}")
 
 
 
 
 
 
 
191
  except Exception as e:
192
- logger.error(f"[{scraper.manager_name}] import error: {e}")
 
 
 
 
 
 
 
193
  all_stats["managers"][scraper.manager_name] = {"error": str(e)}
194
 
 
 
 
 
 
 
195
  return all_stats
 
7
  """
8
  import asyncio
9
  import logging
10
+ import time
11
+ import uuid
12
  from concurrent.futures import ThreadPoolExecutor
13
  from typing import List, Optional
14
 
 
71
  def _run_scraper_sync(scraper: BaseFundScraper) -> List[FundRecord]:
72
  """Run sync scraper; returns flat list of all FundRecord objects."""
73
  try:
74
+ logger.info("[fund-import] %s scrape started", scraper.manager_name)
75
+ started_at = time.monotonic()
76
  results = scraper.scrape_all()
77
  records: List[FundRecord] = []
78
  for fund_records in results.values():
79
  records.extend(fund_records)
80
+ logger.info(
81
+ "[fund-import] %s scrape finished, records=%s duration=%.1fs",
82
+ scraper.manager_name,
83
+ len(records),
84
+ time.monotonic() - started_at,
85
+ )
86
  return records
87
  except Exception as e:
88
+ logger.error("[fund-import] %s scrape failed: %s", scraper.manager_name, e)
89
  return []
90
 
91
 
 
108
  by_fund.setdefault(rec.fund_name, []).append(rec)
109
 
110
  # 3. For each fund, upsert MutualFund then bulk-insert new FundPerformance rows
111
+ total_funds = len(by_fund)
112
+ for index, (fund_name, fund_records) in enumerate(by_fund.items(), start=1):
113
+ logger.info(
114
+ "[fund-import] %s save %s/%s %s started, records=%s",
115
+ manager_name,
116
+ index,
117
+ total_funds,
118
+ fund_name,
119
+ len(fund_records),
120
+ )
121
  meta = _FUND_META.get(fund_name, {})
122
  currency = meta.get("currency") or (fund_records[0].currency if fund_records else "TZS")
123
 
 
179
  if new_rows:
180
  await FundPerformance.bulk_create(new_rows, ignore_conflicts=True)
181
  stats["rows_added"] += len(new_rows)
182
+ logger.info(
183
+ "[fund-import] %s save %s/%s %s done, new=%s skipped_total=%s",
184
+ manager_name,
185
+ index,
186
+ total_funds,
187
+ fund_name,
188
+ len(new_rows),
189
+ stats["rows_skipped"],
190
+ )
191
 
192
  return stats
193
 
 
197
  Entry point called by FastAPI BackgroundTasks.
198
  Runs sync scrapers in a thread pool, then saves results to DB.
199
  """
200
+ run_id = uuid.uuid4().hex[:8]
201
+ started_at = time.monotonic()
202
  scrapers = _get_scrapers(manager_name)
203
  loop = asyncio.get_event_loop()
204
  all_stats: dict = {"managers": {}}
205
+ logger.info(
206
+ "[fund-import:%s] background import started, target=%s managers=%s",
207
+ run_id,
208
+ manager_name,
209
+ [scraper.manager_name for scraper in scrapers],
210
+ )
211
 
212
  with ThreadPoolExecutor(max_workers=len(scrapers)) as pool:
213
  futures = {
214
  loop.run_in_executor(pool, _run_scraper_sync, s): s
215
  for s in scrapers
216
  }
217
+ total = len(futures)
218
+ for index, (future, scraper) in enumerate(futures.items(), start=1):
219
  try:
220
+ logger.info(
221
+ "[fund-import:%s] manager %s/%s %s waiting for scrape result",
222
+ run_id,
223
+ index,
224
+ total,
225
+ scraper.manager_name,
226
+ )
227
  records = await future
228
+ logger.info(
229
+ "[fund-import:%s] manager %s/%s %s saving %s record(s)",
230
+ run_id,
231
+ index,
232
+ total,
233
+ scraper.manager_name,
234
+ len(records),
235
+ )
236
  stats = await _save_records(
237
  records,
238
  manager_name=scraper.manager_name,
239
  website=getattr(scraper, "base_url", ""),
240
  )
241
  all_stats["managers"][scraper.manager_name] = stats
242
+ logger.info(
243
+ "[fund-import:%s] manager %s/%s %s done: %s",
244
+ run_id,
245
+ index,
246
+ total,
247
+ scraper.manager_name,
248
+ stats,
249
+ )
250
  except Exception as e:
251
+ logger.error(
252
+ "[fund-import:%s] manager %s/%s %s failed: %s",
253
+ run_id,
254
+ index,
255
+ total,
256
+ scraper.manager_name,
257
+ e,
258
+ )
259
  all_stats["managers"][scraper.manager_name] = {"error": str(e)}
260
 
261
+ logger.info(
262
+ "[fund-import:%s] background import finished in %.1fs: %s",
263
+ run_id,
264
+ time.monotonic() - started_at,
265
+ all_stats,
266
+ )
267
  return all_stats
App/scheduler.py CHANGED
@@ -7,6 +7,8 @@ All external market checks run only three times per day:
7
 
8
  import asyncio
9
  import logging
 
 
10
  from datetime import datetime, timedelta
11
  from zoneinfo import ZoneInfo
12
 
@@ -16,6 +18,22 @@ MARKET_SYNC_TIMES = [(10, 0), (14, 0), (17, 0)]
16
  DAR_TZ = ZoneInfo("Africa/Dar_es_Salaam")
17
 
18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  async def refresh_stocks() -> None:
20
  """Refresh historical prices, current snapshot data, and corporate actions."""
21
  from App.routers.stocks.crud import (
@@ -30,10 +48,19 @@ async def refresh_stocks() -> None:
30
 
31
  stocks = await Stock.all()
32
  if stocks:
33
- logger.info("[scheduler] Refreshing price history for %s stock(s)", len(stocks))
 
34
  today = date_type.today()
35
-
36
- for stock in stocks:
 
 
 
 
 
 
 
 
37
  try:
38
  latest = (
39
  await StockPriceData.filter(stock=stock).order_by("-date").first()
@@ -43,15 +70,15 @@ async def refresh_stocks() -> None:
43
  data = await fetch_dse_stock_data(stock.symbol, days=days)
44
  if not data.get("success"):
45
  logger.warning(
46
- "[scheduler] %s: price-history API returned failure",
 
 
47
  stock.symbol,
48
  )
 
49
  continue
50
 
51
  raw = data.get("data") or []
52
- if not raw:
53
- continue
54
-
55
  existing_dates = set(
56
  await StockPriceData.filter(stock=stock).values_list(
57
  "date", flat=True
@@ -66,37 +93,63 @@ async def refresh_stocks() -> None:
66
 
67
  if new_rows:
68
  await bulk_insert_price_data(stock, new_rows)
69
- logger.info(
70
- "[scheduler] %s: inserted %s new price record(s)",
71
- stock.symbol,
72
- len(new_rows),
73
- )
 
 
 
 
74
  except Exception as exc:
 
75
  logger.error(
76
- "[scheduler] %s: price-history refresh failed: %s",
 
 
77
  stock.symbol,
78
  exc,
79
  )
 
 
 
 
 
80
  else:
81
- logger.info("[scheduler] No stocks in DB; skipping price-history refresh")
82
 
83
  scraper = DseScraper()
84
 
85
  try:
 
86
  market_rows, _ = await scraper.fetch_market_snapshot()
87
  if market_rows:
88
  stats = await upsert_stock_reference_records(market_rows)
89
- logger.info("[scheduler] DSE market snapshot sync complete: %s", stats)
 
 
 
 
 
 
90
  except Exception as exc:
91
- logger.error("[scheduler] DSE market snapshot sync failed: %s", exc)
92
 
93
  try:
 
94
  actions, _ = await scraper.fetch_corporate_actions()
95
  if actions:
96
  stats = await upsert_corporate_actions(actions)
97
- logger.info("[scheduler] DSE corporate actions sync complete: %s", stats)
 
 
 
 
 
 
98
  except Exception as exc:
99
- logger.error("[scheduler] DSE corporate actions sync failed: %s", exc)
100
 
101
 
102
  async def refresh_funds() -> None:
@@ -104,10 +157,11 @@ async def refresh_funds() -> None:
104
  try:
105
  from App.routers.funds.runner import run_import
106
 
 
107
  stats = await run_import("all")
108
- logger.info("[scheduler] Fund refresh complete: %s", stats)
109
  except Exception as exc:
110
- logger.error("[scheduler] Fund refresh failed: %s", exc)
111
 
112
 
113
  async def refresh_bonds() -> None:
@@ -153,13 +207,24 @@ async def refresh_bonds() -> None:
153
  **bond_data.dict(exclude_unset=True)
154
  )
155
  updated += 1
 
156
  else:
157
  await Bond.create(**bond_data.dict())
158
  created += 1
159
  existing_auction_numbers.add(bond_data.auction_number)
 
160
 
161
  if bond_data.isin:
162
  processed_isins.add(bond_data.isin)
 
 
 
 
 
 
 
 
 
163
  except Exception as exc:
164
  failed += 1
165
  logger.error(
@@ -199,10 +264,21 @@ async def scheduled_market_loop() -> None:
199
  """Refresh stocks, funds, and bonds at the configured Dar es Salaam sync windows."""
200
  while True:
201
  delay = _seconds_until_next_market_sync()
202
- logger.info("[scheduler] Next market sync in %.0f seconds", delay)
 
 
 
 
 
203
  await asyncio.sleep(delay)
204
- logger.info("[scheduler] Scheduled market sync triggered")
205
- await refresh_stocks()
206
- await refresh_funds()
207
- await refresh_bonds()
208
- logger.info("[scheduler] Scheduled market sync complete")
 
 
 
 
 
 
 
7
 
8
  import asyncio
9
  import logging
10
+ import time
11
+ import uuid
12
  from datetime import datetime, timedelta
13
  from zoneinfo import ZoneInfo
14
 
 
18
  DAR_TZ = ZoneInfo("Africa/Dar_es_Salaam")
19
 
20
 
21
+ async def _run_refresh_phase(run_id: str, name: str, refresh_func) -> None:
22
+ started_at = time.monotonic()
23
+ logger.info("[scheduler:%s] %s started", run_id, name)
24
+ try:
25
+ await refresh_func()
26
+ except Exception:
27
+ logger.exception("[scheduler:%s] %s failed", run_id, name)
28
+ return
29
+ logger.info(
30
+ "[scheduler:%s] %s finished in %.1fs",
31
+ run_id,
32
+ name,
33
+ time.monotonic() - started_at,
34
+ )
35
+
36
+
37
  async def refresh_stocks() -> None:
38
  """Refresh historical prices, current snapshot data, and corporate actions."""
39
  from App.routers.stocks.crud import (
 
48
 
49
  stocks = await Stock.all()
50
  if stocks:
51
+ total = len(stocks)
52
+ logger.info("[scheduler] stocks: price history started for %s stock(s)", total)
53
  today = date_type.today()
54
+ inserted_total = 0
55
+ failed_total = 0
56
+
57
+ for index, stock in enumerate(stocks, start=1):
58
+ logger.info(
59
+ "[scheduler] stocks: price history %s/%s %s started",
60
+ index,
61
+ total,
62
+ stock.symbol,
63
+ )
64
  try:
65
  latest = (
66
  await StockPriceData.filter(stock=stock).order_by("-date").first()
 
70
  data = await fetch_dse_stock_data(stock.symbol, days=days)
71
  if not data.get("success"):
72
  logger.warning(
73
+ "[scheduler] stocks: price history %s/%s %s API returned failure",
74
+ index,
75
+ total,
76
  stock.symbol,
77
  )
78
+ failed_total += 1
79
  continue
80
 
81
  raw = data.get("data") or []
 
 
 
82
  existing_dates = set(
83
  await StockPriceData.filter(stock=stock).values_list(
84
  "date", flat=True
 
93
 
94
  if new_rows:
95
  await bulk_insert_price_data(stock, new_rows)
96
+ inserted_total += len(new_rows)
97
+ logger.info(
98
+ "[scheduler] stocks: price history %s/%s %s done, raw=%s new=%s",
99
+ index,
100
+ total,
101
+ stock.symbol,
102
+ len(raw),
103
+ len(new_rows),
104
+ )
105
  except Exception as exc:
106
+ failed_total += 1
107
  logger.error(
108
+ "[scheduler] stocks: price history %s/%s %s failed: %s",
109
+ index,
110
+ total,
111
  stock.symbol,
112
  exc,
113
  )
114
+ logger.info(
115
+ "[scheduler] stocks: price history complete, inserted=%s failed=%s",
116
+ inserted_total,
117
+ failed_total,
118
+ )
119
  else:
120
+ logger.info("[scheduler] stocks: no rows in DB; skipping price-history refresh")
121
 
122
  scraper = DseScraper()
123
 
124
  try:
125
+ logger.info("[scheduler] stocks: DSE market snapshot started")
126
  market_rows, _ = await scraper.fetch_market_snapshot()
127
  if market_rows:
128
  stats = await upsert_stock_reference_records(market_rows)
129
+ logger.info(
130
+ "[scheduler] stocks: DSE market snapshot complete, rows=%s stats=%s",
131
+ len(market_rows),
132
+ stats,
133
+ )
134
+ else:
135
+ logger.info("[scheduler] stocks: DSE market snapshot complete, rows=0")
136
  except Exception as exc:
137
+ logger.error("[scheduler] stocks: DSE market snapshot failed: %s", exc)
138
 
139
  try:
140
+ logger.info("[scheduler] stocks: DSE corporate actions started")
141
  actions, _ = await scraper.fetch_corporate_actions()
142
  if actions:
143
  stats = await upsert_corporate_actions(actions)
144
+ logger.info(
145
+ "[scheduler] stocks: DSE corporate actions complete, rows=%s stats=%s",
146
+ len(actions),
147
+ stats,
148
+ )
149
+ else:
150
+ logger.info("[scheduler] stocks: DSE corporate actions complete, rows=0")
151
  except Exception as exc:
152
+ logger.error("[scheduler] stocks: DSE corporate actions failed: %s", exc)
153
 
154
 
155
  async def refresh_funds() -> None:
 
157
  try:
158
  from App.routers.funds.runner import run_import
159
 
160
+ logger.info("[scheduler] funds: import started")
161
  stats = await run_import("all")
162
+ logger.info("[scheduler] funds: import complete: %s", stats)
163
  except Exception as exc:
164
+ logger.error("[scheduler] funds: import failed: %s", exc)
165
 
166
 
167
  async def refresh_bonds() -> None:
 
207
  **bond_data.dict(exclude_unset=True)
208
  )
209
  updated += 1
210
+ action = "updated"
211
  else:
212
  await Bond.create(**bond_data.dict())
213
  created += 1
214
  existing_auction_numbers.add(bond_data.auction_number)
215
+ action = "created"
216
 
217
  if bond_data.isin:
218
  processed_isins.add(bond_data.isin)
219
+ logger.info(
220
+ "[scheduler] bonds: auction=%s isin=%s %s, created=%s updated=%s failed=%s",
221
+ bond_data.auction_number,
222
+ bond_data.isin,
223
+ action,
224
+ created,
225
+ updated,
226
+ failed,
227
+ )
228
  except Exception as exc:
229
  failed += 1
230
  logger.error(
 
264
  """Refresh stocks, funds, and bonds at the configured Dar es Salaam sync windows."""
265
  while True:
266
  delay = _seconds_until_next_market_sync()
267
+ next_run = datetime.now(DAR_TZ) + timedelta(seconds=delay)
268
+ logger.info(
269
+ "[scheduler] Next market sync at %s, in %.0f seconds",
270
+ next_run.isoformat(timespec="seconds"),
271
+ delay,
272
+ )
273
  await asyncio.sleep(delay)
274
+ run_id = uuid.uuid4().hex[:8]
275
+ started_at = time.monotonic()
276
+ logger.info("[scheduler:%s] Scheduled market sync started", run_id)
277
+ await _run_refresh_phase(run_id, "stocks", refresh_stocks)
278
+ await _run_refresh_phase(run_id, "funds", refresh_funds)
279
+ await _run_refresh_phase(run_id, "bonds", refresh_bonds)
280
+ logger.info(
281
+ "[scheduler:%s] Scheduled market sync complete in %.1fs",
282
+ run_id,
283
+ time.monotonic() - started_at,
284
+ )
main.py CHANGED
@@ -1,4 +1,5 @@
1
  import asyncio
 
2
  from fastapi import FastAPI, Request
3
  from fastapi.middleware.cors import CORSMiddleware
4
  from fastapi.responses import JSONResponse
@@ -17,6 +18,12 @@ from App.scheduler import scheduled_market_loop
17
 
18
  from db import init_db, close_db, clear_db
19
 
 
 
 
 
 
 
20
  app = FastAPI(title="Uwekezaji API", description="Stock Market Data API", redirect_slashes=False)
21
 
22
 
@@ -78,8 +85,11 @@ app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*")
78
  # Database initialization and cleanup
79
  @app.on_event("startup")
80
  async def startup_event():
 
81
  await init_db()
 
82
  asyncio.create_task(scheduled_market_loop())
 
83
 
84
 
85
  @app.on_event("shutdown")
 
1
  import asyncio
2
+ import logging
3
  from fastapi import FastAPI, Request
4
  from fastapi.middleware.cors import CORSMiddleware
5
  from fastapi.responses import JSONResponse
 
18
 
19
  from db import init_db, close_db, clear_db
20
 
21
+ logging.basicConfig(
22
+ level=logging.INFO,
23
+ format="%(asctime)s %(levelname)s %(name)s: %(message)s",
24
+ )
25
+ logger = logging.getLogger(__name__)
26
+
27
  app = FastAPI(title="Uwekezaji API", description="Stock Market Data API", redirect_slashes=False)
28
 
29
 
 
85
  # Database initialization and cleanup
86
  @app.on_event("startup")
87
  async def startup_event():
88
+ logger.info("[startup] database initialization started")
89
  await init_db()
90
+ logger.info("[startup] database initialization complete")
91
  asyncio.create_task(scheduled_market_loop())
92
+ logger.info("[startup] scheduled market background loop started")
93
 
94
 
95
  @app.on_event("shutdown")