Skip to content

Commit

Permalink
Summary of changes in this PR:
Browse files Browse the repository at this point in the history
  * Added a new global `set_connection_limit!` function for controlling the global connection limit that will be applied to all requests
    This is one way to resolve #1033. I added a deprecation warning when passing `connect_limit` to individual requests. So usage would be:
    calling `HTTP.set_connection_limit!` and any time this is called, it changes the global value.
  * Add a try-finally in keepalive! around our global IO lock usage just for good house-keeping
  * Refactored `try_with_timeout` to use a `Channel` instead of the non-threaded `@async`; it's much simpler, seems cleaner,
    and allows us to avoid the usage of `@async` when not needed. Note that I included a change in StreamRequest.jl however that wraps
    all the actual write/read IO operations in a `fetch(@async dostuff())` because this will currently prevent code in this task from
    migrating across threads, which is important for OpenSSL usage where error handling is done per-thread. I don't love the solution,
    but it seems ok for now.
  * I refactored a few of the stream IO functions so that we always know the number of bytes downloaded, whether in memory or written to
    an IO, so we can log them and use them in verbose logging to give bit-rate calculations
  * Ok, the big one: I rewrote the internal implementation of ConnectionPool.ConnectionPools.Pod `acquire`/`release` functions; under really
    heavy workloads, there was a ton of contention on the Pod lock. I also observed at least one "hang" where GDB backtraces seemed to indicate
    that somehow a task failed/died/hung while trying to make a new connection _while holding the Pod lock_, which then meant that no other
    requests could ever make progress. The new implementation includes a lock-free "fastpath" where an existing connection that can be re-used
    doesn't require any lock-taking. It uses a lock-free concurrent Stack implementation copied from JuliaConcurrent/ConcurrentCollections.jl (
    doesn't seem actively maintained and it's not much code, so just copied). The rest of the `acquire`/`release` code is now modeled after
    Base.Event in how releasing always acquires the lock and slow-path acquires also take the lock to ensure fairness and no deadlocks.
    I've included some benchmark results on a variety of heavy workloads [here](https://everlasting-mahogany-a5f.notion.site/Issue-heavy-load-perf-degradation-1cd275c75037481a9cd6378b8303cfb3)
    that show some great improvements, a bulk of which are attributable to reducing contention when acquiring/releasing connections during requests.
    The other key change included in this rewrite is that we ensure we _do not_ hold any locks while _making new connections_ to avoid the
    possibility of the lock ever getting "stuck", and because it's not necessary: the pod is in charge of just keeping track of numbers and
    doesn't need to worry about whether the connection was actually made yet or not (if it fails, it will be immediately released back and retried).
    Overall, the code is also _much_ simpler, which I think is a huge win, because the old code was always pretty scary to have to dig into.
  * Added a new `logerrors::Bool=false` keyword arg that allows doing `@error` logs on errors that may otherwise be "swallowed" when doing retries;
    it can be helpful to sometimes be able to at least see what kinds of errors are happening
  * Added lots of metrics around various time spent in various layers, read vs. write durations, etc.
  • Loading branch information
quinnj committed Apr 19, 2023
1 parent 75c1b25 commit 51e7160
Show file tree
Hide file tree
Showing 25 changed files with 543 additions and 320 deletions.
12 changes: 11 additions & 1 deletion docs/src/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Many remote web services/APIs have rate limits or throttling in place to avoid b

#### `readtimeout`

After a connection is established and a request is sent, a response is expected. If a non-zero value is passed to the `readtimeout` keyword argument, `HTTP.request` will wait to receive a response that many seconds before throwing an error. Note that for chunked or streaming responses, each chunk/packet of bytes received causes the timeout to reset. Passing `readtimeout = 0` disables any timeout checking and is the default.
After a connection is established and a request is sent, a response is expected. If a non-zero value is passed to the `readtimeout` keyword argument, `HTTP.request` will wait to receive a response that many seconds before throwing an error. Passing `readtimeout = 0` disables any timeout checking and is the default.

### `status_exception`

Expand Down Expand Up @@ -150,6 +150,16 @@ Controls the total number of retries that will be attempted. Can also disable al

By default, this keyword argument is `false`, which controls whether non-idempotent requests will be retried (POST or PATCH requests).

#### `retry_delays`

Allows providing a custom `ExponentialBackOff` object to control the delay between retries.
Default is `ExponentialBackOff(n = retries)`.

#### `retry_check`

Allows providing a custom function to control whether a retry should be attempted.
The function should accept 5 arguments: the delay state, exception, request, response (an `HTTP.Response` object *if* a request was successfully made, otherwise `nothing`), and `resp_body` response body (which may be `nothing` if there is no response yet, otherwise a `Vector{UInt8}`), and return `true` if a retry should be attempted. So in traditional nomenclature, the function would have the form `f(s, ex, req, resp, resp_body) -> Bool`.

### Redirect Arguments

#### `redirect`
Expand Down
107 changes: 66 additions & 41 deletions src/ConnectionPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@ remotely closed, a connection will be reused.
"""
module ConnectionPool

export Connection, newconnection, releaseconnection, getrawstream, inactiveseconds, shouldtimeout, set_default_connection_limit!
export Connection, newconnection, releaseconnection, getrawstream, inactiveseconds, shouldtimeout, set_default_connection_limit!, set_connection_limit!

using Sockets, LoggingExtras, NetworkOptions
using MbedTLS: SSLConfig, SSLContext, setup!, associate!, hostname!, handshake!
using MbedTLS, OpenSSL
using ..IOExtras, ..Conditions, ..Exceptions

const default_connection_limit = Ref(8)
const nolimit = typemax(Int)

set_default_connection_limit!(n) = default_connection_limit[] = n

taskid(t=current_task()) = string(hash(t) & 0xffff, base=16, pad=4)

include("connectionpools.jl")
using .ConnectionPools
set_default_connection_limit!(n) = ConnectionPools.connection_limit[] = n
set_connection_limit!(n) = ConnectionPools.connection_limit[] = n

"""
Connection
Expand Down Expand Up @@ -364,35 +363,41 @@ or create a new `Connection` if required.
function newconnection(::Type{T},
host::AbstractString,
port::AbstractString;
connection_limit=default_connection_limit[],
connection_limit=nothing,
forcenew::Bool=false,
idle_timeout=typemax(Int),
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
metrics=(connect_duration_ms=Ref(0.0), ssl_connect_duration_ms=Ref(0.0)),
kw...) where {T <: IO}
return acquire(
getpool(T),
(T, host, port, require_ssl_verification, true);
max_concurrent_connections=Int(connection_limit),
max_concurrent_connections=connection_limit,
forcenew=forcenew,
idle_timeout=Int(idle_timeout)) do
Connection(host, port,
idle_timeout, require_ssl_verification,
getconnection(T, host, port;
require_ssl_verification=require_ssl_verification, kw...)
require_ssl_verification=require_ssl_verification, metrics=metrics, kw...)
)
end
end

releaseconnection(c::Connection{T}, reuse) where {T} =
function releaseconnection(c::Connection{T}, reuse) where {T}
c.timestamp = time()
release(getpool(T), connectionkey(c), c; return_for_reuse=reuse)
end

function keepalive!(tcp)
Base.iolock_begin()
Base.check_open(tcp)
err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint),
tcp.handle, 1, 1)
Base.uv_error("failed to set keepalive on tcp socket", err)
Base.iolock_end()
try
Base.check_open(tcp)
err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint),
tcp.handle, 1, 1)
Base.uv_error("failed to set keepalive on tcp socket", err)
finally
Base.iolock_end()
end
return
end

Expand All @@ -418,32 +423,38 @@ function getconnection(::Type{TCPSocket},
keepalive::Bool=true,
connect_timeout::Int=10,
readtimeout::Int=0,
metrics=(connect_duration_ms=Ref(0.0), ssl_connect_duration_ms=Ref(0.0)),
kw...)::TCPSocket

p::UInt = isempty(port) ? UInt(80) : parse(UInt, port)
@debugv 2 "TCP connect: $host:$p..."
addrs = Sockets.getalladdrinfo(host)
connect_timeout = connect_timeout == 0 && readtimeout > 0 ? readtimeout : connect_timeout
lasterr = ErrorException("unknown connection error")

start_time = time()
for addr in addrs
try
return if connect_timeout > 0
if connect_timeout > 0
tcp = Sockets.TCPSocket()
Sockets.connect!(tcp, addr, p)
try_with_timeout(() -> checkconnected(tcp), connect_timeout, () -> close(tcp)) do
Sockets.wait_connected(tcp)
keepalive && keepalive!(tcp)
try
try_with_timeout(connect_timeout) do
Sockets.wait_connected(tcp)
keepalive && keepalive!(tcp)
end
catch
close(tcp)
rethrow()
end
return tcp
else
tcp = Sockets.connect(addr, p)
keepalive && keepalive!(tcp)
tcp
end
return tcp
catch e
lasterr = e isa TimeoutError ? ConnectTimeout(host, port) : e
continue
finally
metrics.connect_duration_ms[] = (time() - start_time) * 1000
end
end
# If no connetion could be set up, to any address, throw last error
Expand Down Expand Up @@ -498,7 +509,6 @@ function getconnection(::Type{SSLStream},
host::AbstractString,
port::AbstractString;
kw...)::SSLStream

port = isempty(port) ? "443" : port
@debugv 2 "SSL connect: $host:$port..."
tcp = getconnection(TCPSocket, host, port; kw...)
Expand All @@ -508,32 +518,42 @@ end
function sslconnection(::Type{SSLStream}, tcp::TCPSocket, host::AbstractString;
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
sslconfig::OpenSSL.SSLContext=nosslcontext[],
metrics=(connect_duration_ms=Ref(0.0), ssl_connect_duration_ms=Ref(0.0)),
kw...)::SSLStream
if sslconfig === nosslcontext[]
sslconfig = global_sslcontext()
start_time = time()
try
if sslconfig === nosslcontext[]
sslconfig = global_sslcontext()
end
# Create SSL stream.
ssl_stream = SSLStream(sslconfig, tcp)
OpenSSL.hostname!(ssl_stream, host)
OpenSSL.connect(ssl_stream; require_ssl_verification)
return ssl_stream
finally
metrics.ssl_connect_duration_ms[] = (time() - start_time) * 1000
end
# Create SSL stream.
ssl_stream = SSLStream(sslconfig, tcp)
OpenSSL.hostname!(ssl_stream, host)
OpenSSL.connect(ssl_stream; require_ssl_verification)
return ssl_stream
end

function sslconnection(::Type{SSLContext}, tcp::TCPSocket, host::AbstractString;
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
sslconfig::SSLConfig=nosslconfig,
metrics=(connect_duration_ms=Ref(0.0), ssl_connect_duration_ms=Ref(0.0)),
kw...)::SSLContext

if sslconfig === nosslconfig
sslconfig = global_sslconfig(require_ssl_verification)
start_time = time()
try
if sslconfig === nosslconfig
sslconfig = global_sslconfig(require_ssl_verification)
end
io = SSLContext()
setup!(io, sslconfig)
associate!(io, tcp)
hostname!(io, host)
handshake!(io)
return io
finally
metrics.ssl_connect_duration_ms[] = (time() - start_time) * 1000
end

io = SSLContext()
setup!(io, sslconfig)
associate!(io, tcp)
hostname!(io, host)
handshake!(io)
return io
end

function sslupgrade(::Type{IOType}, c::Connection{T},
Expand All @@ -545,8 +565,13 @@ function sslupgrade(::Type{IOType}, c::Connection{T},
# if the upgrade fails, an error will be thrown and the original c will be closed
# in ConnectionRequest
tls = if readtimeout > 0
try_with_timeout(() -> shouldtimeout(c, readtimeout), readtimeout, () -> close(c)) do
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...)
try
try_with_timeout(readtimeout) do
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...)
end
catch
close(c)
rethrow()
end
else
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...)
Expand Down
43 changes: 12 additions & 31 deletions src/Exceptions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,41 +25,22 @@ macro $(:try)(exes...)
end
end # @eval

function try_with_timeout(f, shouldtimeout, delay, iftimeout=() -> nothing)
@assert delay > 0
cond = Condition()
# execute f async
t = @async try
notify(cond, f())
catch e
@debugv 1 "error executing f in try_with_timeout"
isopen(timer) && notify(cond, e, error = true)
end
# start a timer
timer = Timer(delay; interval=delay / 10) do tm
function try_with_timeout(f, timeout)
ch = Channel(0)
timer = Timer(tm -> close(ch, TimeoutError(timeout)), timeout)
Threads.@spawn begin
try
if shouldtimeout()
@debugv 1 "❗️ Timeout: $delay"
close(tm)
iftimeout()
notify(cond, TimeoutError(delay), error = true)
end
put!(ch, $f())
catch e
@debugv 1 "callback error in try_with_timeout"
close(tm)
notify(cond, e, error = true)
if !(e isa HTTPError)
e = CapturedException(e, catch_backtrace())
end
close(ch, e)
finally
close(timer)
end
end
try
res = wait(cond)
@debugv 1 "try_with_timeout finished with: $res"
res
catch e
@debugv 1 "try_with_timeout failed with: $e"
rethrow()
finally
close(timer)
end
return take!(ch)
end

abstract type HTTPError <: Exception end
Expand Down
40 changes: 33 additions & 7 deletions src/HTTP.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,32 @@ include("StatusCodes.jl") ;using .StatusCodes
include("Messages.jl") ;using .Messages
include("cookies.jl") ;using .Cookies
include("Streams.jl") ;using .Streams

function observe_layer(f, cntnm, durnm, req, args...; kw...)
start_time = time()
req.context[cntnm] = Base.get(req.context, cntnm, 0) + 1
try
return f(args...; kw...)
finally
req.context[durnm] = Base.get(req.context, durnm, 0) + (time() - start_time) * 1000
# @info "observed layer = $f, count = $(req.context[cntnm]), duration = $(req.context[durnm])"
end
end

function observe_request_layer(f)
nm = nameof(f)
cntnm = Symbol(nm, "_count")
durnm = Symbol(nm, "_duration_ms")
return (req::Request; kw...) -> observe_layer(f, cntnm, durnm, req, req; kw...)
end

function observe_stream_layer(f)
nm = nameof(f)
cntnm = Symbol(nm, "_count")
durnm = Symbol(nm, "_duration_ms")
return (stream::Stream; kw...) -> observe_layer(f, cntnm, durnm, stream.message.request, stream; kw...)
end

include("clientlayers/MessageRequest.jl"); using .MessageRequest
include("clientlayers/RedirectRequest.jl"); using .RedirectRequest
include("clientlayers/DefaultHeadersRequest.jl"); using .DefaultHeadersRequest
Expand Down Expand Up @@ -120,8 +146,7 @@ Supported optional keyword arguments:
- `connect_timeout = 10`, close the connection after this many seconds if it
is still attempting to connect. Use `connect_timeout = 0` to disable.
- `connection_limit = 8`, number of concurrent connections allowed to each host:port.
- `readtimeout = 0`, close the connection if no data is received for this many
seconds. Use `readtimeout = 0` to disable.
- `readtimeout = 0`, abort a request after this many seconds. Will trigger retries if applicable. Use `readtimeout = 0` to disable.
- `status_exception = true`, throw `HTTP.StatusError` for response status >= 300.
- Basic authentication is detected automatically from the provided url's `userinfo` (in the form `scheme://user:password@host`)
and adds the `Authorization: Basic` header; this can be disabled by passing `basicauth=false`
Expand All @@ -139,7 +164,7 @@ Retry arguments:
- `retry = true`, retry idempotent requests in case of error.
- `retries = 4`, number of times to retry.
- `retry_non_idempotent = false`, retry non-idempotent requests too. e.g. POST.
- `retry_delay = ExponentialBackOff(n = retries)`, provide a custom `ExponentialBackOff` object to control the delay between retries.
- `retry_delays = ExponentialBackOff(n = retries)`, provide a custom `ExponentialBackOff` object to control the delay between retries.
- `retry_check = (s, ex, req, resp, resp_body) -> Bool`, provide a custom function to control whether a retry should be attempted.
The function should accept 5 arguments: the delay state, exception, request, response (an `HTTP.Response` object *if* a request was
successfully made, otherwise `nothing`), and `resp_body` response body (which may be `nothing` if there is no response yet, otherwise
Expand Down Expand Up @@ -286,6 +311,7 @@ function request(method, url, h=nothing, b=nobody;
return request(HTTP.stack(), method, url, headers, body, query; kw...)
end

# layers are applied from left to right, i.e. the first layer is the outermost that is called first, which then calls into the second layer, etc.
const STREAM_LAYERS = [timeoutlayer, exceptionlayer]
const REQUEST_LAYERS = [redirectlayer, defaultheaderslayer, basicauthlayer, contenttypedetectionlayer, cookielayer, retrylayer, canonicalizelayer]

Expand Down Expand Up @@ -398,10 +424,10 @@ function stack(
inner_stream_layers = streamlayers
outer_stream_layers = ()
end
layers = foldr((x, y) -> x(y), inner_stream_layers, init=streamlayer)
layers2 = foldr((x, y) -> x(y), STREAM_LAYERS, init=layers)
layers = foldr((x, y) -> observe_stream_layer(x(y)), inner_stream_layers, init=observe_stream_layer(streamlayer))
layers2 = foldr((x, y) -> observe_stream_layer(x(y)), STREAM_LAYERS, init=layers)
if !isempty(outer_stream_layers)
layers2 = foldr((x, y) -> x(y), outer_stream_layers, init=layers2)
layers2 = foldr((x, y) -> observe_stream_layer(x(y)), outer_stream_layers, init=layers2)
end
# request layers
# messagelayer must be the 1st/outermost layer to convert initial args to Request
Expand All @@ -413,7 +439,7 @@ function stack(
inner_request_layers = requestlayers
outer_request_layers = ()
end
layers3 = foldr((x, y) -> x(y), inner_request_layers; init=connectionlayer(layers2))
layers3 = foldr((x, y) -> x(y), inner_request_layers; init=observe_request_layer(connectionlayer(layers2)))
layers4 = foldr((x, y) -> x(y), REQUEST_LAYERS; init=layers3)
if !isempty(outer_request_layers)
layers4 = foldr((x, y) -> x(y), outer_request_layers, init=layers4)
Expand Down
Loading

0 comments on commit 51e7160

Please sign in to comment.