project/ ├── main.py ├── routers/ │ ├── stocks/ │ │ ├── __init__.py │ │ ├── models.py │ │ ├── schemas.py │ │ ├── service.py │ │ └── routes.py │ ├── utt/ │ │ ├── __init__.py │ │ ├── models.py │ │ ├── schemas.py │ │ ├── service.py │ │ └── routes.py │ └── tasks/ │ ├── __init__.py │ ├── models.py │ ├── schemas.py │ └── routes.py # main.py from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse fromApp.routers.stocks.routes import router as stocks_router fromApp.routers.utt.routes import router as utt_router fromApp.routers.tasks.routes import router as tasks_router app = FastAPI() @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): return JSONResponse(status_code=exc.status_code, content={"success": False, "message": exc.detail}) @app.exception_handler(Exception) async def generic_exception_handler(request: Request, exc: Exception): return JSONResponse(status_code=500, content={"success": False, "message": str(exc)}) app.include_router(stocks_router) app.include_router(utt_router) app.include_router(tasks_router) # routers/tasks/models.py from tortoise import fields, models from enum import Enum class TaskStatus(str, Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" class ImportTask(models.Model): id = fields.IntField(pk=True) task_type = fields.CharField(max_length=50) status = fields.CharEnumField(TaskStatus, default=TaskStatus.PENDING) details = fields.JSONField(null=True) created_at = fields.DatetimeField(auto_now_add=True) updated_at = fields.DatetimeField(auto_now=True) class Meta: table = "import_tasks" # routers/tasks/schemas.py from pydantic import BaseModel from datetime import datetime from enum import Enum class TaskStatus(str, Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" class ImportTaskResponse(BaseModel): id: int task_type: str status: TaskStatus details: dict | None created_at: datetime updated_at: datetime class ResponseModel(BaseModel): success: bool message: str data: dict | list | None = None # routers/tasks/routes.py from fastapi import APIRouter, HTTPException from .models import ImportTask from .schemas import ImportTaskResponse, ResponseModel router = APIRouter(prefix="/tasks", tags=["Tasks"]) @router.get("/", response_model=ResponseModel) async def list_tasks(): tasks = await ImportTask.all().order_by("-created_at") return ResponseModel(success=True, message="List of tasks", data=[task for task in tasks]) @router.get("/{task_id}", response_model=ResponseModel) async def get_task(task_id: int): task = await ImportTask.get_or_none(id=task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") return ResponseModel(success=True, message="Task found", data=task) # You would follow the same modular structure for stocks and utt # Including their own models.py, schemas.py, routes.py, service.py # The routes would queue background tasks and use task_id for status tracking. project/ ├── main.py ├── routers/ │ ├── stocks/ │ │ ├── __init__.py │ │ ├── models.py │ │ ├── schemas.py │ │ ├── service.py │ │ └── routes.py │ ├── utt/ │ │ ├── __init__.py │ │ ├── models.py │ │ ├── schemas.py │ │ ├── service.py │ │ └── routes.py │ └── tasks/ │ ├── __init__.py │ ├── models.py │ ├── schemas.py │ └── routes.py # routers/stocks/models.py from tortoise import fields, models class Stock(models.Model): id = fields.IntField(pk=True) symbol = fields.CharField(max_length=10, unique=True) name = fields.CharField(max_length=100) sector = fields.CharField(max_length=100, null=True) class StockPriceData(models.Model): id = fields.IntField(pk=True) stock = fields.ForeignKeyField("models.Stock", related_name="prices") date = fields.DateField() opening_price = fields.FloatField() closing_price = fields.FloatField() high = fields.FloatField() low = fields.FloatField() volume = fields.IntField() turnover = fields.BigIntField() shares_in_issue = fields.BigIntField() market_cap = fields.BigIntField() class Meta: unique_together = ("stock", "date") # routers/stocks/schemas.py from pydantic import BaseModel from datetime import date from typing import List, Optional fromApp.routers.tasks.schemas import ResponseModel class StockBase(BaseModel): symbol: str name: str sector: Optional[str] = None class StockResponse(StockBase): id: int class StockPriceResponse(BaseModel): date: date opening_price: float closing_price: float high: float low: float volume: int turnover: int shares_in_issue: int market_cap: int class StockPriceListResponse(BaseModel): stock: StockResponse prices: List[StockPriceResponse] # routers/stocks/service.py from datetime import datetime async def fetch_stock_data(symbol: str): import httpx url = f"https://dse.co.tz/api/get/market/prices/for/range/duration?security_code={symbol}&days=5000&class=EQUITY" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code == 200: return response.json().get("data", []) return [] def parse_stock_api_row(row: dict) -> dict: return { "date": datetime.fromisoformat(row["trade_date"]).date(), "opening_price": row["opening_price"], "closing_price": row["closing_price"], "high": row["high"], "low": row["low"], "volume": row["volume"], "turnover": row["turnover"], "shares_in_issue": row["shares_in_issue"], "market_cap": row["market_cap"], } # routers/stocks/routes.py from fastapi import APIRouter, HTTPException, BackgroundTasks from tortoise.transactions import in_transaction from .models import Stock, StockPriceData from .schemas import StockResponse, StockPriceListResponse, ResponseModel from .service import fetch_stock_data, parse_stock_api_row fromApp.routers.tasks.models import ImportTask from typing import List router = APIRouter(prefix="/stocks", tags=["Stocks"]) @router.get("/", response_model=ResponseModel) async def list_stocks(): stocks = await Stock.all() return ResponseModel(success=True, message="List of stocks", data=stocks) @router.get("/{symbol}", response_model=ResponseModel) async def get_stock_prices(symbol: str): stock = await Stock.get_or_none(symbol=symbol) if not stock: raise HTTPException(status_code=404, detail="Stock not found") prices = await StockPriceData.filter(stock=stock).order_by("-date") return ResponseModel(success=True, message="Stock price data", data={"stock": stock, "prices": prices}) @router.post("/import/{symbol}", response_model=ResponseModel) async def queue_import_stock(symbol: str, background_tasks: BackgroundTasks): task = await ImportTask.create(task_type="stocks", status="pending", details={"symbol": symbol}) background_tasks.add_task(run_stock_import_task, task.id, symbol) return ResponseModel(success=True, message="Stock import task queued", data={"task_id": task.id}) async def run_stock_import_task(task_id: int, symbol: str): try: await ImportTask.filter(id=task_id).update(status="running") raw_data = await fetch_stock_data(symbol) if not raw_data: await ImportTask.filter(id=task_id).update(status="failed", details={"error": "No data"}) return first = raw_data[0] stock, _ = await Stock.get_or_create(symbol=first["company"], defaults={"name": first["fullName"]}) existing_dates = set(await StockPriceData.filter(stock=stock).values_list("date", flat=True)) records = [] for row in raw_data: parsed = parse_stock_api_row(row) if parsed["date"] not in existing_dates: records.append(StockPriceData(stock=stock, **parsed)) async with in_transaction(): await StockPriceData.bulk_create(records, ignore_conflicts=True) await ImportTask.filter(id=task_id).update(status="completed") except Exception as e: await ImportTask.filter(id=task_id).update(status="failed", details={"error": str(e)}) # routers/utt/models.py from tortoise import fields, models class UTTFund(models.Model): id = fields.IntField(pk=True) symbol = fields.CharField(max_length=20, unique=True) name = fields.CharField(max_length=100) class UTTFundData(models.Model): id = fields.IntField(pk=True) fund = fields.ForeignKeyField("models.UTTFund", related_name="data") date = fields.DateField() nav_per_unit = fields.FloatField() sale_price_per_unit = fields.FloatField() repurchase_price_per_unit = fields.FloatField() outstanding_number_of_units = fields.BigIntField() net_asset_value = fields.BigIntField() class Meta: unique_together = ("fund", "date") # routers/utt/schemas.py from pydantic import BaseModel from datetime import date from typing import List fromApp.routers.tasks.schemas import ResponseModel class UTTFundResponse(BaseModel): id: int symbol: str name: str class UTTFundDataResponse(BaseModel): date: date nav_per_unit: float sale_price_per_unit: float repurchase_price_per_unit: float outstanding_number_of_units: int net_asset_value: int class UTTFundListResponse(BaseModel): fund: UTTFundResponse data: List[UTTFundDataResponse] # routers/utt/service.py from datetime import datetime import httpx async def fetch_all_utt_data(): url = "https://example.com/utt/api" # Placeholder async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code == 200: return response.json().get("data", []) return [] def parse_utt_api_row(row: dict) -> dict: return { "date": datetime.strptime(row["date_valued"], "%d-%m-%Y").date(), "nav_per_unit": float(row["nav_per_unit"]), "sale_price_per_unit": float(row["sale_price_per_unit"]), "repurchase_price_per_unit": float(row["repurchase_price_per_unit"]), "outstanding_number_of_units": int(float(row["outstanding_number_of_units"])), "net_asset_value": int(float(row["net_asset_value"])) } # routers/utt/routes.py from fastapi import APIRouter, BackgroundTasks, HTTPException from .models import UTTFund, UTTFundData from .schemas import UTTFundResponse, UTTFundListResponse, ResponseModel from .service import fetch_all_utt_data, parse_utt_api_row fromApp.routers.tasks.models import ImportTask router = APIRouter(prefix="/utt", tags=["UTT"]) @router.get("/", response_model=ResponseModel) async def list_funds(): funds = await UTTFund.all() return ResponseModel(success=True, message="List of UTT funds", data=funds) @router.get("/{symbol}", response_model=ResponseModel) async def get_fund_data(symbol: str): fund = await UTTFund.get_or_none(symbol=symbol) if not fund: raise HTTPException(status_code=404, detail="Fund not found") data = await UTTFundData.filter(fund=fund).order_by("-date") return ResponseModel(success=True, message="Fund data", data={"fund": fund, "data": data}) @router.post("/import-all", response_model=ResponseModel) async def queue_import_utt(background_tasks: BackgroundTasks): task = await ImportTask.create(task_type="utt", status="pending", details={}) background_tasks.add_task(run_utt_import_task, task.id) return ResponseModel(success=True, message="UTT import task queued", data={"task_id": task.id}) async def run_utt_import_task(task_id: int): from tortoise.transactions import in_transaction try: await ImportTask.filter(id=task_id).update(status="running") raw_data = await fetch_all_utt_data() if not raw_data: await ImportTask.filter(id=task_id).update(status="failed", details={"error": "No data"}) return for row in raw_data: symbol = row["internal_name"] name = row["scheme_name"] fund, _ = await UTTFund.get_or_create(symbol=symbol, defaults={"name": name}) parsed = parse_utt_api_row(row) exists = await UTTFundData.exists(fund=fund, date=parsed["date"]) if not exists: await UTTFundData.create(fund=fund, **parsed) await ImportTask.filter(id=task_id).update(status="completed") except Exception as e: await ImportTask.filter(id=task_id).update(status="failed", details={"error": str(e)})