Spaces:
Paused
Paused
| import requests | |
| from flask import Flask, request, jsonify | |
| from datetime import datetime | |
| app = Flask(__name__) | |
| # PythonAnywhere API endpoint | |
| PYTHONANYWHERE_URL = "https://omarnuwara.pythonanywhere.com/get-response" | |
| # GitHub API and file information (for token validation) | |
| GITHUB_TOKEN = "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" # Store securely in .env in production | |
| REPO_OWNER = "omarnuwrar" | |
| REPO_NAME = "api" | |
| FILE_PATH = "user.json" | |
| # Helper function to fetch user data from GitHub | |
| def fetch_user_data(): | |
| url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/contents/{FILE_PATH}" | |
| headers = { | |
| "Authorization": f"token {GITHUB_TOKEN}", | |
| "Accept": "application/vnd.github.v3+json", | |
| } | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| content = response.json() | |
| file_content = content["content"] | |
| file_decoded = base64.b64decode(file_content).decode("utf-8") | |
| return json.loads(file_decoded) | |
| else: | |
| return None | |
| # Check if a given token is valid and unexpired | |
| def validate_token(token): | |
| user_data = fetch_user_data() | |
| if not user_data: | |
| return False, "Unable to fetch user data." | |
| for user in user_data: | |
| if user["token"] == token: | |
| token_expiration_time = datetime.fromisoformat(user["token_expiration_time"]) | |
| if datetime.now() <= token_expiration_time: | |
| return True, f"Valid token for {user['first_name']} {user['last_name']}." | |
| else: | |
| return False, "Token expired. Please subscribe to continue." | |
| return False, "Invalid token." | |
| def chat(): | |
| try: | |
| # Ensure the request contains JSON, a "message" key, and a "token" key | |
| if not request.is_json: | |
| return jsonify({"error": "Request must contain JSON."}), 400 | |
| data = request.json | |
| user_input = data.get("message") | |
| token = data.get("token") | |
| if not user_input or not token: | |
| return jsonify({"error": "Both 'message' and 'token' are required."}), 400 | |
| # Validate the token | |
| is_valid, validation_message = validate_token(token) | |
| if not is_valid: | |
| return jsonify({"error": validation_message}), 401 | |
| # Forward the request to PythonAnywhere if the token is valid | |
| payload = {"message": user_input} | |
| response = requests.post(PYTHONANYWHERE_URL, json=payload) | |
| if response.status_code == 200: | |
| ai_response = response.json().get("response", "No response available.") | |
| return jsonify({"response": ai_response}), 200 | |
| else: | |
| return jsonify({ | |
| "error": "Error from PythonAnywhere", | |
| "details": response.text | |
| }), response.status_code | |
| except Exception as e: | |
| # Catch unexpected errors | |
| return jsonify({"error": "An error occurred", "details": str(e)}), 500 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) |