Spaces:
Paused
Paused
| # | |
| # SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| import json # Import json module to work with JSON objects for request and response handling | |
| import uuid # Import uuid module to generate unique identifiers if needed for tracking or sessions | |
| from typing import List, Dict # Import type hinting for function parameters to improve code clarity and checking | |
| from config import * # Import all configuration variables such as server lists and tokens from config files | |
| from src.utils.session_mapping import get_host # Import helper function to map session ID to appropriate server host | |
| from src.utils.ip_generator import generate_ip # Import utility function to generate random IP addresses for headers | |
| from src.utils.helper import mark # Import function to mark servers as failed for retry or logging purposes | |
| import asyncio # Import asyncio module to enable asynchronous programming constructs in the function | |
| from src.utils.time import get_time # Import function to get current date and time in required format | |
| from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close # Import functions to wrap reasoning text with tags | |
| from src.utils.instruction import set_instructions # Import function to generate system instructions based on mode and time | |
| from src.core.transport.httpx import httpx_transport # Import primary HTTP transport method using httpx for streaming | |
| from src.core.transport.aiohttp import aiohttp_transport # Import fallback HTTP transport method using aiohttp | |
| import httpx # Import httpx module for exception handling of HTTP status error | |
| import aiohttp # Import aiohttp module for exception handling of client response error | |
| # Define the main asynchronous function to communicate with AI server and stream responses | |
| async def jarvis( | |
| session_id: str, # Unique identifier for the user session to route requests correctly | |
| model: str, # AI model name or identifier to specify which model to use for generation | |
| history: List[Dict[str, str]], # List of previous conversation messages to maintain context | |
| user_message: str, # The latest message input from the user to send to the AI model | |
| mode: str, # Mode string controlling behavior such as enabling or disabling reasoning output | |
| files=None, # Optional parameter for any files attached by the user to include in the request | |
| temperature: float = 0.6, # Sampling temperature controlling randomness of AI responses | |
| top_k: int = 20, # Limits token selection to top-k probable tokens for response generation | |
| min_p: float = 0, # Minimum probability threshold for token sampling to filter unlikely tokens | |
| top_p: float = 0.95, # Cumulative probability cutoff for nucleus sampling of tokens | |
| repetition_penalty: float = 1, # Parameter to penalize repeated tokens to reduce repetition in output | |
| ): | |
| """ | |
| Stream AI response from multiple configured servers using asynchronous HTTP requests | |
| Yields chunks of response that include reasoning and content parts as they arrive | |
| """ | |
| # Initialize a set to keep track of servers that have already been attempted | |
| tried = set() # Prevents retrying the same server multiple times to avoid redundant requests | |
| # Loop until a server successfully returns a response or all servers have been exhausted | |
| while len(tried) < len(auth): # Continue trying servers until all configured servers are tried | |
| # Retrieve server configuration details mapped to the current session | |
| setup = get_host(session_id) # Get server host, token, and error codes for the session | |
| server = setup["jarvis"] # Extract server name identifier for logging and marking | |
| host = setup["endpoint"] # Extract server endpoint URL for sending requests | |
| token = setup["token"] # Extract authentication token for authorized access | |
| error = setup["error"] # Extract HTTP status code that indicates retryable error | |
| tried.add(server) # Add current server to tried set to avoid retrying it again | |
| # Get the current date and time for system instruction | |
| date = get_time() # Retrieve current timestamp to include in system instructions | |
| # Generate system instructions | |
| instructions = set_instructions(mode, date) # Create system instructions guiding AI behavior | |
| # Make a shallow copy of the conversation history to avoid mutating original list | |
| messages = history.copy() # Duplicate previous messages to safely modify for this request | |
| # Insert the system instruction message at the beginning of the message list | |
| messages.insert(0, {"role": "system", "content": instructions}) # Add system instructions as first message | |
| # Construct the user message dictionary with role and content | |
| msg = {"role": "user", "content": user_message} # Prepare user's latest input for the request | |
| if files: # Check if any files are attached to include in the message payload | |
| msg["files"] = files # Attach files to the user message to send alongside text input | |
| messages.append(msg) # Append the user message (with optional files) to the message history | |
| # Prepare HTTP headers including authorization and content type for the request | |
| headers = { | |
| "Authorization": f"Bearer {token}", # Bearer token for authenticating with the AI server | |
| "Content-Type": "application/json", # Specify that the request body is JSON formatted | |
| "X-Forwarded-For": generate_ip(), # Randomly generated IP address to simulate client origin | |
| } | |
| # Build the JSON payload containing model, messages, and generation parameters | |
| payload = { | |
| "model": model, # Specify which AI model to use for generating responses | |
| "messages": messages, # Provide the full message history including system and user inputs | |
| "stream": True, # Enable streaming mode to receive partial response chunks progressively | |
| "temperature": temperature, # Control randomness in token sampling for response diversity | |
| "top_k": top_k, # Restrict token selection to top-k most probable tokens | |
| "min_p": min_p, # Set minimum probability threshold to filter out unlikely tokens | |
| "top_p": top_p, # Use nucleus sampling with cumulative probability cutoff | |
| "repetition_penalty": repetition_penalty, # Penalize repeated tokens to reduce redundancy | |
| } | |
| # Attempt to stream the response using the primary HTTP transport method (httpx) | |
| try: | |
| # Iterate over each chunk of data yielded by the asynchronous httpx_transport function | |
| async for chunk in httpx_transport(host, headers, payload, mode): # Stream response chunks asynchronously | |
| # Skip any None chunks to prevent errors | |
| if chunk is None: | |
| continue # Skip this iteration and wait for the next chunk without processing | |
| # If the chunk is a dictionary but does not have a "content" key | |
| if isinstance(chunk, dict) and "content" not in chunk: | |
| chunk["content"] = "" # Add an empty "content" key to maintain a consistent data structure for downstream processing | |
| yield chunk # Yield each chunk to the caller as it arrives for real-time processing | |
| return # Exit the function if streaming completes successfully without errors | |
| # Handle HTTP errors with status codes that indicate retryable failures | |
| except httpx.HTTPStatusError as e: # Catch HTTP errors specific to httpx transport | |
| if e.response.status_code == error: # If error code matches retryable error, try next server | |
| continue # Skip current server and proceed to next iteration to retry | |
| else: | |
| mark(server) # Mark the current server as failed for non-retryable errors | |
| # Handle any other unexpected exceptions during httpx transport | |
| except Exception: # Catch all other exceptions to prevent crashing | |
| mark(server) # Mark server as failed due to unexpected error | |
| # If the primary transport fails, attempt to stream response using fallback transport (aiohttp) | |
| try: | |
| # Asynchronously iterate over chunks yielded by aiohttp_transport | |
| async for chunk in aiohttp_transport(host, headers, payload, mode): # Use fallback streaming method | |
| # Skip any None chunks to prevent errors | |
| if chunk is None: | |
| continue # Skip this iteration and wait for the next chunk without processing | |
| # If the chunk is a dictionary but does not have a "content" key | |
| if isinstance(chunk, dict) and "content" not in chunk: | |
| chunk["content"] = "" # Add an empty "content" key to maintain a consistent data structure for downstream processing | |
| yield chunk # Yield streamed chunks to caller as they arrive | |
| return # Exit if fallback transport succeeds | |
| # Handle aiohttp-specific response errors with retryable status codes | |
| except aiohttp.ClientResponseError as e: # Catch HTTP response errors from aiohttp transport | |
| if e.status == error: # Retry on matching error code by trying next server | |
| continue # Continue to next server attempt | |
| else: | |
| mark(server) # Mark server as failed for non-retryable errors | |
| # Handle any other exceptions during aiohttp transport | |
| except Exception: # Catch generic exceptions to avoid crashing | |
| mark(server) # Mark fallback server as failed | |
| # If all servers have been tried and failed, raise an exception | |
| raise RuntimeError() # End the function after exhausting all servers |