RSocketRequester

RSocketRequester provides a fluent API to perform RSocket requests, accepting and returning objects for data and metadata instead of low level data buffers. It can be used symmetrically, to make requests from clients and to make requests from servers.

Client Requester

To obtain an RSocketRequester on the client side requires connecting to a server along with preparing and sending the initial RSocket SETUP frame. RSocketRequester provides a builder for that. Internally uses RSocket Java’s RSocketFactory.

This is the most basic way to connect with default settings:

Java
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
	.connectTcp("localhost", 7000);

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
	.connectWebSocket(URI.create("https://example.org:8080/rsocket"));
Kotlin
import org.springframework.messaging.rsocket.connectTcpAndAwait
import org.springframework.messaging.rsocket.connectWebSocketAndAwait

val requester = RSocketRequester.builder()
		.connectTcpAndAwait("localhost", 7000)

val requester = RSocketRequester.builder()
		.connectWebSocketAndAwait(URI.create("https://example.org:8080/rsocket"))

The above is deferred. To actually connect and use the requester:

Java
// Connect asynchronously
RSocketRequester.builder().connectTcp("localhost", 7000)
	.subscribe(requester -> {
		// ...
	});

// Or block
RSocketRequester requester = RSocketRequester.builder()
	.connectTcp("localhost", 7000)
	.block(Duration.ofSeconds(5));
Kotlin
// Connect asynchronously
import org.springframework.messaging.rsocket.connectTcpAndAwait

class MyService {

	private var requester: RSocketRequester? = null

	private suspend fun requester() = requester ?:
		RSocketRequester.builder().connectTcpAndAwait("localhost", 7000).also { requester = it }

	suspend fun doSomething() = requester().route(...)
}

// Or block
import org.springframework.messaging.rsocket.connectTcpAndAwait

class MyService {

	private val requester = runBlocking {
		RSocketRequester.builder().connectTcpAndAwait("localhost", 7000)
	}

	suspend fun doSomething() = requester.route(...)
}

Connection Setup

RSocketRequester.Builder provides the following to customize the initial SETUP frame:

  • dataMimeType(MimeType) — set the mime type for data on the connection.

  • metadataMimeType(MimeType) — set the mime type for metadata on the connection.

  • setupData(Object) — data to include in the SETUP.

  • setupRoute(String, Object…​) — route in the metadata to include in the SETUP.

  • setupMetadata(Object, MimeType) — other metadata to include in the SETUP.

For data, the default mime type is derived from the first configured Decoder. For metadata, the default mime type is composite metadata which allows multiple metadata value and mime type pairs per request. Typically both don’t need to be changed.

Data and metadata in the SETUP frame is optional. On the server side, rsocket-annot-connectmapping methods can be used to handle the start of a connection and the content of the SETUP frame. Metadata may be used for connection level security.

Strategies

RSocketRequester.Builder accepts RSocketStrategies to configure the requester. You’ll need to use this to provide encoders and decoders for (de)-serialization of data and metadata values. By default only the basic codecs from spring-core for String, byte[], and ByteBuffer are registered. Adding spring-web provides access to more that can be registered as follows:

Java
RSocketStrategies strategies = RSocketStrategies.builder()
	.encoders(encoders -> encoders.add(new Jackson2CborEncoder))
	.decoders(decoders -> decoders.add(new Jackson2CborDecoder))
	.build();

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
	.rsocketStrategies(strategies)
	.connectTcp("localhost", 7000);
Kotlin
import org.springframework.messaging.rsocket.connectTcpAndAwait

val strategies = RSocketStrategies.builder()
		.encoders { it.add(Jackson2CborEncoder()) }
		.decoders { it.add(Jackson2CborDecoder()) }
		.build()

val requester = RSocketRequester.builder()
		.rsocketStrategies(strategies)
		.connectTcpAndAwait("localhost", 7000)

RSocketStrategies is designed for re-use. In some scenarios, e.g. client and server in the same application, it may be preferable to declare it in Spring configuration.

Client Responders

RSocketRequester.Builder can be used to configure responders to requests from the server.

You can use annotated handlers for client-side responding based on the same infrastructure that’s used on a server, but registered programmatically as follows:

Java
RSocketStrategies strategies = RSocketStrategies.builder()
	.routeMatcher(new PathPatternRouteMatcher())  (1)
	.build();

ClientHandler handler = new ClientHandler(); (2)

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
	.rsocketFactory(RSocketMessageHandler.clientResponder(strategies, handler)) (3)
	.connectTcp("localhost", 7000);
1 Use PathPatternRouteMatcher, if spring-web is present, for efficient route matching.
2 Create responder that contains @MessageMaping or @ConnectMapping methods.
3 Use static factory method in RSocketMessageHandler to register one or more responders.
Kotlin
import org.springframework.messaging.rsocket.connectTcpAndAwait

val strategies = RSocketStrategies.builder()
		.routeMatcher(PathPatternRouteMatcher())  (1)
		.build()

val handler = ClientHandler() (2)

val requester = RSocketRequester.builder()
		.rsocketFactory(RSocketMessageHandler.clientResponder(strategies, handler)) (3)
		.connectTcpAndAwait("localhost", 7000)
1 Use PathPatternRouteMatcher, if spring-web is present, for efficient route matching.
2 Create responder that contains @MessageMaping or @ConnectMapping methods.
3 Use static factory method in RSocketMessageHandler to register one or more responders.

Note the above is only a shortcut designed for programmatic registration of client responders. For alternative scenarios, where client responders are in Spring configuration, you can still declare RSocketMessageHandler as a Spring bean and then apply as follows:

Java
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
	.rsocketFactory(factory -> factory.acceptor(handler.responder()))
	.connectTcp("localhost", 7000);
Kotlin
import org.springframework.beans.factory.getBean
import org.springframework.messaging.rsocket.connectTcpAndAwait

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
		.rsocketFactory { it.acceptor(handler.responder()) }
		.connectTcpAndAwait("localhost", 7000)

For the above you may also need to use setHandlerPredicate in RSocketMessageHandler to switch to a different strategy for detecting client responders, e.g. based on a custom annotation such as @RSocketClientResponder vs the default @Controller. This is necessary in scenarios with client and server, or multiple clients in the same application.

See also rsocket-annot-responders, for more on the programming model.

Advanced

RSocketRequesterBuilder provides a callback to expose the underlying ClientRSocketFactory from RSocket Java for further configuration options for keepalive intervals, session resumption, interceptors, and more. You can configure options at that level as follows:

Java
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
	.rsocketFactory(factory -> {
		// ...
	})
	.connectTcp("localhost", 7000);
Kotlin
import org.springframework.messaging.rsocket.connectTcpAndAwait

val requester = RSocketRequester.builder()
		.rsocketFactory {
			//...
		}.connectTcpAndAwait("localhost", 7000)

Server Requester

To make requests from a server to connected clients is a matter of obtaining the requester for the connected client from the server.

In rsocket-annot-responders, @ConnectMapping and @MessageMapping methods support an RSocketRequester argument. Use it to access the requester for the connection. Keep in mind that @ConnectMapping methods are essentially handlers of the SETUP frame which must be handled before requests can begin. Therefore, requests at the very start must be decoupled from handling. For example:

Java
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
	requester.route("status").data("5")
		.retrieveFlux(StatusReport.class)
		.subscribe(bar -> { (1)
			// ...
		});
	return ... (2)
}
1 Start the request asynchronously, independent from handling.
2 Perform handling and return completion Mono<Void>.
Kotlin
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
	GlobalScope.launch {
		requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
			// ...
		}
	}
	/// ... (2)
}
1 Start the request asynchronously, independent from handling.
2 Perform handling in the suspending function.

Requests

Once you have a client or server requester, you can make requests as follows:

Java
ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlux(AirportLocation.class); (3)
1 Specify a route to include in the metadata of the request message.
2 Provide data for the request message.
3 Declare the expected response.
Kotlin
val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlow<AirportLocation>() (3)
1 Specify a route to include in the metadata of the request message.
2 Provide data for the request message.
3 Declare the expected response.

The interaction type is determined implicitly from the cardinality of the input and output. The above example is a Request-Stream because one value is sent and a stream of values is received. For the most part you don’t need to think about this as long as the choice of input and output matches an RSocket interaction type and the types of input and output expected by the responder. The only example of an invalid combination is many-to-one.

The data(Object) method also accepts any Reactive Streams Publisher, including Flux and Mono, as well as any other producer of value(s) that is registered in the ReactiveAdapterRegistry. For a multi-value Publisher such as Flux which produces the same types of values, consider using one of the overloaded data methods to avoid having type checks and Encoder lookup on every element:

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

The data(Object) step is optional. Skip it for requests that don’t send data:

Java
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
	.retrieveMono(AirportLocation.class);
Kotlin
import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
	.retrieveAndAwait<AirportLocation>()

Extra metadata values can be added if using composite metadata (the default) and if the values are supported by a registered Encoder. For example:

Java
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlux(AirportLocation.class);
Kotlin
import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlow<AirportLocation>()

For Fire-and-Forget use the send() method that returns Mono<Void>. Note that the Mono indicates only that the message was successfully sent, and not that it was handled.