InvestingTest / db.py
Mbonea's picture
Store economy payloads and enrich bond advice
0e076de
from tortoise import Tortoise, connections
import os
import ssl
import uuid
from asyncpg import Connection
# It's a good practice to create the SSL context outside the dict
ssl_context = ssl.create_default_context()
# 2. Update your TORTOISE_ORM configuration
SQLITE_DB = "db.sqlite3"
TORTOISE_ORM = {
"connections": {
"default": {
"engine": "tortoise.backends.asyncpg",
"credentials": {
"host": "aws-0-us-east-2.pooler.supabase.com",
"port": "5432",
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"database": "postgres",
"min_size": 1,
"max_size": 10,
"timeout": 30,
"max_queries": 50000,
"max_inactive_connection_lifetime": 300.0,
},
"connect_args": {
"statement_cache_size": 0,
"ssl": ssl_context
}
}
},
"apps": {
"models": {
"models": [
"App.routers.stocks.models",
"App.routers.tasks.models",
"App.routers.funds.models",
"App.routers.admin.models",
"App.routers.users.models",
"App.routers.portfolio.models",
"App.routers.bonds.models",
"App.routers.economy.models",
"aerich.models",
],
"default_connection": "default",
}
},
}
DATA_DIR = os.getenv("UWEKEZAJI_DATA_DIR", "/data/InvestingTest")
SQLITE_DB_PATH = os.getenv(
"SQLITE_DB_PATH",
os.path.join(DATA_DIR, "db.sqlite3") if os.name != "nt" and os.path.isdir("/data") else SQLITE_DB,
)
if SQLITE_DB_PATH != SQLITE_DB:
os.makedirs(os.path.dirname(SQLITE_DB_PATH), exist_ok=True)
TORTOISE_ORM["connections"]["default"] = {
"engine": "tortoise.backends.sqlite",
"credentials": {"file_path": SQLITE_DB_PATH},
}
async def init_db():
await Tortoise.init(config=TORTOISE_ORM)
await Tortoise.generate_schemas()
await ensure_user_auth_profile_schema()
await ensure_admin_schema()
await ensure_stock_news_schema()
await ensure_economy_schema()
await sync_stock_reference_seed_data()
await sync_fund_info_seed_data()
await sync_stock_company_seed_data()
await sync_stock_fundamental_seed_data()
await sync_stock_dividend_seed_data()
await ensure_default_admin_user()
async def close_db():
await Tortoise.close_connections()
async def clear_db():
for model in Tortoise.apps.get("models").values():
await model.all().delete()
async def ensure_user_auth_profile_schema():
"""Add auth/profile columns for older local SQLite databases."""
connection = connections.get("default")
if connection.capabilities.dialect != "sqlite":
return
rows = await connection.execute_query_dict('PRAGMA table_info("users")')
existing_columns = {row["name"] for row in rows}
column_sql = {
"display_name": 'ALTER TABLE "users" ADD COLUMN "display_name" VARCHAR(100)',
"phone_number": 'ALTER TABLE "users" ADD COLUMN "phone_number" VARCHAR(30)',
"location": 'ALTER TABLE "users" ADD COLUMN "location" VARCHAR(120)',
"bio": 'ALTER TABLE "users" ADD COLUMN "bio" TEXT',
"authenticator_secret": 'ALTER TABLE "users" ADD COLUMN "authenticator_secret" VARCHAR(64)',
"authenticator_enabled": 'ALTER TABLE "users" ADD COLUMN "authenticator_enabled" INT NOT NULL DEFAULT 0',
"authenticator_login_enabled": 'ALTER TABLE "users" ADD COLUMN "authenticator_login_enabled" INT NOT NULL DEFAULT 1',
"require_authenticator_for_password_login": 'ALTER TABLE "users" ADD COLUMN "require_authenticator_for_password_login" INT NOT NULL DEFAULT 0',
"is_admin": 'ALTER TABLE "users" ADD COLUMN "is_admin" INT NOT NULL DEFAULT 0',
"is_active": 'ALTER TABLE "users" ADD COLUMN "is_active" INT NOT NULL DEFAULT 1',
"must_change_password": 'ALTER TABLE "users" ADD COLUMN "must_change_password" INT NOT NULL DEFAULT 0',
"support_notes": 'ALTER TABLE "users" ADD COLUMN "support_notes" TEXT',
}
for column, sql in column_sql.items():
if column not in existing_columns:
await connection.execute_script(sql)
portfolio_bond_rows = await connection.execute_query_dict(
'PRAGMA table_info("portfolio_bonds")'
)
portfolio_bond_columns = {row["name"] for row in portfolio_bond_rows}
portfolio_bond_sql = {
"holding_number": 'ALTER TABLE "portfolio_bonds" ADD COLUMN "holding_number" INT',
"holding_status": 'ALTER TABLE "portfolio_bonds" ADD COLUMN "holding_status" VARCHAR(30)',
}
for column, sql in portfolio_bond_sql.items():
if column not in portfolio_bond_columns:
await connection.execute_script(sql)
async def sync_fund_info_seed_data():
"""Populate curated mutual fund metadata tables from the bundled seed JSON."""
try:
from App.routers.funds.seed import sync_fund_info_from_json
await sync_fund_info_from_json()
except Exception:
# Startup should not fail just because curated metadata could not sync.
return
async def ensure_default_admin_user():
try:
from App.routers.admin.seed import ensure_default_admin
await ensure_default_admin()
except Exception:
return
async def sync_stock_company_seed_data():
"""Populate curated stock company metadata from bundled research JSON."""
try:
from App.routers.stocks.seed import sync_stock_company_info_from_json
await sync_stock_company_info_from_json()
except Exception:
return
async def sync_stock_reference_seed_data():
"""Populate stock issued-share reference data from the bundled seed JSON."""
try:
from App.routers.stocks.seed import sync_stock_reference_data_from_json
await sync_stock_reference_data_from_json()
except Exception:
return
async def sync_stock_dividend_seed_data():
"""Populate curated stock dividend history from bundled research JSON."""
try:
from App.routers.stocks.seed import sync_stock_dividends_from_json
await sync_stock_dividends_from_json()
except Exception:
return
async def sync_stock_fundamental_seed_data():
"""Populate cached stock valuation/fundamental snapshots from bundled data."""
try:
from App.routers.stocks.seed import sync_stock_fundamentals_from_json
await sync_stock_fundamentals_from_json()
except Exception:
return
async def ensure_admin_schema():
connection = connections.get("default")
if connection.capabilities.dialect != "sqlite":
return
await connection.execute_script(
"""
CREATE TABLE IF NOT EXISTS "admin_audit_logs" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"action" VARCHAR(120) NOT NULL,
"entity_type" VARCHAR(80),
"entity_id" VARCHAR(120),
"details" JSON,
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"admin_id" CHAR(36) REFERENCES "users" ("id") ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS "learning_articles" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"title" VARCHAR(200) NOT NULL,
"slug" VARCHAR(220) NOT NULL UNIQUE,
"category" VARCHAR(80) NOT NULL,
"summary" TEXT,
"content" TEXT NOT NULL,
"difficulty" VARCHAR(30) NOT NULL DEFAULT 'beginner',
"status" VARCHAR(20) NOT NULL DEFAULT 'draft',
"sort_order" INT NOT NULL DEFAULT 0,
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"created_by_id" CHAR(36) REFERENCES "users" ("id") ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS "learning_quizzes" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"title" VARCHAR(200) NOT NULL,
"slug" VARCHAR(220) NOT NULL UNIQUE,
"category" VARCHAR(80) NOT NULL,
"description" TEXT,
"difficulty" VARCHAR(30) NOT NULL DEFAULT 'beginner',
"passing_score" INT NOT NULL DEFAULT 70,
"status" VARCHAR(20) NOT NULL DEFAULT 'draft',
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"article_id" INT REFERENCES "learning_articles" ("id") ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS "learning_quiz_questions" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"question_type" VARCHAR(40) NOT NULL DEFAULT 'scenario',
"prompt" TEXT NOT NULL,
"scenario" TEXT,
"life_situation" TEXT,
"calculation_notes" TEXT,
"options" JSON,
"correct_answer" JSON,
"explanation" TEXT,
"points" INT NOT NULL DEFAULT 1,
"sort_order" INT NOT NULL DEFAULT 0,
"quiz_id" INT NOT NULL REFERENCES "learning_quizzes" ("id") ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS "learning_quiz_attempts" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"score" VARCHAR(40) NOT NULL,
"passed" INT NOT NULL DEFAULT 0,
"answers" JSON,
"feedback" JSON,
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"quiz_id" INT NOT NULL REFERENCES "learning_quizzes" ("id") ON DELETE CASCADE,
"user_id" CHAR(36) NOT NULL REFERENCES "users" ("id") ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS "admin_support_notes" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"note" TEXT NOT NULL,
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"admin_id" CHAR(36) REFERENCES "users" ("id") ON DELETE CASCADE,
"user_id" CHAR(36) NOT NULL REFERENCES "users" ("id") ON DELETE CASCADE
);
"""
)
async def ensure_economy_schema():
"""Create economy pipeline tables for older SQLite databases."""
connection = connections.get("default")
if connection.capabilities.dialect != "sqlite":
return
await connection.execute_script(
"""
CREATE TABLE IF NOT EXISTS "economic_documents" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"source" VARCHAR(80) NOT NULL,
"document_type" VARCHAR(80) NOT NULL,
"title" VARCHAR(300) NOT NULL,
"report_date" DATE,
"year" INT,
"month" INT,
"source_url" VARCHAR(1000) NOT NULL UNIQUE,
"local_path" VARCHAR(1000) NOT NULL,
"sha256" VARCHAR(64) NOT NULL,
"mime_type" VARCHAR(120),
"downloaded_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"parse_status" VARCHAR(40) NOT NULL DEFAULT 'downloaded',
"raw_metadata" JSON
);
CREATE TABLE IF NOT EXISTS "dse_market_reports" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"report_date" DATE NOT NULL UNIQUE,
"source_url" VARCHAR(1000) NOT NULL,
"local_path" VARCHAR(1000) NOT NULL,
"screenshot_dir" VARCHAR(1000),
"equity_turnover" VARCHAR(40),
"etf_turnover" VARCHAR(40),
"government_bond_face_value" VARCHAR(40),
"government_bond_transaction_value" VARCHAR(40),
"corporate_bond_value" VARCHAR(40),
"parse_status" VARCHAR(40) NOT NULL DEFAULT 'pending',
"schema_version" VARCHAR(40) NOT NULL DEFAULT 'dse-market-report-v1',
"raw_payload" JSON,
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS "secondary_bond_trades" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"report_id" INT NOT NULL REFERENCES "dse_market_reports" ("id") ON DELETE CASCADE,
"trade_date" DATE NOT NULL,
"bond_no" VARCHAR(80) NOT NULL,
"matched_bond_id" INT REFERENCES "bonds" ("id") ON DELETE CASCADE,
"term_years" REAL,
"coupon_rate" REAL,
"issue_date" DATE,
"maturity_date" DATE,
"amount_tzs" VARCHAR(40),
"deals" INT,
"price_percent" REAL,
"yield_percent" REAL,
"raw_row" JSON,
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS "secondary_bond_snapshots" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"bond_id" INT REFERENCES "bonds" ("id") ON DELETE CASCADE,
"bond_no" VARCHAR(80) NOT NULL,
"snapshot_date" DATE NOT NULL,
"latest_trade_date" DATE NOT NULL,
"latest_price_percent" REAL,
"latest_yield_percent" REAL,
"weighted_avg_price_percent" REAL,
"weighted_avg_yield_percent" REAL,
"total_amount_tzs" VARCHAR(40),
"total_deals" INT NOT NULL DEFAULT 0,
"liquidity_score" REAL NOT NULL DEFAULT 0,
"stale_after_days" INT NOT NULL DEFAULT 45,
"valuation_quality" VARCHAR(40) NOT NULL DEFAULT 'secondary_market',
"raw_payload" JSON,
"updated_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE("bond_no", "snapshot_date")
);
CREATE TABLE IF NOT EXISTS "market_alerts" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"alert_date" DATE NOT NULL,
"alert_type" VARCHAR(80) NOT NULL,
"severity" VARCHAR(30) NOT NULL DEFAULT 'info',
"asset_type" VARCHAR(40),
"asset_id" INT,
"title" VARCHAR(300) NOT NULL,
"comment" TEXT NOT NULL,
"evidence" JSON,
"opportunity_score" REAL,
"confidence" REAL NOT NULL DEFAULT 0.5,
"source_report_id" INT REFERENCES "dse_market_reports" ("id") ON DELETE CASCADE,
"source_trade_ids" JSON,
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS "market_intelligence_snapshots" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"snapshot_date" DATE NOT NULL UNIQUE,
"schema_version" VARCHAR(40) NOT NULL DEFAULT 'market-intelligence-v1',
"overall_comment" TEXT NOT NULL,
"inflation_comment" TEXT,
"secondary_bond_comment" TEXT,
"equity_comment" TEXT,
"etf_comment" TEXT,
"rules_comment" TEXT,
"geopolitical_comment" TEXT,
"highlights" JSON,
"strange_activity" JSON,
"opportunities" JSON,
"risks" JSON,
"source_document_ids" JSON,
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS "economy_pipeline_runs" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"run_key" VARCHAR(120) NOT NULL UNIQUE,
"status" VARCHAR(40) NOT NULL DEFAULT 'running',
"started_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"finished_at" TIMESTAMP,
"report_date" DATE,
"steps" JSON,
"error" TEXT,
"upload_status" VARCHAR(40),
"upload_response_code" INT,
"upload_response" TEXT,
"retry_count" INT NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS "economy_ingest_payloads" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"report_date" DATE NOT NULL UNIQUE,
"schema_version" VARCHAR(80),
"payload" JSON NOT NULL,
"documents_count" INT NOT NULL DEFAULT 0,
"secondary_bonds_count" INT NOT NULL DEFAULT 0,
"alerts_count" INT NOT NULL DEFAULT 0,
"source" VARCHAR(80) NOT NULL DEFAULT 'api',
"ingested_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
"""
)
async def ensure_stock_news_schema():
"""Backfill stock news tables/columns for older SQLite databases."""
connection = connections.get("default")
if connection.capabilities.dialect != "sqlite":
return
await connection.execute_script(
"""
CREATE TABLE IF NOT EXISTS "stock_news_articles" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"query_type" VARCHAR(30) NOT NULL,
"section" VARCHAR(40) NOT NULL,
"headline" TEXT NOT NULL,
"source_name" VARCHAR(200),
"source_url" VARCHAR(1000),
"canonical_url" VARCHAR(1000),
"published_at" TIMESTAMP,
"summary" TEXT,
"article_excerpt" TEXT,
"image_url" VARCHAR(1000),
"extracted_images" JSON,
"tags" JSON,
"sentiment_label" VARCHAR(20),
"sentiment_score" REAL,
"summary_provider" VARCHAR(60),
"raw_payload" JSON,
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"stock_id" INT NOT NULL REFERENCES "stocks" ("id") ON DELETE CASCADE
);
"""
)
rows = await connection.execute_query_dict(
'PRAGMA table_info("stock_news_articles")'
)
existing_columns = {row["name"] for row in rows}
column_sql = {
"sentiment_label": 'ALTER TABLE "stock_news_articles" ADD COLUMN "sentiment_label" VARCHAR(20)',
"sentiment_score": 'ALTER TABLE "stock_news_articles" ADD COLUMN "sentiment_score" REAL',
}
for column, sql in column_sql.items():
if column not in existing_columns:
await connection.execute_script(sql)