File size: 4,329 Bytes
359d9e4 ba694bf 359d9e4 c99920e ba694bf ea47c47 ba694bf c99920e ba694bf c99920e ba694bf 6dcf96c c99920e ba694bf c99920e ba694bf 6dcf96c ba694bf 6dcf96c c99920e 6dcf96c ba694bf 6dcf96c c99920e 6dcf96c c99920e ba694bf 6dcf96c ba694bf c99920e ba694bf c99920e ba694bf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
import os
import requests
YT_API_KEY = os.getenv("YT_API_KEY")
def get_latest_video_id(channel_id, device_name=None, playlist_id=None):
"""
Return the videoId of the most recently published video from either:
- a specific playlist (if playlist_id is provided or can be resolved from device_name), or
- the channel (if only channel_id is provided).
This does NOT rely on playlist UI ordering; it walks the whole playlist and
picks the newest item by snippet.publishedAt.
"""
if device_name is None and playlist_id is None:
raise ValueError("Must specify either device_name or playlist_id")
if device_name is not None and playlist_id is not None:
# Keep existing behavior
print("Both device_name and playlist_id entered.. device_name will be ignored.")
# ------------------------------------------------------------------
# 1) If we only know the device_name, resolve playlist_id from channel
# ------------------------------------------------------------------
if playlist_id is None:
playlists_url = "https://www.googleapis.com/youtube/v3/playlists"
playlists_params = {
"part": "snippet",
"channelId": channel_id,
"maxResults": 50,
"key": YT_API_KEY,
}
res = requests.get(playlists_url, params=playlists_params)
res.raise_for_status()
playlists = res.json().get("items", [])
for p in playlists:
title = p["snippet"]["title"].lower()
if device_name.lower() in title:
playlist_id = p["id"]
break
if not playlist_id:
raise Exception(f"No playlist found matching device name '{device_name}'")
# ------------------------------------------------------------------
# 2) If we have a playlist_id, walk ALL pages and find newest item
# ------------------------------------------------------------------
if playlist_id:
playlist_url = "https://www.googleapis.com/youtube/v3/playlistItems"
latest_video_id = None
latest_published_at = None
page_token = None
while True:
playlist_params = {
"part": "snippet,contentDetails",
"playlistId": playlist_id,
"maxResults": 50, # API max
"key": YT_API_KEY,
}
if page_token:
playlist_params["pageToken"] = page_token
playlist_res = requests.get(playlist_url, params=playlist_params)
playlist_res.raise_for_status()
data = playlist_res.json()
for item in data.get("items", []):
snippet = item.get("snippet", {})
published_at = snippet.get("publishedAt")
if not published_at:
continue
# For playlistItems, videoId lives in contentDetails.videoId
video_id = item.get("contentDetails", {}).get("videoId")
if not video_id:
# Fallback to snippet.resourceId if needed
video_id = (
snippet.get("resourceId", {}) or {}
).get("videoId")
if not video_id:
continue
if latest_published_at is None or published_at > latest_published_at:
latest_published_at = published_at
latest_video_id = video_id
page_token = data.get("nextPageToken")
if not page_token:
break
return latest_video_id
# ------------------------------------------------------------------
# 3) Fallback: no playlist, just get latest video on the channel
# ------------------------------------------------------------------
search_url = "https://www.googleapis.com/youtube/v3/search"
search_params = {
"part": "snippet",
"channelId": channel_id,
"maxResults": 1,
"order": "date", # newest first
"type": "video",
"key": YT_API_KEY,
}
res = requests.get(search_url, params=search_params)
res.raise_for_status()
items = res.json().get("items", [])
if not items:
return None
return items[0]["id"]["videoId"]
|