transcript / app.py
CORVO-AI's picture
Update app.py
00d3596 verified
raw
history blame
7.17 kB
import requests
from flask import Flask, request, jsonify
from datetime import datetime
import json
app = Flask(__name__)
# URLs
SEPTIMIUS_JSON_URL = "https://corvo-ai-septimius.hf.space/read_Septimius_json"
# Brain Inc API Details
BRAININC_API_URL = "https://api.braininc.net/stream/bas-demo-v4/nlp/completions_generation"
BRAININC_API_TOKEN = "token 72ec00483379076f580eb8126f29da802a5140c3" # Replace if needed
def is_token_valid(token):
"""
Check if the token is valid and not expired by reading from Septimius.json.
"""
try:
# Fetch the current Septimius.json data
response = requests.get(SEPTIMIUS_JSON_URL)
if response.status_code == 200:
json_data = response.json()
if json_data.get("success"):
# Check if the token exists and is not expired
for entry in json_data["data"]:
if entry.get("token") == token:
# Parse the expiration date and compare it with the current date
expired_date = entry.get("expired_date")
if expired_date and datetime.fromisoformat(expired_date) > datetime.now():
return True # Token is valid
else:
return False # Token is expired
return False # Token not found
else:
print(f"Error fetching Septimius.json (Status Code: {response.status_code})")
return False
except requests.exceptions.RequestException as e:
print(f"Error connecting to Septimius API: {e}")
return False
def generate_ai_response(message):
"""
Generate AI response using Brain Inc API.
"""
headers = {
"Authorization": f"{BRAININC_API_TOKEN}",
"Content-Type": "application/json;charset=UTF-8",
}
payload = {
"identifier_type": "object_id",
"identifier_value": "4775260",
"pubsub_topic": "/studios/516104/wsuid_4647155_new-edge-112_nodeid_editor-111/textGenStream/1722445975926",
"stream": True,
"variables": {
"Input": f"you are AI Model (ALEX-9) Created By Libyan Team\n\nBoss : MR Omar Nuwara\n\nyour task is chat with user and help him in many tasks and NOTE: you are not from open ai you are from Libyan Team and your name (ALEX-9)\n\nNow Here the chat:{message}",
"User Input":f"you are AI Model (ALEX-9) Created By Libyan Team\n\nBoss : MR Omar Nuwara\n\nyour task is chat with user and help him in many tasks and NOTE: you are not from open ai you are from Libyan Team and your name (ALEX-9)\n\nNow Here the chat:{message} ,
"input":f"you are AI Model (ALEX-9) Created By Libyan Team\n\nBoss : MR Omar Nuwara\n\nyour task is chat with user and help him in many tasks and NOTE: you are not from open ai you are from Libyan Team and your name (ALEX-9)\n\nNow Here the chat:{message} ,
}
}
try:
response = requests.post(BRAININC_API_URL, headers=headers, json=payload)
combined_content = ""
for line in response.text.splitlines():
if line.startswith('data: '):
json_data = line[6:].strip()
try:
data = json.loads(json_data)
delta_content = data.get('choices')[0].get('delta').get('content', '')
combined_content += delta_content
except json.JSONDecodeError:
continue
combined_content = combined_content.replace("\n", "<br>")
return combined_content
except requests.exceptions.RequestException as e:
print(f"Error connecting to Brain Inc API: {e}")
return "Error generating response"
@app.route('/chat', methods=['POST'])
def chat():
try:
# Ensure the request contains JSON and the required keys
request_data = request.json if request.is_json else None
token = request_data.get("token") if request_data else None
message = request_data.get("message") if request_data else None
# Error Handling for Missing Parameters
if not token:
return jsonify({"error": "Token Not Provided", "details": "Please provide a valid token in the request body."}), 400
if not message:
return jsonify({"error": "Message Not Provided", "details": "Please provide a message in the request body."}), 400
# Check if the token is valid
if not is_token_valid(token):
# Enhanced Error Response for Token Validation
if token_expired(token):
return jsonify({"error": "Token Expired", "details": "Your token has expired. Please renew your subscription to continue."}), 403
elif token_not_found(token):
return jsonify({"error": "Token Not Found", "details": "The provided token does not exist. Please check your token or subscribe to the service."}), 403
else:
return jsonify({"error": "Invalid Token", "details": "The provided token is invalid. Please check your token or contact support."}), 403
# Generate AI response using Brain Inc API
ai_response = generate_ai_response(message)
if ai_response == "Error generating response":
return jsonify({"error": "AI Response Generation Failed", "details": "Failed to generate a response from the AI service. Please try again later."}), 500
else:
return jsonify({"response": ai_response})
except requests.exceptions.RequestException as e:
# Catch API Request Exceptions
return jsonify({"error": "API Request Error", "details": str(e)}), 500
except Exception as e:
# Catch Any Unexpected Errors
return jsonify({"error": "An Unexpected Error Occurred", "details": str(e)}), 500
# Helper functions for more specific token error handling
def token_expired(token):
try:
response = requests.get(SEPTIMIUS_JSON_URL)
if response.status_code == 200:
json_data = response.json()
if json_data.get("success"):
for entry in json_data["data"]:
if entry.get("token") == token:
expired_date = entry.get("expired_date")
if expired_date and datetime.fromisoformat(expired_date) <= datetime.now():
return True
except Exception as e:
print(f"Error checking token expiration: {e}")
return False
def token_not_found(token):
try:
response = requests.get(SEPTIMIUS_JSON_URL)
if response.status_code == 200:
json_data = response.json()
if json_data.get("success"):
for entry in json_data["data"]:
if entry.get("token") == token:
return False # Token found
return True # Token not found in the loop
except Exception as e:
print(f"Error checking token existence: {e}")
return True # Assume not found if an error occurs
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)