1proxy / app /routers /admin.py
paijo77's picture
update app/routers/admin.py
396dcd5 verified
from fastapi import APIRouter, Depends, BackgroundTasks, HTTPException, Query, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.database import get_db
from app.db_models import Proxy, User, CandidateSource, ProxySource
from app.models.candidate import CandidateResponse
from app.dependencies import require_admin
from app.hunter.service import HunterService
from app.db_storage import db_storage
from typing import List, Optional
from pydantic import BaseModel
# All admin endpoints require admin role
router = APIRouter(
prefix="/api/v1/admin", tags=["admin"], dependencies=[Depends(require_admin)]
)
# Access limiter from app state
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
class UserUpdateRole(BaseModel):
role: str
class UserResponse(BaseModel):
id: int
email: str
username: str
role: str
created_at: Optional[str]
class Config:
from_attributes = True
@router.get("/users", response_model=dict)
@limiter.limit("30/minute")
async def list_users(
request: Request,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
session: AsyncSession = Depends(get_db),
):
users, total = await db_storage.get_users(session, limit=limit, offset=offset)
return {
"total": total,
"count": len(users),
"offset": offset,
"limit": limit,
"users": [
{
"id": u.id,
"email": u.email,
"username": u.username,
"role": u.role,
"created_at": u.created_at.isoformat() if u.created_at else None,
}
for u in users
],
}
@router.get("/users/{user_id}", response_model=UserResponse)
@limiter.limit("60/minute")
async def get_user_details(
request: Request, user_id: int, session: AsyncSession = Depends(get_db)
):
user = await db_storage.get_user_by_id(session, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.put("/users/{user_id}/role", response_model=UserResponse)
@limiter.limit("10/minute")
async def update_user_role(
request: Request,
user_id: int,
role_data: UserUpdateRole,
session: AsyncSession = Depends(get_db),
):
if role_data.role not in ["user", "admin"]:
raise HTTPException(status_code=400, detail="Invalid role")
user = await db_storage.update_user_role(session, user_id, role_data.role)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.delete("/users/{user_id}")
@limiter.limit("5/minute")
async def delete_user(
request: Request, user_id: int, session: AsyncSession = Depends(get_db)
):
# Prevent self-deletion if current user is the target
# This would require current_user from dependency, but we'll stick to basic admin check for now
success = await db_storage.delete_user(session, user_id)
if not success:
raise HTTPException(status_code=404, detail="User not found")
return {"message": "User deleted successfully"}
@router.post("/hunter/trigger")
@limiter.limit("5/minute")
async def trigger_hunt(request: Request, background_tasks: BackgroundTasks):
"""
Manually trigger the Hunter Protocol to find new proxy sources.
"""
service = HunterService()
background_tasks.add_task(service.run_hunt)
return {"status": "Hunter Protocol initiated", "message": "Check logs for progress"}
@router.get("/candidates", response_model=List[CandidateResponse])
@limiter.limit("30/minute")
async def list_candidates(
request: Request,
status: str = "pending",
limit: int = 50,
offset: int = 0,
session: AsyncSession = Depends(get_db),
):
"""
List discovered candidate sources.
"""
stmt = (
select(CandidateSource)
.where(CandidateSource.status == status)
.order_by(CandidateSource.confidence_score.desc())
.limit(limit)
.offset(offset)
)
result = await session.execute(stmt)
return result.scalars().all()
@router.post("/candidates/{id}/approve")
@limiter.limit("10/minute")
async def approve_candidate(
request: Request, id: int, session: AsyncSession = Depends(get_db)
):
"""
Approve a candidate source and promote it to a real ProxySource.
"""
# Get candidate
stmt = select(CandidateSource).where(CandidateSource.id == id)
result = await session.execute(stmt)
candidate = result.scalar_one_or_none()
if not candidate:
raise HTTPException(status_code=404, detail="Candidate not found")
if candidate.status == "approved":
raise HTTPException(status_code=400, detail="Candidate already approved")
# Check if URL already exists in sources (double check)
stmt_source = select(ProxySource).where(ProxySource.url == candidate.url)
result_source = await session.execute(stmt_source)
if result_source.scalar_one_or_none():
# Just mark as approved/duplicate
candidate.status = "approved"
await session.commit()
return {"status": "merged", "message": "Source already existed"}
# Create new ProxySource
# We assume it's a public list found on the web
new_source = ProxySource(
url=candidate.url,
type="public", # or "url" depending on your convention
name=f"Hunter: {candidate.domain}",
description=f"Auto-discovered via {candidate.discovery_method}",
enabled=True,
is_admin_source=True,
)
session.add(new_source)
# Update candidate status
candidate.status = "approved"
await session.commit()
return {"status": "approved", "source_id": new_source.id}
@router.get("/validation-stats")
@limiter.limit("60/minute")
async def get_validation_stats(
request: Request, session: AsyncSession = Depends(get_db)
):
result = await session.execute(
select(
Proxy.validation_status,
func.count(Proxy.id).label("count"),
func.avg(Proxy.quality_score).label("avg_quality"),
func.avg(Proxy.latency_ms).label("avg_latency"),
).group_by(Proxy.validation_status)
)
stats_by_status = {}
for row in result.all():
stats_by_status[row.validation_status] = {
"count": row.count,
"avg_quality": round(row.avg_quality, 2) if row.avg_quality else None,
"avg_latency": round(row.avg_latency, 2) if row.avg_latency else None,
}
total_result = await session.execute(select(func.count()).select_from(Proxy))
total = total_result.scalar()
validated_count = stats_by_status.get("validated", {}).get("count", 0)
pending_count = stats_by_status.get("pending", {}).get("count", 0)
failed_count = stats_by_status.get("failed", {}).get("count", 0)
validation_rate = round((validated_count / total) * 100, 2) if total > 0 else 0
return {
"total_proxies": total,
"by_status": stats_by_status,
"summary": {
"validated": validated_count,
"pending": pending_count,
"failed": failed_count,
"validation_rate_percent": validation_rate,
},
}
@router.get("/quality-distribution")
@limiter.limit("60/minute")
async def get_quality_distribution(
request: Request, session: AsyncSession = Depends(get_db)
):
result = await session.execute(
select(Proxy.quality_score, func.count(Proxy.id).label("count"))
.where(Proxy.validation_status == "validated")
.group_by(Proxy.quality_score)
.order_by(Proxy.quality_score.desc())
)
distribution = {
"excellent": 0,
"good": 0,
"fair": 0,
"poor": 0,
}
for row in result.all():
if row.quality_score:
if row.quality_score >= 80:
distribution["excellent"] += row.count
elif row.quality_score >= 60:
distribution["good"] += row.count
elif row.quality_score >= 40:
distribution["fair"] += row.count
else:
distribution["poor"] += row.count
return distribution
@router.get("/recent-validations")
@limiter.limit("60/minute")
async def get_recent_validations(
request: Request, limit: int = 20, session: AsyncSession = Depends(get_db)
):
result = await session.execute(
select(Proxy)
.where(Proxy.last_validated.isnot(None))
.order_by(Proxy.last_validated.desc())
.limit(limit)
)
proxies = result.scalars().all()
return {
"recent_validations": [
{
"url": p.url,
"validation_status": p.validation_status,
"quality_score": p.quality_score,
"latency_ms": p.latency_ms,
"country_code": p.country_code,
"anonymity": p.anonymity,
"last_validated": p.last_validated.isoformat()
if p.last_validated
else None,
}
for p in proxies
]
}