Spaces:
Sleeping
Sleeping
| import requests | |
| from flask import Flask, request, jsonify | |
| from datetime import datetime | |
| import base64 | |
| import json | |
| app = Flask(__name__) | |
| # PythonAnywhere API endpoint | |
| PYTHONANYWHERE_URL = "https://omarnuwara.pythonanywhere.com/get-response" | |
| # GitHub API and file information | |
| GITHUB_TOKEN = "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" # Store this in .env in production | |
| REPO_OWNER = "omarnuwrar" | |
| REPO_NAME = "api" | |
| FILE_PATH = "user.json" | |
| def fetch_user_data(): | |
| """Fetch user data from GitHub repo.""" | |
| url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/contents/{FILE_PATH}" | |
| headers = { | |
| "Authorization": f"token {GITHUB_TOKEN}", | |
| "Accept": "application/vnd.github.v3+json", | |
| } | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| content = response.json() | |
| file_content = content["content"] | |
| file_decoded = base64.b64decode(file_content).decode("utf-8") | |
| return json.loads(file_decoded) | |
| else: | |
| print(f"Failed to fetch file: {response.status_code}") | |
| print(response.json()) | |
| return None | |
| def is_token_valid(token): | |
| """ | |
| Check if the provided token is valid and unexpired. | |
| """ | |
| user_data = fetch_user_data() | |
| if not user_data: | |
| return False, "Unable to fetch user data." | |
| for user in user_data: | |
| if user["token"] == token: | |
| # Parse token expiration time | |
| expiration_time = datetime.fromisoformat(user["token_expiration_time"]) | |
| if datetime.now() <= expiration_time: | |
| return True, None | |
| return False, "Your token has expired. Please subscribe to continue." | |
| return False, "Invalid token. Please log in or create an account." | |
| def chat(): | |
| """ | |
| Chat endpoint. Requires a valid token. | |
| """ | |
| try: | |
| # Ensure the request contains JSON | |
| if not request.is_json: | |
| return jsonify({"error": "Invalid request format. JSON expected."}), 400 | |
| # Extract message and token from the request body | |
| user_input = request.json.get("message", "") | |
| user_token = request.json.get("token", "") | |
| if not user_input: | |
| return jsonify({"error": "No message provided."}), 400 | |
| if not user_token: | |
| return jsonify({"error": "No token provided."}), 400 | |
| # Validate the token | |
| is_valid, error_message = is_token_valid(user_token) | |
| if not is_valid: | |
| return jsonify({"error": error_message}), 403 | |
| # Forward the request to PythonAnywhere | |
| data = {"message": user_input} | |
| response = requests.post(PYTHONANYWHERE_URL, json=data) | |
| if response.status_code == 200: | |
| response_json = response.json() | |
| ai_response = response_json.get("response", "No 'response' key found in the JSON.") | |
| ai_response = ai_response.encode('latin1').decode('utf-8', 'ignore') | |
| return jsonify({"response": ai_response}) | |
| else: | |
| return jsonify({ | |
| "error": "Error from PythonAnywhere", | |
| "details": response.text | |
| }), response.status_code | |
| except Exception as e: | |
| return jsonify({"error": "An error occurred", "details": str(e)}), 500 | |
| def get_payload(): | |
| """ | |
| Get payload from the request body. Requires a valid token. | |
| """ | |
| try: | |
| # Extract token from query parameters | |
| user_token = request.args.get("token", "") | |
| if not user_token: | |
| return jsonify({"error": "No token provided."}), 400 | |
| # Validate the token | |
| is_valid, error_message = is_token_valid(user_token) | |
| if not is_valid: | |
| return jsonify({"error": error_message}), 403 | |
| return jsonify({"payload": "This is a demo payload response."}) | |
| except Exception as e: | |
| return jsonify({"error": "An error occurred", "details": str(e)}), 500 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) |