Spaces:
Paused
Paused
| import os | |
| import requests | |
| import gzip | |
| import shutil | |
| import logging | |
| from flask import Flask, send_file, jsonify | |
| import xml.etree.ElementTree as ET | |
| from datetime import datetime | |
| import pytz # New import for timezone handling | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| # Ensure the extracted directory exists | |
| extract_to_dir = 'extracted' | |
| if not os.path.exists(extract_to_dir): | |
| os.makedirs(extract_to_dir, exist_ok=True) | |
| # Function to download the file | |
| def download_file(url, output_path): | |
| logger.info(f"Downloading from {url}...") | |
| try: | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| with open(output_path, 'wb') as f: | |
| f.write(response.content) | |
| logger.info(f"Downloaded file to {output_path} with size {len(response.content)} bytes") | |
| else: | |
| logger.error(f"Failed to download file. Status code: {response.status_code}") | |
| except PermissionError as e: | |
| logger.error(f"Permission error occurred while downloading the file: {e}") | |
| except Exception as e: | |
| logger.error(f"An error occurred while downloading the file: {e}") | |
| # Function to extract .gz files | |
| def extract_gz_file(gz_path, extract_to_dir): | |
| logger.info(f"Extracting {gz_path} to {extract_to_dir}...") | |
| if not os.path.exists(gz_path): | |
| logger.error(f"GZ file does not exist: {gz_path}") | |
| return | |
| try: | |
| extracted_file_path = os.path.join(extract_to_dir, 'it_wltv_full.xml') # Updated file extension | |
| with gzip.open(gz_path, 'rb') as f_in: | |
| with open(extracted_file_path, 'wb') as f_out: | |
| shutil.copyfileobj(f_in, f_out) | |
| logger.info(f"Extracted GZ file to {extracted_file_path}") | |
| except Exception as e: | |
| logger.error(f"An error occurred while extracting the file: {e}") | |
| # Function to get the current program for a channel | |
| def get_current_program(channel_id): | |
| xml_file = os.path.join(extract_to_dir, 'it_wltv_full.xml') | |
| if not os.path.exists(xml_file): | |
| logger.error("XML file does not exist for program data") | |
| return None | |
| try: | |
| tree = ET.parse(xml_file) | |
| root = tree.getroot() | |
| # Set timezone to Italy | |
| italy_tz = pytz.timezone('Europe/Rome') | |
| # Get the current time in Italy's timezone | |
| current_time = datetime.now(italy_tz) | |
| logger.info(f"Current Italy time: {current_time}") | |
| for programme in root.findall('programme'): | |
| if programme.get('channel') == channel_id: | |
| # Parse the start and stop times from XML and set them to Italy's timezone | |
| start_time = datetime.strptime(programme.get('start')[:14], "%Y%m%d%H%M%S") | |
| start_time = italy_tz.localize(start_time) | |
| stop_time = datetime.strptime(programme.get('stop')[:14], "%Y%m%d%H%M%S") | |
| stop_time = italy_tz.localize(stop_time) | |
| logger.info(f"Checking program: {programme.find('title').text} ({start_time} - {stop_time})") | |
| # Check if current time is between start and stop times | |
| if start_time <= current_time < stop_time: | |
| title = programme.find('title').text if programme.find('title') is not None else "No title" | |
| desc = programme.find('desc').text if programme.find('desc') is not None else "No description" | |
| return {"title": title, "description": desc} | |
| logger.info(f"No current program found for {channel_id}.") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to parse XML file: {e}") | |
| return None | |
| # Flask route to serve the extracted file | |
| def serve_extracted_file(): | |
| try: | |
| extracted_files = os.listdir(extract_to_dir) | |
| logger.info(f"Files in 'extracted' directory: {extracted_files}") | |
| for file_name in extracted_files: | |
| logger.info(f"File: {file_name}, Extension: {os.path.splitext(file_name)[1]}") | |
| except Exception as e: | |
| logger.error(f"Error listing files in 'extracted' directory: {e}") | |
| return "Error listing files", 500 | |
| file_name = 'it_wltv_full.xml' # Updated file name with extension | |
| file_path = os.path.join(extract_to_dir, file_name) | |
| if os.path.exists(file_path): | |
| logger.info(f"Serving file from {file_path}") | |
| return send_file(file_path, as_attachment=False, mimetype='text/plain') | |
| else: | |
| logger.error("File not found") | |
| return "File not found", 404 | |
| # Route to fetch logs | |
| def get_logs(): | |
| try: | |
| with open('app.log', 'r') as log_file: | |
| logs = log_file.read() | |
| return logs, 200 | |
| except Exception as e: | |
| logger.error(f"Failed to read logs: {e}") | |
| return jsonify({"error": "Failed to read logs"}), 500 | |
| # Route to update (download and extract again) | |
| def update_files(): | |
| file_url = 'http://epg-guide.com/wltv.gz' | |
| file_path = '/tmp/it_wltv_full.gz' | |
| # Download and extract the gz file | |
| download_file(file_url, file_path) | |
| extract_gz_file(file_path, extract_to_dir) | |
| return "Files updated", 200 | |
| # Route to serve the current program for a specific channel | |
| def current_program(channel_id): | |
| program_info = get_current_program(channel_id) | |
| if program_info: | |
| return jsonify(program_info) | |
| else: | |
| return "No current program found for this channel.", 404 | |
| if __name__ == '__main__': | |
| file_url = 'http://epg-guide.com/wltv.gz' | |
| file_path = '/tmp/it_wltv_full.gz' | |
| # Download and extract the gz file initially | |
| download_file(file_url, file_path) | |
| extract_gz_file(file_path, extract_to_dir) | |
| # Start the Flask web server | |
| app.run(host='0.0.0.0', port=8080) | |