Spaces:
Sleeping
Sleeping
File size: 8,418 Bytes
89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 89c7106 923fc35 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 | from fastapi import FastAPI, Query
from pytrends.request import TrendReq
from fastapi.responses import JSONResponse
from typing import List, Optional
import pandas as pd
import json
import time
from functools import lru_cache
app = FastAPI(
title="PyTrends API",
description="HTTP API for Google Trends data",
version="1.0.0"
)
# Initialize pytrends
pytrends = TrendReq(hl="en-US", tz=360)
# Helper function for retry logic
def retry_with_backoff(func, max_retries=3, initial_delay=1, backoff_factor=2):
"""Retry function with exponential backoff for handling rate limits"""
for attempt in range(max_retries):
try:
return func()
except Exception as e:
error_str = str(e)
# Check if it's a rate limit error (429)
if "429" in error_str or "rate" in error_str.lower():
if attempt < max_retries - 1:
wait_time = initial_delay * (backoff_factor ** attempt)
print(f"Rate limited. Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
continue
else:
return {"error": f"Service temporarily unavailable. Please try again in a few moments. Details: {error_str}"}
else:
raise
return {"error": "Max retries exceeded"}
@app.get("/")
def root():
return {
"message": "PyTrends API",
"endpoints": {
"/health": "Health check endpoint",
"/interest_over_time": "Get interest over time for keywords",
"/interest_by_region": "Get interest by region for keywords",
"/trending_searches": "Get trending searches for a country",
"/related_queries": "Get related queries for keywords"
},
"note": "If you get a 429 error, Google Trends is rate limiting. Wait a few minutes and try again."
}
@app.get("/health")
def health_check():
return {"status": "healthy"}
@app.get("/interest_over_time")
def get_interest_over_time(
kw: List[str] = Query(..., description="Keywords to search"),
timeframe: str = Query("today 5-y", description="Timeframe for search"),
geo: str = Query("", description="Geographic location"),
gprop: str = Query("", description="Google property (images, news, youtube, etc.)"),
cat: int = Query(0, description="Category")
):
try:
def fetch_data():
pytrends.build_payload(
kw_list=kw,
timeframe=timeframe,
geo=geo,
gprop=gprop,
cat=cat
)
time.sleep(0.5) # Small delay to avoid rate limits
df = pytrends.interest_over_time()
if df is None or df.empty:
return {"error": "No data available for this query"}
df = df.drop('isPartial', axis=1)
data = df.reset_index().to_dict('records')
for record in data:
if 'date' in record:
record['date'] = str(record['date'])
return {
"data": data,
"keywords": kw,
"timeframe": timeframe,
"geo": geo
}
result = retry_with_backoff(fetch_data)
if isinstance(result, dict) and "error" in result:
return JSONResponse(status_code=429, content=result)
return result
except Exception as e:
error_msg = str(e)
if "429" in error_msg or "rate" in error_msg.lower():
return JSONResponse(
status_code=429,
content={"error": "Too many requests to Google Trends. Please wait a few minutes before trying again."}
)
return JSONResponse(
status_code=400,
content={"error": error_msg}
)
@app.get("/interest_by_region")
def get_interest_by_region(
kw: List[str] = Query(..., description="Keywords to search"),
timeframe: str = Query("today 5-y", description="Timeframe for search"),
geo: str = Query("", description="Geographic location"),
gprop: str = Query("", description="Google property"),
resolution: str = Query("country", description="Resolution (country, region, metro, city)")
):
try:
def fetch_data():
pytrends.build_payload(
kw_list=kw,
timeframe=timeframe,
geo=geo,
gprop=gprop
)
time.sleep(0.5)
df = pytrends.interest_by_region(resolution=resolution)
if df is None or df.empty:
return {"error": "No regional data available"}
data = df.reset_index().to_dict('records')
return {
"data": data,
"keywords": kw,
"resolution": resolution
}
result = retry_with_backoff(fetch_data)
if isinstance(result, dict) and "error" in result:
return JSONResponse(status_code=429, content=result)
return result
except Exception as e:
error_msg = str(e)
if "429" in error_msg or "rate" in error_msg.lower():
return JSONResponse(
status_code=429,
content={"error": "Too many requests to Google Trends. Please wait a few minutes before trying again."}
)
return JSONResponse(
status_code=400,
content={"error": error_msg}
)
@app.get("/trending_searches")
def get_trending_searches(
country: str = Query("united_states", description="Country code")
):
try:
def fetch_data():
time.sleep(0.5)
df = pytrends.trending_searches(pn=country)
if df is None or df.empty:
return {"error": "No trending data available for this country"}
data = df.values.tolist()
return {
"trending": data,
"country": country
}
result = retry_with_backoff(fetch_data)
if isinstance(result, dict) and "error" in result:
return JSONResponse(status_code=429, content=result)
return result
except Exception as e:
error_msg = str(e)
if "429" in error_msg or "rate" in error_msg.lower():
return JSONResponse(
status_code=429,
content={"error": "Too many requests to Google Trends. Please wait a few minutes before trying again."}
)
return JSONResponse(
status_code=400,
content={"error": error_msg}
)
@app.get("/related_queries")
def get_related_queries(
kw: List[str] = Query(..., description="Keywords to search"),
timeframe: str = Query("today 5-y", description="Timeframe for search"),
geo: str = Query("", description="Geographic location")
):
try:
def fetch_data():
pytrends.build_payload(
kw_list=kw,
timeframe=timeframe,
geo=geo
)
time.sleep(0.5)
related = pytrends.related_queries()
result = {}
for kw_item in kw:
if kw_item in related:
top_queries = related[kw_item]['top']
rising_queries = related[kw_item]['rising']
result[kw_item] = {
"top": top_queries.reset_index().to_dict('records') if top_queries is not None else [],
"rising": rising_queries.reset_index().to_dict('records') if rising_queries is not None else []
}
return {
"related_queries": result,
"keywords": kw
}
result = retry_with_backoff(fetch_data)
if isinstance(result, dict) and "error" in result:
return JSONResponse(status_code=429, content=result)
return result
except Exception as e:
error_msg = str(e)
if "429" in error_msg or "rate" in error_msg.lower():
return JSONResponse(
status_code=429,
content={"error": "Too many requests to Google Trends. Please wait a few minutes before trying again."}
)
return JSONResponse(
status_code=400,
content={"error": error_msg}
) |