Spaces:
Paused
Paused
| from flask import Flask, request, jsonify, send_file | |
| import cloudscraper | |
| import requests | |
| from bs4 import BeautifulSoup as bs | |
| import random | |
| import string | |
| import re | |
| import time | |
| import os | |
| app = Flask(__name__) | |
| app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 | |
| class NekopoiGet: | |
| def __init__(self): | |
| self.scraper = cloudscraper.create_scraper() | |
| def getUrl(self, url): | |
| try: | |
| res = self.scraper.get(url) | |
| if res.status_code == 200: | |
| soup = bs(res.content, 'html.parser') | |
| getsrc = soup.find("iframe") | |
| if getsrc: | |
| src = getsrc.get("src") | |
| return src | |
| return None | |
| else: | |
| return None | |
| except Exception as e: | |
| print(f"Error getUrl: {e}") | |
| return None | |
| def getVidDirec(self, url): | |
| try: | |
| res = self.scraper.get(url) | |
| soup = bs(res.content, 'html.parser') | |
| scripts = soup.find_all('script') | |
| pass_md5_url = None | |
| token = None | |
| for script in scripts: | |
| if script.string: | |
| match = re.search(r"/pass_md5/([^']+)", script.string) | |
| if match: | |
| pass_md5_path = match.group(0) | |
| token_match = re.search(r"/([a-zA-Z0-9]+)$", pass_md5_path) | |
| if token_match: | |
| token = token_match.group(1) | |
| domain = url.split('/e/')[0] | |
| pass_md5_url = domain + pass_md5_path | |
| break | |
| if not pass_md5_url: | |
| return None | |
| try: | |
| md5_response = self.scraper.get(pass_md5_url) | |
| if md5_response.status_code == 200: | |
| video_base_url = md5_response.text.strip() | |
| random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=10)) | |
| final_url = f"{video_base_url}{random_str}?token={token}&expiry={int(time.time() * 1000)}" | |
| return final_url | |
| except Exception as e: | |
| print(f"Error md5: {e}") | |
| return None | |
| return None | |
| except Exception as e: | |
| print(f"Error getVidDirec: {e}") | |
| return None | |
| def download_video(self, video_url, referer_url, filename="video.mp4"): | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', | |
| 'Accept': '*/*', | |
| 'Accept-Language': 'en-US,en;q=0.9', | |
| 'Referer': referer_url, | |
| 'Origin': 'https://dsvplay.com', | |
| 'Sec-Fetch-Dest': 'video', | |
| 'Sec-Fetch-Mode': 'cors', | |
| 'Sec-Fetch-Site': 'cross-site' | |
| } | |
| try: | |
| response = requests.get(video_url, headers=headers, stream=True, timeout=30) | |
| if response.status_code == 200: | |
| with open(filename, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| return True | |
| else: | |
| return False | |
| except Exception as e: | |
| print(f"Error download: {e}") | |
| return False | |
| neko = NekopoiGet() | |
| def index(): | |
| return jsonify({ | |
| 'message': 'Nekopoi Scraper API', | |
| 'usage': { | |
| 'get_url': 'GET /api/get-url?url=https://nekopoi.care/xxx', | |
| 'download': 'GET /api/download?url=https://nekopoi.care/xxx' | |
| }, | |
| 'example': 'GET /api/get-url?url=https://nekopoi.care/example-video' | |
| }) | |
| def get_url(): | |
| try: | |
| url = request.args.get('url') | |
| if not url: | |
| return jsonify({'error': 'URL parameter is required'}), 400 | |
| if not url.startswith("https://nekopoi.care"): | |
| return jsonify({'error': 'Invalid URL. Must start with https://nekopoi.care'}), 400 | |
| iframe_url = neko.getUrl(url) | |
| if not iframe_url: | |
| return jsonify({'error': 'Failed to get iframe URL'}), 500 | |
| video_url = neko.getVidDirec(iframe_url) | |
| if not video_url: | |
| return jsonify({'error': 'Failed to get video URL'}), 500 | |
| return jsonify({ | |
| 'success': True, | |
| 'iframe_url': iframe_url, | |
| 'video_url': video_url | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def download(): | |
| try: | |
| url = request.args.get('url') | |
| if not url: | |
| return jsonify({'error': 'URL parameter is required'}), 400 | |
| if not url.startswith("https://nekopoi.care"): | |
| return jsonify({'error': 'Invalid URL'}), 400 | |
| iframe_url = neko.getUrl(url) | |
| if not iframe_url: | |
| return jsonify({'error': 'Failed to get iframe URL'}), 500 | |
| video_url = neko.getVidDirec(iframe_url) | |
| if not video_url: | |
| return jsonify({'error': 'Failed to get video URL'}), 500 | |
| filename = f"video_{int(time.time())}.mp4" | |
| filepath = os.path.join('/tmp', filename) | |
| success = neko.download_video(video_url, url, filepath) | |
| if success: | |
| return send_file( | |
| filepath, | |
| as_attachment=True, | |
| download_name='nekopoi_video.mp4', | |
| mimetype='video/mp4' | |
| ) | |
| else: | |
| return jsonify({'error': 'Failed to download video'}), 500 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def health(): | |
| return jsonify({'status': 'healthy'}) | |
| if __name__ == '__main__': | |
| port = int(os.environ.get('PORT', 7860)) | |
| app.run(host='0.0.0.0', port=port, debug=False) |