RSocketRequester
RSocketRequester
provides a fluent API to perform RSocket requests, accepting and
returning objects for data and metadata instead of low level data buffers. It can be used
symmetrically, to make requests from clients and to make requests from servers.
Client Requester
To obtain an RSocketRequester
on the client side requires connecting to a server along with
preparing and sending the initial RSocket SETUP
frame. RSocketRequester
provides a
builder for that. Internally uses RSocket Java’s RSocketFactory
.
This is the most basic way to connect with default settings:
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.connectTcp("localhost", 7000);
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.connectWebSocket(URI.create("https://example.org:8080/rsocket"));
import org.springframework.messaging.rsocket.connectTcpAndAwait
import org.springframework.messaging.rsocket.connectWebSocketAndAwait
val requester = RSocketRequester.builder()
.connectTcpAndAwait("localhost", 7000)
val requester = RSocketRequester.builder()
.connectWebSocketAndAwait(URI.create("https://example.org:8080/rsocket"))
The above is deferred. To actually connect and use the requester:
// Connect asynchronously
RSocketRequester.builder().connectTcp("localhost", 7000)
.subscribe(requester -> {
// ...
});
// Or block
RSocketRequester requester = RSocketRequester.builder()
.connectTcp("localhost", 7000)
.block(Duration.ofSeconds(5));
// Connect asynchronously
import org.springframework.messaging.rsocket.connectTcpAndAwait
class MyService {
private var requester: RSocketRequester? = null
private suspend fun requester() = requester ?:
RSocketRequester.builder().connectTcpAndAwait("localhost", 7000).also { requester = it }
suspend fun doSomething() = requester().route(...)
}
// Or block
import org.springframework.messaging.rsocket.connectTcpAndAwait
class MyService {
private val requester = runBlocking {
RSocketRequester.builder().connectTcpAndAwait("localhost", 7000)
}
suspend fun doSomething() = requester.route(...)
}
Connection Setup
RSocketRequester.Builder
provides the following to customize the initial SETUP
frame:
-
dataMimeType(MimeType)
— set the mime type for data on the connection. -
metadataMimeType(MimeType)
— set the mime type for metadata on the connection. -
setupData(Object)
— data to include in theSETUP
. -
setupRoute(String, Object…)
— route in the metadata to include in theSETUP
. -
setupMetadata(Object, MimeType)
— other metadata to include in theSETUP
.
For data, the default mime type is derived from the first configured Decoder
. For
metadata, the default mime type is
composite metadata which allows multiple
metadata value and mime type pairs per request. Typically both don’t need to be changed.
Data and metadata in the SETUP
frame is optional. On the server side,
rsocket-annot-connectmapping methods can be used to handle the start of a
connection and the content of the SETUP
frame. Metadata may be used for connection
level security.
Strategies
RSocketRequester.Builder
accepts RSocketStrategies
to configure the requester.
You’ll need to use this to provide encoders and decoders for (de)-serialization of data and
metadata values. By default only the basic codecs from spring-core
for String
,
byte[]
, and ByteBuffer
are registered. Adding spring-web
provides access to more that
can be registered as follows:
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder))
.build();
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.rsocketStrategies(strategies)
.connectTcp("localhost", 7000);
import org.springframework.messaging.rsocket.connectTcpAndAwait
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.connectTcpAndAwait("localhost", 7000)
RSocketStrategies
is designed for re-use. In some scenarios, e.g. client and server in
the same application, it may be preferable to declare it in Spring configuration.
Client Responders
RSocketRequester.Builder
can be used to configure responders to requests from the
server.
You can use annotated handlers for client-side responding based on the same infrastructure that’s used on a server, but registered programmatically as follows:
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) (1)
.build();
ClientHandler handler = new ClientHandler(); (2)
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.rsocketFactory(RSocketMessageHandler.clientResponder(strategies, handler)) (3)
.connectTcp("localhost", 7000);
1 | Use PathPatternRouteMatcher , if spring-web is present, for efficient
route matching. |
2 | Create responder that contains @MessageMaping or @ConnectMapping methods. |
3 | Use static factory method in RSocketMessageHandler to register one or more responders. |
import org.springframework.messaging.rsocket.connectTcpAndAwait
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) (1)
.build()
val handler = ClientHandler() (2)
val requester = RSocketRequester.builder()
.rsocketFactory(RSocketMessageHandler.clientResponder(strategies, handler)) (3)
.connectTcpAndAwait("localhost", 7000)
1 | Use PathPatternRouteMatcher , if spring-web is present, for efficient
route matching. |
2 | Create responder that contains @MessageMaping or @ConnectMapping methods. |
3 | Use static factory method in RSocketMessageHandler to register one or more responders. |
Note the above is only a shortcut designed for programmatic registration of client
responders. For alternative scenarios, where client responders are in Spring configuration,
you can still declare RSocketMessageHandler
as a Spring bean and then apply as follows:
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.rsocketFactory(factory -> factory.acceptor(handler.responder()))
.connectTcp("localhost", 7000);
import org.springframework.beans.factory.getBean
import org.springframework.messaging.rsocket.connectTcpAndAwait
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketFactory { it.acceptor(handler.responder()) }
.connectTcpAndAwait("localhost", 7000)
For the above you may also need to use setHandlerPredicate
in RSocketMessageHandler
to
switch to a different strategy for detecting client responders, e.g. based on a custom
annotation such as @RSocketClientResponder
vs the default @Controller
. This
is necessary in scenarios with client and server, or multiple clients in the same
application.
See also rsocket-annot-responders, for more on the programming model.
Advanced
RSocketRequesterBuilder
provides a callback to expose the underlying
ClientRSocketFactory
from RSocket Java for further configuration options for
keepalive intervals, session resumption, interceptors, and more. You can configure options
at that level as follows:
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.rsocketFactory(factory -> {
// ...
})
.connectTcp("localhost", 7000);
import org.springframework.messaging.rsocket.connectTcpAndAwait
val requester = RSocketRequester.builder()
.rsocketFactory {
//...
}.connectTcpAndAwait("localhost", 7000)
Server Requester
To make requests from a server to connected clients is a matter of obtaining the requester for the connected client from the server.
In rsocket-annot-responders, @ConnectMapping
and @MessageMapping
methods support an
RSocketRequester
argument. Use it to access the requester for the connection. Keep in
mind that @ConnectMapping
methods are essentially handlers of the SETUP
frame which
must be handled before requests can begin. Therefore, requests at the very start must be
decoupled from handling. For example:
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { (1)
// ...
});
return ... (2)
}
1 | Start the request asynchronously, independent from handling. |
2 | Perform handling and return completion Mono<Void> . |
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
// ...
}
}
/// ... (2)
}
1 | Start the request asynchronously, independent from handling. |
2 | Perform handling in the suspending function. |
Requests
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlux(AirportLocation.class); (3)
1 | Specify a route to include in the metadata of the request message. |
2 | Provide data for the request message. |
3 | Declare the expected response. |
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlow<AirportLocation>() (3)
1 | Specify a route to include in the metadata of the request message. |
2 | Provide data for the request message. |
3 | Declare the expected response. |
The interaction type is determined implicitly from the cardinality of the input and
output. The above example is a Request-Stream
because one value is sent and a stream
of values is received. For the most part you don’t need to think about this as long as the
choice of input and output matches an RSocket interaction type and the types of input and
output expected by the responder. The only example of an invalid combination is many-to-one.
The data(Object)
method also accepts any Reactive Streams Publisher
, including
Flux
and Mono
, as well as any other producer of value(s) that is registered in the
ReactiveAdapterRegistry
. For a multi-value Publisher
such as Flux
which produces the
same types of values, consider using one of the overloaded data
methods to avoid having
type checks and Encoder
lookup on every element:
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
The data(Object)
step is optional. Skip it for requests that don’t send data:
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
Extra metadata values can be added if using
composite metadata (the default) and if the
values are supported by a registered Encoder
. For example:
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
For Fire-and-Forget
use the send()
method that returns Mono<Void>
. Note that the Mono
indicates only that the message was successfully sent, and not that it was handled.