Spaces:
Sleeping
Sleeping
File size: 3,088 Bytes
f80e9b3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
import httpx
import os
from typing import List, Dict, Any, Optional
from app.schemas import CanvaBrandTemplate, CanvaAutofillRequest, CanvaAutofillResponse
CANVA_API_BASE = "https://api.canva.com/rest/v1"
class CanvaService:
def __init__(self, access_token: str):
self.access_token = access_token
self.headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
async def get_brand_templates(self, dataset: str = "non_empty") -> List[CanvaBrandTemplate]:
"""Get list of Canva brand templates"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{CANVA_API_BASE}/brand-templates",
params={"dataset": dataset},
headers=self.headers,
timeout=30.0
)
response.raise_for_status()
data = response.json()
templates = []
for item in data.get("items", []):
templates.append(CanvaBrandTemplate(
id=item["id"],
title=item["title"],
view_url=item.get("view_url", ""),
create_url=item.get("create_url", ""),
thumbnail=item.get("thumbnail")
))
return templates
async def get_brand_template_dataset(self, template_id: str) -> Dict[str, Any]:
"""Get dataset for a specific brand template"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{CANVA_API_BASE}/brand-templates/{template_id}/dataset",
headers=self.headers,
timeout=30.0
)
response.raise_for_status()
return response.json()
async def create_autofill_job(self, request: CanvaAutofillRequest) -> CanvaAutofillResponse:
"""Create an autofill job for a brand template"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{CANVA_API_BASE}/autofills",
headers=self.headers,
json={
"brand_template_id": request.brand_template_id,
"title": request.title,
"data": request.data
},
timeout=30.0
)
response.raise_for_status()
data = response.json()
return CanvaAutofillResponse(
job_id=data["job"]["id"],
status=data["job"]["status"]
)
async def get_autofill_job_status(self, job_id: str) -> Dict[str, Any]:
"""Get status of an autofill job"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{CANVA_API_BASE}/autofills/{job_id}",
headers=self.headers,
timeout=30.0
)
response.raise_for_status()
return response.json()
|