Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, Depends, status | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from pydantic import BaseModel, field_validator | |
| from typing import List, Optional | |
| from datetime import date, datetime | |
| import yfinance as yf | |
| import requests | |
| import uvicorn | |
| import os | |
| from dotenv import load_dotenv | |
| from supabase import create_client, Client | |
| load_dotenv() | |
| app = FastAPI( | |
| title="Investment Portfolio Tracker API", | |
| description="Backend API for managing investment portfolios with Supabase", | |
| version="2.0.0" | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Supabase Configuration | |
| supabase_url = os.getenv("SUPABASE_URL") | |
| supabase_key = os.getenv("SUPABASE_KEY") | |
| if not supabase_url or not supabase_key: | |
| raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set in environment variables") | |
| supabase: Client = create_client(supabase_url, supabase_key) | |
| # Security | |
| security = HTTPBearer() | |
| # Auth Models | |
| class UserSignUp(BaseModel): | |
| email: str | |
| password: str | |
| class UserSignIn(BaseModel): | |
| email: str | |
| password: str | |
| class AuthResponse(BaseModel): | |
| access_token: str | |
| refresh_token: str | |
| user_id: str | |
| email: str | |
| # Investment Models | |
| class InvestmentCreate(BaseModel): | |
| asset_type: str | |
| ticker: str | |
| quantity: float | |
| buy_price: float | |
| buy_date: str | |
| def validate_asset_type(cls, v): | |
| if v not in ['Stock', 'Crypto']: | |
| raise ValueError('Asset type must be Stock or Crypto') | |
| return v | |
| def validate_quantity(cls, v): | |
| if v <= 0: | |
| raise ValueError('Quantity must be greater than 0') | |
| return v | |
| def validate_buy_price(cls, v): | |
| if v <= 0: | |
| raise ValueError('Buy price must be greater than 0') | |
| return v | |
| def validate_ticker(cls, v): | |
| if not v or not v.strip(): | |
| raise ValueError('Ticker cannot be empty') | |
| return v.upper().strip() | |
| class InvestmentUpdate(BaseModel): | |
| quantity: Optional[float] = None | |
| buy_price: Optional[float] = None | |
| def validate_quantity(cls, v): | |
| if v is not None and v <= 0: | |
| raise ValueError('Quantity must be greater than 0') | |
| return v | |
| def validate_buy_price(cls, v): | |
| if v is not None and v <= 0: | |
| raise ValueError('Buy price must be greater than 0') | |
| return v | |
| class Investment(BaseModel): | |
| id: int | |
| asset_type: str | |
| ticker: str | |
| quantity: float | |
| buy_price: float | |
| buy_date: str | |
| current_price: Optional[float] = None | |
| gain_loss: Optional[float] = None | |
| gain_loss_pct: Optional[float] = None | |
| class InvestmentResponse(BaseModel): | |
| id: int | |
| message: str | |
| class PortfolioSummary(BaseModel): | |
| total_invested: float | |
| current_value: float | |
| gain_loss: float | |
| gain_loss_pct: float | |
| class PriceResponse(BaseModel): | |
| ticker: str | |
| price: Optional[float] | |
| timestamp: str | |
| # Authentication Helper Functions | |
| async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| """Extract user from JWT token""" | |
| try: | |
| token = credentials.credentials | |
| # Verify JWT token with Supabase | |
| user = supabase.auth.get_user(token) | |
| if user.user is None: | |
| raise HTTPException(status_code=401, detail="Invalid authentication token") | |
| return user.user, token | |
| except Exception as e: | |
| raise HTTPException(status_code=401, detail="Invalid authentication token") | |
| def get_user_supabase_client(token: str): | |
| """Create a Supabase client with user's JWT token for RLS""" | |
| try: | |
| # Create a new client instance with the user token | |
| client = create_client(supabase_url, supabase_key) | |
| # Set the JWT token for authenticated requests | |
| client.postgrest.auth(token) | |
| client.auth.set_session_from_url(f"#access_token={token}&token_type=bearer") | |
| return client | |
| except Exception as e: | |
| print(f"Error creating user client: {e}") | |
| # Fallback: use service client with manual user filtering | |
| return supabase | |
| def get_stock_price(ticker: str) -> Optional[float]: | |
| try: | |
| stock = yf.Ticker(f"{ticker}.NS") | |
| hist = stock.history(period="1d") | |
| if not hist.empty: | |
| return float(hist['Close'].iloc[-1]) | |
| else: | |
| stock = yf.Ticker(ticker) | |
| hist = stock.history(period="1d") | |
| if not hist.empty: | |
| return float(hist['Close'].iloc[-1]) | |
| return None | |
| except Exception as e: | |
| return None | |
| def get_crypto_price(symbol: str) -> Optional[float]: | |
| try: | |
| url = f"https://api.coingecko.com/api/v3/simple/price?ids={symbol.lower()}&vs_currencies=usd" | |
| response = requests.get(url, timeout=5) | |
| data = response.json() | |
| return float(data[symbol.lower()]['usd']) | |
| except Exception as e: | |
| try: | |
| crypto_map = { | |
| 'BTC': 'bitcoin', | |
| 'ETH': 'ethereum', | |
| 'ADA': 'cardano', | |
| 'DOT': 'polkadot', | |
| 'LINK': 'chainlink', | |
| 'LTC': 'litecoin', | |
| 'BCH': 'bitcoin-cash', | |
| 'XRP': 'ripple' | |
| } | |
| coin_id = crypto_map.get(symbol.upper(), symbol.lower()) | |
| url = f"https://api.coingecko.com/api/v3/simple/price?ids={coin_id}&vs_currencies=usd" | |
| response = requests.get(url, timeout=5) | |
| data = response.json() | |
| return float(data[coin_id]['usd']) | |
| except Exception as e: | |
| return None | |
| def calculate_investment_metrics(investment: dict) -> dict: | |
| current_price = None | |
| if investment['asset_type'] == 'Stock': | |
| current_price = get_stock_price(investment['ticker']) | |
| else: | |
| current_price = get_crypto_price(investment['ticker']) | |
| investment['current_price'] = current_price | |
| if current_price is not None: | |
| invested_value = investment['quantity'] * investment['buy_price'] | |
| current_value = investment['quantity'] * current_price | |
| gain_loss = current_value - invested_value | |
| gain_loss_pct = (gain_loss / invested_value * 100) if invested_value > 0 else 0 | |
| investment['gain_loss'] = round(gain_loss, 2) | |
| investment['gain_loss_pct'] = round(gain_loss_pct, 2) | |
| else: | |
| investment['gain_loss'] = None | |
| investment['gain_loss_pct'] = None | |
| return investment | |
| async def root(): | |
| return {"message": "Investment Portfolio Tracker API", "version": "2.0.0"} | |
| # Authentication Endpoints | |
| async def sign_up(user_data: UserSignUp): | |
| try: | |
| response = supabase.auth.sign_up({ | |
| "email": user_data.email, | |
| "password": user_data.password | |
| }) | |
| if response.user is None: | |
| raise HTTPException(status_code=400, detail="Failed to create user") | |
| return AuthResponse( | |
| access_token=response.session.access_token, | |
| refresh_token=response.session.refresh_token, | |
| user_id=response.user.id, | |
| email=response.user.email | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Sign up failed: {str(e)}") | |
| async def sign_in(user_data: UserSignIn): | |
| try: | |
| response = supabase.auth.sign_in_with_password({ | |
| "email": user_data.email, | |
| "password": user_data.password | |
| }) | |
| if response.user is None: | |
| raise HTTPException(status_code=401, detail="Invalid credentials") | |
| return AuthResponse( | |
| access_token=response.session.access_token, | |
| refresh_token=response.session.refresh_token, | |
| user_id=response.user.id, | |
| email=response.user.email | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=401, detail=f"Sign in failed: {str(e)}") | |
| async def sign_out(current_user_data=Depends(get_current_user)): | |
| try: | |
| supabase.auth.sign_out() | |
| return {"message": "Successfully signed out"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Sign out failed: {str(e)}") | |
| async def add_investment(investment: InvestmentCreate, current_user_data=Depends(get_current_user)): | |
| try: | |
| current_user, token = current_user_data | |
| # Insert investment into Supabase database using service client with explicit user_id | |
| # This bypasses RLS for insert but ensures proper user association for reads | |
| result = supabase.table("investments").insert({ | |
| "user_id": current_user.id, | |
| "asset_type": investment.asset_type, | |
| "ticker": investment.ticker, | |
| "quantity": investment.quantity, | |
| "buy_price": investment.buy_price, | |
| "buy_date": investment.buy_date | |
| }).execute() | |
| if not result.data: | |
| raise HTTPException(status_code=500, detail="Failed to create investment") | |
| return InvestmentResponse( | |
| id=result.data[0]["id"], | |
| message="Investment added successfully" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to add investment: {str(e)}") | |
| async def get_investments(current_user_data=Depends(get_current_user)): | |
| try: | |
| current_user, token = current_user_data | |
| # Fetch user's investments from Supabase with explicit user filtering | |
| result = supabase.table("investments").select("*").eq("user_id", current_user.id).execute() | |
| investments_with_prices = [] | |
| for investment in result.data: | |
| investment_with_metrics = calculate_investment_metrics(investment) | |
| investments_with_prices.append(investment_with_metrics) | |
| return investments_with_prices | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to fetch investments: {str(e)}") | |
| async def get_investment(investment_id: int, current_user_data=Depends(get_current_user)): | |
| try: | |
| current_user, token = current_user_data | |
| result = supabase.table("investments").select("*").eq("id", investment_id).eq("user_id", current_user.id).execute() | |
| if not result.data: | |
| raise HTTPException(status_code=404, detail="Investment not found") | |
| investment = result.data[0] | |
| investment_with_metrics = calculate_investment_metrics(investment) | |
| return investment_with_metrics | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to fetch investment: {str(e)}") | |
| async def update_investment(investment_id: int, update_data: InvestmentUpdate, current_user_data=Depends(get_current_user)): | |
| try: | |
| current_user, token = current_user_data | |
| # Check if investment exists and belongs to user | |
| check_result = supabase.table("investments").select("id").eq("id", investment_id).eq("user_id", current_user.id).execute() | |
| if not check_result.data: | |
| raise HTTPException(status_code=404, detail="Investment not found") | |
| # Prepare update data | |
| update_dict = {} | |
| if update_data.quantity is not None: | |
| update_dict["quantity"] = update_data.quantity | |
| if update_data.buy_price is not None: | |
| update_dict["buy_price"] = update_data.buy_price | |
| if not update_dict: | |
| raise HTTPException(status_code=400, detail="No valid fields to update") | |
| # Update investment using service client with user verification | |
| result = supabase.table("investments").update(update_dict).eq("id", investment_id).eq("user_id", current_user.id).execute() | |
| return InvestmentResponse( | |
| id=investment_id, | |
| message="Investment updated successfully" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to update investment: {str(e)}") | |
| async def delete_investment(investment_id: int, current_user_data=Depends(get_current_user)): | |
| try: | |
| current_user, token = current_user_data | |
| # Check if investment exists and belongs to user | |
| check_result = supabase.table("investments").select("id").eq("id", investment_id).eq("user_id", current_user.id).execute() | |
| if not check_result.data: | |
| raise HTTPException(status_code=404, detail="Investment not found") | |
| # Delete investment using service client with user verification | |
| result = supabase.table("investments").delete().eq("id", investment_id).eq("user_id", current_user.id).execute() | |
| return InvestmentResponse( | |
| id=investment_id, | |
| message="Investment deleted successfully" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to delete investment: {str(e)}") | |
| async def get_portfolio_summary(current_user_data=Depends(get_current_user)): | |
| try: | |
| current_user, token = current_user_data | |
| # Fetch user's investments from Supabase with explicit user filtering | |
| result = supabase.table("investments").select("*").eq("user_id", current_user.id).execute() | |
| if not result.data: | |
| return PortfolioSummary( | |
| total_invested=0.0, | |
| current_value=0.0, | |
| gain_loss=0.0, | |
| gain_loss_pct=0.0 | |
| ) | |
| total_invested = 0.0 | |
| total_current = 0.0 | |
| for investment in result.data: | |
| invested_value = investment["quantity"] * investment["buy_price"] | |
| total_invested += invested_value | |
| current_price = None | |
| if investment['asset_type'] == 'Stock': | |
| current_price = get_stock_price(investment['ticker']) | |
| else: | |
| current_price = get_crypto_price(investment['ticker']) | |
| if current_price is not None: | |
| current_value = investment["quantity"] * current_price | |
| total_current += current_value | |
| gain_loss = total_current - total_invested | |
| gain_loss_pct = (gain_loss / total_invested * 100) if total_invested > 0 else 0 | |
| return PortfolioSummary( | |
| total_invested=round(total_invested, 2), | |
| current_value=round(total_current, 2), | |
| gain_loss=round(gain_loss, 2), | |
| gain_loss_pct=round(gain_loss_pct, 2) | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to calculate portfolio summary: {str(e)}") | |
| async def get_stock_price_endpoint(ticker: str): | |
| try: | |
| price = get_stock_price(ticker.upper()) | |
| return PriceResponse( | |
| ticker=ticker.upper(), | |
| price=price, | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to fetch stock price: {str(e)}") | |
| async def get_crypto_price_endpoint(symbol: str): | |
| try: | |
| price = get_crypto_price(symbol.upper()) | |
| return PriceResponse( | |
| ticker=symbol.upper(), | |
| price=price, | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to fetch crypto price: {str(e)}") | |
| async def health_check(): | |
| return {"status": "healthy", "timestamp": datetime.now().isoformat()} | |
| import gradio as gr | |
| # Create a simple Gradio interface that shows API info | |
| def api_info(): | |
| return """ | |
| # Investment Portfolio Tracker API | |
| Your FastAPI backend is running! | |
| ## API Documentation | |
| Access the interactive API docs at: `/docs` | |
| ## Health Check | |
| Check API status at: `/health` | |
| ## Available Endpoints: | |
| - **Authentication**: /auth/signup, /auth/signin, /auth/signout | |
| - **Investments**: /investments/ (GET, POST, PUT, DELETE) | |
| - **Analytics**: /summary/ | |
| - **Market Data**: /price/stock/{ticker}, /price/crypto/{symbol} | |
| """ | |
| # Create Gradio interface | |
| demo = gr.Interface( | |
| fn=api_info, | |
| inputs=[], | |
| outputs=gr.Markdown(), | |
| title="Investment Portfolio Tracker API", | |
| description="FastAPI Backend Service - Access API docs at /docs endpoint" | |
| ) | |
| # Mount the Gradio app | |
| app = gr.mount_gradio_app(app, demo, path="/gradio") | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |