Make it work

RSocket examples – Java, JavaScript and Spring WebFlux

Overview

RSocket is a binary protocol that allows asynchronous message passing over a single connection. It uses Reactive Streams, provides symmetric API’s for server and client and supports TCP, WebSockets, and Aeron. It’s not widely used, but it’s worth taking a closer look, especially if you like reactive programming or build Spring WebFlux application. The Internet lacks RSocket examples and that’s the reason why I’m going to show you how to establish a client-server connection with Java, JavaScript and Spring WebFlux.
As always, remember you can see the full source code in this GitHub project.

Java server - Java client

Let’s start with a simple Java application to easy understand different interaction models provided by RSocket: fire-and-forget, request/response, request/stream, and channel. Let’s create a Maven Project with the following dependencies:

pom.xml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.kojotdev.blog.rsocket</groupId>
  7. <artifactId>rsocket-examples</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <dependencies>
  10. <dependency>
  11. <groupId>io.rsocket</groupId>
  12. <artifactId>rsocket-core</artifactId>
  13. <version>1.0.0-RC3</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>io.rsocket</groupId>
  17. <artifactId>rsocket-transport-netty</artifactId>
  18. <version>1.0.0-RC3</version>
  19. </dependency>
  20. <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
  21. <dependency>
  22. <groupId>com.fasterxml.jackson.core</groupId>
  23. <artifactId>jackson-databind</artifactId>
  24. <version>2.10.0.pr3</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>ch.qos.logback</groupId>
  28. <artifactId>logback-classic</artifactId>
  29. <version>1.2.3</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>ch.qos.logback</groupId>
  33. <artifactId>logback-core</artifactId>
  34. <version>1.2.3</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.slf4j</groupId>
  38. <artifactId>slf4j-api</artifactId>
  39. <version>1.7.25</version>
  40. </dependency>
  41. </dependencies>
  42. <repositories>
  43. <repository>
  44. <id>spring-milestones</id>
  45. <name>Spring Milestones</name>
  46. <url>https://repo.spring.io/libs-milestone</url>
  47. <snapshots>
  48. <enabled>false</enabled>
  49. </snapshots>
  50. </repository>
  51. </repositories>
  52. <build>
  53. <plugins>
  54. <plugin>
  55. <groupId>org.apache.maven.plugins</groupId>
  56. <artifactId>maven-compiler-plugin</artifactId>
  57. <version>3.8.1</version>
  58. <configuration>
  59. <release>11</release>
  60. </configuration>
  61. </plugin>
  62. </plugins>
  63. </build>
  64. </project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kojotdev.blog.rsocket</groupId>
    <artifactId>rsocket-examples</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>io.rsocket</groupId>
            <artifactId>rsocket-core</artifactId>
            <version>1.0.0-RC3</version>
        </dependency>
        <dependency>
            <groupId>io.rsocket</groupId>
            <artifactId>rsocket-transport-netty</artifactId>
            <version>1.0.0-RC3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.10.0.pr3</version>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/libs-milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <release>11</release>
                </configuration>
            </plugin>
        </plugins>
    </build>
    
</project>

We added RSocket dependencies, Jackson for mapping JSON objects, and a logback for logger. We also added a Spring Milestones repository because it’s needed with the current version. Otherwise, Maven will throw an exception Could not find artifact io.projectreactor:reactor-bom:pom:Dysprosium-M3 in central (https://repo.maven.apache.org/maven2)
OK, it’s time to add our domain Message class and so ugly MessageMapper 😉

Message.java
  1. import com.fasterxml.jackson.annotation.JsonCreator;
  2. import com.fasterxml.jackson.annotation.JsonProperty;
  3. public class Message {
  4. public final String message;
  5. @JsonCreator
  6. public Message(@JsonProperty("message") String message) {
  7. this.message = message;
  8. }
  9. }
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class Message {

    public final String message;

    @JsonCreator
    public Message(@JsonProperty("message") String message) {
        this.message = message;
    }
}
MessageMapper.java
  1. import com.fasterxml.jackson.core.JsonProcessingException;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import java.io.IOException;
  4. public class MessageMapper {
  5. private static ObjectMapper jsonMapper = new ObjectMapper();
  6. public static String messageToJson(Message msg) {
  7. try {
  8. return jsonMapper.writeValueAsString(msg);
  9. } catch (JsonProcessingException e) {
  10. throw new IllegalStateException(e);
  11. }
  12. }
  13. public static Message jsonToMessage(String json) {
  14. try {
  15. return jsonMapper.readValue(json, Message.class);
  16. } catch (IOException e) {
  17. throw new IllegalStateException(e);
  18. }
  19. }
  20. }
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

public class MessageMapper {
    private static ObjectMapper jsonMapper = new ObjectMapper();

    public static String messageToJson(Message msg) {
        try {
            return jsonMapper.writeValueAsString(msg);
        } catch (JsonProcessingException e) {
            throw new IllegalStateException(e);
        }
    }

    public static Message jsonToMessage(String json) {
        try {
            return jsonMapper.readValue(json, Message.class);
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }

    }
}

The Server class is listed below:

Server.java
  1. import ch.qos.logback.classic.Level;
  2. import ch.qos.logback.classic.Logger;
  3. import io.rsocket.AbstractRSocket;
  4. import io.rsocket.Payload;
  5. import io.rsocket.RSocketFactory;
  6. import io.rsocket.transport.netty.server.WebsocketServerTransport;
  7. import io.rsocket.util.DefaultPayload;
  8. import org.reactivestreams.Publisher;
  9. import org.slf4j.LoggerFactory;
  10. import reactor.core.publisher.Flux;
  11. import reactor.core.publisher.Mono;
  12. public final class Server {
  13. private static Logger log = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
  14. static {
  15. log.setLevel(Level.INFO);
  16. }
  17. public static void main(String[] args) {
  18. RSocketFactory.receive()
  19. .acceptor((setup, sendingSocket) -> Mono.just(new DefaultSimpleService()))
  20. .transport(WebsocketServerTransport.create(8801))
  21. .start()
  22. .block()
  23. .onClose()
  24. .block();
  25. }
  26. private static final class DefaultSimpleService extends AbstractRSocket {
  27. @Override
  28. public Mono<Void> fireAndForget(Payload payload) {
  29. log.info("got fireAndForget in Server");
  30. log.info(payload.getDataUtf8());
  31. return Mono.empty();
  32. }
  33. @Override
  34. public Mono<Payload> requestResponse(Payload payload) {
  35. log.info("got requestResponse in Server");
  36. log.info(payload.getDataUtf8());
  37. return Mono.just(payload.getDataUtf8())
  38. .map(payloadString -> MessageMapper.jsonToMessage(payloadString))
  39. .map(message -> message.message + " | requestReponse from Server #1")
  40. .map(responseText -> new Message(responseText))
  41. .map(responseMessage -> MessageMapper.messageToJson(responseMessage))
  42. .map(responseJson -> DefaultPayload.create(responseJson));
  43. }
  44. @Override
  45. public Flux<Payload> requestStream(Payload payload) {
  46. log.info("got requestStream in Server");
  47. log.info(payload.getDataUtf8());
  48. return Mono.just(payload.getDataUtf8())
  49. .map(payloadString -> MessageMapper.jsonToMessage(payloadString))
  50. .flatMapMany(msg -> Flux.range(0, 5)
  51. .map(count -> msg.message + " | requestStream from Server #" + count)
  52. .map(responseText -> new Message(responseText))
  53. .map(responseMessage -> MessageMapper.messageToJson(responseMessage)))
  54. .map(message -> DefaultPayload.create(message));
  55. }
  56. @Override
  57. public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
  58. log.info("got requestChannel in Server");
  59. return Flux.from(payloads)
  60. .map(payload -> payload.getDataUtf8())
  61. .map(payloadString -> {
  62. log.info(payloadString);
  63. return MessageMapper.jsonToMessage(payloadString);
  64. })
  65. .flatMap(msg -> Flux.range(0, 2)
  66. .map(count -> msg.message + " | requestChannel from Server #" + count)
  67. .map(responseText -> new Message(responseText))
  68. .map(responseMessage -> MessageMapper.messageToJson(responseMessage)))
  69. .map(message -> DefaultPayload.create(message));
  70. }
  71. }
  72. }
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import io.rsocket.util.DefaultPayload;
import org.reactivestreams.Publisher;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class Server {

    private static Logger log = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);

    static {
        log.setLevel(Level.INFO);
    }

    public static void main(String[] args) {
        RSocketFactory.receive()
                .acceptor((setup, sendingSocket) -> Mono.just(new DefaultSimpleService()))
                .transport(WebsocketServerTransport.create(8801))
                .start()
                .block()
                .onClose()
                .block();
    }

    private static final class DefaultSimpleService extends AbstractRSocket {

        @Override
        public Mono<Void> fireAndForget(Payload payload) {
            log.info("got fireAndForget in Server");
            log.info(payload.getDataUtf8());
            return Mono.empty();
        }

        @Override
        public Mono<Payload> requestResponse(Payload payload) {
            log.info("got requestResponse in Server");
            log.info(payload.getDataUtf8());
            return Mono.just(payload.getDataUtf8())
                    .map(payloadString -> MessageMapper.jsonToMessage(payloadString))
                    .map(message -> message.message + " | requestReponse from Server #1")
                    .map(responseText -> new Message(responseText))
                    .map(responseMessage -> MessageMapper.messageToJson(responseMessage))
                    .map(responseJson -> DefaultPayload.create(responseJson));
        }

        @Override
        public Flux<Payload> requestStream(Payload payload) {
            log.info("got requestStream in Server");
            log.info(payload.getDataUtf8());
            return Mono.just(payload.getDataUtf8())
                    .map(payloadString -> MessageMapper.jsonToMessage(payloadString))
                    .flatMapMany(msg -> Flux.range(0, 5)
                            .map(count -> msg.message + " | requestStream from Server #" + count)
                            .map(responseText -> new Message(responseText))
                            .map(responseMessage -> MessageMapper.messageToJson(responseMessage)))
                    .map(message -> DefaultPayload.create(message));
        }

        @Override
        public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
            log.info("got requestChannel in Server");
            return Flux.from(payloads)
                    .map(payload -> payload.getDataUtf8())
                    .map(payloadString -> {
                        log.info(payloadString);
                        return MessageMapper.jsonToMessage(payloadString);
                    })
                    .flatMap(msg -> Flux.range(0, 2)
                            .map(count -> msg.message + " | requestChannel from Server #" + count)
                            .map(responseText -> new Message(responseText))
                            .map(responseMessage -> MessageMapper.messageToJson(responseMessage)))
                    .map(message -> DefaultPayload.create(message));

        }
    }

}

In the main static method, we start our RSocket server. It uses WebSocket at 8801 port and accepts interaction models implemented by the DefaultSimpleService class. All of these methods are very similar. They just get the payload, change its message and return it to the client. You can see the difference in the arguments and return types. 

    • fireAndForget just gets the payload and returns nothing (you don’t want to wait for the server response)
    • requestResponse gets the payload and returns a single response (like with the standard HTTP/REST application)
    • requestStream gets the payload and returns stream (no need to implement the SSE, it’s built-in)
    • channel – stream as a request and stream as a response (bi-directional streams)

The client class looks very similar to the server class:

Client.java
  1. import ch.qos.logback.classic.Level;
  2. import ch.qos.logback.classic.Logger;
  3. import io.rsocket.Payload;
  4. import io.rsocket.RSocket;
  5. import io.rsocket.RSocketFactory;
  6. import io.rsocket.transport.netty.client.WebsocketClientTransport;
  7. import io.rsocket.util.DefaultPayload;
  8. import org.slf4j.LoggerFactory;
  9. import reactor.core.publisher.Flux;
  10. public class Client {
  11. private static Logger log = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
  12. static {
  13. log.setLevel(Level.INFO);
  14. }
  15. public static void main(String[] args) {
  16. final Client client = new Client();
  17. final RSocket rSocket = client.connect();
  18. client.fireAndForget(rSocket);
  19. client.requestResponse(rSocket);
  20. client.requestStream(rSocket);
  21. client.requestChannel(rSocket);
  22. }
  23. private RSocket connect() {
  24. return RSocketFactory.connect()
  25. .transport(WebsocketClientTransport.create(8801))
  26. .start()
  27. .block();
  28. }
  29. private void fireAndForget(RSocket rSocket) {
  30. log.info("sending fire and forget from client");
  31. Flux.just(new Message("fire and forget JAVA client!"))
  32. .map(msg -> MessageMapper.messageToJson(msg))
  33. .map(json -> DefaultPayload.create(json))
  34. .flatMap(message -> rSocket.fireAndForget(message))
  35. .blockLast();
  36. }
  37. private void requestResponse(RSocket rSocket) {
  38. log.info("sending request-response from client");
  39. Flux.just(new Message("requestResponse from JAVA client!"))
  40. .map(msg -> MessageMapper.messageToJson(msg))
  41. .map(json -> DefaultPayload.create(json))
  42. .flatMap(message -> rSocket.requestResponse(message))
  43. .map(payload -> payload.getDataUtf8())
  44. .doOnNext(payloadString -> {
  45. log.info("got response in JAVA client");
  46. log.info(payloadString);
  47. })
  48. .blockLast();
  49. }
  50. private void requestStream(RSocket rSocket) {
  51. log.info("sending request-stream from client");
  52. Flux.just(new Message("requestStream from JAVA client!"))
  53. .map(msg -> MessageMapper.messageToJson(msg))
  54. .map(json -> DefaultPayload.create(json))
  55. .flatMap(message -> rSocket.requestStream(message))
  56. .map(payload -> payload.getDataUtf8())
  57. .doOnNext(payloadString -> log.info(payloadString))
  58. .blockLast();
  59. }
  60. private void requestChannel(RSocket rSocket) {
  61. log.info("sending request-channel from client");
  62. final Flux<Payload> requestPayload = Flux.range(0, 5)
  63. .map(count -> new Message("requestChannel from JAVA client! #" + count))
  64. .map(msg -> {
  65. log.info("sending message: {}", msg.message);
  66. return MessageMapper.messageToJson(msg);
  67. })
  68. .map(json -> DefaultPayload.create(json));
  69. rSocket
  70. .requestChannel(requestPayload)
  71. .map(payload -> payload.getDataUtf8())
  72. .doOnNext(payloadString -> log.info(payloadString))
  73. .blockLast();
  74. }
  75. }
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.util.DefaultPayload;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

public class Client {

    private static Logger log = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);

    static {
        log.setLevel(Level.INFO);
    }

    public static void main(String[] args) {
        final Client client = new Client();
        final RSocket rSocket = client.connect();
        client.fireAndForget(rSocket);
        client.requestResponse(rSocket);
        client.requestStream(rSocket);
        client.requestChannel(rSocket);
    }

    private RSocket connect() {
        return RSocketFactory.connect()
                .transport(WebsocketClientTransport.create(8801))
                .start()
                .block();
    }

    private void fireAndForget(RSocket rSocket) {
        log.info("sending fire and forget from client");
        Flux.just(new Message("fire and forget JAVA client!"))
                .map(msg -> MessageMapper.messageToJson(msg))
                .map(json -> DefaultPayload.create(json))
                .flatMap(message -> rSocket.fireAndForget(message))
                .blockLast();
    }

    private void requestResponse(RSocket rSocket) {
        log.info("sending request-response from client");
        Flux.just(new Message("requestResponse from JAVA client!"))
                .map(msg -> MessageMapper.messageToJson(msg))
                .map(json -> DefaultPayload.create(json))
                .flatMap(message -> rSocket.requestResponse(message))
                .map(payload -> payload.getDataUtf8())
                .doOnNext(payloadString -> {
                    log.info("got response in JAVA client");
                    log.info(payloadString);
                })
                .blockLast();
    }

    private void requestStream(RSocket rSocket) {
        log.info("sending request-stream from client");
        Flux.just(new Message("requestStream from JAVA client!"))
                .map(msg -> MessageMapper.messageToJson(msg))
                .map(json -> DefaultPayload.create(json))
                .flatMap(message -> rSocket.requestStream(message))
                .map(payload -> payload.getDataUtf8())
                .doOnNext(payloadString -> log.info(payloadString))
                .blockLast();
    }

    private void requestChannel(RSocket rSocket) {
        log.info("sending request-channel from client");
        final Flux<Payload> requestPayload = Flux.range(0, 5)
                .map(count -> new Message("requestChannel from JAVA client! #" + count))
                .map(msg -> {
                    log.info("sending message: {}", msg.message);
                    return MessageMapper.messageToJson(msg);
                })
                .map(json -> DefaultPayload.create(json));

        rSocket
                .requestChannel(requestPayload)
                .map(payload -> payload.getDataUtf8())
                .doOnNext(payloadString -> log.info(payloadString))
                .blockLast();
    }
}

We still use the RSocketFactory, but this time with the connect() method instead of receive(). As you can see I use WebsocketServerTransport, but you can easily change it to the TCP (same with the port and host). After establishing the connection, we call the appropriate method and handle the response.

Java server - JavaScript client

With rsocket-js we can easily write our JavaScript client. As usual, I’ve created the Vue project, but I’m pretty sure you will understand the code even if you never used the Vue before. To make it work, we need to install the following packages:

    • npm i rsocket-core
    • npm i rsocket-websocket-client
    • npm i rsocket-flowable

Let’s connect with the backend:

Home.vue
  1. import { RSocketClient, JsonSerializers } from "rsocket-core";
  2. import RSocketWebSocketClient from "rsocket-websocket-client";
  3. ...
  4. connect() {
  5. console.log("connecting with RSocket...");
  6. const transport = new RSocketWebSocketClient({
  7. url: "ws://localhost:8801"
  8. });
  9. const client = new RSocketClient({
  10. // send/receive JSON objects instead of strings/buffers
  11. serializers: JsonSerializers,
  12. setup: {
  13. // ms btw sending keepalive to server
  14. keepAlive: 60000,
  15. // ms timeout if no keepalive response
  16. lifetime: 180000,
  17. // format of `data`
  18. dataMimeType: "application/json",
  19. // format of `metadata`
  20. metadataMimeType: "application/json"
  21. },
  22. transport
  23. });
  24. client.connect().subscribe({
  25. onComplete: socket => {
  26. this.socket = socket;
  27. },
  28. onError: error => {
  29. console.log("got connection error");
  30. console.error(error);
  31. },
  32. onSubscribe: cancel => {
  33. /* call cancel() to abort */
  34. }
  35. });
import { RSocketClient, JsonSerializers } from "rsocket-core";
import RSocketWebSocketClient from "rsocket-websocket-client";
...
connect() {
      console.log("connecting with RSocket...");
      const transport = new RSocketWebSocketClient({
        url: "ws://localhost:8801"
      });
      const client = new RSocketClient({
        // send/receive JSON objects instead of strings/buffers
        serializers: JsonSerializers,
        setup: {
          // ms btw sending keepalive to server
          keepAlive: 60000,
          // ms timeout if no keepalive response
          lifetime: 180000,
          // format of `data`
          dataMimeType: "application/json",
          // format of `metadata`
          metadataMimeType: "application/json"
        },
        transport
      });
      client.connect().subscribe({
        onComplete: socket => {
          this.socket = socket;
        },
        onError: error => {
          console.log("got connection error");
          console.error(error);
        },
        onSubscribe: cancel => {
          /* call cancel() to abort */
        }
      });

The signature and implementation of each method are presented below.

fireAndForget(payload: Payload): void

FireAndForget.vue
  1. if (this.socket) {
  2. const message = { message: "fire and forget from JavaScript!" };
  3. this.socket.fireAndForget({
  4. data: message,
  5. metadata: ""
  6. });
  7. this.sent.push(message);
  8. } else {
  9. console.log("not connected...");
  10. }
if (this.socket) {
        const message = { message: "fire and forget from JavaScript!" };
        this.socket.fireAndForget({
          data: message,
          metadata: ""
        });
        this.sent.push(message);
      } else {
        console.log("not connected...");
      }
requestResponse(payload: Payload): Single
RequestResponse.vue
  1. if (this.socket) {
  2. const message = { message: "requestResponse from JavaScript!" };
  3. this.socket
  4. .requestResponse({
  5. data: message,
  6. metadata: ""
  7. })
  8. .subscribe({
  9. onComplete: data => {
  10. console.log("got response with requestResponse");
  11. this.received.push(data.data);
  12. },
  13. onError: error => {
  14. console.log("got error with requestResponse");
  15. console.error(error);
  16. },
  17. onSubscribe: cancel => {
  18. this.sent.push(message);
  19. /* call cancel() to stop onComplete/onError */
  20. }
  21. });
  22. } else {
  23. console.log("not connected...");
  24. }
  if (this.socket) {
	const message = { message: "requestResponse from JavaScript!" };
	this.socket
	  .requestResponse({
		data: message,
		metadata: ""
	  })
	  .subscribe({
		onComplete: data => {
		  console.log("got response with requestResponse");
		  this.received.push(data.data);
		},
		onError: error => {
		  console.log("got error with requestResponse");
		  console.error(error);
		},
		onSubscribe: cancel => {
		  this.sent.push(message);
		  /* call cancel() to stop onComplete/onError */
		}
	  });
  } else {
	console.log("not connected...");
  }
requestStream(payload: Payload): Flowable
RequestStream.vue
  1. if (this.socket) {
  2. const message = { message: "requestStream from JavaScript!" };
  3. this.socket
  4. .requestStream({
  5. data: message,
  6. metadata: ""
  7. })
  8. .subscribe({
  9. onComplete: () => {
  10. console.log("requestStream done");
  11. this.received.push("requestStream done");
  12. },
  13. onError: error => {
  14. console.log("got error with requestStream");
  15. console.error(error);
  16. },
  17. onNext: value => {
  18. // console.log("got next value in requestStream..");
  19. this.received.push(value.data);
  20. },
  21. // Nothing happens until `request(n)` is called
  22. onSubscribe: sub => {
  23. console.log("subscribe request Stream!");
  24. sub.request(7);
  25. this.sent.push(message);
  26. }
  27. });
  28. } else {
  29. console.log("not connected...");
  30. }
  if (this.socket) {
	const message = { message: "requestStream from JavaScript!" };
	this.socket
	  .requestStream({
		data: message,
		metadata: ""
	  })
	  .subscribe({
		onComplete: () => {
		  console.log("requestStream done");
		  this.received.push("requestStream done");
		},
		onError: error => {
		  console.log("got error with requestStream");
		  console.error(error);
		},
		onNext: value => {
		  // console.log("got next value in requestStream..");
		  this.received.push(value.data);
		},
		// Nothing happens until `request(n)` is called
		onSubscribe: sub => {
		  console.log("subscribe request Stream!");
		  sub.request(7);
		  this.sent.push(message);
		}
	  });
  } else {
	console.log("not connected...");
  }
requestChannel(payload: Flowable): Flowable
RequestChannel.vue
  1. if (this.socket) {
  2. const flowablePayload = new Flowable(subscriber => {
  3. subscriber.onSubscribe({
  4. cancel: () => {},
  5. request: n => {
  6. for (let index = 0; index < n; index++) {
  7. const message = {
  8. message: "requestChannel from JavaScript! #" + index
  9. };
  10. subscriber.onNext(message);
  11. }
  12. subscriber.onComplete();
  13. }
  14. });
  15. });
  16. // test flowable payload
  17. // flowablePayload.subscribe({
  18. // onComplete: () => console.log("done"),
  19. // onError: error => console.error(error),
  20. // onNext: value => {
  21. // console.log("got onNext value ");
  22. // console.log(value);
  23. // },
  24. // // Nothing happens until `request(n)` is called
  25. // onSubscribe: sub => {
  26. // sub.request(5);
  27. // }
  28. // });
  29. this.socket
  30. .requestChannel({
  31. data: flowablePayload,
  32. metadata: ""
  33. })
  34. .subscribe({
  35. onComplete: () => {
  36. console.log("requestChannel done");
  37. this.received.push("requestChannel done");
  38. },
  39. onError: error => {
  40. console.log("got error with requestChannel");
  41. console.error(error);
  42. },
  43. onNext: value => {
  44. // console.log("got next value in requestChannel..");
  45. this.received.push(value.data);
  46. },
  47. // Nothing happens until `request(n)` is called
  48. onSubscribe: sub => {
  49. console.log("subscribe request Channel!");
  50. sub.request(7);
  51. this.sent.push(message);
  52. }
  53. });
  54. } else {
  55. console.log("not connected...");
  56. }
  if (this.socket) {
	const flowablePayload = new Flowable(subscriber => {
	  subscriber.onSubscribe({
		cancel: () => {},
		request: n => {
		  for (let index = 0; index < n; index++) {
			const message = {
			  message: "requestChannel from JavaScript! #" + index
			};
			subscriber.onNext(message);
		  }
		  subscriber.onComplete();
		}
	  });
	});

	// test flowable payload
	// flowablePayload.subscribe({
	//   onComplete: () => console.log("done"),
	//   onError: error => console.error(error),
	//   onNext: value => {
	//     console.log("got onNext value ");
	//     console.log(value);
	//   },
	//   // Nothing happens until `request(n)` is called
	//   onSubscribe: sub => {
	//     sub.request(5);
	//   }
	// });

	this.socket
	  .requestChannel({
		data: flowablePayload,
		metadata: ""
	  })
	  .subscribe({
		onComplete: () => {
		  console.log("requestChannel done");
		  this.received.push("requestChannel done");
		},
		onError: error => {
		  console.log("got error with requestChannel");
		  console.error(error);
		},
		onNext: value => {
		  // console.log("got next value in requestChannel..");
		  this.received.push(value.data);
		},
		// Nothing happens until `request(n)` is called
		onSubscribe: sub => {
		  console.log("subscribe request Channel!");
		  sub.request(7);
		  this.sent.push(message);
		}
	  });
  } else {
	console.log("not connected...");
  }

At this moment, the requestChannel method doesn’t work, because it is not yet implemented in the rsocket-js.

WebFlux server - WebFlux client

The latest version of Spring Boot (2.2.0 M6) supports RSocket. It means that you can very easily configure the server and client. I’m not going to implement all of the interaction models, they just depend on the method arguments and return type. Simple server that handles requestResponse model requires below configuration:

ServerController.java
  1. import com.kojotdev.blog.webfluxrsocketexample.domain.Message;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.messaging.handler.annotation.MessageMapping;
  5. import org.springframework.stereotype.Controller;
  6. import reactor.core.publisher.Mono;
  7. @Controller
  8. public class ServerController {
  9. private static final Logger log = LoggerFactory.getLogger(ServerController.class);
  10. @MessageMapping("hello")
  11. public Mono<Message> helloServer(Message message) {
  12. log.info("got message in Server {}", message.message);
  13. return Mono.just(message)
  14. .map(msg -> new Message(msg.message + " | Server says hello!"));
  15. }
  16. }
import com.kojotdev.blog.webfluxrsocketexample.domain.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Mono;

@Controller
public class ServerController {
    private static final Logger log = LoggerFactory.getLogger(ServerController.class);

    @MessageMapping("hello")
    public Mono<Message> helloServer(Message message) {
        log.info("got message in Server {}", message.message);
        return Mono.just(message)
                .map(msg -> new Message(msg.message + " | Server says hello!"));
    }
}
application.properties
  1. #spring.rsocket.server.port=7000
  2. spring.rsocket.server.transport=websocket
  3. spring.rsocket.server.mapping-path=/rsocket
  4. #logging.level.root=DEBUG
#spring.rsocket.server.port=7000
spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rsocket
#logging.level.root=DEBUG
pom.xml
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-rsocket</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-webflux</artifactId>
  8. </dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

The client is a separate Spring Webflux application.

ClientConfiguration.java
  1. import io.rsocket.transport.netty.client.WebsocketClientTransport;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.messaging.rsocket.RSocketRequester;
  5. import org.springframework.messaging.rsocket.RSocketStrategies;
  6. import java.net.URI;
  7. @Configuration
  8. public class ClientConfiguration {
  9. @Bean
  10. public RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
  11. final URI uri = URI.create("ws://localhost:8080/rsocket");
  12. return RSocketRequester
  13. .builder()
  14. .rsocketStrategies(rSocketStrategies)
  15. .connect(WebsocketClientTransport.create(uri))
  16. .block();
  17. }
  18. }
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;

import java.net.URI;

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        final URI uri = URI.create("ws://localhost:8080/rsocket");
        return RSocketRequester
                .builder()
                .rsocketStrategies(rSocketStrategies)
                .connect(WebsocketClientTransport.create(uri))
                .block();
    }
}
MessageController.java
  1. import com.kojotdev.blog.webfluxrsocketexample.domain.Message;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.messaging.rsocket.RSocketRequester;
  5. import org.springframework.web.bind.annotation.PostMapping;
  6. import org.springframework.web.bind.annotation.RequestBody;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import reactor.core.publisher.Mono;
  9. @RestController
  10. public class MessageController {
  11. private static final Logger log = LoggerFactory.getLogger(MessageController.class);
  12. public final RSocketRequester rSocketRequester;
  13. public MessageController(RSocketRequester rSocketRequester) {
  14. this.rSocketRequester = rSocketRequester;
  15. }
  16. @PostMapping("/message")
  17. public Mono<String> message(@RequestBody Message message) {
  18. log.info("got message in the rest request {}", message.message);
  19. return rSocketRequester
  20. .route("hello")
  21. .data(message)
  22. .retrieveMono(Message.class)
  23. .map(serverMessage -> serverMessage.message)
  24. .doOnNext(serverMsg -> log.info("server response in the client: {}", serverMsg));
  25. }
  26. }
import com.kojotdev.blog.webfluxrsocketexample.domain.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

@RestController
public class MessageController {

    private static final Logger log = LoggerFactory.getLogger(MessageController.class);

    public final RSocketRequester rSocketRequester;

    public MessageController(RSocketRequester rSocketRequester) {
        this.rSocketRequester = rSocketRequester;
    }

    @PostMapping("/message")
    public Mono<String> message(@RequestBody Message message) {
        log.info("got message in the rest request {}", message.message);
        return rSocketRequester
                .route("hello")
                .data(message)
                .retrieveMono(Message.class)
                .map(serverMessage -> serverMessage.message)
                .doOnNext(serverMsg -> log.info("server response in the client: {}", serverMsg));
    }
}

We declared an RSocketRequester bean to make a connection with the server. With this bean, we can easily send and retrieve the messages. The flow of our application looks like this:

    • post the message with the REST endpoint “/message”
    • send it to the server with the injected RSocketRequester bean
    • retrieve the response 
    • log the response and return it with the REST endpoint

Spring WebFlux doesn’t support WebSocket with STOMP, so the RSocket sounds very promising for solving the message mapping problem. The documentation is still very modest about this, but it looks like everything goes in the right direction.

WebFlux server - JavaScript client

It would be very nice to push the messages with the JavaScript client and the Spring WebFlux server. Unfortunately, I couldn’t figure it out how to do this. The problem lies in the metadata and routing. I found this issue in the GitHub project and hope they’re going to solve this soon.

Leave a Reply

Your email address will not be published.