1proxy / app /routers /sources.py
paijo77's picture
update app/routers/sources.py
0711d6f verified
from fastapi import APIRouter, Depends, HTTPException, status, Request, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_
from typing import List, Optional
from pydantic import BaseModel, HttpUrl
from app.database import get_db
from app.dependencies import get_current_user, require_user, require_admin
from app.db_models import User, ProxySource
from app.models import SourceType
from app.source_validator import source_validator, SourceValidationResult
from app.models import SourceConfig
router = APIRouter(prefix="/api/v1", tags=["sources"])
# Access limiter from app state via request
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
class SourceCreate(BaseModel):
url: HttpUrl
type: SourceType
name: Optional[str] = None
description: Optional[str] = None
is_paid: bool = False
class SourceUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
enabled: Optional[bool] = None
is_paid: Optional[bool] = None
class SourceResponse(BaseModel):
id: int
url: str
type: str
name: Optional[str]
description: Optional[str]
is_paid: bool
enabled: bool
validated: bool
validation_error: Optional[str]
total_scraped: int
success_rate: float
is_admin_source: bool
is_owner: bool = False
class Config:
from_attributes = True
class UserStats(BaseModel):
total_sources: int
active_sources: int
total_proxies_contributed: int
avg_success_rate: float
@router.get("/my-stats", response_model=UserStats)
@limiter.limit("60/minute")
async def get_my_stats(
request: Request,
current_user: User = Depends(require_user),
session: AsyncSession = Depends(get_db),
):
result = await session.execute(
select(ProxySource).where(ProxySource.user_id == current_user.id)
)
sources = result.scalars().all()
total_sources = len(sources)
active_sources = sum(1 for s in sources if s.enabled)
total_proxies_contributed = sum(s.total_scraped for s in sources)
avg_success_rate = 0.0
if total_sources > 0:
avg_success_rate = sum(s.success_rate for s in sources) / total_sources
return UserStats(
total_sources=total_sources,
active_sources=active_sources,
total_proxies_contributed=total_proxies_contributed,
avg_success_rate=avg_success_rate,
)
@router.get("/my-sources", response_model=List[SourceResponse])
@limiter.limit("60/minute")
async def get_my_sources(
request: Request,
current_user: User = Depends(require_user),
session: AsyncSession = Depends(get_db),
):
result = await session.execute(
select(ProxySource).where(ProxySource.user_id == current_user.id)
)
sources = result.scalars().all()
return [
SourceResponse(**{**source.__dict__, "is_owner": True}) for source in sources
]
@router.post("/my-sources", response_model=dict, status_code=status.HTTP_201_CREATED)
@limiter.limit("10/hour")
async def create_source(
request: Request,
source_data: SourceCreate,
current_user: User = Depends(require_user),
session: AsyncSession = Depends(get_db),
):
result = await session.execute(
select(ProxySource).where(ProxySource.url == str(source_data.url))
)
existing = result.scalar_one_or_none()
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="This source URL already exists in the database",
)
source_config = SourceConfig(
url=source_data.url, type=source_data.type, enabled=True
)
validation_result: SourceValidationResult = await source_validator.validate_source(
source_config
)
if not validation_result.valid:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "Source validation failed",
"reason": validation_result.error_message,
},
)
new_source = ProxySource(
user_id=current_user.id,
url=str(source_data.url),
type=source_data.type.value,
name=source_data.name or str(source_data.url).split("/")[-1],
description=source_data.description,
is_paid=source_data.is_paid,
enabled=True,
validated=True,
is_admin_source=False,
)
session.add(new_source)
await session.commit()
await session.refresh(new_source)
return {
"message": "Source created successfully",
"source_id": new_source.id,
"validation": {
"proxy_count": validation_result.proxy_count,
"sample_proxies": validation_result.sample_proxies,
},
}
@router.put("/my-sources/{source_id}", response_model=SourceResponse)
@limiter.limit("30/minute")
async def update_source(
request: Request,
source_id: int,
update_data: SourceUpdate,
current_user: User = Depends(require_user),
session: AsyncSession = Depends(get_db),
):
result = await session.execute(
select(ProxySource).where(
and_(ProxySource.id == source_id, ProxySource.user_id == current_user.id)
)
)
source = result.scalar_one_or_none()
if not source:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Source not found or you don't have permission to edit it",
)
if source.is_admin_source and current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot edit admin-protected sources",
)
if update_data.name is not None:
source.name = update_data.name
if update_data.description is not None:
source.description = update_data.description
if update_data.enabled is not None:
source.enabled = update_data.enabled
if update_data.is_paid is not None:
source.is_paid = update_data.is_paid
await session.commit()
await session.refresh(source)
return SourceResponse(**{**source.__dict__, "is_owner": True})
@router.delete("/my-sources/{source_id}", status_code=status.HTTP_204_NO_CONTENT)
@limiter.limit("30/minute")
async def delete_source(
request: Request,
source_id: int,
current_user: User = Depends(require_user),
session: AsyncSession = Depends(get_db),
):
result = await session.execute(
select(ProxySource).where(
and_(ProxySource.id == source_id, ProxySource.user_id == current_user.id)
)
)
source = result.scalar_one_or_none()
if not source:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Source not found or you don't have permission to delete it",
)
if source.is_admin_source:
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can delete admin-protected sources",
)
await session.delete(source)
await session.commit()
return None
@router.get("/admin/sources", response_model=dict)
@limiter.limit("30/minute")
async def admin_get_all_sources(
request: Request,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
current_user: User = Depends(require_admin),
session: AsyncSession = Depends(get_db),
):
from sqlalchemy import func
total_result = await session.execute(select(func.count()).select_from(ProxySource))
total = total_result.scalar() or 0
result = await session.execute(
select(ProxySource)
.limit(limit)
.offset(offset)
.order_by(ProxySource.created_at.desc())
)
sources = result.scalars().all()
return {
"total": total,
"count": len(sources),
"offset": offset,
"limit": limit,
"sources": [
SourceResponse(
**{**source.__dict__, "is_owner": source.user_id == current_user.id}
)
for source in sources
],
}
@router.post("/admin/sources/{source_id}/protect", response_model=SourceResponse)
async def admin_protect_source(
source_id: int,
current_user: User = Depends(require_admin),
session: AsyncSession = Depends(get_db),
):
result = await session.execute(
select(ProxySource).where(ProxySource.id == source_id)
)
source = result.scalar_one_or_none()
if not source:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Source not found"
)
source.is_admin_source = True
await session.commit()
await session.refresh(source)
return SourceResponse(
**{**source.__dict__, "is_owner": source.user_id == current_user.id}
)