Service concurrency limiting with RSocket request leases

April 9, 2020
RSocket java load-balancer

[rsocket-requests-lease on github]

This is first post in advanced applications of RSocket series, and topic for today is less known yet essential protocol feature - requests leasing.

We know that single RSocket stream is naturally bounded due to Reactive Streams semantics - there is no more in-flight messages than demanded with receiver Subscription.

However limiting outstanding messages per request is not useful without limiting requests concurrency.

For services, growing requests unbounded exhausts their backing resources and leads to latency surge.

For proxies, messages queue up on sender and receiver side, network link hits capacity limit - both lead to gradual memory & latency increase.

Relying solely on latency aware load balancer on Requester side is not enough. Historical stats are not meaningful if service is about to be overwhelmed by requests spike and latencies grow exponentially. This event makes service unusable, all in-flight requests are affected and likely to timeout.

Requester side load balancer needs to be paired with Responder side mechanism ensuring requests volume does not exceed capacity of the service and It can serve responses in a timely manner.

RSocket provides this concurrency limiting mechanism with Requests Lease. It is based on simple idea
of expiring request permits sent periodically to its peer by Responder.

How It works

Lease rejection policy is implemented on Responder side only, requests over leased capacity are short-circuited before hitting request handler. Rejecting on requester side is not performed because Responder should be aware of actual requests volume.

Requester RSocket does not expose leased requests directly, instead It provides availability as remaining requests / leased requests ratio. Expired lease has availability 0.0.

Lease is primarily intended for server side use cases, notably for reverse proxies in front of services set.
In simplest case proxy selects service RSocket with highest availability for next request thus implements least-loaded balancing strategy, solely Responder side driven. It works well in homogeneous networks since services can reliably estimate own capacity for given target latency, and aggressively drop what is above.

This contrasts with least-loaded LB applied alone on requester side, which would produce suboptimal results as It effectively treats service instances as having same capacity, even if backed by different hardware.

Service Responder gathers stats to estimate allowed requests: number of received/completed requests, response errors and response latencies aggregated over logical service call name. For response streams latency is defined as interval between request is received and first signal of response is produced.

Proxies may use round-trip time as latency estimate, It is provided by RSocket keep-alive frame which is able to carry data payload. RTT includes time spent on network and incoming / outgoing queues of both peers. Responder allowed requests should be decreased once RTT substantially exceeds expected latency value.

(Bit of trivia) Official RSocket-java repo started treating keep-alive frames as prioritized, so RTT will only include time spent on network and not on RSocket queues. There under some unfortunate circumstances one could witness few milliseconds round trips but requests dropped by multiple second timeouts.

Building blocks

Request leasing is enabled on RSocketFactory for both server and client of jauntsdn/rsocket.;

interface ClientConfigurer {

    Leases<?> configure(Flux<Long> rtt, Scheduler scheduler);

interface ServerConfigurer {

    Leases<?> configure(Scheduler scheduler);

Configurers are provided with RSocket scheduler, additionally client one receives Flux of RTTs measured by RSocket keep-alives. Configurers are optional for both server and client - in this case Responder will not accept any requests as no leases are sent:;;

Leases is holder for 2 central components:

StatsRecorder<T> contains set of callbacks tied to lifecycle of request and response on Responder side. <T> parameter denotes logical name of the call, e.g. for RSocket-RPC this can be String of form service/method, and is provided in every callback.

interface StatsRecorder<T> {

    T onRequestStarted(FrameType requestType, ByteBuf metadata);

    void onResponseStarted(FrameType requestType, T request, 
                           Signal<Void> firstSignal, long latencyNanos);

    void onResponseTerminated(FrameType requestType, T request,
                              Signal<Void> lastSignal);

StatsRecorder is handed to lease sender Function<Optional<StatsRecorder>, Flux<Lease>> which uses stats to estimate lease allowed requests and expiration time. Leases produced by Flux<Lease> are sent to peer requester and utilized by Responder to implement lease policy - rejected requests are terminated with RSocket REJECTED error containing either lease_expired or lease_exhausted message.

On requester RSocket allowed requests are expressed by RSocket.availability() as ratio between remaining and initially allowed requests. For example, if requester receives lease of 10 requests over 1000 millis, initial availability is 10/10 = 1.0. Once one request is sent, availability is 9/10 = 0.9. Expired lease has availability 0.0.


Lets compose demo application to evaluate feature in practice.

It consists of three parts: set of servers hosting simple RSocket-RPC service, reverse proxy fronting servers and RSocket-RPC client. Leasing is enabled on servers and proxy, and disabled on client.

Service is implemented as saturable one, with response times proportional to number of (concurrent) requests in given time window. It roughly models real-world service, either IO bound - backed by database, or CPU bound involving heavy computations.

Response time delays are controlled with CONCURRENCY_DELAY property containing string of form 10 => 2; 50 => 5; 120 => 20; => 5000. It means that for concurrency less than 10, latency will be no more than 2 millis; for concurrency 11 - 50 latency is 2-5 millis; for concurrency 51 - 120 latency is 5 - 20 millis, anything above will be served with 20 - 5000 millis delay.

Stats are gathered by ServiceStatsRecorder which records number of accepted and rejected requests, latencies per RPC call. Leases are produced by StaticLeaseSender which allows constant number of requests (ALLOWED_REQUESTS property) every second.

Both are wired up on ServerRSocketFactory:

Leases.ServerConfigurer leases =
        scheduler ->
                    new StaticLeaseSender(
                        scheduler, address, leaseTimeToLive, leaseAllowedRequests))
                .stats(new ServiceStatsRecorder());


Proxy relies on LeastLoadedBalancerRSocket connecting to backend servers provided with SERVERS property. Property contains set of addresses in form localhost:8309,localhost:8310,localhost:8311. LeastLoadedBalancerRSocket selects RSocket with highest availability to serve next request, and switches to round-robin once all RSockets availability is exhausted.

Client connects proxy and sends 1000 requests per second to RSocket-RPC service.

Unbounded leased requests: 9999

We start testing with effectively unbounded allowed requests (9999) on 3 services: localhost:8309,8310,8311

./ localhost:8309 9999
./ localhost:8310 9999
./ localhost:8311 9999


./ localhost:8308 localhost:8309,localhost:8310,localhost:8311


`./ localhost:8308`

On client we see there are no rejected requests as each of 3 services provide 9999 allowed requests that is more than enough to fullfill client’s 1000 rps. Response p99 latency is expectedly subpar - around 5000ms.

15:16:26.068 parallel-1 ================================================================================
15:16:26.068 parallel-1 Responses from localhost:8310: 322
15:16:26.068 parallel-1 Responses from localhost:8309: 317
15:16:26.068 parallel-1 Responses from localhost:8311: 335
15:16:26.068 parallel-1 Responses p99 latency millis: 4877
15:16:26.069 parallel-1 Rejected requests: 0

Similar stats are reported by services themselves

15:17:47.101 reactor-tcp-epoll-2 service localhost:8311 accepted 329 requests
15:17:47.101 reactor-tcp-epoll-2 service call Service/response latency is 4898 millis
15:17:47.101 reactor-tcp-epoll-2 responder sends new lease, allowed requests is 9999, time-to-live is 1000 millis

Moderate leased requests: 100

On client we see there are 300 accepted and 700 rejected requests. Latency is just around 20 millis.

15:25:44.430 parallel-1 ================================================================================
15:25:44.430 parallel-1 Responses from localhost:8309: 100
15:25:44.430 parallel-1 Responses from localhost:8310: 100
15:25:44.430 parallel-1 Responses from localhost:8311: 100
15:25:44.430 parallel-1 Responses p99 latency millis: 19
15:25:44.430 parallel-1 Rejected requests: 700

Service stats correspond client numbers: 100 accepted requests was leased by each service, 233 rejected and same latency - 20 millis.

15:25:44.087 reactor-tcp-epoll-2 ================================================================================
15:25:44.087 reactor-tcp-epoll-2 service localhost:8310 accepted 100 requests
15:25:44.087 reactor-tcp-epoll-2 service localhost:8310 rejected 233 requests
15:25:44.087 reactor-tcp-epoll-2 service call Service/response latency is 19 millis
15:25:44.087 reactor-tcp-epoll-2 responder sends new lease, allowed requests is 100, time-to-live is 1000 millis

Small leased requests: 30

Now client reports acceptable latency of 4 millis with 90 accepted and 910 rejected requests

15:32:00.926 parallel-1 ================================================================================
15:32:00.926 parallel-1 Responses from localhost:8310: 30
15:32:00.926 parallel-1 Responses from localhost:8309: 30
15:32:00.926 parallel-1 Responses from localhost:8311: 30
15:32:00.927 parallel-1 Responses p99 latency millis: 4
15:32:00.927 parallel-1 Rejected requests: 910

Services stats correlate

15:33:13.725 reactor-tcp-epoll-2 ================================================================================
15:33:13.725 reactor-tcp-epoll-2 service localhost:8310 accepted 30 requests
15:33:13.725 reactor-tcp-epoll-2 service localhost:8310 rejected 304 requests
15:33:13.725 reactor-tcp-epoll-2 service call Service/response latency is 4 millis
15:33:13.725 reactor-tcp-epoll-2 responder sends new lease, allowed requests is 30, time-to-live is 1000 millis

Results are summarized below:

Leased requests    Response latency    Accepted requests    Rejected requests   
9999 4877 974 0
100 19 300 700
30 4 90 910

RSocket requests leasing is central protocol feature which enables concurrency and latency control without guesswork on caller side. It helps applications remain stable and can greatly reduce timeout related errors. It is transparent for RSocket services - there are no leasing related references in service API definitions or implementations. Static leases sender can be substituted with smarter one that estimates allowed requests based on Responder statistics and target service call latency. Algorithms are known and described at concurrency-limits - we will see that in action soon reported on the pages of this blog.

Serving one million streams: RSocket limits on the JVM. Part 2

January 20, 2021
RSocket java

WebSockets over http/2: implementing RFC8441 with Netty

July 30, 2020
netty websocket http2 java

Serving one million streams: RSocket limits on the JVM. Part 1

June 24, 2020
RSocket java