InvestingTest / main.py
Mbonea's picture
Expand economy document ingestion
6541314
import asyncio
import logging
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError, HTTPException
from starlette.status import HTTP_400_BAD_REQUEST
from App.routers.stocks.routes import router as stocks_router
from App.routers.funds.routes import router as funds_router
from App.routers.bonds.routes import router as bonds_router
from App.routers.tasks.routes import router as tasks_router
from App.routers.users.routes import router as users_router
from App.routers.portfolio.routes import router as portfolio_router
from App.routers.admin.routes import router as admin_router
from App.routers.economy.config import load_local_env
from App.routers.economy.routes import api_router as economy_api_router, router as economy_router
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
from App.schemas import ResponseModel, AppException
from App.scheduler import scheduled_economy_loop, scheduled_market_loop, scheduled_news_loop
from db import init_db, close_db, clear_db
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
load_local_env()
app = FastAPI(
title="Uwekezaji API", description="Stock Market Data API", redirect_slashes=False
)
def _json_safe(value):
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace")
if isinstance(value, dict):
return {key: _json_safe(item) for key, item in value.items()}
if isinstance(value, list):
return [_json_safe(item) for item in value]
if isinstance(value, tuple):
return [_json_safe(item) for item in value]
return value
@app.exception_handler(AppException)
async def custom_http_exception_handler(request: Request, exc: AppException):
return JSONResponse(
status_code=exc.status_code,
content=ResponseModel(
success=False,
message=getattr(exc, "message", str(exc.detail)),
data=getattr(exc, "data", None),
).model_dump(),
)
@app.exception_handler(HTTPException)
async def generic_http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content=ResponseModel(
success=False,
message=str(exc.detail),
data=None,
).model_dump(),
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=HTTP_400_BAD_REQUEST,
content=ResponseModel(
success=False,
message="Validation error",
data={"errors": _json_safe(exc.errors())},
).model_dump(),
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(stocks_router)
app.include_router(funds_router)
app.include_router(bonds_router)
app.include_router(tasks_router)
app.include_router(users_router)
app.include_router(portfolio_router)
app.include_router(admin_router)
app.include_router(economy_router)
app.include_router(economy_api_router)
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*")
@app.on_event("startup")
async def startup_event():
logger.info("[startup] database initialization started")
await init_db()
logger.info("[startup] database initialization complete")
asyncio.create_task(scheduled_market_loop())
logger.info("[startup] scheduled market background loop started")
asyncio.create_task(scheduled_news_loop())
logger.info("[startup] scheduled news background loop started")
asyncio.create_task(scheduled_economy_loop())
logger.info("[startup] economy pipeline loop registered")
@app.on_event("shutdown")
async def shutdown_event():
await close_db()
@app.get("/")
async def root():
return {"message": "Welcome to Uwekezaji API"}