app / app.py
Ritesh1035's picture
Update app.py
9054418 verified
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, field_validator
from typing import List, Optional
from datetime import date, datetime
import yfinance as yf
import requests
import uvicorn
import os
from dotenv import load_dotenv
from supabase import create_client, Client
load_dotenv()
app = FastAPI(
title="Investment Portfolio Tracker API",
description="Backend API for managing investment portfolios with Supabase",
version="2.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Supabase Configuration
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_KEY")
if not supabase_url or not supabase_key:
raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set in environment variables")
supabase: Client = create_client(supabase_url, supabase_key)
# Security
security = HTTPBearer()
# Auth Models
class UserSignUp(BaseModel):
email: str
password: str
class UserSignIn(BaseModel):
email: str
password: str
class AuthResponse(BaseModel):
access_token: str
refresh_token: str
user_id: str
email: str
# Investment Models
class InvestmentCreate(BaseModel):
asset_type: str
ticker: str
quantity: float
buy_price: float
buy_date: str
@field_validator('asset_type')
@classmethod
def validate_asset_type(cls, v):
if v not in ['Stock', 'Crypto']:
raise ValueError('Asset type must be Stock or Crypto')
return v
@field_validator('quantity')
@classmethod
def validate_quantity(cls, v):
if v <= 0:
raise ValueError('Quantity must be greater than 0')
return v
@field_validator('buy_price')
@classmethod
def validate_buy_price(cls, v):
if v <= 0:
raise ValueError('Buy price must be greater than 0')
return v
@field_validator('ticker')
@classmethod
def validate_ticker(cls, v):
if not v or not v.strip():
raise ValueError('Ticker cannot be empty')
return v.upper().strip()
class InvestmentUpdate(BaseModel):
quantity: Optional[float] = None
buy_price: Optional[float] = None
@field_validator('quantity')
@classmethod
def validate_quantity(cls, v):
if v is not None and v <= 0:
raise ValueError('Quantity must be greater than 0')
return v
@field_validator('buy_price')
@classmethod
def validate_buy_price(cls, v):
if v is not None and v <= 0:
raise ValueError('Buy price must be greater than 0')
return v
class Investment(BaseModel):
id: int
asset_type: str
ticker: str
quantity: float
buy_price: float
buy_date: str
current_price: Optional[float] = None
gain_loss: Optional[float] = None
gain_loss_pct: Optional[float] = None
class InvestmentResponse(BaseModel):
id: int
message: str
class PortfolioSummary(BaseModel):
total_invested: float
current_value: float
gain_loss: float
gain_loss_pct: float
class PriceResponse(BaseModel):
ticker: str
price: Optional[float]
timestamp: str
# Authentication Helper Functions
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Extract user from JWT token"""
try:
token = credentials.credentials
# Verify JWT token with Supabase
user = supabase.auth.get_user(token)
if user.user is None:
raise HTTPException(status_code=401, detail="Invalid authentication token")
return user.user, token
except Exception as e:
raise HTTPException(status_code=401, detail="Invalid authentication token")
def get_user_supabase_client(token: str):
"""Create a Supabase client with user's JWT token for RLS"""
try:
# Create a new client instance with the user token
client = create_client(supabase_url, supabase_key)
# Set the JWT token for authenticated requests
client.postgrest.auth(token)
client.auth.set_session_from_url(f"#access_token={token}&token_type=bearer")
return client
except Exception as e:
print(f"Error creating user client: {e}")
# Fallback: use service client with manual user filtering
return supabase
def get_stock_price(ticker: str) -> Optional[float]:
try:
stock = yf.Ticker(f"{ticker}.NS")
hist = stock.history(period="1d")
if not hist.empty:
return float(hist['Close'].iloc[-1])
else:
stock = yf.Ticker(ticker)
hist = stock.history(period="1d")
if not hist.empty:
return float(hist['Close'].iloc[-1])
return None
except Exception as e:
return None
def get_crypto_price(symbol: str) -> Optional[float]:
try:
url = f"https://api.coingecko.com/api/v3/simple/price?ids={symbol.lower()}&vs_currencies=usd"
response = requests.get(url, timeout=5)
data = response.json()
return float(data[symbol.lower()]['usd'])
except Exception as e:
try:
crypto_map = {
'BTC': 'bitcoin',
'ETH': 'ethereum',
'ADA': 'cardano',
'DOT': 'polkadot',
'LINK': 'chainlink',
'LTC': 'litecoin',
'BCH': 'bitcoin-cash',
'XRP': 'ripple'
}
coin_id = crypto_map.get(symbol.upper(), symbol.lower())
url = f"https://api.coingecko.com/api/v3/simple/price?ids={coin_id}&vs_currencies=usd"
response = requests.get(url, timeout=5)
data = response.json()
return float(data[coin_id]['usd'])
except Exception as e:
return None
def calculate_investment_metrics(investment: dict) -> dict:
current_price = None
if investment['asset_type'] == 'Stock':
current_price = get_stock_price(investment['ticker'])
else:
current_price = get_crypto_price(investment['ticker'])
investment['current_price'] = current_price
if current_price is not None:
invested_value = investment['quantity'] * investment['buy_price']
current_value = investment['quantity'] * current_price
gain_loss = current_value - invested_value
gain_loss_pct = (gain_loss / invested_value * 100) if invested_value > 0 else 0
investment['gain_loss'] = round(gain_loss, 2)
investment['gain_loss_pct'] = round(gain_loss_pct, 2)
else:
investment['gain_loss'] = None
investment['gain_loss_pct'] = None
return investment
@app.get("/")
async def root():
return {"message": "Investment Portfolio Tracker API", "version": "2.0.0"}
# Authentication Endpoints
@app.post("/auth/signup", response_model=AuthResponse)
async def sign_up(user_data: UserSignUp):
try:
response = supabase.auth.sign_up({
"email": user_data.email,
"password": user_data.password
})
if response.user is None:
raise HTTPException(status_code=400, detail="Failed to create user")
return AuthResponse(
access_token=response.session.access_token,
refresh_token=response.session.refresh_token,
user_id=response.user.id,
email=response.user.email
)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Sign up failed: {str(e)}")
@app.post("/auth/signin", response_model=AuthResponse)
async def sign_in(user_data: UserSignIn):
try:
response = supabase.auth.sign_in_with_password({
"email": user_data.email,
"password": user_data.password
})
if response.user is None:
raise HTTPException(status_code=401, detail="Invalid credentials")
return AuthResponse(
access_token=response.session.access_token,
refresh_token=response.session.refresh_token,
user_id=response.user.id,
email=response.user.email
)
except Exception as e:
raise HTTPException(status_code=401, detail=f"Sign in failed: {str(e)}")
@app.post("/auth/signout")
async def sign_out(current_user_data=Depends(get_current_user)):
try:
supabase.auth.sign_out()
return {"message": "Successfully signed out"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Sign out failed: {str(e)}")
@app.post("/investments/", response_model=InvestmentResponse)
async def add_investment(investment: InvestmentCreate, current_user_data=Depends(get_current_user)):
try:
current_user, token = current_user_data
# Insert investment into Supabase database using service client with explicit user_id
# This bypasses RLS for insert but ensures proper user association for reads
result = supabase.table("investments").insert({
"user_id": current_user.id,
"asset_type": investment.asset_type,
"ticker": investment.ticker,
"quantity": investment.quantity,
"buy_price": investment.buy_price,
"buy_date": investment.buy_date
}).execute()
if not result.data:
raise HTTPException(status_code=500, detail="Failed to create investment")
return InvestmentResponse(
id=result.data[0]["id"],
message="Investment added successfully"
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to add investment: {str(e)}")
@app.get("/investments/", response_model=List[Investment])
async def get_investments(current_user_data=Depends(get_current_user)):
try:
current_user, token = current_user_data
# Fetch user's investments from Supabase with explicit user filtering
result = supabase.table("investments").select("*").eq("user_id", current_user.id).execute()
investments_with_prices = []
for investment in result.data:
investment_with_metrics = calculate_investment_metrics(investment)
investments_with_prices.append(investment_with_metrics)
return investments_with_prices
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch investments: {str(e)}")
@app.get("/investments/{investment_id}", response_model=Investment)
async def get_investment(investment_id: int, current_user_data=Depends(get_current_user)):
try:
current_user, token = current_user_data
result = supabase.table("investments").select("*").eq("id", investment_id).eq("user_id", current_user.id).execute()
if not result.data:
raise HTTPException(status_code=404, detail="Investment not found")
investment = result.data[0]
investment_with_metrics = calculate_investment_metrics(investment)
return investment_with_metrics
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch investment: {str(e)}")
@app.put("/investments/{investment_id}", response_model=InvestmentResponse)
async def update_investment(investment_id: int, update_data: InvestmentUpdate, current_user_data=Depends(get_current_user)):
try:
current_user, token = current_user_data
# Check if investment exists and belongs to user
check_result = supabase.table("investments").select("id").eq("id", investment_id).eq("user_id", current_user.id).execute()
if not check_result.data:
raise HTTPException(status_code=404, detail="Investment not found")
# Prepare update data
update_dict = {}
if update_data.quantity is not None:
update_dict["quantity"] = update_data.quantity
if update_data.buy_price is not None:
update_dict["buy_price"] = update_data.buy_price
if not update_dict:
raise HTTPException(status_code=400, detail="No valid fields to update")
# Update investment using service client with user verification
result = supabase.table("investments").update(update_dict).eq("id", investment_id).eq("user_id", current_user.id).execute()
return InvestmentResponse(
id=investment_id,
message="Investment updated successfully"
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to update investment: {str(e)}")
@app.delete("/investments/{investment_id}", response_model=InvestmentResponse)
async def delete_investment(investment_id: int, current_user_data=Depends(get_current_user)):
try:
current_user, token = current_user_data
# Check if investment exists and belongs to user
check_result = supabase.table("investments").select("id").eq("id", investment_id).eq("user_id", current_user.id).execute()
if not check_result.data:
raise HTTPException(status_code=404, detail="Investment not found")
# Delete investment using service client with user verification
result = supabase.table("investments").delete().eq("id", investment_id).eq("user_id", current_user.id).execute()
return InvestmentResponse(
id=investment_id,
message="Investment deleted successfully"
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete investment: {str(e)}")
@app.get("/summary/", response_model=PortfolioSummary)
async def get_portfolio_summary(current_user_data=Depends(get_current_user)):
try:
current_user, token = current_user_data
# Fetch user's investments from Supabase with explicit user filtering
result = supabase.table("investments").select("*").eq("user_id", current_user.id).execute()
if not result.data:
return PortfolioSummary(
total_invested=0.0,
current_value=0.0,
gain_loss=0.0,
gain_loss_pct=0.0
)
total_invested = 0.0
total_current = 0.0
for investment in result.data:
invested_value = investment["quantity"] * investment["buy_price"]
total_invested += invested_value
current_price = None
if investment['asset_type'] == 'Stock':
current_price = get_stock_price(investment['ticker'])
else:
current_price = get_crypto_price(investment['ticker'])
if current_price is not None:
current_value = investment["quantity"] * current_price
total_current += current_value
gain_loss = total_current - total_invested
gain_loss_pct = (gain_loss / total_invested * 100) if total_invested > 0 else 0
return PortfolioSummary(
total_invested=round(total_invested, 2),
current_value=round(total_current, 2),
gain_loss=round(gain_loss, 2),
gain_loss_pct=round(gain_loss_pct, 2)
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to calculate portfolio summary: {str(e)}")
@app.get("/price/stock/{ticker}", response_model=PriceResponse)
async def get_stock_price_endpoint(ticker: str):
try:
price = get_stock_price(ticker.upper())
return PriceResponse(
ticker=ticker.upper(),
price=price,
timestamp=datetime.now().isoformat()
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch stock price: {str(e)}")
@app.get("/price/crypto/{symbol}", response_model=PriceResponse)
async def get_crypto_price_endpoint(symbol: str):
try:
price = get_crypto_price(symbol.upper())
return PriceResponse(
ticker=symbol.upper(),
price=price,
timestamp=datetime.now().isoformat()
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch crypto price: {str(e)}")
@app.get("/health")
async def health_check():
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
import gradio as gr
# Create a simple Gradio interface that shows API info
def api_info():
return """
# Investment Portfolio Tracker API
Your FastAPI backend is running!
## API Documentation
Access the interactive API docs at: `/docs`
## Health Check
Check API status at: `/health`
## Available Endpoints:
- **Authentication**: /auth/signup, /auth/signin, /auth/signout
- **Investments**: /investments/ (GET, POST, PUT, DELETE)
- **Analytics**: /summary/
- **Market Data**: /price/stock/{ticker}, /price/crypto/{symbol}
"""
# Create Gradio interface
demo = gr.Interface(
fn=api_info,
inputs=[],
outputs=gr.Markdown(),
title="Investment Portfolio Tracker API",
description="FastAPI Backend Service - Access API docs at /docs endpoint"
)
# Mount the Gradio app
app = gr.mount_gradio_app(app, demo, path="/gradio")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)