Spaces:
Sleeping
Sleeping
File size: 7,172 Bytes
d5ee326 34beca1 d1cd13b 305d870 34beca1 d1cd13b 5a4ffe0 34beca1 5a4ffe0 34beca1 5a4ffe0 34beca1 5a4ffe0 34beca1 5a4ffe0 34beca1 5a4ffe0 d1cd13b 8b98535 00d3596 d1cd13b 34beca1 de2fd30 34beca1 de2fd30 34beca1 de2fd30 34beca1 de2fd30 34beca1 d1cd13b de2fd30 5a4ffe0 de2fd30 34beca1 de2fd30 5a4ffe0 2f62045 305d870 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | import requests
from flask import Flask, request, jsonify
from datetime import datetime
import json
app = Flask(__name__)
# URLs
SEPTIMIUS_JSON_URL = "https://corvo-ai-septimius.hf.space/read_Septimius_json"
# Brain Inc API Details
BRAININC_API_URL = "https://api.braininc.net/stream/bas-demo-v4/nlp/completions_generation"
BRAININC_API_TOKEN = "token 72ec00483379076f580eb8126f29da802a5140c3" # Replace if needed
def is_token_valid(token):
"""
Check if the token is valid and not expired by reading from Septimius.json.
"""
try:
# Fetch the current Septimius.json data
response = requests.get(SEPTIMIUS_JSON_URL)
if response.status_code == 200:
json_data = response.json()
if json_data.get("success"):
# Check if the token exists and is not expired
for entry in json_data["data"]:
if entry.get("token") == token:
# Parse the expiration date and compare it with the current date
expired_date = entry.get("expired_date")
if expired_date and datetime.fromisoformat(expired_date) > datetime.now():
return True # Token is valid
else:
return False # Token is expired
return False # Token not found
else:
print(f"Error fetching Septimius.json (Status Code: {response.status_code})")
return False
except requests.exceptions.RequestException as e:
print(f"Error connecting to Septimius API: {e}")
return False
def generate_ai_response(message):
"""
Generate AI response using Brain Inc API.
"""
headers = {
"Authorization": f"{BRAININC_API_TOKEN}",
"Content-Type": "application/json;charset=UTF-8",
}
payload = {
"identifier_type": "object_id",
"identifier_value": "4775260",
"pubsub_topic": "/studios/516104/wsuid_4647155_new-edge-112_nodeid_editor-111/textGenStream/1722445975926",
"stream": True,
"variables": {
"Input": f"you are AI Model (ALEX-9) Created By Libyan Team\n\nBoss : MR Omar Nuwara\n\nyour task is chat with user and help him in many tasks and NOTE: you are not from open ai you are from Libyan Team and your name (ALEX-9)\n\nNow Here the chat:{message}",
"User Input":f"you are AI Model (ALEX-9) Created By Libyan Team\n\nBoss : MR Omar Nuwara\n\nyour task is chat with user and help him in many tasks and NOTE: you are not from open ai you are from Libyan Team and your name (ALEX-9)\n\nNow Here the chat:{message} ,
"input":f"you are AI Model (ALEX-9) Created By Libyan Team\n\nBoss : MR Omar Nuwara\n\nyour task is chat with user and help him in many tasks and NOTE: you are not from open ai you are from Libyan Team and your name (ALEX-9)\n\nNow Here the chat:{message} ,
}
}
try:
response = requests.post(BRAININC_API_URL, headers=headers, json=payload)
combined_content = ""
for line in response.text.splitlines():
if line.startswith('data: '):
json_data = line[6:].strip()
try:
data = json.loads(json_data)
delta_content = data.get('choices')[0].get('delta').get('content', '')
combined_content += delta_content
except json.JSONDecodeError:
continue
combined_content = combined_content.replace("\n", "<br>")
return combined_content
except requests.exceptions.RequestException as e:
print(f"Error connecting to Brain Inc API: {e}")
return "Error generating response"
@app.route('/chat', methods=['POST'])
def chat():
try:
# Ensure the request contains JSON and the required keys
request_data = request.json if request.is_json else None
token = request_data.get("token") if request_data else None
message = request_data.get("message") if request_data else None
# Error Handling for Missing Parameters
if not token:
return jsonify({"error": "Token Not Provided", "details": "Please provide a valid token in the request body."}), 400
if not message:
return jsonify({"error": "Message Not Provided", "details": "Please provide a message in the request body."}), 400
# Check if the token is valid
if not is_token_valid(token):
# Enhanced Error Response for Token Validation
if token_expired(token):
return jsonify({"error": "Token Expired", "details": "Your token has expired. Please renew your subscription to continue."}), 403
elif token_not_found(token):
return jsonify({"error": "Token Not Found", "details": "The provided token does not exist. Please check your token or subscribe to the service."}), 403
else:
return jsonify({"error": "Invalid Token", "details": "The provided token is invalid. Please check your token or contact support."}), 403
# Generate AI response using Brain Inc API
ai_response = generate_ai_response(message)
if ai_response == "Error generating response":
return jsonify({"error": "AI Response Generation Failed", "details": "Failed to generate a response from the AI service. Please try again later."}), 500
else:
return jsonify({"response": ai_response})
except requests.exceptions.RequestException as e:
# Catch API Request Exceptions
return jsonify({"error": "API Request Error", "details": str(e)}), 500
except Exception as e:
# Catch Any Unexpected Errors
return jsonify({"error": "An Unexpected Error Occurred", "details": str(e)}), 500
# Helper functions for more specific token error handling
def token_expired(token):
try:
response = requests.get(SEPTIMIUS_JSON_URL)
if response.status_code == 200:
json_data = response.json()
if json_data.get("success"):
for entry in json_data["data"]:
if entry.get("token") == token:
expired_date = entry.get("expired_date")
if expired_date and datetime.fromisoformat(expired_date) <= datetime.now():
return True
except Exception as e:
print(f"Error checking token expiration: {e}")
return False
def token_not_found(token):
try:
response = requests.get(SEPTIMIUS_JSON_URL)
if response.status_code == 200:
json_data = response.json()
if json_data.get("success"):
for entry in json_data["data"]:
if entry.get("token") == token:
return False # Token found
return True # Token not found in the loop
except Exception as e:
print(f"Error checking token existence: {e}")
return True # Assume not found if an error occurs
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860) |