RSocket examples – Java, JavaScript and Spring WebFlux
Overview
RSocket is a binary protocol that allows asynchronous message passing over a single connection. It uses Reactive Streams, provides symmetric API’s for server and client and supports TCP, WebSockets, and Aeron. It’s not widely used, but it’s worth taking a closer look, especially if you like reactive programming or build Spring WebFlux application. The Internet lacks RSocket examples and that’s the reason why I’m going to show you how to establish a client-server connection with Java, JavaScript and Spring WebFlux.
As always, remember you can see the full source code in this GitHub project.
Java server - Java client
Let’s start with a simple Java application to easy understand different interaction models provided by RSocket: fire-and-forget, request/response, request/stream, and channel. Let’s create a Maven Project with the following dependencies:
pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.kojotdev.blog.rsocket</groupId>
- <artifactId>rsocket-examples</artifactId>
- <version>1.0-SNAPSHOT</version>
- <dependencies>
- <dependency>
- <groupId>io.rsocket</groupId>
- <artifactId>rsocket-core</artifactId>
- <version>1.0.0-RC3</version>
- </dependency>
- <dependency>
- <groupId>io.rsocket</groupId>
- <artifactId>rsocket-transport-netty</artifactId>
- <version>1.0.0-RC3</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>2.10.0.pr3</version>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <version>1.2.3</version>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- <version>1.2.3</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.25</version>
- </dependency>
- </dependencies>
- <repositories>
- <repository>
- <id>spring-milestones</id>
- <name>Spring Milestones</name>
- <url>https://repo.spring.io/libs-milestone</url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.8.1</version>
- <configuration>
- <release>11</release>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </project>
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.kojotdev.blog.rsocket</groupId> <artifactId>rsocket-examples</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>io.rsocket</groupId> <artifactId>rsocket-core</artifactId> <version>1.0.0-RC3</version> </dependency> <dependency> <groupId>io.rsocket</groupId> <artifactId>rsocket-transport-netty</artifactId> <version>1.0.0-RC3</version> </dependency> <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.10.0.pr3</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> </dependencies> <repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/libs-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <release>11</release> </configuration> </plugin> </plugins> </build> </project>
We added RSocket dependencies, Jackson for mapping JSON objects, and a logback for logger. We also added a Spring Milestones repository because it’s needed with the current version. Otherwise, Maven will throw an exception Could not find artifact io.projectreactor:reactor-bom:pom:Dysprosium-M3 in central (https://repo.maven.apache.org/maven2)
OK, it’s time to add our domain Message class and so ugly MessageMapper 😉
Message.java
- import com.fasterxml.jackson.annotation.JsonCreator;
- import com.fasterxml.jackson.annotation.JsonProperty;
- public class Message {
- public final String message;
- @JsonCreator
- public Message(@JsonProperty("message") String message) {
- this.message = message;
- }
- }
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; public class Message { public final String message; @JsonCreator public Message(@JsonProperty("message") String message) { this.message = message; } }
MessageMapper.java
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import java.io.IOException;
- public class MessageMapper {
- private static ObjectMapper jsonMapper = new ObjectMapper();
- public static String messageToJson(Message msg) {
- try {
- return jsonMapper.writeValueAsString(msg);
- } catch (JsonProcessingException e) {
- throw new IllegalStateException(e);
- }
- }
- public static Message jsonToMessage(String json) {
- try {
- return jsonMapper.readValue(json, Message.class);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
- }
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; public class MessageMapper { private static ObjectMapper jsonMapper = new ObjectMapper(); public static String messageToJson(Message msg) { try { return jsonMapper.writeValueAsString(msg); } catch (JsonProcessingException e) { throw new IllegalStateException(e); } } public static Message jsonToMessage(String json) { try { return jsonMapper.readValue(json, Message.class); } catch (IOException e) { throw new IllegalStateException(e); } } }
The Server class is listed below:
Server.java
- import ch.qos.logback.classic.Level;
- import ch.qos.logback.classic.Logger;
- import io.rsocket.AbstractRSocket;
- import io.rsocket.Payload;
- import io.rsocket.RSocketFactory;
- import io.rsocket.transport.netty.server.WebsocketServerTransport;
- import io.rsocket.util.DefaultPayload;
- import org.reactivestreams.Publisher;
- import org.slf4j.LoggerFactory;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.Mono;
- public final class Server {
- private static Logger log = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
- static {
- log.setLevel(Level.INFO);
- }
- public static void main(String[] args) {
- RSocketFactory.receive()
- .acceptor((setup, sendingSocket) -> Mono.just(new DefaultSimpleService()))
- .transport(WebsocketServerTransport.create(8801))
- .start()
- .block()
- .onClose()
- .block();
- }
- private static final class DefaultSimpleService extends AbstractRSocket {
- @Override
- public Mono<Void> fireAndForget(Payload payload) {
- log.info("got fireAndForget in Server");
- log.info(payload.getDataUtf8());
- return Mono.empty();
- }
- @Override
- public Mono<Payload> requestResponse(Payload payload) {
- log.info("got requestResponse in Server");
- log.info(payload.getDataUtf8());
- return Mono.just(payload.getDataUtf8())
- .map(payloadString -> MessageMapper.jsonToMessage(payloadString))
- .map(message -> message.message + " | requestReponse from Server #1")
- .map(responseText -> new Message(responseText))
- .map(responseMessage -> MessageMapper.messageToJson(responseMessage))
- .map(responseJson -> DefaultPayload.create(responseJson));
- }
- @Override
- public Flux<Payload> requestStream(Payload payload) {
- log.info("got requestStream in Server");
- log.info(payload.getDataUtf8());
- return Mono.just(payload.getDataUtf8())
- .map(payloadString -> MessageMapper.jsonToMessage(payloadString))
- .flatMapMany(msg -> Flux.range(0, 5)
- .map(count -> msg.message + " | requestStream from Server #" + count)
- .map(responseText -> new Message(responseText))
- .map(responseMessage -> MessageMapper.messageToJson(responseMessage)))
- .map(message -> DefaultPayload.create(message));
- }
- @Override
- public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
- log.info("got requestChannel in Server");
- return Flux.from(payloads)
- .map(payload -> payload.getDataUtf8())
- .map(payloadString -> {
- log.info(payloadString);
- return MessageMapper.jsonToMessage(payloadString);
- })
- .flatMap(msg -> Flux.range(0, 2)
- .map(count -> msg.message + " | requestChannel from Server #" + count)
- .map(responseText -> new Message(responseText))
- .map(responseMessage -> MessageMapper.messageToJson(responseMessage)))
- .map(message -> DefaultPayload.create(message));
- }
- }
- }
import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import io.rsocket.AbstractRSocket; import io.rsocket.Payload; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.server.WebsocketServerTransport; import io.rsocket.util.DefaultPayload; import org.reactivestreams.Publisher; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public final class Server { private static Logger log = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); static { log.setLevel(Level.INFO); } public static void main(String[] args) { RSocketFactory.receive() .acceptor((setup, sendingSocket) -> Mono.just(new DefaultSimpleService())) .transport(WebsocketServerTransport.create(8801)) .start() .block() .onClose() .block(); } private static final class DefaultSimpleService extends AbstractRSocket { @Override public Mono<Void> fireAndForget(Payload payload) { log.info("got fireAndForget in Server"); log.info(payload.getDataUtf8()); return Mono.empty(); } @Override public Mono<Payload> requestResponse(Payload payload) { log.info("got requestResponse in Server"); log.info(payload.getDataUtf8()); return Mono.just(payload.getDataUtf8()) .map(payloadString -> MessageMapper.jsonToMessage(payloadString)) .map(message -> message.message + " | requestReponse from Server #1") .map(responseText -> new Message(responseText)) .map(responseMessage -> MessageMapper.messageToJson(responseMessage)) .map(responseJson -> DefaultPayload.create(responseJson)); } @Override public Flux<Payload> requestStream(Payload payload) { log.info("got requestStream in Server"); log.info(payload.getDataUtf8()); return Mono.just(payload.getDataUtf8()) .map(payloadString -> MessageMapper.jsonToMessage(payloadString)) .flatMapMany(msg -> Flux.range(0, 5) .map(count -> msg.message + " | requestStream from Server #" + count) .map(responseText -> new Message(responseText)) .map(responseMessage -> MessageMapper.messageToJson(responseMessage))) .map(message -> DefaultPayload.create(message)); } @Override public Flux<Payload> requestChannel(Publisher<Payload> payloads) { log.info("got requestChannel in Server"); return Flux.from(payloads) .map(payload -> payload.getDataUtf8()) .map(payloadString -> { log.info(payloadString); return MessageMapper.jsonToMessage(payloadString); }) .flatMap(msg -> Flux.range(0, 2) .map(count -> msg.message + " | requestChannel from Server #" + count) .map(responseText -> new Message(responseText)) .map(responseMessage -> MessageMapper.messageToJson(responseMessage))) .map(message -> DefaultPayload.create(message)); } } }
In the main static method, we start our RSocket server. It uses WebSocket at 8801
port and accepts interaction models implemented by the DefaultSimpleService
class. All of these methods are very similar. They just get the payload, change its message and return it to the client. You can see the difference in the arguments and return types.Â
- fireAndForget just gets the payload and returns nothing (you don’t want to wait for the server response)
- requestResponse gets the payload and returns a single response (like with the standard HTTP/REST application)
- requestStream gets the payload and returns stream (no need to implement the SSE, it’s built-in)
- channel – stream as a request and stream as a response (bi-directional streams)
The client class looks very similar to the server class:
Client.java
- import ch.qos.logback.classic.Level;
- import ch.qos.logback.classic.Logger;
- import io.rsocket.Payload;
- import io.rsocket.RSocket;
- import io.rsocket.RSocketFactory;
- import io.rsocket.transport.netty.client.WebsocketClientTransport;
- import io.rsocket.util.DefaultPayload;
- import org.slf4j.LoggerFactory;
- import reactor.core.publisher.Flux;
- public class Client {
- private static Logger log = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
- static {
- log.setLevel(Level.INFO);
- }
- public static void main(String[] args) {
- final Client client = new Client();
- final RSocket rSocket = client.connect();
- client.fireAndForget(rSocket);
- client.requestResponse(rSocket);
- client.requestStream(rSocket);
- client.requestChannel(rSocket);
- }
- private RSocket connect() {
- return RSocketFactory.connect()
- .transport(WebsocketClientTransport.create(8801))
- .start()
- .block();
- }
- private void fireAndForget(RSocket rSocket) {
- log.info("sending fire and forget from client");
- Flux.just(new Message("fire and forget JAVA client!"))
- .map(msg -> MessageMapper.messageToJson(msg))
- .map(json -> DefaultPayload.create(json))
- .flatMap(message -> rSocket.fireAndForget(message))
- .blockLast();
- }
- private void requestResponse(RSocket rSocket) {
- log.info("sending request-response from client");
- Flux.just(new Message("requestResponse from JAVA client!"))
- .map(msg -> MessageMapper.messageToJson(msg))
- .map(json -> DefaultPayload.create(json))
- .flatMap(message -> rSocket.requestResponse(message))
- .map(payload -> payload.getDataUtf8())
- .doOnNext(payloadString -> {
- log.info("got response in JAVA client");
- log.info(payloadString);
- })
- .blockLast();
- }
- private void requestStream(RSocket rSocket) {
- log.info("sending request-stream from client");
- Flux.just(new Message("requestStream from JAVA client!"))
- .map(msg -> MessageMapper.messageToJson(msg))
- .map(json -> DefaultPayload.create(json))
- .flatMap(message -> rSocket.requestStream(message))
- .map(payload -> payload.getDataUtf8())
- .doOnNext(payloadString -> log.info(payloadString))
- .blockLast();
- }
- private void requestChannel(RSocket rSocket) {
- log.info("sending request-channel from client");
- final Flux<Payload> requestPayload = Flux.range(0, 5)
- .map(count -> new Message("requestChannel from JAVA client! #" + count))
- .map(msg -> {
- log.info("sending message: {}", msg.message);
- return MessageMapper.messageToJson(msg);
- })
- .map(json -> DefaultPayload.create(json));
- rSocket
- .requestChannel(requestPayload)
- .map(payload -> payload.getDataUtf8())
- .doOnNext(payloadString -> log.info(payloadString))
- .blockLast();
- }
- }
import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.WebsocketClientTransport; import io.rsocket.util.DefaultPayload; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; public class Client { private static Logger log = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); static { log.setLevel(Level.INFO); } public static void main(String[] args) { final Client client = new Client(); final RSocket rSocket = client.connect(); client.fireAndForget(rSocket); client.requestResponse(rSocket); client.requestStream(rSocket); client.requestChannel(rSocket); } private RSocket connect() { return RSocketFactory.connect() .transport(WebsocketClientTransport.create(8801)) .start() .block(); } private void fireAndForget(RSocket rSocket) { log.info("sending fire and forget from client"); Flux.just(new Message("fire and forget JAVA client!")) .map(msg -> MessageMapper.messageToJson(msg)) .map(json -> DefaultPayload.create(json)) .flatMap(message -> rSocket.fireAndForget(message)) .blockLast(); } private void requestResponse(RSocket rSocket) { log.info("sending request-response from client"); Flux.just(new Message("requestResponse from JAVA client!")) .map(msg -> MessageMapper.messageToJson(msg)) .map(json -> DefaultPayload.create(json)) .flatMap(message -> rSocket.requestResponse(message)) .map(payload -> payload.getDataUtf8()) .doOnNext(payloadString -> { log.info("got response in JAVA client"); log.info(payloadString); }) .blockLast(); } private void requestStream(RSocket rSocket) { log.info("sending request-stream from client"); Flux.just(new Message("requestStream from JAVA client!")) .map(msg -> MessageMapper.messageToJson(msg)) .map(json -> DefaultPayload.create(json)) .flatMap(message -> rSocket.requestStream(message)) .map(payload -> payload.getDataUtf8()) .doOnNext(payloadString -> log.info(payloadString)) .blockLast(); } private void requestChannel(RSocket rSocket) { log.info("sending request-channel from client"); final Flux<Payload> requestPayload = Flux.range(0, 5) .map(count -> new Message("requestChannel from JAVA client! #" + count)) .map(msg -> { log.info("sending message: {}", msg.message); return MessageMapper.messageToJson(msg); }) .map(json -> DefaultPayload.create(json)); rSocket .requestChannel(requestPayload) .map(payload -> payload.getDataUtf8()) .doOnNext(payloadString -> log.info(payloadString)) .blockLast(); } }
We still use the RSocketFactory
, but this time with the connect()
method instead of receive()
. As you can see I use WebsocketServerTransport
, but you can easily change it to the TCP (same with the port and host). After establishing the connection, we call the appropriate method and handle the response.
Java server - JavaScript client
With rsocket-js we can easily write our JavaScript client. As usual, I’ve created the Vue project, but I’m pretty sure you will understand the code even if you never used the Vue before. To make it work, we need to install the following packages:
npm i rsocket-core
npm i rsocket-websocket-client
npm i rsocket-flowable

Let’s connect with the backend:
Home.vue
- import { RSocketClient, JsonSerializers } from "rsocket-core";
- import RSocketWebSocketClient from "rsocket-websocket-client";
- ...
- connect() {
- console.log("connecting with RSocket...");
- const transport = new RSocketWebSocketClient({
- url: "ws://localhost:8801"
- });
- const client = new RSocketClient({
- // send/receive JSON objects instead of strings/buffers
- serializers: JsonSerializers,
- setup: {
- // ms btw sending keepalive to server
- keepAlive: 60000,
- // ms timeout if no keepalive response
- lifetime: 180000,
- // format of `data`
- dataMimeType: "application/json",
- // format of `metadata`
- metadataMimeType: "application/json"
- },
- transport
- });
- client.connect().subscribe({
- onComplete: socket => {
- this.socket = socket;
- },
- onError: error => {
- console.log("got connection error");
- console.error(error);
- },
- onSubscribe: cancel => {
- /* call cancel() to abort */
- }
- });
import { RSocketClient, JsonSerializers } from "rsocket-core"; import RSocketWebSocketClient from "rsocket-websocket-client"; ... connect() { console.log("connecting with RSocket..."); const transport = new RSocketWebSocketClient({ url: "ws://localhost:8801" }); const client = new RSocketClient({ // send/receive JSON objects instead of strings/buffers serializers: JsonSerializers, setup: { // ms btw sending keepalive to server keepAlive: 60000, // ms timeout if no keepalive response lifetime: 180000, // format of `data` dataMimeType: "application/json", // format of `metadata` metadataMimeType: "application/json" }, transport }); client.connect().subscribe({ onComplete: socket => { this.socket = socket; }, onError: error => { console.log("got connection error"); console.error(error); }, onSubscribe: cancel => { /* call cancel() to abort */ } });
The signature and implementation of each method are presented below.
fireAndForget(payload: Payload): void
FireAndForget.vue
- if (this.socket) {
- const message = { message: "fire and forget from JavaScript!" };
- this.socket.fireAndForget({
- data: message,
- metadata: ""
- });
- this.sent.push(message);
- } else {
- console.log("not connected...");
- }
if (this.socket) { const message = { message: "fire and forget from JavaScript!" }; this.socket.fireAndForget({ data: message, metadata: "" }); this.sent.push(message); } else { console.log("not connected..."); }
requestResponse(payload: Payload): Single
RequestResponse.vue
- if (this.socket) {
- const message = { message: "requestResponse from JavaScript!" };
- this.socket
- .requestResponse({
- data: message,
- metadata: ""
- })
- .subscribe({
- onComplete: data => {
- console.log("got response with requestResponse");
- this.received.push(data.data);
- },
- onError: error => {
- console.log("got error with requestResponse");
- console.error(error);
- },
- onSubscribe: cancel => {
- this.sent.push(message);
- /* call cancel() to stop onComplete/onError */
- }
- });
- } else {
- console.log("not connected...");
- }
if (this.socket) { const message = { message: "requestResponse from JavaScript!" }; this.socket .requestResponse({ data: message, metadata: "" }) .subscribe({ onComplete: data => { console.log("got response with requestResponse"); this.received.push(data.data); }, onError: error => { console.log("got error with requestResponse"); console.error(error); }, onSubscribe: cancel => { this.sent.push(message); /* call cancel() to stop onComplete/onError */ } }); } else { console.log("not connected..."); }
requestStream(payload: Payload): Flowable
RequestStream.vue
- if (this.socket) {
- const message = { message: "requestStream from JavaScript!" };
- this.socket
- .requestStream({
- data: message,
- metadata: ""
- })
- .subscribe({
- onComplete: () => {
- console.log("requestStream done");
- this.received.push("requestStream done");
- },
- onError: error => {
- console.log("got error with requestStream");
- console.error(error);
- },
- onNext: value => {
- // console.log("got next value in requestStream..");
- this.received.push(value.data);
- },
- // Nothing happens until `request(n)` is called
- onSubscribe: sub => {
- console.log("subscribe request Stream!");
- sub.request(7);
- this.sent.push(message);
- }
- });
- } else {
- console.log("not connected...");
- }
if (this.socket) { const message = { message: "requestStream from JavaScript!" }; this.socket .requestStream({ data: message, metadata: "" }) .subscribe({ onComplete: () => { console.log("requestStream done"); this.received.push("requestStream done"); }, onError: error => { console.log("got error with requestStream"); console.error(error); }, onNext: value => { // console.log("got next value in requestStream.."); this.received.push(value.data); }, // Nothing happens until `request(n)` is called onSubscribe: sub => { console.log("subscribe request Stream!"); sub.request(7); this.sent.push(message); } }); } else { console.log("not connected..."); }
requestChannel(payload: Flowable): Flowable
RequestChannel.vue
- if (this.socket) {
- const flowablePayload = new Flowable(subscriber => {
- subscriber.onSubscribe({
- cancel: () => {},
- request: n => {
- for (let index = 0; index < n; index++) {
- const message = {
- message: "requestChannel from JavaScript! #" + index
- };
- subscriber.onNext(message);
- }
- subscriber.onComplete();
- }
- });
- });
- // test flowable payload
- // flowablePayload.subscribe({
- // onComplete: () => console.log("done"),
- // onError: error => console.error(error),
- // onNext: value => {
- // console.log("got onNext value ");
- // console.log(value);
- // },
- // // Nothing happens until `request(n)` is called
- // onSubscribe: sub => {
- // sub.request(5);
- // }
- // });
- this.socket
- .requestChannel({
- data: flowablePayload,
- metadata: ""
- })
- .subscribe({
- onComplete: () => {
- console.log("requestChannel done");
- this.received.push("requestChannel done");
- },
- onError: error => {
- console.log("got error with requestChannel");
- console.error(error);
- },
- onNext: value => {
- // console.log("got next value in requestChannel..");
- this.received.push(value.data);
- },
- // Nothing happens until `request(n)` is called
- onSubscribe: sub => {
- console.log("subscribe request Channel!");
- sub.request(7);
- this.sent.push(message);
- }
- });
- } else {
- console.log("not connected...");
- }
if (this.socket) { const flowablePayload = new Flowable(subscriber => { subscriber.onSubscribe({ cancel: () => {}, request: n => { for (let index = 0; index < n; index++) { const message = { message: "requestChannel from JavaScript! #" + index }; subscriber.onNext(message); } subscriber.onComplete(); } }); }); // test flowable payload // flowablePayload.subscribe({ // onComplete: () => console.log("done"), // onError: error => console.error(error), // onNext: value => { // console.log("got onNext value "); // console.log(value); // }, // // Nothing happens until `request(n)` is called // onSubscribe: sub => { // sub.request(5); // } // }); this.socket .requestChannel({ data: flowablePayload, metadata: "" }) .subscribe({ onComplete: () => { console.log("requestChannel done"); this.received.push("requestChannel done"); }, onError: error => { console.log("got error with requestChannel"); console.error(error); }, onNext: value => { // console.log("got next value in requestChannel.."); this.received.push(value.data); }, // Nothing happens until `request(n)` is called onSubscribe: sub => { console.log("subscribe request Channel!"); sub.request(7); this.sent.push(message); } }); } else { console.log("not connected..."); }
At this moment, the requestChannel
method doesn’t work, because it is not yet implemented in the rsocket-js.
WebFlux server - WebFlux client
The latest version of Spring Boot (2.2.0 M6) supports RSocket. It means that you can very easily configure the server and client. I’m not going to implement all of the interaction models, they just depend on the method arguments and return type. Simple server that handles requestResponse
model requires below configuration:
ServerController.java
- import com.kojotdev.blog.webfluxrsocketexample.domain.Message;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.messaging.handler.annotation.MessageMapping;
- import org.springframework.stereotype.Controller;
- import reactor.core.publisher.Mono;
- @Controller
- public class ServerController {
- private static final Logger log = LoggerFactory.getLogger(ServerController.class);
- @MessageMapping("hello")
- public Mono<Message> helloServer(Message message) {
- log.info("got message in Server {}", message.message);
- return Mono.just(message)
- .map(msg -> new Message(msg.message + " | Server says hello!"));
- }
- }
import com.kojotdev.blog.webfluxrsocketexample.domain.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.stereotype.Controller; import reactor.core.publisher.Mono; @Controller public class ServerController { private static final Logger log = LoggerFactory.getLogger(ServerController.class); @MessageMapping("hello") public Mono<Message> helloServer(Message message) { log.info("got message in Server {}", message.message); return Mono.just(message) .map(msg -> new Message(msg.message + " | Server says hello!")); } }
application.properties
pom.xml
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-rsocket</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-webflux</artifactId>
- </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-rsocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
The client is a separate Spring Webflux application.
ClientConfiguration.java
- import io.rsocket.transport.netty.client.WebsocketClientTransport;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.messaging.rsocket.RSocketRequester;
- import org.springframework.messaging.rsocket.RSocketStrategies;
- import java.net.URI;
- @Configuration
- public class ClientConfiguration {
- @Bean
- public RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
- final URI uri = URI.create("ws://localhost:8080/rsocket");
- return RSocketRequester
- .builder()
- .rsocketStrategies(rSocketStrategies)
- .connect(WebsocketClientTransport.create(uri))
- .block();
- }
- }
import io.rsocket.transport.netty.client.WebsocketClientTransport; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketStrategies; import java.net.URI; @Configuration public class ClientConfiguration { @Bean public RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) { final URI uri = URI.create("ws://localhost:8080/rsocket"); return RSocketRequester .builder() .rsocketStrategies(rSocketStrategies) .connect(WebsocketClientTransport.create(uri)) .block(); } }
MessageController.java
- import com.kojotdev.blog.webfluxrsocketexample.domain.Message;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.messaging.rsocket.RSocketRequester;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestBody;
- import org.springframework.web.bind.annotation.RestController;
- import reactor.core.publisher.Mono;
- @RestController
- public class MessageController {
- private static final Logger log = LoggerFactory.getLogger(MessageController.class);
- public final RSocketRequester rSocketRequester;
- public MessageController(RSocketRequester rSocketRequester) {
- this.rSocketRequester = rSocketRequester;
- }
- @PostMapping("/message")
- public Mono<String> message(@RequestBody Message message) {
- log.info("got message in the rest request {}", message.message);
- return rSocketRequester
- .route("hello")
- .data(message)
- .retrieveMono(Message.class)
- .map(serverMessage -> serverMessage.message)
- .doOnNext(serverMsg -> log.info("server response in the client: {}", serverMsg));
- }
- }
import com.kojotdev.blog.webfluxrsocketexample.domain.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; @RestController public class MessageController { private static final Logger log = LoggerFactory.getLogger(MessageController.class); public final RSocketRequester rSocketRequester; public MessageController(RSocketRequester rSocketRequester) { this.rSocketRequester = rSocketRequester; } @PostMapping("/message") public Mono<String> message(@RequestBody Message message) { log.info("got message in the rest request {}", message.message); return rSocketRequester .route("hello") .data(message) .retrieveMono(Message.class) .map(serverMessage -> serverMessage.message) .doOnNext(serverMsg -> log.info("server response in the client: {}", serverMsg)); } }
We declared an RSocketRequester
bean to make a connection with the server. With this bean, we can easily send and retrieve the messages. The flow of our application looks like this:
- post the message with the REST endpoint “/message”
- send it to the server with the injected
RSocketRequester
bean - retrieve the responseÂ
- log the response and return it with the REST endpoint
Spring WebFlux doesn’t support WebSocket with STOMP, so the RSocket sounds very promising for solving the message mapping problem. The documentation is still very modest about this, but it looks like everything goes in the right direction.
WebFlux server - JavaScript client
It would be very nice to push the messages with the JavaScript client and the Spring WebFlux server. Unfortunately, I couldn’t figure it out how to do this. The problem lies in the metadata and routing. I found this issue in the GitHub project and hope they’re going to solve this soon.
2 Comments
Anonymous
very informative. Thank you
Michael
Hi,
to use rsocket-js with webflux server, you need to configure the mapping.
In your example, you use hello entrypoint on server: @MessageMapping(“hello”).
The same thing should be done on client side:
1. Change connection endpoint:
const transport = new RSocketWebSocketClient({
url: “ws://localhost:8801/rsocket”
});
2. change serializers & metadataMimeType in RSocketClient:
const client = new RSocketClient({
// send/receive JSON objects instead of strings/buffers
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
// other properties….
// format of `data`
dataMimeType: ‘application/json’,
// format of `metadata`
metadataMimeType: ‘message/x.rsocket.routing.v0’,
},
transport
});
3. Finally, set the same MessageMapping for a method:
this.socket
.requestResponse({
data: message,
metadata: String.fromCharCode(‘hello’.length) + ‘hello’
})
That’s all, now your web client can connect to webflux server.