File size: 7,638 Bytes
836f75b 0b7f1e4 fb99609 836f75b 0fe31f5 fb99609 836f75b 7bb63b8 836f75b 7bb63b8 836f75b 0fe31f5 7bb63b8 0fe31f5 7bb63b8 fb99609 7bb63b8 7658a25 7bb63b8 0fe31f5 7bb63b8 0fe31f5 0b7f1e4 7bb63b8 836f75b 7bb63b8 836f75b 7bb63b8 836f75b 7bb63b8 836f75b 7bb63b8 836f75b 97c0fdb 836f75b 7bb63b8 836f75b 7bb63b8 836f75b 7bb63b8 321dbb9 7bb63b8 fb99609 7bb63b8 321dbb9 7bb63b8 321dbb9 ca6954c 7bb63b8 321dbb9 7bb63b8 321dbb9 7bb63b8 321dbb9 7bb63b8 96e3d15 321dbb9 7bb63b8 0fe31f5 0cc274a fb99609 0fe31f5 0cc274a fb99609 7bb63b8 fb99609 0fe31f5 7bb63b8 836f75b 7bb63b8 0b7f1e4 7bb63b8 0b7f1e4 7bb63b8 0b7f1e4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# tvdb.py
import os
import requests
import urllib.parse
from datetime import datetime, timedelta
from dotenv import load_dotenv
import json
import asyncio
import aiofiles
from tvdbApiClient import fetch_and_cache_seasons, save_to_json
from services import RecentList, GenreList
load_dotenv()
THETVDB_API_KEY = os.getenv("THETVDB_API_KEY")
THETVDB_API_URL = os.getenv("THETVDB_API_URL")
CACHE_DIR = os.getenv("CACHE_DIR")
TOKEN_EXPIRY = None
THETVDB_TOKEN = None
recent_list = RecentList()
genre_list = GenreList()
def authenticate_thetvdb():
global THETVDB_TOKEN, TOKEN_EXPIRY
auth_url = f"{THETVDB_API_URL}/login"
auth_data = {
"apikey": THETVDB_API_KEY
}
try:
print("Authenticating with TheTVDB API...")
response = requests.post(auth_url, json=auth_data)
response.raise_for_status()
response_data = response.json()
THETVDB_TOKEN = response_data['data']['token']
TOKEN_EXPIRY = datetime.now() + timedelta(days=30)
print("Authentication successful.")
except requests.RequestException as e:
print(f"Authentication failed: {e}")
THETVDB_TOKEN = None
TOKEN_EXPIRY = None
def get_thetvdb_token():
global THETVDB_TOKEN, TOKEN_EXPIRY
if not THETVDB_TOKEN or datetime.now() >= TOKEN_EXPIRY:
authenticate_thetvdb()
return THETVDB_TOKEN
def clean_data(data):
print("Cleaning data...")
fields_to_keep = {
"data": {
'id': None,
'name': None,
'image': None,
'score': None,
'runtime': None,
'releases': None,
'year': None,
'contentRatings': None,
'originalCountry': None,
'originalLanguage': None,
'translations': {},
'artworks': [],
'genres':[],
'characters': [],
'spoken_languages': [],
'translations': {},
'trailers': [],
}
}
cleaned_data = {}
for key, value in fields_to_keep.items():
if key in data:
cleaned_data[key] = {}
for field in fields_to_keep[key]:
if field in data[key]:
cleaned_data[key][field] = data[key][field]
print("Data cleaned successfully.")
return cleaned_data
async def fetch_and_cache_json(original_title, title, media_type, year=None):
print(f"Fetching data for: {original_title}")
if year:
search_url = f"{THETVDB_API_URL}/search?query={urllib.parse.quote(title)}&type={media_type}&year={year}"
else:
search_url = f"{THETVDB_API_URL}/search?query={urllib.parse.quote(title)}&type={media_type}"
token = get_thetvdb_token()
if not token:
print("Authentication token not available.")
return
headers = {
"Authorization": f"Bearer {token}",
"accept": "application/json",
}
try:
print(f"Sending search request to: {search_url}")
response = requests.get(search_url, headers=headers)
print(f"Search response status code: {response.status_code}")
response.raise_for_status()
data = response.json()
if 'data' in data and data['data']:
first_result = data['data'][0]
tvdb_id = first_result.get('tvdb_id')
media_type = first_result.get('type')
print(f"Found TVDB ID: {tvdb_id} with media type: {media_type}")
if not tvdb_id:
print("TVDB ID not found in the search results")
return
if media_type == 'movie':
extended_url = f"{THETVDB_API_URL}/movies/{tvdb_id}/extended?meta=translations"
elif media_type == 'series':
extended_url = f"{THETVDB_API_URL}/series/{tvdb_id}/extended?meta=translations"
await fetch_and_cache_seasons(tvdb_id, original_title)
else:
print(f"Unsupported media type: {media_type}")
return
response = requests.get(extended_url, headers=headers)
print(f"Extended data response status code: {response.status_code}")
response.raise_for_status()
extended_data = response.json()
cleaned_data = clean_data(extended_data)
print(f"cleaning.. {original_title}")
genres = None
if cleaned_data['data'].get('genres'):
genres = cleaned_data['data'].get('genres')
print(f"genres extracted: {genres}")
description = None
if cleaned_data['data'].get('translations') and cleaned_data['data']['translations'].get('overviewTranslations'):
for overview in cleaned_data['data']['translations']['overviewTranslations']:
if overview['language'] == 'eng':
description = overview.get('overview')
break
print(f"Description extracted: {description}")
image_link = None
if cleaned_data['data'].get('artworks'):
for artwork in cleaned_data['data']['artworks']:
if artwork['type'] in [15, 3]:
image_link = artwork.get('thumbnail')
break
print(f"Image link extracted: {image_link}")
if media_type == 'movie':
recent_list.add_entry(original_title, cleaned_data['data']['year'], description, image_link, genres,'film')
genre_list.add_entry(genres, original_title, cleaned_data['data']['year'], description, image_link, 'movie')
elif media_type == 'series':
recent_list.add_entry(original_title, cleaned_data['data']['year'], description, image_link, genres, 'series')
genre_list.add_entry(genres, original_title, cleaned_data['data']['year'], description, image_link, 'series')
print(f"adding.. {original_title}")
# Create the full directory path if it doesn't exist
full_dir_path = os.path.join(CACHE_DIR, media_type)
os.makedirs(full_dir_path, exist_ok=True)
# Now create the JSON cache path
json_cache_path = os.path.join(full_dir_path, f"{urllib.parse.quote(original_title)}.json")
await save_to_json(cleaned_data, json_cache_path)
print(f"Data saved to JSON at: {json_cache_path}")
else:
print(f"No data found for {original_title} in search results.")
except requests.RequestException as e:
print(f"Error fetching data: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
def main():
entries = [
{"original_title": "Funky Monkey (2004)", "title": "Funky Monkey", "media_type": "movie", "year": 2004},
{"original_title": "My Spy (2020)", "title": "My Spy", "media_type": "movie", "year": 2020},
{"original_title": "My Spy (2024)", "title": "My Spy", "media_type": "movie", "year": 2024},
{"original_title": "Yaariayan (2014)", "title": "Yaariayan", "media_type": "movie", "year": 2014},
{"original_title": "Yaariyan 2 (2023)", "title": "Yaariyan 2", "media_type": "movie", "year": 2023},
]
asyncio.run(process_entries(entries))
async def process_entries(entries):
for entry in entries:
await fetch_and_cache_json(entry['original_title'], entry['title'], entry['media_type'], entry['year'])
if __name__ == "__main__":
main()
|