Spaces:
Sleeping
Sleeping
| defmodule Srh.Http.CommandHandler do | |
| alias Srh.Http.RequestValidator | |
| alias Srh.Auth.TokenResolver | |
| alias Srh.Redis.Client | |
| alias Srh.Redis.ClientWorker | |
| def handle_command(conn, token) do | |
| case RequestValidator.validate_redis_body(conn.body_params) do | |
| {:ok, command_array} -> | |
| do_handle_command(command_array, token) | |
| {:error, error_message} -> | |
| {:malformed_data, error_message} | |
| end | |
| end | |
| def handle_command_array(conn, token) do | |
| case RequestValidator.validate_pipeline_redis_body(conn.body_params) do | |
| {:ok, array_of_command_arrays} -> | |
| do_handle_command_array(array_of_command_arrays, token) | |
| {:error, error_message} -> | |
| {:malformed_data, error_message} | |
| end | |
| end | |
| def handle_command_transaction_array(conn, token) do | |
| # Transactions use the same body format as pipelines, so we can use the same validator | |
| case RequestValidator.validate_pipeline_redis_body(conn.body_params) do | |
| {:ok, array_of_command_arrays} -> | |
| do_handle_command_transaction_array(array_of_command_arrays, token) | |
| {:error, error_message} -> | |
| {:malformed_data, error_message} | |
| end | |
| end | |
| defp do_handle_command(command_array, token) do | |
| case TokenResolver.resolve(token) do | |
| {:ok, connection_info} -> | |
| dispatch_command(command_array, connection_info) | |
| {:error, msg} -> | |
| {:not_authorized, msg} | |
| end | |
| end | |
| defp do_handle_command_array(array_of_command_arrays, token) do | |
| case TokenResolver.resolve(token) do | |
| {:ok, connection_info} -> | |
| dispatch_command_array(array_of_command_arrays, connection_info) | |
| {:error, msg} -> | |
| {:not_authorized, msg} | |
| end | |
| end | |
| defp do_handle_command_transaction_array(array_of_command_arrays, token) do | |
| case TokenResolver.resolve(token) do | |
| {:ok, connection_info} -> | |
| dispatch_command_transaction_array(array_of_command_arrays, connection_info) | |
| {:error, msg} -> | |
| {:not_authorized, msg} | |
| end | |
| end | |
| defp dispatch_command_array(_arr, _connection_info, responses \\ []) | |
| defp dispatch_command_array([current | rest], connection_info, responses) do | |
| updated_responses = | |
| case dispatch_command(current, connection_info) do | |
| {:ok, result_map} -> | |
| [result_map | responses] | |
| {:connection_error, result} -> | |
| {:connection_error, result} | |
| {:redis_error, result} -> | |
| [result | responses] | |
| end | |
| case updated_responses do | |
| {:connection_error, result} -> | |
| {:connection_error, result} | |
| _ -> | |
| dispatch_command_array(rest, connection_info, updated_responses) | |
| end | |
| end | |
| defp dispatch_command_array([], _connection_info, responses) do | |
| # The responses will be in reverse order, as we're adding them to the list with the faster method of putting them at head. | |
| {:ok, Enum.reverse(responses)} | |
| end | |
| defp dispatch_command_transaction_array( | |
| command_array, | |
| %{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info, | |
| responses \\ [] | |
| ) do | |
| case GenRegistry.lookup_or_start(Client, srh_id, [max_connections, connection_info]) do | |
| {:ok, client_pid} -> | |
| # Borrow a client, then run all of the commands (wrapped in MULTI and EXEC) | |
| worker_pid = Client.borrow_worker(client_pid) | |
| # We are manually going to invoke the MULTI, because there might be a connection error to the Redis server. | |
| # In that case, we don't want the error to be wound up in the array of errors, | |
| # we instead want to return the error immediately. | |
| case ClientWorker.redis_command(worker_pid, ["MULTI"]) do | |
| {:ok, _} -> | |
| do_dispatch_command_transaction_array(command_array, worker_pid, responses) | |
| # Now manually run the EXEC - this is what contains the information to form the response, not the above | |
| result = | |
| case ClientWorker.redis_command(worker_pid, ["EXEC"]) do | |
| {:ok, res} -> | |
| { | |
| :ok, | |
| res | |
| |> Enum.map(&%{result: &1}) | |
| } | |
| {:error, error} -> | |
| decode_error(error, srh_id) | |
| end | |
| Client.return_worker(client_pid, worker_pid) | |
| # Fire back the result here, because the initial Multi was successful | |
| result | |
| {:error, %{reason: :closed} = error} -> | |
| # Ensure that this pool is killed, but still pass the error up the chain for the response | |
| Client.destroy_workers(client_pid) | |
| decode_error(error, srh_id) | |
| {:error, error} -> | |
| decode_error(error, srh_id) | |
| end | |
| {:error, msg} -> | |
| {:server_error, msg} | |
| end | |
| end | |
| defp do_dispatch_command_transaction_array([current | rest], worker_pid, responses) | |
| when is_pid(worker_pid) do | |
| updated_responses = | |
| case ClientWorker.redis_command(worker_pid, current) do | |
| {:ok, res} -> | |
| [%{result: res} | responses] | |
| {:error, error} -> | |
| [ | |
| %{ | |
| error: error.message | |
| } | |
| | responses | |
| ] | |
| end | |
| do_dispatch_command_transaction_array(rest, worker_pid, updated_responses) | |
| end | |
| defp do_dispatch_command_transaction_array([], worker_pid, responses) when is_pid(worker_pid) do | |
| {:ok, Enum.reverse(responses)} | |
| end | |
| defp dispatch_command( | |
| command_array, | |
| %{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info | |
| ) | |
| when is_number(max_connections) do | |
| case GenRegistry.lookup_or_start(Client, srh_id, [max_connections, connection_info]) do | |
| {:ok, pid} -> | |
| # Run the command | |
| case Client.find_worker(pid) | |
| |> ClientWorker.redis_command(command_array) do | |
| {:ok, res} -> | |
| {:ok, %{result: res}} | |
| # Jedix connection error | |
| {:error, %{reason: :closed} = error} -> | |
| # Ensure that this pool is killed, but still pass the error up the chain for the response | |
| Client.destroy_workers(pid) | |
| decode_error(error, srh_id) | |
| {:error, error} -> | |
| decode_error(error, srh_id) | |
| end | |
| {:error, msg} -> | |
| {:server_error, msg} | |
| end | |
| end | |
| # Figure out if it's an actual Redis error or a Redix error | |
| defp decode_error(error, srh_id) do | |
| case error do | |
| %{reason: :closed} -> | |
| IO.puts( | |
| "WARNING: SRH was unable to connect to the Redis server. Please make sure it is running, and the connection information is correct. SRH ID: #{srh_id}" | |
| ) | |
| { | |
| :connection_error, | |
| "SRH: Unable to connect to the Redis server. See SRH logs for more information." | |
| } | |
| _ -> | |
| { | |
| :redis_error, | |
| %{ | |
| error: error.message | |
| } | |
| } | |
| end | |
| end | |
| end | |