PostGen / backend /app /services /canva_service.py
Seth
update
f80e9b3
import httpx
import os
from typing import List, Dict, Any, Optional
from app.schemas import CanvaBrandTemplate, CanvaAutofillRequest, CanvaAutofillResponse
CANVA_API_BASE = "https://api.canva.com/rest/v1"
class CanvaService:
def __init__(self, access_token: str):
self.access_token = access_token
self.headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
async def get_brand_templates(self, dataset: str = "non_empty") -> List[CanvaBrandTemplate]:
"""Get list of Canva brand templates"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{CANVA_API_BASE}/brand-templates",
params={"dataset": dataset},
headers=self.headers,
timeout=30.0
)
response.raise_for_status()
data = response.json()
templates = []
for item in data.get("items", []):
templates.append(CanvaBrandTemplate(
id=item["id"],
title=item["title"],
view_url=item.get("view_url", ""),
create_url=item.get("create_url", ""),
thumbnail=item.get("thumbnail")
))
return templates
async def get_brand_template_dataset(self, template_id: str) -> Dict[str, Any]:
"""Get dataset for a specific brand template"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{CANVA_API_BASE}/brand-templates/{template_id}/dataset",
headers=self.headers,
timeout=30.0
)
response.raise_for_status()
return response.json()
async def create_autofill_job(self, request: CanvaAutofillRequest) -> CanvaAutofillResponse:
"""Create an autofill job for a brand template"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{CANVA_API_BASE}/autofills",
headers=self.headers,
json={
"brand_template_id": request.brand_template_id,
"title": request.title,
"data": request.data
},
timeout=30.0
)
response.raise_for_status()
data = response.json()
return CanvaAutofillResponse(
job_id=data["job"]["id"],
status=data["job"]["status"]
)
async def get_autofill_job_status(self, job_id: str) -> Dict[str, Any]:
"""Get status of an autofill job"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{CANVA_API_BASE}/autofills/{job_id}",
headers=self.headers,
timeout=30.0
)
response.raise_for_status()
return response.json()