| from fastapi import APIRouter, Depends, HTTPException, status, Request, Query
|
| from sqlalchemy.ext.asyncio import AsyncSession
|
| from sqlalchemy import select, and_
|
| from typing import List, Optional
|
| from pydantic import BaseModel, HttpUrl
|
|
|
| from app.database import get_db
|
| from app.dependencies import get_current_user, require_user, require_admin
|
| from app.db_models import User, ProxySource
|
| from app.models import SourceType
|
| from app.source_validator import source_validator, SourceValidationResult
|
| from app.models import SourceConfig
|
|
|
| router = APIRouter(prefix="/api/v1", tags=["sources"])
|
|
|
|
|
| from slowapi import Limiter
|
| from slowapi.util import get_remote_address
|
|
|
| limiter = Limiter(key_func=get_remote_address)
|
|
|
|
|
| class SourceCreate(BaseModel):
|
| url: HttpUrl
|
| type: SourceType
|
| name: Optional[str] = None
|
| description: Optional[str] = None
|
| is_paid: bool = False
|
|
|
|
|
| class SourceUpdate(BaseModel):
|
| name: Optional[str] = None
|
| description: Optional[str] = None
|
| enabled: Optional[bool] = None
|
| is_paid: Optional[bool] = None
|
|
|
|
|
| class SourceResponse(BaseModel):
|
| id: int
|
| url: str
|
| type: str
|
| name: Optional[str]
|
| description: Optional[str]
|
| is_paid: bool
|
| enabled: bool
|
| validated: bool
|
| validation_error: Optional[str]
|
| total_scraped: int
|
| success_rate: float
|
| is_admin_source: bool
|
| is_owner: bool = False
|
|
|
| class Config:
|
| from_attributes = True
|
|
|
|
|
| class UserStats(BaseModel):
|
| total_sources: int
|
| active_sources: int
|
| total_proxies_contributed: int
|
| avg_success_rate: float
|
|
|
|
|
| @router.get("/my-stats", response_model=UserStats)
|
| @limiter.limit("60/minute")
|
| async def get_my_stats(
|
| request: Request,
|
| current_user: User = Depends(require_user),
|
| session: AsyncSession = Depends(get_db),
|
| ):
|
| result = await session.execute(
|
| select(ProxySource).where(ProxySource.user_id == current_user.id)
|
| )
|
| sources = result.scalars().all()
|
|
|
| total_sources = len(sources)
|
| active_sources = sum(1 for s in sources if s.enabled)
|
| total_proxies_contributed = sum(s.total_scraped for s in sources)
|
|
|
| avg_success_rate = 0.0
|
| if total_sources > 0:
|
| avg_success_rate = sum(s.success_rate for s in sources) / total_sources
|
|
|
| return UserStats(
|
| total_sources=total_sources,
|
| active_sources=active_sources,
|
| total_proxies_contributed=total_proxies_contributed,
|
| avg_success_rate=avg_success_rate,
|
| )
|
|
|
|
|
| @router.get("/my-sources", response_model=List[SourceResponse])
|
| @limiter.limit("60/minute")
|
| async def get_my_sources(
|
| request: Request,
|
| current_user: User = Depends(require_user),
|
| session: AsyncSession = Depends(get_db),
|
| ):
|
| result = await session.execute(
|
| select(ProxySource).where(ProxySource.user_id == current_user.id)
|
| )
|
| sources = result.scalars().all()
|
|
|
| return [
|
| SourceResponse(**{**source.__dict__, "is_owner": True}) for source in sources
|
| ]
|
|
|
|
|
| @router.post("/my-sources", response_model=dict, status_code=status.HTTP_201_CREATED)
|
| @limiter.limit("10/hour")
|
| async def create_source(
|
| request: Request,
|
| source_data: SourceCreate,
|
| current_user: User = Depends(require_user),
|
| session: AsyncSession = Depends(get_db),
|
| ):
|
| result = await session.execute(
|
| select(ProxySource).where(ProxySource.url == str(source_data.url))
|
| )
|
| existing = result.scalar_one_or_none()
|
|
|
| if existing:
|
| raise HTTPException(
|
| status_code=status.HTTP_409_CONFLICT,
|
| detail="This source URL already exists in the database",
|
| )
|
|
|
| source_config = SourceConfig(
|
| url=source_data.url, type=source_data.type, enabled=True
|
| )
|
|
|
| validation_result: SourceValidationResult = await source_validator.validate_source(
|
| source_config
|
| )
|
|
|
| if not validation_result.valid:
|
| raise HTTPException(
|
| status_code=status.HTTP_400_BAD_REQUEST,
|
| detail={
|
| "error": "Source validation failed",
|
| "reason": validation_result.error_message,
|
| },
|
| )
|
|
|
| new_source = ProxySource(
|
| user_id=current_user.id,
|
| url=str(source_data.url),
|
| type=source_data.type.value,
|
| name=source_data.name or str(source_data.url).split("/")[-1],
|
| description=source_data.description,
|
| is_paid=source_data.is_paid,
|
| enabled=True,
|
| validated=True,
|
| is_admin_source=False,
|
| )
|
|
|
| session.add(new_source)
|
| await session.commit()
|
| await session.refresh(new_source)
|
|
|
| return {
|
| "message": "Source created successfully",
|
| "source_id": new_source.id,
|
| "validation": {
|
| "proxy_count": validation_result.proxy_count,
|
| "sample_proxies": validation_result.sample_proxies,
|
| },
|
| }
|
|
|
|
|
| @router.put("/my-sources/{source_id}", response_model=SourceResponse)
|
| @limiter.limit("30/minute")
|
| async def update_source(
|
| request: Request,
|
| source_id: int,
|
| update_data: SourceUpdate,
|
| current_user: User = Depends(require_user),
|
| session: AsyncSession = Depends(get_db),
|
| ):
|
| result = await session.execute(
|
| select(ProxySource).where(
|
| and_(ProxySource.id == source_id, ProxySource.user_id == current_user.id)
|
| )
|
| )
|
| source = result.scalar_one_or_none()
|
|
|
| if not source:
|
| raise HTTPException(
|
| status_code=status.HTTP_404_NOT_FOUND,
|
| detail="Source not found or you don't have permission to edit it",
|
| )
|
|
|
| if source.is_admin_source and current_user.role != "admin":
|
| raise HTTPException(
|
| status_code=status.HTTP_403_FORBIDDEN,
|
| detail="Cannot edit admin-protected sources",
|
| )
|
|
|
| if update_data.name is not None:
|
| source.name = update_data.name
|
| if update_data.description is not None:
|
| source.description = update_data.description
|
| if update_data.enabled is not None:
|
| source.enabled = update_data.enabled
|
| if update_data.is_paid is not None:
|
| source.is_paid = update_data.is_paid
|
|
|
| await session.commit()
|
| await session.refresh(source)
|
|
|
| return SourceResponse(**{**source.__dict__, "is_owner": True})
|
|
|
|
|
| @router.delete("/my-sources/{source_id}", status_code=status.HTTP_204_NO_CONTENT)
|
| @limiter.limit("30/minute")
|
| async def delete_source(
|
| request: Request,
|
| source_id: int,
|
| current_user: User = Depends(require_user),
|
| session: AsyncSession = Depends(get_db),
|
| ):
|
| result = await session.execute(
|
| select(ProxySource).where(
|
| and_(ProxySource.id == source_id, ProxySource.user_id == current_user.id)
|
| )
|
| )
|
| source = result.scalar_one_or_none()
|
|
|
| if not source:
|
| raise HTTPException(
|
| status_code=status.HTTP_404_NOT_FOUND,
|
| detail="Source not found or you don't have permission to delete it",
|
| )
|
|
|
| if source.is_admin_source:
|
| if current_user.role != "admin":
|
| raise HTTPException(
|
| status_code=status.HTTP_403_FORBIDDEN,
|
| detail="Only admins can delete admin-protected sources",
|
| )
|
|
|
| await session.delete(source)
|
| await session.commit()
|
|
|
| return None
|
|
|
|
|
| @router.get("/admin/sources", response_model=dict)
|
| @limiter.limit("30/minute")
|
| async def admin_get_all_sources(
|
| request: Request,
|
| limit: int = Query(50, ge=1, le=200),
|
| offset: int = Query(0, ge=0),
|
| current_user: User = Depends(require_admin),
|
| session: AsyncSession = Depends(get_db),
|
| ):
|
| from sqlalchemy import func
|
|
|
| total_result = await session.execute(select(func.count()).select_from(ProxySource))
|
| total = total_result.scalar() or 0
|
|
|
| result = await session.execute(
|
| select(ProxySource)
|
| .limit(limit)
|
| .offset(offset)
|
| .order_by(ProxySource.created_at.desc())
|
| )
|
| sources = result.scalars().all()
|
|
|
| return {
|
| "total": total,
|
| "count": len(sources),
|
| "offset": offset,
|
| "limit": limit,
|
| "sources": [
|
| SourceResponse(
|
| **{**source.__dict__, "is_owner": source.user_id == current_user.id}
|
| )
|
| for source in sources
|
| ],
|
| }
|
|
|
|
|
| @router.post("/admin/sources/{source_id}/protect", response_model=SourceResponse)
|
| async def admin_protect_source(
|
| source_id: int,
|
| current_user: User = Depends(require_admin),
|
| session: AsyncSession = Depends(get_db),
|
| ):
|
| result = await session.execute(
|
| select(ProxySource).where(ProxySource.id == source_id)
|
| )
|
| source = result.scalar_one_or_none()
|
|
|
| if not source:
|
| raise HTTPException(
|
| status_code=status.HTTP_404_NOT_FOUND, detail="Source not found"
|
| )
|
|
|
| source.is_admin_source = True
|
| await session.commit()
|
| await session.refresh(source)
|
|
|
| return SourceResponse(
|
| **{**source.__dict__, "is_owner": source.user_id == current_user.id}
|
| )
|
|
|