This is first post in advanced applications of RSocket series, and topic for today is less known yet essential protocol feature - requests leasing.
We know that single RSocket stream is naturally bounded due to Reactive Streams semantics - there is no more in-flight messages than demanded with receiver Subscription.
However limiting outstanding messages per request is not useful without limiting requests concurrency.
For services, growing requests unbounded exhausts their backing resources and leads to latency surge.
For proxies, messages queue up on sender and receiver side, network link hits capacity limit - both lead to gradual memory & latency increase.
Relying solely on latency aware load balancer on Requester side is not enough. Historical stats are not meaningful if service is about to be overwhelmed by requests spike and latencies grow exponentially. This event makes service unusable, all in-flight requests are affected and likely to timeout.
Requester side load balancer needs to be paired with Responder side mechanism ensuring requests volume does not exceed capacity of the service and It can serve responses in a timely manner.
RSocket provides this concurrency limiting mechanism with Requests Lease. It is based on simple idea
of expiring request permits sent periodically to its peer by Responder.
How It works
Lease rejection policy is implemented on Responder side only, requests over leased capacity are short-circuited before hitting request handler. Rejecting on requester side is not performed because Responder should be aware of actual requests volume.
Requester RSocket does not expose leased requests directly, instead It provides availability as remaining requests / leased requests
ratio. Expired lease has availability 0.0.
Lease is primarily intended for server side use cases, notably for reverse proxies in front of services set.
In simplest case proxy selects service RSocket with highest availability for next request thus implements least-loaded
balancing strategy, solely Responder side driven. It works well in homogeneous networks since services can reliably
estimate own capacity for given target latency, and aggressively drop what is above.
This contrasts with least-loaded LB applied alone on requester side, which would produce suboptimal results as It effectively treats service instances as having same capacity, even if backed by different hardware.
Service Responder gathers stats to estimate allowed requests: number of received/completed requests, response errors and response latencies aggregated over logical service call name. For response streams latency is defined as interval between request is received and first signal of response is produced.
Proxies may use round-trip time as latency estimate, It is provided by RSocket keep-alive frame which is able to carry data payload. RTT includes time spent on network and incoming / outgoing queues of both peers. Responder allowed requests should be decreased once RTT substantially exceeds expected latency value.
(Bit of trivia) Official RSocket-java repo started treating keep-alive frames as prioritized, so RTT will only include time spent on network and not on RSocket queues. There under some unfortunate circumstances one could witness few milliseconds round trips but requests dropped by multiple second timeouts.
Building blocks
Request leasing is enabled on RSocketFactory
for both server and client of jauntsdn/rsocket.
RSocketFactory.lease(Lease.Configurer);
interface Configurer {
Optional<Lease.StatsRecorder<?>> configure(Lease.Controller leaseController);
}
Lease.Configurer
is a handle for 2 components:
Lease.StatsRecorder
is for gathering requests statisticsLease.Controller
is for allowing requests leases sent to peer Requester, and used by Responder.
interface Controller {
void allow(int timeToLiveMillis, int allowedRequests);
}
StatsRecorder<T>
contains set of callbacks tied to lifecycle of request and response on Responder side. <T>
parameter
denotes logical name of the call, e.g. for RSocket-RPC this can be String
of form service/method
, and is provided in every callback.
interface StatsRecorder<T> {
T onRequestStarted(Interaction.Type requestType, ByteBuf metadata);
void onResponseStarted(
Interaction.Type requestType,
T request,
Interaction.StreamSignal firstSignal,
long latencyMicros);
void onResponseTerminated(
Interaction.Type requestType,
T request,
Interaction.StreamSignal lastSignal,
long responseDurationMicros);
void onRtt(long rttMicros);
void onOpen();
void onClose(long graceTimeoutMillis);
void onError(Interaction.Type requestType, Throwable err);
}
-
onRequestStarted
is called once Responder receives the request, returns logical name of the call based on request type and metadata. -
void onResponseStarted
is called once response first signal is received (either data or termination).latencyMicros
is interval between start of the request and first signal of the response. -
void onResponseTerminated
is called once response is terminated. -
void onRtt
is called on round-trip time measured with keep-alive frame.
LeaseController
is handed to StatsRecorder
, which uses stats
to estimate allowed requests and expiration time. Produced leases are sent to peer requester, and utilized by
Responder to implement lease policy - rejected requests are terminated with RSocket REJECTED
error containing either
lease_expired
or lease_exhausted
message.
On requester RSocket allowed requests are expressed by RSocket.availability()
as ratio between remaining and initially allowed
requests. For example, if requester receives lease of 10 requests over 1000 millis, initial availability is 10/10 = 1.0.
Once one request is sent, availability is 9/10 = 0.9. Expired lease has availability 0.0.
Showcase
Lets compose demo application to evaluate feature in practice.
It consists of three parts: set of servers hosting simple RSocket-RPC service, reverse proxy fronting servers and RSocket-RPC client. Leasing is enabled on servers and proxy, and disabled on client.
Service is implemented as saturable one, with response times proportional to number of (concurrent) requests in given time window. It roughly models real-world service, either IO bound - backed by database, or CPU bound involving heavy computations.
Response time delays are controlled with CONCURRENCY_DELAY
property containing string of form 10 => 2; 50 => 5; 120 => 20; => 5000
.
It means that for concurrency less than 10, latency will be no more than 2 millis; for concurrency 11 - 50 latency is 2-5 millis;
for concurrency 51 - 120 latency is 5 - 20 millis, anything above will be served with 20 - 5000 millis delay.
Stats are gathered by ServiceStatsRecorder
which records number of accepted and rejected requests, latencies per RPC call.
Leases are produced by StaticLeaseSender
which allows constant number of requests (ALLOWED_REQUESTS
property) every second.
Both are wired up on ServerRSocketFactory
:
Lease.Configurer leases =
leaseController -> {
ConstantLeaseController constantLeaseController =
new ConstantLeaseController(
leaseController,
inetSocketAddress,
leaseTimeToLive,
leaseAllowedRequests);
return Optional.of(constantLeaseController);
};
RSocketFactory
.receive()
.lease(leases)
Proxy relies on LeastLoadedBalancerRSocket
connecting to backend servers provided with SERVERS
property. Property contains
set of addresses in form localhost:8309,localhost:8310,localhost:8311
. LeastLoadedBalancerRSocket
selects RSocket with highest
availability to serve next request, and switches to round-robin once all RSockets availability is exhausted.
Client connects proxy and sends 1000 requests per second to RSocket-RPC service.
Unbounded leased requests: 9999
We start testing with effectively unbounded allowed requests (9999) on 3 services: localhost:8309,8310,8311
./lease_server.sh localhost:8309 9999
./lease_server.sh localhost:8310 9999
./lease_server.sh localhost:8311 9999
Proxy
./lease_proxy.sh localhost:8308 localhost:8309,localhost:8310,localhost:8311
Client
`./lease_client.sh localhost:8308`
On client we see there are no rejected requests as each of 3 services provide 9999 allowed requests that is more than enough to fullfill client’s 1000 rps. Response p99 latency is expectedly subpar - around 5000ms.
15:16:26.068 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client ================================================================================
15:16:26.068 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Responses from localhost:8310: 322
15:16:26.068 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Responses from localhost:8309: 317
15:16:26.068 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Responses from localhost:8311: 335
15:16:26.068 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Responses p99 latency millis: 4877
15:16:26.069 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Rejected requests: 0
Similar stats are reported by services themselves
15:17:47.101 reactor-tcp-epoll-2 com.jauntsdn.rsocket.lease.showcase.Server service localhost:8311 accepted 329 requests
15:17:47.101 reactor-tcp-epoll-2 com.jauntsdn.rsocket.lease.showcase.Server service call Service/response latency is 4898 millis
15:17:47.101 reactor-tcp-epoll-2 com.jauntsdn.rsocket.lease.showcase.Server responder sends new lease, allowed requests is 9999, time-to-live is 1000 millis
Moderate leased requests: 100
On client we see there are 300 accepted and 700 rejected requests. Latency is just around 20 millis.
15:25:44.430 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client ================================================================================
15:25:44.430 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Responses from localhost:8309: 100
15:25:44.430 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Responses from localhost:8310: 100
15:25:44.430 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Responses from localhost:8311: 100
15:25:44.430 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Responses p99 latency millis: 19
15:25:44.430 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Rejected requests: 700
Service stats correspond client numbers: 100 accepted requests was leased by each service, 233 rejected and same latency - 20 millis.
15:25:44.087 reactor-tcp-epoll-2 com.jauntsdn.rsocket.lease.showcase.Server ================================================================================
15:25:44.087 reactor-tcp-epoll-2 com.jauntsdn.rsocket.lease.showcase.Server service localhost:8310 accepted 100 requests
15:25:44.087 reactor-tcp-epoll-2 com.jauntsdn.rsocket.lease.showcase.Server service localhost:8310 rejected 233 requests
15:25:44.087 reactor-tcp-epoll-2 com.jauntsdn.rsocket.lease.showcase.Server service call Service/response latency is 19 millis
15:25:44.087 reactor-tcp-epoll-2 com.jauntsdn.rsocket.lease.showcase.Server responder sends new lease, allowed requests is 100, time-to-live is 1000 millis
Small leased requests: 30
Now client reports acceptable latency of 4 millis with 90 accepted and 910 rejected requests
15:32:00.926 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client ================================================================================
15:32:00.926 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Responses from localhost:8310: 30
15:32:00.926 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Responses from localhost:8309: 30
15:32:00.926 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Responses from localhost:8311: 30
15:32:00.927 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Responses p99 latency millis: 4
15:32:00.927 parallel-1 com.jauntsdn.rsocket.lease.showcase.Client Rejected requests: 910
Services stats correlate
15:33:13.725 reactor-tcp-epoll-2 com.jauntsdn.rsocket.lease.showcase.Server ================================================================================
15:33:13.725 reactor-tcp-epoll-2 com.jauntsdn.rsocket.lease.showcase.Server service localhost:8310 accepted 30 requests
15:33:13.725 reactor-tcp-epoll-2 com.jauntsdn.rsocket.lease.showcase.Server service localhost:8310 rejected 304 requests
15:33:13.725 reactor-tcp-epoll-2 com.jauntsdn.rsocket.lease.showcase.Server service call Service/response latency is 4 millis
15:33:13.725 reactor-tcp-epoll-2 com.jauntsdn.rsocket.lease.showcase.Server responder sends new lease, allowed requests is 30, time-to-live is 1000 millis
Results are summarized below:
Leased requests | Response latency | Accepted requests | Rejected requests |
---|---|---|---|
9999 | 4877 | 974 | 0 |
100 | 19 | 300 | 700 |
30 | 4 | 90 | 910 |
RSocket requests leasing is central protocol feature which enables concurrency and latency control without guesswork on caller side.
It helps applications remain stable and can greatly reduce timeout related errors. It is transparent for RSocket services - there are no leasing related references in
service API definitions or implementations. Static leases sender can be substituted with smarter one that estimates allowed requests based on
Responder statistics and target service call latency. Algorithms are known and described at concurrency-limits -
we will see that in action soon reported on the pages of this blog.