import requests from flask import Flask, request, jsonify from datetime import datetime import json app = Flask(__name__) # URLs SEPTIMIUS_JSON_URL = "https://corvo-ai-septimius.hf.space/read_Septimius_json" # Brain Inc API Details BRAININC_API_URL = "https://api.braininc.net/stream/bas-demo-v4/nlp/completions_generation" BRAININC_API_TOKEN = "token 72ec00483379076f580eb8126f29da802a5140c3" # Replace if needed def is_token_valid(token): """ Check if the token is valid and not expired by reading from Septimius.json. """ try: # Fetch the current Septimius.json data response = requests.get(SEPTIMIUS_JSON_URL) if response.status_code == 200: json_data = response.json() if json_data.get("success"): # Check if the token exists and is not expired for entry in json_data["data"]: if entry.get("token") == token: # Parse the expiration date and compare it with the current date expired_date = entry.get("expired_date") if expired_date and datetime.fromisoformat(expired_date) > datetime.now(): return True # Token is valid else: return False # Token is expired return False # Token not found else: print(f"Error fetching Septimius.json (Status Code: {response.status_code})") return False except requests.exceptions.RequestException as e: print(f"Error connecting to Septimius API: {e}") return False def generate_ai_response(message): """ Generate AI response using Brain Inc API. """ headers = { "Authorization": f"{BRAININC_API_TOKEN}", "Content-Type": "application/json;charset=UTF-8", } payload = { "identifier_type": "object_id", "identifier_value": "4775260", "pubsub_topic": "/studios/516104/wsuid_4647155_new-edge-112_nodeid_editor-111/textGenStream/1722445975926", "stream": True, "variables": { "Input": f"you are AI Model (ALEX-9) Created By Libyan Team\n\nBoss : MR Omar Nuwara\n\nyour task is chat with user and help him in many tasks and NOTE: you are not from open ai you are from Libyan Team and your name (ALEX-9)\n\nNow Here the chat:{message}", "User Input":f"you are AI Model (ALEX-9) Created By Libyan Team\n\nBoss : MR Omar Nuwara\n\nyour task is chat with user and help him in many tasks and NOTE: you are not from open ai you are from Libyan Team and your name (ALEX-9)\n\nNow Here the chat:{message} , "input":f"you are AI Model (ALEX-9) Created By Libyan Team\n\nBoss : MR Omar Nuwara\n\nyour task is chat with user and help him in many tasks and NOTE: you are not from open ai you are from Libyan Team and your name (ALEX-9)\n\nNow Here the chat:{message} , } } try: response = requests.post(BRAININC_API_URL, headers=headers, json=payload) combined_content = "" for line in response.text.splitlines(): if line.startswith('data: '): json_data = line[6:].strip() try: data = json.loads(json_data) delta_content = data.get('choices')[0].get('delta').get('content', '') combined_content += delta_content except json.JSONDecodeError: continue combined_content = combined_content.replace("\n", "
") return combined_content except requests.exceptions.RequestException as e: print(f"Error connecting to Brain Inc API: {e}") return "Error generating response" @app.route('/chat', methods=['POST']) def chat(): try: # Ensure the request contains JSON and the required keys request_data = request.json if request.is_json else None token = request_data.get("token") if request_data else None message = request_data.get("message") if request_data else None # Error Handling for Missing Parameters if not token: return jsonify({"error": "Token Not Provided", "details": "Please provide a valid token in the request body."}), 400 if not message: return jsonify({"error": "Message Not Provided", "details": "Please provide a message in the request body."}), 400 # Check if the token is valid if not is_token_valid(token): # Enhanced Error Response for Token Validation if token_expired(token): return jsonify({"error": "Token Expired", "details": "Your token has expired. Please renew your subscription to continue."}), 403 elif token_not_found(token): return jsonify({"error": "Token Not Found", "details": "The provided token does not exist. Please check your token or subscribe to the service."}), 403 else: return jsonify({"error": "Invalid Token", "details": "The provided token is invalid. Please check your token or contact support."}), 403 # Generate AI response using Brain Inc API ai_response = generate_ai_response(message) if ai_response == "Error generating response": return jsonify({"error": "AI Response Generation Failed", "details": "Failed to generate a response from the AI service. Please try again later."}), 500 else: return jsonify({"response": ai_response}) except requests.exceptions.RequestException as e: # Catch API Request Exceptions return jsonify({"error": "API Request Error", "details": str(e)}), 500 except Exception as e: # Catch Any Unexpected Errors return jsonify({"error": "An Unexpected Error Occurred", "details": str(e)}), 500 # Helper functions for more specific token error handling def token_expired(token): try: response = requests.get(SEPTIMIUS_JSON_URL) if response.status_code == 200: json_data = response.json() if json_data.get("success"): for entry in json_data["data"]: if entry.get("token") == token: expired_date = entry.get("expired_date") if expired_date and datetime.fromisoformat(expired_date) <= datetime.now(): return True except Exception as e: print(f"Error checking token expiration: {e}") return False def token_not_found(token): try: response = requests.get(SEPTIMIUS_JSON_URL) if response.status_code == 200: json_data = response.json() if json_data.get("success"): for entry in json_data["data"]: if entry.get("token") == token: return False # Token found return True # Token not found in the loop except Exception as e: print(f"Error checking token existence: {e}") return True # Assume not found if an error occurs if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)