Spring WebFlux WebSocket with Vue.js
Overview
In the previous article, I’ve shown you how to build a Spring application with WebSocket (STOMP) and Vue.js. This time, we are going to use Spring WebFlux and native WebSocket, but our application will look and behave the same. As usual, remember that you can see the full source code in the GitHub project.
A newer version of Spring supports RSocket, so you probably are interested about this article either.

Spring WebFlux WebSocket - backend
spring-boot-starter-websocket
project here, because it’s going to use embedded Tomcat
server, which isn’t desirable when you build a reactive app.To enable WebSocket in our WebFlux project, we need to add these two classes:
ReactiveWebSocketConfiguration
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.reactive.HandlerMapping; import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; import java.util.HashMap; import java.util.Map; @Configuration public class ReactiveWebSocketConfiguration { private WebSocketHandler webSocketHandler; public ReactiveWebSocketConfiguration(ReactiveWebSocketHandler webSocketHandler) { this.webSocketHandler = webSocketHandler; } @Bean public HandlerMapping webSocketHandlerMapping() { Map<String, WebSocketHandler> map = new HashMap<>(); map.put("/greetings", webSocketHandler); SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping(); handlerMapping.setOrder(1); handlerMapping.setUrlMap(map); return handlerMapping; } @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } }
ReactiveWebSocketHandler
import org.springframework.stereotype.Component; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Component public class ReactiveWebSocketHandler implements WebSocketHandler { @Override public Mono<Void> handle(WebSocketSession webSocketSession) { final Flux<WebSocketMessage> message = webSocketSession .receive() .map(webSocketMessage -> webSocketMessage.getPayloadAsText()) .map(name -> "Hello, " + name + "!") .map(greetings -> webSocketSession.textMessage(greetings)); return webSocketSession.send(message); } }
That’s it! The first one maps our WebSocketHandler
to the /greetings
endpoint and the second handles our WebSocket message. Now let’s start the application and check it with the Apic’s Chrome extension.

It works as expected. In our ReactiveWebSocketHandler
we receive the message, get the body as plain text, and then return it with Hello prefix. However, instead of the simple String, we would like to send the JSON data object. In the regular Spring application, it is possible out of the box, but with the WebFlux… well, we need to implement the Object Mapper.
Mapping JSON request and response
To handle the request message we need to add the HelloMessage
class. It’s simple POJO with two annotations like @JsonCreator
and @JsonProperty
to make Jackson properly map the fields. For the response, we are going to use the Greeting
class.
HelloMessage
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; public class HelloMessage { public final String name; @JsonCreator public HelloMessage( @JsonProperty("name") String name) { this.name = name; } }
Greeting
public class Greeting { public final String content; public Greeting(String content) { this.content = content; } public static Greeting from(HelloMessage helloMessage) { return new Greeting("Hello, " + helloMessage.name + "!"); } }
The GreetingService
will be responsible for the conversion of JSON -> HelloMessage -> Greetings -> JSON.
GreetingsService
import com.fasterxml.jackson.databind.ObjectMapper; import io.vavr.control.Try; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class GreetingsService { private static final Logger log = LoggerFactory.getLogger(GreetingsService.class); private ObjectMapper jsonMapper = new ObjectMapper(); public String greeting(String message) { return Try.of(() -> { // Thread.sleep(1000); //simulate delay final HelloMessage helloMessage = jsonMapper.readValue(message, HelloMessage.class); final Greeting greeting = Greeting.from(helloMessage); return jsonMapper.writeValueAsString(greeting); }) .onFailure(parserException -> log.error("Could not parse JSON object", parserException)) .getOrElse(""); } }
As you can see I use Vavr (version 0.10.1) to handle the parser exception. It is a great Java functional library, which provides immutable collections, so it suite pretty well to the reactive WebFlux application. In case the parser would fail the greeting()
method returns an empty string.
The last step is to change our ReactiveWebSocketHandler
.
ReactiveWebSocketHandler
@Component public class ReactiveWebSocketHandler implements WebSocketHandler { private final GreetingsService greetingsService = new GreetingsService(); @Override public Mono<Void> handle(WebSocketSession webSocketSession) { final Flux<WebSocketMessage> message = webSocketSession .receive() .map(webSocketMessage -> webSocketMessage.getPayloadAsText()) .map(helloMessage -> greetingsService.greeting(helloMessage)) .map(greetings -> webSocketSession.textMessage(greetings)); return webSocketSession.send(message); } }
{"name":"kojot"}
and see the response {"content":"Hello, kojot!"}
. Looks like it working fine, but there is one little problem here. If we open the Apic in another tab, we can see that our message isn’t broadcast to the other sessions. We need to fix it.Sharing messages with WebSocket sessions
In a typical Spring Boot application, this is quite simple. We can enable a built-in message broker with @EnableWebSocketMessageBroker
annotation. Spring WebFlux doesn’t have this option. We need to implement it by ourselves. The idea is to have a Flux
which will be shared across all sessions and its data must be populated by new incoming messages. This solution is based on the Matt Raible great article.
First, we need to change our ReactiveWebSocketHandler
.
ReactiveWebSocketHandler
@Component public class ReactiveWebSocketHandler implements WebSocketHandler { private final GreetingsService greetingsService = new GreetingsService(); private final GreetingsPublisher greetingsPublisher; private final Flux<String> publisher; public ReactiveWebSocketHandler(GreetingsPublisher greetingsPublisher) { this.greetingsPublisher = greetingsPublisher; this.publisher = Flux.create(greetingsPublisher).share(); } @Override public Mono<Void> handle(WebSocketSession webSocketSession) { webSocketSession .receive() .map(webSocketMessage -> webSocketMessage.getPayloadAsText()) .map(helloMessage -> greetingsService.greeting(helloMessage)) .doOnNext(greetings -> greetingsPublisher.push(greetings)) .subscribe(); final Flux<WebSocketMessage> message = publisher .map(greetings -> webSocketSession.textMessage(greetings)); return webSocketSession.send(message); } }
After receiving the message we’re pushing it to the greetings publisher. Look that we use Flux.share()
method to be sure that all subscribers are going to get the data.
GreetingsPublisher
import io.vavr.control.Try; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import reactor.core.publisher.FluxSink; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; @Component public class GreetingsPublisher implements Consumer<FluxSink<String>> { private static final Logger log = LoggerFactory.getLogger(GreetingsPublisher.class); private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(); private final Executor executor = Executors.newSingleThreadExecutor(); public boolean push(String greeting) { return queue.offer(greeting); } @Override public void accept(FluxSink<String> sink) { this.executor.execute(() -> { while (true) { Try.of(() -> { final String greeting = queue.take(); return sink.next(greeting); }) .onFailure(ex -> log.error("Could not take greeting from queue", ex)); } }); } }
The GreetingsPublisher
emits the data to our Flux publisher with thesink.next(greeting)
method, as soon as it’s pushed to the queue. This class is a bean so you can inject it in other places to broadcast the messages through WebSocket. Let’s write the Vue frontend to test it across the browsers.
Vue.js - frontend
After creating the project with Vue CLI, we don’t have to install anything else, because WebSockets are well supported by the browsers.
Greetings {{ item }}
That’s it! We have a full reactive web application with Spring Weblux, Vue and WebSocket. The source code of this project is available in the GitHub here.
Please feel free to comment below 😉
Bug with Spring Boot 2.1.7
There is a bug in the Spring Boot 2.1.7 version. If we reconnect to WebSocket multiple times, the new message will not arrive. With 2.1.6 everything works fine. I will update this post if I found the reason why it fails.
2 Comments
Fabiano
hi! It seems I’m having the same issue you’ve got – when I disconnect the client and reconnect again – I cannot receive the messages anymore. have you found out the reason for this behavior?
Thank you.
Very good article.
kojot
To be honest, no, I haven’t worked on it. If you use Spring WebFlux, I recommend you to move to RSocket protocol. It’s got connection resumption.