transcript / app.py
CORVO-AI's picture
Update app.py
2f62045 verified
raw
history blame
4.08 kB
import requests
from flask import Flask, request, jsonify
from datetime import datetime
import base64
import json
app = Flask(__name__)
# PythonAnywhere API endpoint
PYTHONANYWHERE_URL = "https://omarnuwara.pythonanywhere.com/get-response"
# GitHub API and file information
GITHUB_TOKEN = "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" # Store this in .env in production
REPO_OWNER = "omarnuwrar"
REPO_NAME = "api"
FILE_PATH = "user.json"
def fetch_user_data():
"""Fetch user data from GitHub repo."""
url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/contents/{FILE_PATH}"
headers = {
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
content = response.json()
file_content = content["content"]
file_decoded = base64.b64decode(file_content).decode("utf-8")
return json.loads(file_decoded)
else:
print(f"Failed to fetch file: {response.status_code}")
print(response.json())
return None
def is_token_valid(token):
"""
Check if the provided token is valid and unexpired.
"""
user_data = fetch_user_data()
if not user_data:
return False, "Unable to fetch user data."
for user in user_data:
if user["token"] == token:
# Parse token expiration time
expiration_time = datetime.fromisoformat(user["token_expiration_time"])
if datetime.now() <= expiration_time:
return True, None
return False, "Your token has expired. Please subscribe to continue."
return False, "Invalid token. Please log in or create an account."
@app.route('/chat', methods=['POST'])
def chat():
"""
Chat endpoint. Requires a valid token.
"""
try:
# Ensure the request contains JSON
if not request.is_json:
return jsonify({"error": "Invalid request format. JSON expected."}), 400
# Extract message and token from the request body
user_input = request.json.get("message", "")
user_token = request.json.get("token", "")
if not user_input:
return jsonify({"error": "No message provided."}), 400
if not user_token:
return jsonify({"error": "No token provided."}), 400
# Validate the token
is_valid, error_message = is_token_valid(user_token)
if not is_valid:
return jsonify({"error": error_message}), 403
# Forward the request to PythonAnywhere
data = {"message": user_input}
response = requests.post(PYTHONANYWHERE_URL, json=data)
if response.status_code == 200:
response_json = response.json()
ai_response = response_json.get("response", "No 'response' key found in the JSON.")
ai_response = ai_response.encode('latin1').decode('utf-8', 'ignore')
return jsonify({"response": ai_response})
else:
return jsonify({
"error": "Error from PythonAnywhere",
"details": response.text
}), response.status_code
except Exception as e:
return jsonify({"error": "An error occurred", "details": str(e)}), 500
@app.route('/get-payload', methods=['GET'])
def get_payload():
"""
Get payload from the request body. Requires a valid token.
"""
try:
# Extract token from query parameters
user_token = request.args.get("token", "")
if not user_token:
return jsonify({"error": "No token provided."}), 400
# Validate the token
is_valid, error_message = is_token_valid(user_token)
if not is_valid:
return jsonify({"error": error_message}), 403
return jsonify({"payload": "This is a demo payload response."})
except Exception as e:
return jsonify({"error": "An error occurred", "details": str(e)}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)