Spaces:
Sleeping
Sleeping
Scott Hiett
commited on
Commit
·
bb41a4c
1
Parent(s):
691cebf
Better error handling for failure to connect to Redis server
Browse files- lib/srh/auth/token_resolver.ex +20 -18
- lib/srh/http/base_router.ex +8 -2
- lib/srh/http/command_handler.ex +47 -29
- lib/srh/redis/client_worker.ex +1 -1
lib/srh/auth/token_resolver.ex
CHANGED
|
@@ -70,11 +70,13 @@ defmodule Srh.Auth.TokenResolver do
|
|
| 70 |
{srh_max_connections, ""} = Integer.parse(System.get_env("SRH_MAX_CONNECTIONS", "3"))
|
| 71 |
|
| 72 |
# Create a config-file-like structure that the ETS layout expects, with just one entry
|
| 73 |
-
config_file_data =
|
| 74 |
-
|
| 75 |
-
|
| 76 |
-
|
| 77 |
-
|
|
|
|
|
|
|
| 78 |
|
| 79 |
IO.puts("Loaded config from env. #{map_size(config_file_data)} entries.")
|
| 80 |
# Load this into ETS
|
|
@@ -98,17 +100,17 @@ defmodule Srh.Auth.TokenResolver do
|
|
| 98 |
# The env strategy uses the same ETS table as the file strategy, so we can fall back on that
|
| 99 |
defp do_resolve("env", token), do: do_resolve("file", token)
|
| 100 |
|
| 101 |
-
defp do_resolve("redis", _token) do
|
| 102 |
-
{
|
| 103 |
-
:ok,
|
| 104 |
-
# This is done to replicate what will eventually be API endpoints, so they keys are not atoms
|
| 105 |
-
Jason.decode!(
|
| 106 |
-
Jason.encode!(%{
|
| 107 |
-
srh_id: "1000",
|
| 108 |
-
connection_string: "redis://localhost:6379",
|
| 109 |
-
max_connections: 10
|
| 110 |
-
})
|
| 111 |
-
)
|
| 112 |
-
}
|
| 113 |
-
end
|
| 114 |
end
|
|
|
|
| 70 |
{srh_max_connections, ""} = Integer.parse(System.get_env("SRH_MAX_CONNECTIONS", "3"))
|
| 71 |
|
| 72 |
# Create a config-file-like structure that the ETS layout expects, with just one entry
|
| 73 |
+
config_file_data =
|
| 74 |
+
Map.put(%{}, srh_token, %{
|
| 75 |
+
# Jason.parse! expects these keys to be strings, not atoms, so we need to replicate that setup
|
| 76 |
+
"srh_id" => "env_config_connection",
|
| 77 |
+
"connection_string" => srh_connection_string,
|
| 78 |
+
"max_connections" => srh_max_connections
|
| 79 |
+
})
|
| 80 |
|
| 81 |
IO.puts("Loaded config from env. #{map_size(config_file_data)} entries.")
|
| 82 |
# Load this into ETS
|
|
|
|
| 100 |
# The env strategy uses the same ETS table as the file strategy, so we can fall back on that
|
| 101 |
defp do_resolve("env", token), do: do_resolve("file", token)
|
| 102 |
|
| 103 |
+
# defp do_resolve("redis", _token) do
|
| 104 |
+
# {
|
| 105 |
+
# :ok,
|
| 106 |
+
# # This is done to replicate what will eventually be API endpoints, so they keys are not atoms
|
| 107 |
+
# Jason.decode!(
|
| 108 |
+
# Jason.encode!(%{
|
| 109 |
+
# srh_id: "1000",
|
| 110 |
+
# connection_string: "redis://localhost:6379",
|
| 111 |
+
# max_connections: 10
|
| 112 |
+
# })
|
| 113 |
+
# )
|
| 114 |
+
# }
|
| 115 |
+
# end
|
| 116 |
end
|
lib/srh/http/base_router.ex
CHANGED
|
@@ -54,7 +54,8 @@ defmodule Srh.Http.BaseRouter do
|
|
| 54 |
|> get_req_header("upstash-encoding")
|
| 55 |
|> RequestValidator.validate_encoding_header() do
|
| 56 |
{:ok, _encoding_enabled} -> true
|
| 57 |
-
|
|
|
|
| 58 |
end
|
| 59 |
end
|
| 60 |
|
|
@@ -63,7 +64,9 @@ defmodule Srh.Http.BaseRouter do
|
|
| 63 |
true ->
|
| 64 |
# We need to use the encoder to
|
| 65 |
ResultEncoder.encode_response(response)
|
| 66 |
-
|
|
|
|
|
|
|
| 67 |
end
|
| 68 |
end
|
| 69 |
|
|
@@ -85,6 +88,9 @@ defmodule Srh.Http.BaseRouter do
|
|
| 85 |
{:not_authorized, message} ->
|
| 86 |
%{code: 401, message: message, json: false}
|
| 87 |
|
|
|
|
|
|
|
|
|
|
| 88 |
{:server_error, _} ->
|
| 89 |
%{code: 500, message: "An error occurred internally", json: false}
|
| 90 |
|
|
|
|
| 54 |
|> get_req_header("upstash-encoding")
|
| 55 |
|> RequestValidator.validate_encoding_header() do
|
| 56 |
{:ok, _encoding_enabled} -> true
|
| 57 |
+
# it's not required to be present
|
| 58 |
+
{:error, _} -> false
|
| 59 |
end
|
| 60 |
end
|
| 61 |
|
|
|
|
| 64 |
true ->
|
| 65 |
# We need to use the encoder to
|
| 66 |
ResultEncoder.encode_response(response)
|
| 67 |
+
|
| 68 |
+
false ->
|
| 69 |
+
response
|
| 70 |
end
|
| 71 |
end
|
| 72 |
|
|
|
|
| 88 |
{:not_authorized, message} ->
|
| 89 |
%{code: 401, message: message, json: false}
|
| 90 |
|
| 91 |
+
{:connection_error, message} ->
|
| 92 |
+
%{code: 500, message: message, json: false}
|
| 93 |
+
|
| 94 |
{:server_error, _} ->
|
| 95 |
%{code: 500, message: "An error occurred internally", json: false}
|
| 96 |
|
lib/srh/http/command_handler.ex
CHANGED
|
@@ -99,39 +99,43 @@ defmodule Srh.Http.CommandHandler do
|
|
| 99 |
do_dispatch_command_transaction_array(wrapped_command_array, worker_pid, responses)
|
| 100 |
|
| 101 |
# Now manually run the EXEC - this is what contains the information to form the response, not the above
|
| 102 |
-
result =
|
| 103 |
-
|
| 104 |
-
{
|
| 105 |
-
|
| 106 |
-
|
| 107 |
-
|
| 108 |
-
|
| 109 |
-
|
| 110 |
|
| 111 |
-
|
| 112 |
-
|
| 113 |
-
|
| 114 |
|
| 115 |
Client.return_worker(client_pid, worker_pid)
|
| 116 |
|
| 117 |
result
|
|
|
|
| 118 |
{:error, msg} ->
|
| 119 |
{:server_error, msg}
|
| 120 |
end
|
| 121 |
end
|
| 122 |
|
| 123 |
-
defp do_dispatch_command_transaction_array([current | rest], worker_pid, responses)
|
| 124 |
-
|
| 125 |
-
|
| 126 |
-
|
| 127 |
-
|
| 128 |
-
|
| 129 |
-
|
| 130 |
-
|
| 131 |
-
|
| 132 |
-
|
| 133 |
-
|
| 134 |
-
|
|
|
|
|
|
|
|
|
|
| 135 |
|
| 136 |
do_dispatch_command_transaction_array(rest, worker_pid, updated_responses)
|
| 137 |
end
|
|
@@ -154,16 +158,30 @@ defmodule Srh.Http.CommandHandler do
|
|
| 154 |
{:ok, %{result: res}}
|
| 155 |
|
| 156 |
{:error, error} ->
|
| 157 |
-
|
| 158 |
-
:redis_error,
|
| 159 |
-
%{
|
| 160 |
-
error: error.message
|
| 161 |
-
}
|
| 162 |
-
}
|
| 163 |
end
|
| 164 |
|
| 165 |
{:error, msg} ->
|
| 166 |
{:server_error, msg}
|
| 167 |
end
|
| 168 |
end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 169 |
end
|
|
|
|
| 99 |
do_dispatch_command_transaction_array(wrapped_command_array, worker_pid, responses)
|
| 100 |
|
| 101 |
# Now manually run the EXEC - this is what contains the information to form the response, not the above
|
| 102 |
+
result =
|
| 103 |
+
case ClientWorker.redis_command(worker_pid, ["EXEC"]) do
|
| 104 |
+
{:ok, res} ->
|
| 105 |
+
{
|
| 106 |
+
:ok,
|
| 107 |
+
res
|
| 108 |
+
|> Enum.map(&%{result: &1})
|
| 109 |
+
}
|
| 110 |
|
| 111 |
+
{:error, error} ->
|
| 112 |
+
decode_error(error)
|
| 113 |
+
end
|
| 114 |
|
| 115 |
Client.return_worker(client_pid, worker_pid)
|
| 116 |
|
| 117 |
result
|
| 118 |
+
|
| 119 |
{:error, msg} ->
|
| 120 |
{:server_error, msg}
|
| 121 |
end
|
| 122 |
end
|
| 123 |
|
| 124 |
+
defp do_dispatch_command_transaction_array([current | rest], worker_pid, responses)
|
| 125 |
+
when is_pid(worker_pid) do
|
| 126 |
+
updated_responses =
|
| 127 |
+
case ClientWorker.redis_command(worker_pid, current) do
|
| 128 |
+
{:ok, res} ->
|
| 129 |
+
[%{result: res} | responses]
|
| 130 |
+
|
| 131 |
+
{:error, error} ->
|
| 132 |
+
[
|
| 133 |
+
%{
|
| 134 |
+
error: error.message
|
| 135 |
+
}
|
| 136 |
+
| responses
|
| 137 |
+
]
|
| 138 |
+
end
|
| 139 |
|
| 140 |
do_dispatch_command_transaction_array(rest, worker_pid, updated_responses)
|
| 141 |
end
|
|
|
|
| 158 |
{:ok, %{result: res}}
|
| 159 |
|
| 160 |
{:error, error} ->
|
| 161 |
+
decode_error(error)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 162 |
end
|
| 163 |
|
| 164 |
{:error, msg} ->
|
| 165 |
{:server_error, msg}
|
| 166 |
end
|
| 167 |
end
|
| 168 |
+
|
| 169 |
+
# Figure out if it's an actual Redis error or a Redix error
|
| 170 |
+
defp decode_error(error) do
|
| 171 |
+
case error do
|
| 172 |
+
%{reason: :closed} ->
|
| 173 |
+
{
|
| 174 |
+
:connection_error,
|
| 175 |
+
"Unable to connect to the Redis server"
|
| 176 |
+
}
|
| 177 |
+
|
| 178 |
+
_ ->
|
| 179 |
+
{
|
| 180 |
+
:redis_error,
|
| 181 |
+
%{
|
| 182 |
+
error: error.message
|
| 183 |
+
}
|
| 184 |
+
}
|
| 185 |
+
end
|
| 186 |
+
end
|
| 187 |
end
|
lib/srh/redis/client_worker.ex
CHANGED
|
@@ -52,7 +52,6 @@ defmodule Srh.Redis.ClientWorker do
|
|
| 52 |
{:noreply, state}
|
| 53 |
end
|
| 54 |
|
| 55 |
-
# TODO: Handle host / port connections
|
| 56 |
def handle_info(
|
| 57 |
:create_connection,
|
| 58 |
%{
|
|
@@ -62,6 +61,7 @@ defmodule Srh.Redis.ClientWorker do
|
|
| 62 |
} = state
|
| 63 |
)
|
| 64 |
when is_binary(connection_string) do
|
|
|
|
| 65 |
{:ok, pid} = Redix.start_link(connection_string)
|
| 66 |
{:noreply, %{state | redix_pid: pid}}
|
| 67 |
end
|
|
|
|
| 52 |
{:noreply, state}
|
| 53 |
end
|
| 54 |
|
|
|
|
| 55 |
def handle_info(
|
| 56 |
:create_connection,
|
| 57 |
%{
|
|
|
|
| 61 |
} = state
|
| 62 |
)
|
| 63 |
when is_binary(connection_string) do
|
| 64 |
+
# Will cause a crash for this genserver if the connection fails
|
| 65 |
{:ok, pid} = Redix.start_link(connection_string)
|
| 66 |
{:noreply, %{state | redix_pid: pid}}
|
| 67 |
end
|