专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

Spring Boot WebFlux + Server-sent事件示例

在本文中,我们将向您展示如何使用服务器发送的事件开发响应式Web应用程序。
Spring Boot 2.1.2.RELEASE
Spring WebFlux 5.1.4.RELEASE
Thymeleaf 3.0.11.RELEASE
JUnit 5.3.2
Maven 3

75_1.png

在Spring中,返回JSON和标头 MediaType.TEXT_EVENT_STREAM_VALUE

@RestController
public class CommentController {

    @GetMapping(path = "/comment/stream", 
        produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Comment> feed() {
        //...
    }
}

在Javascript中,用于EventSource向上述端点发送请求。

function loadComments () {
    this.source = null;
    this.start = function () {
        this.source = new EventSource("/comment/stream");
        this.source.addEventListener("message", function (event) {
            var comment = JSON.parse(event.data);
            //... update somewhere
        });
        this.source.onerror = function () {
            this.close();
        };
    };

    this.stop = function() {
        this.source.close();
    }
}
comment = new loadComments();
window.onload = function() {
    comment.start();
};
window.onbeforeunload = function() {
    comment.stop();
}

1.项目目录

75_2.png

2. Maven的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.imddy.springboot</groupId>
    <artifactId>webflux-thymeleaf-sse</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>webflux-thymeleaf-sse</name>
    <url>http://maven.apache.org</url>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <junit-jupiter.version>5.3.2</junit-jupiter.version>
    </properties>

    <dependencies>
        <!-- web mvc -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- webflux reactive -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <!-- thymeleaf -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>

        <!-- exclude junit 4, prefer junit 5 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>junit</groupId>
                    <artifactId>junit</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- junit 5 -->
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>${junit-jupiter.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.0</version>
            </plugin>

        </plugins>
    </build>
</project>

3. Spring Boot + Spring WebFlux

3、1 Spring基于WebFlux注释的控制器。启用数据流。写produces = MediaType.TEXT_EVENT_STREAM_VALUE

CommentController.java

package com.imddy.springboot.reactive.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.imddy.springboot.reactive.model.Comment;
import com.imddy.springboot.reactive.repository.CommentRepository;

import reactor.core.publisher.Flux;

@RestController
public class CommentController {

    @Autowired
    private CommentRepository commentRepository;

    @GetMapping(path = "/comment/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Comment> feed() {
        return this.commentRepository.findAll();
    }

}

MainController.java

package com.imddy.springboot.reactive.controller;

import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Controller
public class MainController {

    @GetMapping(path = {"/","/index"})
    public String index(final Model model) {
        return "index";
    }

}

3、2在repository,返回一个Flux对象。

CommentRepository.java 这个是个接口

package com.imddy.springboot.reactive.repository;

import com.imddy.springboot.reactive.model.Comment;

import reactor.core.publisher.Flux;

public interface CommentRepository {

    Flux<Comment> findAll();

}

ReactiveCommentRepository.java 这个是实现类

package com.imddy.springboot.reactive.repository;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import org.springframework.stereotype.Repository;

import com.imddy.springboot.reactive.model.Comment;
import com.imddy.springboot.reactive.utils.CommentGenerator;

import reactor.core.publisher.Flux;

@Repository
public class ReactiveCommentRepository implements CommentRepository{

     @Override
        public Flux<Comment> findAll() {

            //simulate data streaming every 1 second.
            return Flux.interval(Duration.ofSeconds(1))
                    .onBackpressureDrop()
                    .map(this::generateComment)
                    .flatMapIterable(x -> x);

        }

        private List<Comment> generateComment(long interval) {

            Comment obj = new Comment(
                CommentGenerator.randomAuthor(), 
                CommentGenerator.randomMessage(), 
                CommentGenerator.getCurrentTimeStamp());
            return Arrays.asList(obj);

        }
}

3、3一个用于生成随机注释的utils类。

CommentGenerator.java

package com.imddy.springboot.reactive.utils;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

public class CommentGenerator {

    private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final List<String> COMMENT_AUTHOR =
            Arrays.asList(
                    "Mkyong", "Oliver", "Jack", "Harry", "Jacob",
                    "Isla", "Emily", "Poppy", "Ava", "Isabella");

    private static final List<String> COMMENT_MESSAGE =
            Arrays.asList(
                    "I Love this!",
                    "Me too!",
                    "Wow",
                    "True!",
                    "Hello everyone here?",
                    "Good!");

    public static String randomAuthor() {
        return COMMENT_AUTHOR.get(RANDOM.nextInt(COMMENT_AUTHOR.size()));
    }

    public static String randomMessage() {
        return COMMENT_MESSAGE.get(RANDOM.nextInt(COMMENT_MESSAGE.size()));
    }

    public static String getCurrentTimeStamp() {
        return dtf.format(LocalDateTime.now());
    }
}

3、4评论模型。

Comment.java

package com.imddy.springboot.reactive.model;

public class Comment {

    private String author;
    private String message;
    private String timestamp;

    public Comment() {
    }

    public Comment(String author, String message, String timestamp) {
        this.author = author;
        this.message = message;
        this.timestamp = timestamp;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public String getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }

}

3、5启动Spring Boot。

CommentWebApplication.java 这个是Spring Boot Application启动程序。

package com.imddy.springboot.reactive;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class CommentWebApplication {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(CommentWebApplication.class, args);
    }

}

application.properties 这个项目这个里面没有也可以。

logging.level.org.springframework.web=INFO

#thymelea模板配置
spring.thymeleaf.prefix=classpath:/templates/
spring.thymeleaf.suffix=.html
spring.thymeleaf.mode=HTML5
spring.thymeleaf.encoding=UTF-8
#热部署文件,页面不产生缓存,及时更新
spring.thymeleaf.cache=false

4. Thymeleaf

模板中没有特殊的反应标签,只使用正常循环。放在 templates目录下

index.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <link data-th-href="@{/css/bootstrap.min.css}" rel="stylesheet">
    <link data-th-href="@{/css/main.css}" rel="stylesheet">
</head>
<body>

<div class="container">

    <div class="row">
        <div id="title">
            <h1>Spring WebFlux + Server Sent Events</h1>
        </div>

        <table id="comments" class="table table-striped">
            <thead>
            <tr>
                <th width="10%">Author</th>
                <th width="60%">Message</th>
                <th width="30%">Date</th>
            </tr>
            </thead>
            <tbody>
            <tr class="result" data-th-each="comment : ${comments}">
                <td>[[${comment.author}]]</td>
                <td>[[${comment.message}]]</td>
                <td>[[${comment.timestamp}]]</td>
            </tr>
            </tbody>
        </table>
    </div>

</div>

<script data-th-src="@{/js/main.js}"></script>

</body>

</html>

5. JavaScript EventSource

关键是使用Javascript EventSource类发送请求并监听message事件,并将流数据反应更新到表中。 放在 /static/js/ 下

main.js

function loadComments() {

    this.source = null;

    this.start = function() {

        var commentTable = document.getElementById("comments");

        this.source = new EventSource("/comment/stream");

        this.source.addEventListener("message", function(event) {

            // These events are JSON, so parsing and DOM fiddling are needed
            var comment = JSON.parse(event.data);

            var row = commentTable.getElementsByTagName("tbody")[0]
                    .insertRow(0);
            var cell0 = row.insertCell(0);
            var cell1 = row.insertCell(1);
            var cell2 = row.insertCell(2);

            cell0.className = "author-style";
            cell0.innerHTML = comment.author;

            cell1.className = "text";
            cell1.innerHTML = comment.message;

            cell2.className = "date";
            cell2.innerHTML = comment.timestamp;

        });

        this.source.onerror = function() {
            this.close();
        };

    };

    this.stop = function() {
        this.source.close();
    }

}

comment = new loadComments();

/*
 * Register callbacks for starting and stopping the SSE controller.
 */
window.onload = function() {
    comment.start();
};
window.onbeforeunload = function() {
    comment.stop();
}

页面使用了bootstrap的样式,引入对应放在 /static/css/ 下

bootstrap.min.css 文件太长,我没有引入,使用的是 Bootstrap v4.2.1 (https://getbootstrap.com/)

/*!
 * Bootstrap v4.2.1 (https://getbootstrap.com/)
 * Copyright 2011-2018 The Bootstrap Authors
 * Copyright 2011-2018 Twitter, Inc.
 * Licensed under MIT (https://github.com/twbs/bootstrap/blob/master/LICENSE)
 */:root{--blue:#007bff;--indigo:#6610f2;--purple:#6f42c1;--pink:#e83e8c;--red:#dc3545;--
/*# sourceMappingURL=bootstrap.min.css.map */

main.css

#title{
    margin:40px 0;
}

6.单元测试

WebTestClient单元测试流式响应

TestCommentWebApplication.java 放在test测试目录下,这里其实不需要这个文件,它就是只是个测试。

package com.imddy.springboot.reactive;

import java.util.List;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;

import com.imddy.springboot.reactive.model.Comment;

import static org.junit.jupiter.api.Assertions.assertEquals;

//junit 4
//@RunWith(SpringRunner.class)
//@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
//@AutoConfigureWebTestClient(timeout = "10000")//10 seconds
public class TestCommentWebApplication {

 @Autowired
 private WebTestClient webClient;

 @Test
 public void testCommentStream() {

     List<Comment> comments = webClient
             .get().uri("/comment/stream")
             .accept(MediaType.valueOf(MediaType.TEXT_EVENT_STREAM_VALUE))
             .exchange()
             .expectStatus().isOk()
             //.expectHeader().contentType(MediaType.APPLICATION_STREAM_JSON) // caused timeout
             .returnResult(Comment.class)
             .getResponseBody()
             .take(3)
             .collectList()
             .block();

     comments.forEach(x -> System.out.println(x));

     assertEquals(3, comments.size());

 }

}

7.Demo运行测试

运行

75_3.png

结果

http://localhost:8080/

75_4.png

http://localhost:8080/comment/stream

75_5.png

/ 访问的是 thymeleaf映射的index.html 页面。

/comment/stream 为数据流。

参考:http://www.spring4all.com/article/6852 和 https://www.mkyong.com/spring-boot/spring-boot-webflux-server-sent-events-example/

文章永久链接:https://tech.souyunku.com/24796

未经允许不得转载:搜云库技术团队 » Spring Boot WebFlux + Server-sent事件示例

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们