Spaces:
Paused
Paused
| import time | |
| import requests | |
| import random | |
| import os | |
| import tempfile | |
| import aiohttp | |
| import asyncio | |
| from bs4 import BeautifulSoup | |
| import io | |
| # Get webhook URL from environment variable | |
| WEBHOOK_URL = os.environ['web'] | |
| BASE_URL = "https://tidal.com/browse/track/" | |
| # Configuration - Edit these values as needed | |
| START_TRACK_ID = 30878722 # Change this to your desired starting track ID | |
| TRACK_CHECK_INTERVAL = 2.1 # Speed of checking in seconds (lower = faster) | |
| async def send_webhook_message(track_link, track_info): | |
| data = { | |
| "content": f"{track_link}\n{track_info}" | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(WEBHOOK_URL, json=data) as response: | |
| if response.status != 204: | |
| print(f"Failed to send webhook: {response.status}") | |
| async def check_tidal_track(track_id): | |
| try: | |
| url = f"{BASE_URL}{track_id}/u" | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36" | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, headers=headers, timeout=5) as response: | |
| # Check status code first - 200 means it exists, 404 means it doesn't | |
| if response.status == 200: | |
| content = await response.text() | |
| soup = BeautifulSoup(content, 'html.parser') | |
| # Try to find the track title | |
| title_element = soup.find('h1', class_="font-size-regular mt-0 mb-0 ellipsis-double overflow-hidden") | |
| if title_element: | |
| # Try to find artist information - fixed to match the correct class | |
| artist_element = soup.find('a', class_="artist-list-link") | |
| if not artist_element: | |
| # Try alternative class names that might be used for artist links | |
| artist_element = soup.find('a', class_="text-dec-none visible-offset-0 artist-list-link hover-desktop") | |
| artist_name = artist_element.text.strip() if artist_element else "Unknown Artist" | |
| track_info = f"{title_element.text}\n{artist_name}" | |
| return True, track_info, title_element.text, artist_name | |
| else: | |
| # If we got a 200 but couldn't find the title, still consider it valid | |
| return True, "Unknown Track", "Unknown Track", "Unknown Artist" | |
| return False, None, None, None | |
| except Exception as e: | |
| print(f"Error checking track {track_id}: {e}") | |
| return False, None, None, None | |
| async def tidal_downloader(track_id, song_name, artist_name): | |
| api_url = "https://hifi.kratosgodofwar6076.workers.dev/getSongLink" | |
| payload = { | |
| "trackId": track_id, | |
| "quality": "3" | |
| } | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(api_url, json=payload) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| download_url = None | |
| if isinstance(data, list) and len(data) > 2: | |
| download_url = data[2].get('OriginalTrackUrl') | |
| if download_url: | |
| print(f"Found download URL for track {track_id}") | |
| return download_url | |
| else: | |
| print(f"No download URL found for track {track_id}") | |
| return None | |
| else: | |
| print(f"Error getting download URL: {response.status}") | |
| return None | |
| except Exception as e: | |
| print(f"Error in tidal_downloader: {str(e)}") | |
| return None | |
| async def download_file(url): | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as response: | |
| if response.status == 200: | |
| # Read the file content directly into memory | |
| file_content = await response.read() | |
| return file_content | |
| else: | |
| print(f"Failed to download file: {response.status}") | |
| return None | |
| except Exception as e: | |
| print(f"Error downloading file: {e}") | |
| return None | |
| async def upload_to_service(file_content, song_name, artist_name): | |
| try: | |
| upload_url = "https://jerrrycans-file.hf.space/upload" | |
| async with aiohttp.ClientSession() as session: | |
| # Create form data with the correct file name | |
| form = aiohttp.FormData() | |
| form.add_field('file', | |
| file_content, | |
| filename=f"{song_name}.flac", | |
| content_type='audio/flac') | |
| # Add metadata fields if needed | |
| # The server might be expecting these fields | |
| form.add_field('song_name', song_name) | |
| form.add_field('artist', artist_name) | |
| # Send the POST request with a timeout | |
| async with session.post(upload_url, data=form, timeout=60) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| print(f"Upload successful: {result}") | |
| return result.get('url') | |
| else: | |
| error_text = await response.text() | |
| print(f"Upload failed: {response.status} - {error_text}") | |
| return None | |
| except Exception as e: | |
| print(f"Error uploading file: {e}") | |
| return None | |
| async def process_track(track_id): | |
| try: | |
| track_exists, track_info, song_name, artist_name = await check_tidal_track(track_id) | |
| if track_exists: | |
| track_link = f"{BASE_URL}{track_id}/u" | |
| print(f"Found valid track: {track_link} - {track_info}") | |
| await send_webhook_message(track_link, track_info) | |
| # Download the track | |
| download_url = await tidal_downloader(track_id, song_name, artist_name) | |
| if download_url: | |
| # Download the file content | |
| file_content = await download_file(download_url) | |
| if file_content: | |
| # Upload the file to the service | |
| upload_url = await upload_to_service(file_content, song_name, artist_name) | |
| if upload_url: | |
| await send_webhook_message(f"Uploaded: {song_name} - {artist_name}", f"URL: {upload_url}") | |
| else: | |
| print(f"Track {track_id} does not exist") | |
| return track_exists | |
| except Exception as e: | |
| print(f"Error processing track {track_id}: {e}") | |
| return False | |
| async def main(): | |
| current_track_id = START_TRACK_ID | |
| print(f"Starting from track ID: {current_track_id}") | |
| while True: | |
| try: | |
| track_processed = await process_track(current_track_id) | |
| current_track_id += 1 | |
| await asyncio.sleep(TRACK_CHECK_INTERVAL) | |
| except Exception as e: | |
| print(f"Error in main loop: {e}") | |
| await asyncio.sleep(TRACK_CHECK_INTERVAL) | |
| current_track_id += 1 | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |