Make it work

Spring WebFlux WebSocket with Vue.js

Overview

In the previous article, I’ve shown you how to build a Spring application with WebSocket (STOMP) and Vue.js. This time, we are going to use Spring WebFlux and native WebSocket, but our application will look and behave the same. As usual, remember that you can see the full source code in the GitHub project.

A newer version of Spring supports RSocket, so you probably are interested about this article either.

Spring WebFlux WebSocket - backend
To initialize the project let’s go to Spring Initializr and select Spring Reactive Web dependency. Please don’t pick up spring-boot-starter-websocket project here, because it’s going to use embedded Tomcat server, which isn’t desirable when you build a reactive app.

To enable WebSocket in our WebFlux project, we need to add these two classes:

ReactiveWebSocketConfiguration
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class ReactiveWebSocketConfiguration {

    private WebSocketHandler webSocketHandler;

    public ReactiveWebSocketConfiguration(ReactiveWebSocketHandler webSocketHandler) {
        this.webSocketHandler = webSocketHandler;
    }

    @Bean
    public HandlerMapping webSocketHandlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/greetings", webSocketHandler);

        SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
        handlerMapping.setOrder(1);
        handlerMapping.setUrlMap(map);
        return handlerMapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }

}
ReactiveWebSocketHandler
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        final Flux<WebSocketMessage> message = webSocketSession
                .receive()
                .map(webSocketMessage -> webSocketMessage.getPayloadAsText())
                .map(name -> "Hello, " + name + "!")
                .map(greetings -> webSocketSession.textMessage(greetings));

        return webSocketSession.send(message);
    }
}

That’s it! The first one maps our WebSocketHandler to the /greetings endpoint and the second handles our WebSocket message. Now let’s start the application and check it with the Apic’s Chrome extension.

It works as expected. In our ReactiveWebSocketHandler we receive the message, get the body as plain text, and then return it with Hello prefix. However, instead of the simple String, we would like to send the JSON data object. In the regular Spring application, it is possible out of the box, but with the WebFlux… well, we need to implement the Object Mapper.

Mapping JSON request and response

To handle the request message we need to add the HelloMessage class. It’s simple POJO with two annotations like @JsonCreator and @JsonProperty to make Jackson properly map the fields. For the response, we are going to use the Greeting class.

HelloMessage
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class HelloMessage {
    
    public final String name;

    @JsonCreator
    public HelloMessage(
            @JsonProperty("name") String name) {
        this.name = name;
    }
}
Greeting
public class Greeting {

    public final String content;

    public Greeting(String content) {
        this.content = content;
    }

    public static Greeting from(HelloMessage helloMessage) {
        return new Greeting("Hello, " + helloMessage.name + "!");
    }
}

The GreetingService will be responsible for the conversion of JSON -> HelloMessage -> Greetings -> JSON.

GreetingsService
import com.fasterxml.jackson.databind.ObjectMapper;
import io.vavr.control.Try;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GreetingsService {

    private static final Logger log = LoggerFactory.getLogger(GreetingsService.class);

    private ObjectMapper jsonMapper = new ObjectMapper();

    public String greeting(String message) {
        return Try.of(() -> {
//            Thread.sleep(1000); //simulate delay
            final HelloMessage helloMessage = jsonMapper.readValue(message, HelloMessage.class);
            final Greeting greeting = Greeting.from(helloMessage);
            return jsonMapper.writeValueAsString(greeting);
        })
                .onFailure(parserException -> log.error("Could not parse JSON object", parserException))
                .getOrElse("");
    }

}

As you can see I use Vavr (version 0.10.1) to handle the parser exception. It is a great Java functional library, which provides immutable collections, so it suite pretty well to the reactive WebFlux application. In case the parser would fail the greeting() method returns an empty string.

The last step is to change our ReactiveWebSocketHandler.

ReactiveWebSocketHandler
@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {

    private final GreetingsService greetingsService = new GreetingsService();

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        final Flux<WebSocketMessage> message = webSocketSession
                .receive()
                .map(webSocketMessage -> webSocketMessage.getPayloadAsText())
                .map(helloMessage -> greetingsService.greeting(helloMessage))
                .map(greetings -> webSocketSession.textMessage(greetings));

        return webSocketSession.send(message);
    }
}
Let’s test it. Fire-up Apic tool, connect to WebSocket, send JSON message like {"name":"kojot"} and see the response {"content":"Hello, kojot!"}. Looks like it working fine, but there is one little problem here. If we open the Apic in another tab, we can see that our message isn’t broadcast to the other sessions. We need to fix it.
Sharing messages with WebSocket sessions

In a typical Spring Boot application, this is quite simple. We can enable a built-in message broker with @EnableWebSocketMessageBroker annotation. Spring WebFlux doesn’t have this option. We need to implement it by ourselves. The idea is to have a Flux which will be shared across all sessions and its data must be populated by new incoming messages. This solution is based on the Matt Raible great article.

First, we need to change our ReactiveWebSocketHandler.

ReactiveWebSocketHandler
@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {

    private final GreetingsService greetingsService = new GreetingsService();
    private final GreetingsPublisher greetingsPublisher;
    private final Flux<String> publisher;

    public ReactiveWebSocketHandler(GreetingsPublisher greetingsPublisher) {
        this.greetingsPublisher = greetingsPublisher;
        this.publisher = Flux.create(greetingsPublisher).share();
    }

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        webSocketSession
                .receive()
                .map(webSocketMessage -> webSocketMessage.getPayloadAsText())
                .map(helloMessage -> greetingsService.greeting(helloMessage))
                .doOnNext(greetings -> greetingsPublisher.push(greetings))
                .subscribe();

        final Flux<WebSocketMessage> message = publisher
                .map(greetings -> webSocketSession.textMessage(greetings));

        return webSocketSession.send(message);
    }
}

After receiving the message we’re pushing it to the greetings publisher. Look that we use Flux.share() method to be sure that all subscribers are going to get the data.

GreetingsPublisher
import io.vavr.control.Try;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import reactor.core.publisher.FluxSink;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;

@Component
public class GreetingsPublisher implements Consumer<FluxSink<String>> {
    private static final Logger log = LoggerFactory.getLogger(GreetingsPublisher.class);

    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    private final Executor executor = Executors.newSingleThreadExecutor();

    public boolean push(String greeting) {
        return queue.offer(greeting);
    }

    @Override
    public void accept(FluxSink<String> sink) {
        this.executor.execute(() -> {
            while (true) {
                Try.of(() -> {
                    final String greeting = queue.take();
                    return sink.next(greeting);
                })
                        .onFailure(ex -> log.error("Could not take greeting from queue", ex));
            }
        });
    }
}

The GreetingsPublisher emits the data to our Flux publisher with thesink.next(greeting) method, as soon as it’s pushed to the queue. This class is a bean so you can inject it in other places to broadcast the messages through WebSocket. Let’s write the Vue frontend to test it across the browsers.

Vue.js - frontend

After creating the project with Vue CLI, we don’t have to install anything else, because WebSockets are well supported by the browsers.




That’s it! We have a full reactive web application with Spring Weblux, Vue and WebSocket. The source code of this project is available in the GitHub here.
Please feel free to comment below 😉

Bug with Spring Boot 2.1.7

There is a bug in the Spring Boot 2.1.7 version. If we reconnect to WebSocket multiple times, the new message will not arrive. With 2.1.6 everything works fine. I will update this post if I found the reason why it fails.

Leave a Reply

Your email address will not be published.