mlstocks / backend /app /api /market.py
github-actions[bot]
Deploy to Hugging Face Space
abf702c
from fastapi import APIRouter, Query, HTTPException, Depends
from app.services.yahoo_finance import yahoo_finance_service
from app.models.market import MarketIndicesResponse, SearchResponse
from app.models.database import User, UserAgenticScan
from app.db.session import get_db
from app.core.auth import get_user_or_demo
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
import json
from typing import Optional, List, Dict, Any
router = APIRouter()
@router.get("/market-indices", response_model=MarketIndicesResponse)
async def get_market_indices():
results = await yahoo_finance_service.get_market_indices()
return {"results": results}
@router.get("/search", response_model=SearchResponse)
async def search_ticker(q: str = Query(..., min_length=1)):
results = await yahoo_finance_service.search_tickers(q)
return {"results": results}
class OptionsAnalysisRequest(BaseModel):
symbol: str
provider: str = "openai"
class AgentLog(BaseModel):
source: str
content: str
avatar: str
class OptionsAnalysisResponse(BaseModel):
logs: List[AgentLog]
result: Optional[Dict[str, Any]] = None
@router.post("/options-analysis", response_model=OptionsAnalysisResponse)
async def analyze_options_strategy(
request: OptionsAnalysisRequest,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_user_or_demo)
):
"""
Runs multi-agent analysis for option strategy recommendations.
Uses the local agentic_analyst module.
"""
print(f"[DEBUG] POST /api/options-analysis for symbol: {request.symbol} (provider: {request.provider})")
try:
# Import required modules from local agentic_analyst
from app.core.model_factory import AutoGenModelFactory
from app.agentic_analyst.team import get_trading_team, extract_json
# Agent icon mapping
AGENT_ICONS = {
"MarketAnalyst": "πŸ“Š",
"SentimentAnalyst": "πŸ“°",
"StrategyAdvisor": "🧠",
"RiskManager": "πŸ›‘οΈ",
"System": "πŸ€–",
"User": "πŸ•΅οΈβ€β™‚οΈ"
}
# Setup model based on provider
if request.provider == "gemini" or request.provider == "google" or request.provider == "openai":
# Revert to 2.0-flash as it was confirmed working previously
model_name = "gemini-2.0-flash"
family = "gemini"
request.provider = "google"
elif request.provider == "groq":
model_name = "llama-3.3-70b-versatile"
family = "groq"
elif request.provider == "ollama":
model_name = "llama3.2:3b"
family = "llama"
else:
# Default to Gemini
model_name = "gemini-2.0-flash"
family = "gemini"
request.provider = "google"
print(f"[DEBUG] Using provider: {request.provider}, model: {model_name}")
try:
model_client = AutoGenModelFactory.get_model(
provider=request.provider,
model_name=model_name,
temperature=0.2,
model_info={
"family": family,
"function_calling": True,
}
)
except Exception as e:
print(f"[DEBUG] Failed to initialize model client: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error initializing model: {str(e)}")
# Get trading team
print("[DEBUG] Getting trading team...")
team = get_trading_team(model_client)
# Construct task
task = f"""
Perform a real-time trade analysis for {request.symbol}.
1. MarketAnalyst: detailed technicals.
2. SentimentAnalyst: news sentiment (Top 5 stories).
3. StrategyAdvisor: recommend a spread with >70% confidence.
4. RiskManager: validate. Output JSON with "final_decision" (TRADE/WAIT), "confidence", and "actionable_recommendation".
"""
logs = []
final_output = {}
# Add user request log
logs.append({
"source": "User",
"content": f"Analyze {request.symbol} ({request.provider})",
"avatar": AGENT_ICONS["User"]
})
print(f"[DEBUG] Starting team stream for {request.symbol}...")
print(f"[DEBUG] Task being sent to team: {task[:100]}...")
try:
# Run team and collect messages
async for message in team.run_stream(task=task):
raw_source = getattr(message, 'source', 'System')
source = raw_source
# Normalize source
if source.lower() == 'user':
source = 'User'
# Skip echoing the task prompt
if source == 'User' and "Perform a real-time trade analysis" in getattr(message, 'content', ''):
continue
# Progress tracking for tool calls
message_type = type(message).__name__
# print(f"[DEBUG] Processing message from {source} (Type: {message_type})")
# Check for tool calls specifically to show progress
if "ToolCall" in message_type:
print(f"[DEBUG] {source} is executing tools...")
content = getattr(message, 'content', '')
# Handle non-string content (like tool calls)
if not isinstance(content, str):
if isinstance(content, list):
content = "\n".join([str(c) for c in content])
else:
content = str(content)
if not content:
continue
print(f"[DEBUG] Agent Message -> Source: {source}, Length: {len(content)}")
# Add to logs
avatar_icon = AGENT_ICONS.get(source, "πŸ€–")
logs.append({
"source": source,
"content": content,
"avatar": avatar_icon
})
# Extract final output from RiskManager
if source == "RiskManager":
print("[DEBUG] RiskManager produced content, attempting extraction...")
parsed = extract_json(content)
if parsed:
print(f"[DEBUG] Successfully extracted final decision: {parsed.get('final_decision')}")
final_output = parsed
except Exception as stream_err:
print(f"[ERROR] Stream error during analysis: {stream_err}")
# Log the error but continue to return whatever we have
logs.append({
"source": "System",
"content": f"ERROR: Analysis interrupted. {str(stream_err)}",
"avatar": "⚠️"
})
# Prepare final output logic with fallback
if not final_output and logs:
print("[DEBUG] No structured final output found. Attempting fallback save.")
# Try to find any message that looks like a conclusion or just save as incomplete
final_output = {
"final_decision": "INCOMPLETE",
"confidence": 0.0,
"actionable_recommendation": "Analysis terminated early. Check logs for partial details.",
"entry_price": "N/A",
"risk_warning": "Abrupt termination detected."
}
if final_output:
try:
# Save to database
db_scan = UserAgenticScan(
user_id=user.id,
symbol=request.symbol,
decision=final_output.get("final_decision", "WAIT"),
confidence=float(final_output.get("confidence", 0.0)),
recommendation=final_output.get("actionable_recommendation", "Analysis complete"),
entry_price=str(final_output.get("entry_price", "")),
raw_logs=json.dumps([{"source": l["source"], "content": l["content"]} for l in logs]),
risk_warning=final_output.get("risk_warning", "")
)
db.add(db_scan)
await db.commit()
print(f"[DEBUG] Saved Agentic Scan to DB: ID {db_scan.id}")
except Exception as db_err:
print(f"[ERROR] Failed to save scan to DB: {db_err}")
print(f"[DEBUG] Analysis results ready for {request.symbol}. Logs: {len(logs)}")
return {
"logs": logs,
"result": final_output if final_output else None
}
except ImportError as e:
raise HTTPException(
status_code=500,
detail=f"Market analyst modules not available: {str(e)}"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Analysis failed: {str(e)}"
)
from app.models.database import HistoricalData
from sqlalchemy import select, desc
class FeatureDataPoint(BaseModel):
date: str
close: float
volume: float
indicators: Optional[Dict[str, Any]] = None
class AgenticScanSummary(BaseModel):
id: int
decision: str
confidence: float
recommendation: str
created_at: str
risk_warning: Optional[str] = None
entry_price: Optional[str] = None
class TickerDetailResponse(BaseModel):
symbol: str
price: float
change: float
features: List[FeatureDataPoint]
latest_strategy: Optional[AgenticScanSummary] = None
@router.get("/ticker/{symbol}", response_model=TickerDetailResponse)
async def get_ticker_details(
symbol: str,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_user_or_demo)
):
# 1. Get Live Quote
raw_symbol = symbol.upper()
try:
quotes = await yahoo_finance_service.get_quotes([raw_symbol])
quote = quotes[0] if quotes else None
price_val = 0.0
change_val = 0.0
if quote:
# Handle string formatting like "+1.23", "1,234.56"
p = quote.price.replace(',', '')
if p != "N/A": price_val = float(p)
c = quote.change.replace('+', '').replace(',', '')
if c != "N/A": change_val = float(c)
except Exception:
price_val = 0.0
change_val = 0.0
# 2. Get Historical Data (All records for full trend)
# Note: We need to import HistoricalData from database models first
hist_result = await db.execute(
select(HistoricalData)
.where(HistoricalData.symbol == raw_symbol)
.order_by(desc(HistoricalData.date))
# .limit(30) removed to show all history
)
hist_rows = hist_result.scalars().all()
features = []
for row in hist_rows:
indicators = {}
if row.indicators:
try:
indicators = json.loads(row.indicators)
except:
pass
features.append({
"date": row.date.strftime("%Y-%m-%d"),
"close": row.close,
"volume": row.volume,
"indicators": indicators
})
features.reverse() # Oldest to Newest for charting
# 3. Get Latest Strategy
scan_result = await db.execute(
select(UserAgenticScan)
.where(UserAgenticScan.user_id == user.id)
.where(UserAgenticScan.symbol == raw_symbol)
.order_by(desc(UserAgenticScan.created_at))
.limit(1)
)
latest_scan = scan_result.scalar_one_or_none()
scan_summary = None
if latest_scan:
scan_summary = {
"id": latest_scan.id,
"decision": latest_scan.decision,
"confidence": latest_scan.confidence,
"recommendation": latest_scan.recommendation,
"created_at": latest_scan.created_at.isoformat(),
"risk_warning": latest_scan.risk_warning,
"entry_price": latest_scan.entry_price
}
return {
"symbol": raw_symbol,
"price": price_val,
"change": change_val,
"features": features,
"latest_strategy": scan_summary
}