nekocare / app.py
maylinejix's picture
Update app.py
4013f6f verified
from flask import Flask, request, jsonify, send_file
import cloudscraper
import requests
from bs4 import BeautifulSoup as bs
import random
import string
import re
import time
import os
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024
class NekopoiGet:
def __init__(self):
self.scraper = cloudscraper.create_scraper()
def getUrl(self, url):
try:
res = self.scraper.get(url)
if res.status_code == 200:
soup = bs(res.content, 'html.parser')
getsrc = soup.find("iframe")
if getsrc:
src = getsrc.get("src")
return src
return None
else:
return None
except Exception as e:
print(f"Error getUrl: {e}")
return None
def getVidDirec(self, url):
try:
res = self.scraper.get(url)
soup = bs(res.content, 'html.parser')
scripts = soup.find_all('script')
pass_md5_url = None
token = None
for script in scripts:
if script.string:
match = re.search(r"/pass_md5/([^']+)", script.string)
if match:
pass_md5_path = match.group(0)
token_match = re.search(r"/([a-zA-Z0-9]+)$", pass_md5_path)
if token_match:
token = token_match.group(1)
domain = url.split('/e/')[0]
pass_md5_url = domain + pass_md5_path
break
if not pass_md5_url:
return None
try:
md5_response = self.scraper.get(pass_md5_url)
if md5_response.status_code == 200:
video_base_url = md5_response.text.strip()
random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=10))
final_url = f"{video_base_url}{random_str}?token={token}&expiry={int(time.time() * 1000)}"
return final_url
except Exception as e:
print(f"Error md5: {e}")
return None
return None
except Exception as e:
print(f"Error getVidDirec: {e}")
return None
def download_video(self, video_url, referer_url, filename="video.mp4"):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.9',
'Referer': referer_url,
'Origin': 'https://dsvplay.com',
'Sec-Fetch-Dest': 'video',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'cross-site'
}
try:
response = requests.get(video_url, headers=headers, stream=True, timeout=30)
if response.status_code == 200:
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return True
else:
return False
except Exception as e:
print(f"Error download: {e}")
return False
neko = NekopoiGet()
@app.route('/', methods=['GET'])
def index():
return jsonify({
'message': 'Nekopoi Scraper API',
'usage': {
'get_url': 'GET /api/get-url?url=https://nekopoi.care/xxx',
'download': 'GET /api/download?url=https://nekopoi.care/xxx'
},
'example': 'GET /api/get-url?url=https://nekopoi.care/example-video'
})
@app.route('/api/get-url', methods=['GET'])
def get_url():
try:
url = request.args.get('url')
if not url:
return jsonify({'error': 'URL parameter is required'}), 400
if not url.startswith("https://nekopoi.care"):
return jsonify({'error': 'Invalid URL. Must start with https://nekopoi.care'}), 400
iframe_url = neko.getUrl(url)
if not iframe_url:
return jsonify({'error': 'Failed to get iframe URL'}), 500
video_url = neko.getVidDirec(iframe_url)
if not video_url:
return jsonify({'error': 'Failed to get video URL'}), 500
return jsonify({
'success': True,
'iframe_url': iframe_url,
'video_url': video_url
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/download', methods=['GET'])
def download():
try:
url = request.args.get('url')
if not url:
return jsonify({'error': 'URL parameter is required'}), 400
if not url.startswith("https://nekopoi.care"):
return jsonify({'error': 'Invalid URL'}), 400
iframe_url = neko.getUrl(url)
if not iframe_url:
return jsonify({'error': 'Failed to get iframe URL'}), 500
video_url = neko.getVidDirec(iframe_url)
if not video_url:
return jsonify({'error': 'Failed to get video URL'}), 500
filename = f"video_{int(time.time())}.mp4"
filepath = os.path.join('/tmp', filename)
success = neko.download_video(video_url, url, filepath)
if success:
return send_file(
filepath,
as_attachment=True,
download_name='nekopoi_video.mp4',
mimetype='video/mp4'
)
else:
return jsonify({'error': 'Failed to download video'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'})
if __name__ == '__main__':
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port, debug=False)