关于 Spring For All

关于 Spring For All

Spring For All 的一切
最新动态

最新动态

Spring 5 会是咋样呢
Spring Boot

Spring Boot

快速构建并运行 Spring 应用程序
Spring Cloud

Spring Cloud

分布式系统的一套工具,可用于构建微服务
Spring Framework

Spring Framework

提供依赖注入、事务、Web应用、数据访问等模块
Spring Data

Spring Data

提供一致性数据访问模块
Spring Security

Spring Security

提供应用身份验证和授权支持
Spring Batch

Spring Batch

提供高容批处理操作模块
Spring AMQP

Spring AMQP

基于AMQP消息解决方案
Micro Service Arch.

Micro Service Arch.

微服务架构相关
开源项目及视频教程

开源项目及视频教程

做中国最好的 Spring 开源项目及视频教程
小马哥专栏

小马哥专栏

阿里技术布道者,专注 Spring Boot 及微服务

初学者怎么看spring的源码?比如mvc 的resolver

Spring Frameworkitmuch.com 回复了问题 • 6 人关注 • 5 个回复 • 236 次浏览 • 13 小时前 • 来自相关话题

「转」Spring Boot 2.0 - WebFlux framework

Spring Framework泥瓦匠BYSocket 发表了文章 • 0 个评论 • 273 次浏览 • 6 天前 • 来自相关话题

1、介绍
1.1 什么是响应式编程(Reactive Programming)?

简单来说,响应式编程是针对异步和事件驱动的非阻塞应用程序,并且需要少量线程来垂直缩放(即在 JVM 内)而不是水平(即通过集群)。

响应式应用的一个关键方面是“背压(backpressure)”的概念,这是确保生产者不会压倒消费者的机制。例如,当HTTP连接太慢时,从数据库延伸到HTTP响应的反应组件的流水线、数据存储库也可以减慢或停止,直到网络容量释放。

响应式编程也导致从命令式到声明异步组合逻辑的重大转变。与使用Java 8的 CompletableFuture 编写封锁代码相比,可以通过 lambda
表达式编写后续操作。

1.2 响应式 API(Reactive API)和 构建块(Building Blocks)

Spring Framework 5 将 Reactive Streams 作为通过异步组件和库进行背压通信的合同。Reactive Streams 是通过行业协作创建的规范,也已在Java 9中被采用为 java.util.concurrent.Flow。

Spring Framework 在内部使用 Reactor 自己的响应支持。Reactor 是一个 Reactive Streams 实现,进一步扩展基本的 Reactive Streams Publisher 、Flux 和 Mono 可组合的API类型,以提供对 0..N 和 0..1 的数据序列的声明性操作。

Spring Framework 在许多自己的 Reactive API 中暴露了 Flux 和
Mono。然而,在应用级别,一如既往,Spring 提供了选择,并完全支持使用RxJava。有关的更多信息,请查看 Sebastien Deleuze 发表的 "Understanding Reactive Types" 。

2、Spring WebFlux 模块

Spring Framework 5 包括一个新的 spring-webflux 模块。该模块包含对响应式 HTTP 和 WebSocket 客户端的支持,以及对REST,HTML浏览器和 WebSocket风格交互的响应式服务器Web应用程序的支持。

2.1、服务器端

在服务器端 WebFlux 支持2种不同的编程模型:

基于注解的 @Controller 和其他注解也支持 Spring MVC
Functional 、Java 8 lambda 风格的路由和处理





 

WebFlux 可以在支持 Servlet 3.1 非阻塞 IO API 以及其他异步运行时(如 Netty 和 Undertow )的 Servlet 容器上运行。每个运行时都适用于响应型 ServerHttpRequest 和 ServerHttpResponse,将请求和响应的正文暴露为 Flux<DataBuffer>,而不是具有响应背压的InputStream 和 OutputStream 。顶部作为 Flux<Object> 支持REST风格的 JSON 和 XML 序列化和反序列化,HTML视图呈现和服务器发送事件也是如此。

基于注解的编程模式

WebFlux中也支持相同的 @Controller 编程模型和 Spring MVC 中使用的相同注解。主要区别在于底层核心框架契约(即 HandlerMapping HandlerAdapter )是非阻塞的,并且在响应型 ServerHttpRequest
和 ServerHttpResponse 上运行,而不是在 HttpServletRequest 和HttpServletResponse 上运行。以下是一个响应式 Controller 的例子:
$(document).ready(function() {$('pre code').each(function(i, block) { hljs.highlightBlock( block); }); });@RestController
public class PersonController {

private final PersonRepository repository;

public PersonController(PersonRepository repository) {
this.repository = repository;
}

@PostMapping("/person")
Mono<Void> create(@RequestBody Publisher<Person> personStream) {
return this.repository.save(personStream).then();
}

@GetMapping("/person")
Flux<Person> list() {
return this.repository.findAll();
}

@GetMapping("/person/{id}")
Mono<Person> findById(@PathVariable String id) {
return this.repository.findOne(id);
}
}


 
函数式编程模式

HandlerFunctions

传入的HTTP请求由 HandlerFunction 处理,HandlerFunction 本质上是一个接收 ServerRequest 并返回 Mono<ServerResponse> 的函数。处理函数的注解对应方法将是一个 @RequestMapping 的方法。

ServerRequest 和 ServerResponse 是提供JDK-8友好访问底层HTTP消息的不可变接口。两者都通过在反应堆顶部建立完全反应:请求将身体暴露为 Flux 或 Mono; 响应接受任何 Reactive Streams Publisher 作为主体。

ServerRequest 可以访问各种HTTP请求元素:方法,URI,查询参数,以及通过单独的ServerRequest.Headers 接口 - 头。通过 body方法 提供对 body 的访问。例如,这是如何将请求体提取为 Mono <String>:
Mono<String> string = request.bodyToMono(String.class);
 
这里是如何将身体提取为 Flux,其中 Person 是可以从body内容反序列化的类(即如果body包含JSON,则由Jackson支持,或者如果是XML,则为JAXB)。
Flux<Person> people = request.bodyToFlux(Person.class);
 
上面的两个方法(bodyToMono 和 bodyToFlux)实际上是使用通用ServerRequest.body(BodyExtractor)函数的便利方法。
BodyExtractor 是一个功能策略界面,允许您编写自己的提取逻辑,但在 BodyExtractors 实用程序类中可以找到常见的 BodyExtractor 实例。所以,上面的例子可以替换为:
Mono<String> string = request.body(BodyExtractors.toMono(String.class);
Flux<Person> people = request.body(BodyExtractors.toFlux(Person.class);
 
类似地,ServerResponse 提供对HTTP响应的访问。由于它是不可变的,您可以使用构建器创建一个 ServerResponse 。构建器允许您设置响应状态,添加响应标题并提供正文。例如,这是如何使用200 OK状态创建响应,JSON内容类型和正文:Mono<Person> person = ... ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(person);这里是如何使用201创建的状态,位置标题和空白体来构建响应:
URI location = ...
ServerResponse.created(location).build();
将这些组合在一起可以创建一个 HandlerFunction。例如,这里是一个简单的“Hello World”处理程序 lambda 的示例,它返回一个200状态的响应和一个基于 String 的主体:
HandlerFunction<ServerResponse> helloWorld = request -> ServerResponse.ok().body(fromObject("Hello World"));
 
使用 lambda 写处理函数,就像我们上面所说的那样很方便,但是在处理多个函数时可能缺乏可读性,变得不那么容易维护。因此,建议将相关处理函数分组到一个处理程序或控制器类中。例如,这是一个暴露了一个响应式的 Person 存储库的类:
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;

public class PersonHandler {

private final PersonRepository repository;

public PersonHandler(PersonRepository repository) {
this.repository = repository;
}

// 1
public Mono<ServerResponse> listPeople(ServerRequest request) {
Flux<Person> people = repository.allPeople();
return ServerResponse.ok().contentType(APPLICATION_JSON).body(people, Person.class);
}

// 2
public Mono<ServerResponse> createPerson(ServerRequest request) {
Mono<Person> person = request.bodyToMono(Person.class);
return ServerResponse.ok().build(repository.savePerson(person));
}

// 3
public Mono<ServerResponse> getPerson(ServerRequest request) {
int personId = Integer.valueOf(request.pathVariable("id"));
Mono<ServerResponse> notFound = ServerResponse.notFound().build();
Mono<Person> personMono = this.repository.getPerson(personId);
return personMono
.flatMap(person -> ServerResponse.ok().contentType(APPLICATION_JSON).body(fromObject(person)))
.switchIfEmpty(notFound);
}
}
1/ listPeople 是一个处理函数,它将数据库中发现的所有 Person
对象返回为JSON。

2/ createPerson 是一个处理函数,用于存储请求正文中包含的新Person。请注意,PersonRepository.savePerson(Person) 返回Mono<Void>:发出完成信号的空 Mono,当人从请求中读取并存储时,发出完成信号。因此,当接收到完成信号时,即当 Person 已被保存时,我们使用 build(Publisher<Void>) 方法来发送响应。

3/ getPerson 是一个处理函数,它通过路径变量id来标识一个人。我们通过数据库检索该Person,并创建一个JSON响应(如果找到)。如果没有找到,我们使用 switchIfEmpty(Mono<T>) 来返回 404 Not Found 响应。

RouterFunctions

传入请求将路由到处理函数,并使用一个 RouterFunction,它是一个服务器 ServerRequest 的函数,并返回一个 Mono<HandlerFunction>。如果请求与特定路由匹配,则返回处理函数; 否则返回一个空的 Mono。RouterFunction 与 @Controller 类中的 @RequestMapping 注解类似。

通常,您不要自己编写路由器功能,而是使用
RouterFunctions.route(RequestPredicate, HandlerFunction), 使用请求谓词和处理函数创建一个。如果谓词适用,请求将路由到给定的处理函数; 否则不执行路由,导致 404 Not Found 响应。虽然您可以编写自己的 RequestPredicate ,但是您不需要:RequestPredicates 实用程序类提供常用的谓词,基于路径,HTTP方法,内容类型等进行匹配。使用路由,我们可以路由到我们的 “Hello World” 处理函数:
RouterFunction<ServerResponse> helloWorldRoute =
RouterFunctions.route(RequestPredicates.path("/hello-world"),
request -> Response.ok().body(fromObject("Hello World")));

 
两个路由功能可以组成一个新的路由功能,路由到任一处理函数:如果第一个路由的谓词不匹配,则第二个被评估。组合的路由器功能按顺序进行评估,因此在通用功能之前放置特定功能是有意义的。您可以通过调用 RouterFunction.and(RouterFunction) 或通过调用
RouterFunction.andRoute(RequestPredicate, HandlerFunction) 来组成两个路由功能,这是 RouterFunction.and() 与 RouterFunctions.route() 的一种方便组合。

给定我们上面显示的 PersonH​​andler,我们现在可以定义路由功能,路由到相应的处理函数。我们使用 方法引用(method-references) 来引用处理函数:
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;

PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository);

RouterFunction<ServerResponse> personRoute =
route(GET("/person/{id}").and(accept(APPLICATION_JSON)), handler::getPerson)
.andRoute(GET("/person").and(accept(APPLICATION_JSON)), handler::listPeople)
.andRoute(POST("/person").and(contentType(APPLICATION_JSON)), handler::createPerson);

 
除路由功能之外,您还可以通过调用
RequestPredicate.and(RequestPredicate) 或
RequestPredicate.or(RequestPredicate) 来构成请求谓词。这些工作正如预期的那样:如果给定的谓词匹配,则生成的谓词匹配; 或者如果任一谓词都匹配。RequestPredicates 中发现的大多数谓词是组合的。例如,RequestPredicates.GET(String) 是
RequestPredicates.method(HttpMethod) 和RequestPredicates.path(String) 的组合。

启动服务器

现在只有一个难题遗留:在HTTP服务器中运行路由功能。您可以使用
RouterFunction<ServerResponse> route = ...
HttpHandler httpHandler = RouterFunctions.toHttpHandler(route);
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
HttpServer server = HttpServer.create(HOST, PORT);
server.newHandler(adapter).block();
 
对于 Tomcat ,它看起来像这样:
RouterFunction<ServerResponse> route = ...
HttpHandler httpHandler = RouterFunctions.toHttpHandler(route);
HttpServlet servlet = new ServletHttpHandlerAdapter(httpHandler);
Tomcat server = new Tomcat();
Context rootContext = server.addContext("", System.getProperty("java.io.tmpdir"));
Tomcat.addServlet(rootContext, "servlet", servlet);
rootContext.addServletMapping("/", "servlet");
tomcatServer.start();
 
待完成:DispatcherHandler
HandlerFilterFunction
路由功能映射的路由可以通过调用
RouterFunction.filter(HandlerFilterFunction) 进行过滤,其中
HandlerFilterFunction 本质上是一个接收 ServerRequest 和
HandlerFunction 的函数,并返回一个 ServerResponse 。处理函数参数表示链中的下一个元素:通常是路由到的 HandlerFunction ,但是如果应用了多个过滤器,也可以是另一个 FilterFunction 。使用注解,可以使用 @ControllerAdvice 和 / 或 ServletFilter 来实现类似的功能。让我们在我们的路由中添加一个简单的安全过滤器,假设我们有一个 SecurityManager 可以确定是否允许特定的路径:
import static org.springframework.http.HttpStatus.UNAUTHORIZED;

SecurityManager securityManager = ...
RouterFunction<ServerResponse> route = ...

RouterFunction<ServerResponse> filteredRoute =
route.filter(request, next) -> {
if (securityManager.allowAccessTo(request.path())) {
return next.handle(request);
}
else {
return ServerResponse.status(UNAUTHORIZED).build();
}
});
 
在这个例子中可以看到,调用 next.handle(ServerRequest) 是可选的:我们只允许在允许访问时执行处理函数。

2.2 客户端(Client Side)

WebFlux 包括一个 functional, reactive WebClient,它为 RestTemplate 提供了一种完全无阻塞和响应式的替代方案。
它将网络输入和输出公开为客户端 HttpRequest 和ClientHttpResponse ,其中请求和响应的主体是 Flux <DataBuffer>而不是 InputStream 和 OutputStream。此外,它还支持与服务器端相同的响应式 JSON,XML和SSE 序列化机制,因此您可以使用类型化对象。以下是使用需要 ClientHttpConnector 实现的 WebClient 插入特定HTTP客户端(如 Reactor Netty)的示例:
WebClient client = WebClient.create("http://example.com");

Mono<Account> account = client.get()
.url("/accounts/{id}", 1L)
.accept(APPLICATION_JSON)
.exchange(request)
.flatMap(response -> response.bodyToMono(Account.class));
 
AsyncRestTemplate 还支持非阻塞交互。主要区别在于它不支持非阻塞流,例如 Twitter one ,因为它基本上仍然依赖于 InputStream 和
OutputStream。

2.4 请求体和响应体的转换(Request and Response Body Conversion)

spring-core 模块提供了响应式 Encoder (编码器) 和 Decoder (解码器),使得能够串行化字符串与类型对象的转换。spring-web 模块添加了 JSON(Jackson)和 XML(JAXB)实现,用于Web应用程序以及其他用于SSE流和零拷贝文件传输。

支持以下 Reactive API:

Reactor 3.x 支持开箱即用
io.reactivex.rxjava2:rxjava 依赖项在类路径上时支持 RxJava 2.x
当 ·io.reactivex:rxjava和io.reactivex:rxjava-reactive-streams`(RxJava 和 Reactive Streams 之间的适配器)依赖关系在类路径上时,支持 RxJava 1.x

例如,请求体可以是以下方式之一,它将在注解和功能编程模型中自动解码:

Account account - 在调用控制器之前,account 将无阻塞地被反序列化。
Mono<Account> account - controller 可以使用 Mono 来声明在反序列化 account 后执行的逻辑。
Single<Account> account - 和 Mono 类似,但是用的是 RxJava
Flux<Account> accounts - 输入流场景
Observable<Account> accounts - RxJava 的 输入流场景

响应体(response body)可以是以下之一:

Mono<Account> - 当 Mono 完成时,序列化而不阻塞给定的Account。
Single<Account> - 与上类似,但是使用的 RxJava
Flux<Account> - 流式场景,可能是SSE,具体取决于所请求的内容类型。
Observable<Account> - 与上类似, 但是使用的 RxJava Observable 类型
Flowable<Account> - 与上类似, 但是使用的 RxJava 2 Flowable 类型。
Publisher<Account> 或 Flow.Publisher<Account> - 支持任何实现Reactive Streams Publisher 的类型。
Flux<ServerSentEvent> - SSE 流。
Mono<Void> - 当 Mono 完成时,请求处理完成。
Account - 序列化而不阻塞给定的Account; 意味着同步、非阻塞的
Controller 方法。
Void - 特定于基于注解的编程模型,方法返回时,请求处理完成;
意味着同步、非阻塞的 Controller 方法。

当使用像 Flux 或 Observable 这样的流类型时,请求/响应或映射/路由级别中指定的媒体类型用于确定数据应如何序列化和刷新。例如,返回 Flux<Account> 的REST端点将默认序列化如下:

application/json : Flux<Account> 作为异步集合处理,并在完成事件发布时将其序列化为具有显式刷新的JSON数组。
application/stream+json : 一个 Flux<Account> 将作为一系列的Account 元素处理,作为以新行分隔的单个JSON对象,并在每个元素之后显式刷新。WebClient 支持JSON流解码,因此这对于服务器到服务器的用例来说是一个很好的用例。
text/event-stream : 一个 Flux<Account> 或 Flux<ServerSentEvent<Account >> 将作为一个 Stream 或ServerSentEvent 元素的流处理,作为单独的 SSE 元素,使用默认的JSON进行数据编码和每个元素之间的显式刷新。这非常适合将流暴露给浏览器客户端。WebClient 也支持读取SSE流。

2.4 响应式 Websocket 支持

WebFlux 包括响应式 WebSocket 客户端和服务器支持。Java WebSocket API(JSR-356),Jetty,Undertow和Reactor Netty都支持客户端和服务器。

在服务器端,声明一个 WebSocketHandlerAdapter,然后简单地添加映射到基于 WebSocketHandler 的端点:
@Bean
public HandlerMapping webSocketMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/foo", new FooWebSocketHandler());
map.put("/bar", new BarWebSocketHandler());

SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(10);
mapping.setUrlMap(map);
return mapping;
}

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
在客户端,为上面列出的支持的库之一创建一个 WebSocketClient:
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute("ws://localhost:8080/echo"), session -> {... }).blockMillis(5000);
 
2.5 测试

spring-test 模块包括一个 WebTestClient,可用于测试具有或不具有正在运行的服务器的 WebFlux 服务器端点。

没有运行服务器的测试与来自Spring MVC的 MockMvc 相当,其中使用模拟请求和响应,而不是使用套接字通过网络连接。然而,WebTestClient 也可以针对正在运行的服务器执行测试。

更多请查看 sample tests

3、开始入门
3.1 Spring Boot Starter

通过 http://start.spring.io 提供的 Spring Boot WebFlux 启动器是最快的入门方式。它做所有必要的,所以你开始像Spring MVC一样编写@Controller类。只需转到 http://start.spring.io ,选择版本
2.0.0.BUILD-SNAPSHOT,并在依赖关系框中键入 respond。
默认情况下,启动器使用 Reactor Netty 运行,但依赖关系可以像往常一样通过 Spring Boot 更改为不同的运行时。有关更多详细信息和说明,请参阅 Spring Boo t参考文档页面。

3.2 手动引导(Manual Bootstrapping)

对于依赖关系,从 spring-webflux 和 spring-context 开始。
然后添加jackson-databind 和 io.netty:netty-buffer(暂时见SPR-14528)以获得JSON支持。最后添加一个支持的运行时的依赖项:

Tomcat — org.apache.tomcat.embed:tomcat-embed-core
Jetty — org.eclipse.jetty:jetty-server 和 org.eclipse.jetty:jetty-servlet
Reactor Netty — io.projectreactor.ipc:reactor-netty
Undertow — io.undertow:undertow-core

基于注解编程模式的引导:
ApplicationContext context = new AnnotationConfigApplicationContext(DelegatingWebFluxConfiguration.class); // (1)
HttpHandler handler = DispatcherHandler.toHttpHandler(context); // (2)
 
以上加载默认的 Spring Web 框架配置(1),然后创建一个DispatcherHandler,主类驱动请求处理(2),并适应 HttpHandler - 响应式HTTP请求处理的最低级别的Spring抽象。

函数编程模式的引导:
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); // (1)
context.registerBean(FooBean.class, () -> new FooBeanImpl()); // (2)
context.registerBean(BarBean.class); // (3)
context.refresh();

HttpHandler handler = WebHttpHandlerBuilder
.webHandler(RouterFunctions.toHttpHandler(...))
.applicationContext(context)
.build(); // (4)
以上创建了一个 AnnotationConfigApplicationContext 实例(1),可以利用新的功能 bean 注册API(2)使用 Java 8 供应商注册 bean,或者只需通过指定其类(3)即可。HttpHandler 是使用WebHttpHandlerBuilder(4)创建的。

然后可以将 HttpHandler 安装在支持的运行服务器之一中:
// Tomcat and Jetty (also see notes below)
HttpServlet servlet = new ServletHttpHandlerAdapter(handler);
...

// Reactor Netty
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer.create(host, port).newHandler(adapter).block();

// Undertow
UndertowHttpHandlerAdapter adapter = new UndertowHttpHandlerAdapter(handler);
Undertow server = Undertow.builder().addHttpListener(port, host).setHandler(adapter).build();
server.start();
对于特别是使用 WAR 部署的 Servlet 容器,可以使用作为WebApplicationInitializer 的
AbstractAnnotationConfigDispatcherHandlerInitializer,并由 Servlet容器自动检测。它负责注册 ServletHttpHandlerAdapter ,如上所示。您将需要实现一个抽象方法来指向您的 Spring 配置。

3.3 Examples

您将在以下项目中找到有助于构建反应式 Web 应用程序的代码示例:

Functional programming model sample
Spring Reactive Playground: playground for most Spring Web reactive features
Reactor website: the spring-functional
branch is a Spring 5 functional, Java 8 lambda-style application
Spring Reactive University session: live-coded project from this Devoxx BE 2106 university talk
Reactive Thymeleaf Sandbox
Mix-it 2017 website: Kotlin + Reactive + Functional web and bean registration API application
Reactor by example: code snippets coming from this InfoQ article
Spring integration tests: various features tested with Reactor StepVerifier


Webflux 实战项目

demo地址: https://anyim.cfapps.io/

作者:Anoyi
链接:http://www.jianshu.com/p/f4ff6d74ad4a

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
  查看全部
1、介绍
1.1 什么是响应式编程(Reactive Programming)?

简单来说,响应式编程是针对异步和事件驱动的非阻塞应用程序,并且需要少量线程来垂直缩放(即在 JVM 内)而不是水平(即通过集群)。

响应式应用的一个关键方面是“背压(backpressure)”的概念,这是确保生产者不会压倒消费者的机制。例如,当HTTP连接太慢时,从数据库延伸到HTTP响应的反应组件的流水线、数据存储库也可以减慢或停止,直到网络容量释放。

响应式编程也导致从命令式到声明异步组合逻辑的重大转变。与使用Java 8的 CompletableFuture 编写封锁代码相比,可以通过 lambda
表达式编写后续操作。

1.2 响应式 API(Reactive API)和 构建块(Building Blocks)

Spring Framework 5 将 Reactive Streams 作为通过异步组件和库进行背压通信的合同。Reactive Streams 是通过行业协作创建的规范,也已在Java 9中被采用为 java.util.concurrent.Flow。

Spring Framework 在内部使用 Reactor 自己的响应支持。Reactor 是一个 Reactive Streams 实现,进一步扩展基本的 Reactive Streams Publisher 、Flux 和 Mono 可组合的API类型,以提供对 0..N 和 0..1 的数据序列的声明性操作。

Spring Framework 在许多自己的 Reactive API 中暴露了 Flux 和
Mono。然而,在应用级别,一如既往,Spring 提供了选择,并完全支持使用RxJava。有关的更多信息,请查看 Sebastien Deleuze 发表的 "Understanding Reactive Types" 。

2、Spring WebFlux 模块

Spring Framework 5 包括一个新的 spring-webflux 模块。该模块包含对响应式 HTTP 和 WebSocket 客户端的支持,以及对REST,HTML浏览器和 WebSocket风格交互的响应式服务器Web应用程序的支持。

2.1、服务器端

在服务器端 WebFlux 支持2种不同的编程模型:

基于注解的 @Controller 和其他注解也支持 Spring MVC
Functional 、Java 8 lambda 风格的路由和处理

3424642-7922d13b6c20ee6e.png

 

WebFlux 可以在支持 Servlet 3.1 非阻塞 IO API 以及其他异步运行时(如 Netty 和 Undertow )的 Servlet 容器上运行。每个运行时都适用于响应型 ServerHttpRequest 和 ServerHttpResponse,将请求和响应的正文暴露为 Flux<DataBuffer>,而不是具有响应背压的InputStream 和 OutputStream 。顶部作为 Flux<Object> 支持REST风格的 JSON 和 XML 序列化和反序列化,HTML视图呈现和服务器发送事件也是如此。

基于注解的编程模式

WebFlux中也支持相同的 @Controller 编程模型和 Spring MVC 中使用的相同注解。主要区别在于底层核心框架契约(即 HandlerMapping HandlerAdapter )是非阻塞的,并且在响应型 ServerHttpRequest
和 ServerHttpResponse 上运行,而不是在 HttpServletRequest 和HttpServletResponse 上运行。以下是一个响应式 Controller 的例子:
@RestController
public class PersonController {

private final PersonRepository repository;

public PersonController(PersonRepository repository) {
this.repository = repository;
}

@PostMapping("/person")
Mono<Void> create(@RequestBody Publisher<Person> personStream) {
return this.repository.save(personStream).then();
}

@GetMapping("/person")
Flux<Person> list() {
return this.repository.findAll();
}

@GetMapping("/person/{id}")
Mono<Person> findById(@PathVariable String id) {
return this.repository.findOne(id);
}
}


 
函数式编程模式

HandlerFunctions

传入的HTTP请求由 HandlerFunction 处理,HandlerFunction 本质上是一个接收 ServerRequest 并返回 Mono<ServerResponse> 的函数。处理函数的注解对应方法将是一个 @RequestMapping 的方法。

ServerRequest 和 ServerResponse 是提供JDK-8友好访问底层HTTP消息的不可变接口。两者都通过在反应堆顶部建立完全反应:请求将身体暴露为 Flux 或 Mono; 响应接受任何 Reactive Streams Publisher 作为主体。

ServerRequest 可以访问各种HTTP请求元素:方法,URI,查询参数,以及通过单独的ServerRequest.Headers 接口 - 头。通过 body方法 提供对 body 的访问。例如,这是如何将请求体提取为 Mono <String>:
Mono<String> string = request.bodyToMono(String.class);

 
这里是如何将身体提取为 Flux,其中 Person 是可以从body内容反序列化的类(即如果body包含JSON,则由Jackson支持,或者如果是XML,则为JAXB)。
Flux<Person> people = request.bodyToFlux(Person.class);

 
上面的两个方法(bodyToMono 和 bodyToFlux)实际上是使用通用ServerRequest.body(BodyExtractor)函数的便利方法。
BodyExtractor 是一个功能策略界面,允许您编写自己的提取逻辑,但在 BodyExtractors 实用程序类中可以找到常见的 BodyExtractor 实例。所以,上面的例子可以替换为:
Mono<String> string = request.body(BodyExtractors.toMono(String.class); 
Flux<Person> people = request.body(BodyExtractors.toFlux(Person.class);

 
类似地,ServerResponse 提供对HTTP响应的访问。由于它是不可变的,您可以使用构建器创建一个 ServerResponse 。构建器允许您设置响应状态,添加响应标题并提供正文。例如,这是如何使用200 OK状态创建响应,JSON内容类型和正文:Mono<Person> person = ... ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(person);这里是如何使用201创建的状态,位置标题和空白体来构建响应:
URI location = ... 
ServerResponse.created(location).build();

将这些组合在一起可以创建一个 HandlerFunction。例如,这里是一个简单的“Hello World”处理程序 lambda 的示例,它返回一个200状态的响应和一个基于 String 的主体:
HandlerFunction<ServerResponse> helloWorld = request -> ServerResponse.ok().body(fromObject("Hello World"));

 
使用 lambda 写处理函数,就像我们上面所说的那样很方便,但是在处理多个函数时可能缺乏可读性,变得不那么容易维护。因此,建议将相关处理函数分组到一个处理程序或控制器类中。例如,这是一个暴露了一个响应式的 Person 存储库的类:
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;

public class PersonHandler {

private final PersonRepository repository;

public PersonHandler(PersonRepository repository) {
this.repository = repository;
}

// 1
public Mono<ServerResponse> listPeople(ServerRequest request) {
Flux<Person> people = repository.allPeople();
return ServerResponse.ok().contentType(APPLICATION_JSON).body(people, Person.class);
}

// 2
public Mono<ServerResponse> createPerson(ServerRequest request) {
Mono<Person> person = request.bodyToMono(Person.class);
return ServerResponse.ok().build(repository.savePerson(person));
}

// 3
public Mono<ServerResponse> getPerson(ServerRequest request) {
int personId = Integer.valueOf(request.pathVariable("id"));
Mono<ServerResponse> notFound = ServerResponse.notFound().build();
Mono<Person> personMono = this.repository.getPerson(personId);
return personMono
.flatMap(person -> ServerResponse.ok().contentType(APPLICATION_JSON).body(fromObject(person)))
.switchIfEmpty(notFound);
}
}

1/ listPeople 是一个处理函数,它将数据库中发现的所有 Person
对象返回为JSON。

2/ createPerson 是一个处理函数,用于存储请求正文中包含的新Person。请注意,PersonRepository.savePerson(Person) 返回Mono<Void>:发出完成信号的空 Mono,当人从请求中读取并存储时,发出完成信号。因此,当接收到完成信号时,即当 Person 已被保存时,我们使用 build(Publisher<Void>) 方法来发送响应。

3/ getPerson 是一个处理函数,它通过路径变量id来标识一个人。我们通过数据库检索该Person,并创建一个JSON响应(如果找到)。如果没有找到,我们使用 switchIfEmpty(Mono<T>) 来返回 404 Not Found 响应。

RouterFunctions

传入请求将路由到处理函数,并使用一个 RouterFunction,它是一个服务器 ServerRequest 的函数,并返回一个 Mono<HandlerFunction>。如果请求与特定路由匹配,则返回处理函数; 否则返回一个空的 Mono。RouterFunction 与 @Controller 类中的 @RequestMapping 注解类似。

通常,您不要自己编写路由器功能,而是使用
RouterFunctions.route(RequestPredicate, HandlerFunction), 使用请求谓词和处理函数创建一个。如果谓词适用,请求将路由到给定的处理函数; 否则不执行路由,导致 404 Not Found 响应。虽然您可以编写自己的 RequestPredicate ,但是您不需要:RequestPredicates 实用程序类提供常用的谓词,基于路径,HTTP方法,内容类型等进行匹配。使用路由,我们可以路由到我们的 “Hello World” 处理函数:
RouterFunction<ServerResponse> helloWorldRoute =
RouterFunctions.route(RequestPredicates.path("/hello-world"),
request -> Response.ok().body(fromObject("Hello World")));

 
两个路由功能可以组成一个新的路由功能,路由到任一处理函数:如果第一个路由的谓词不匹配,则第二个被评估。组合的路由器功能按顺序进行评估,因此在通用功能之前放置特定功能是有意义的。您可以通过调用 RouterFunction.and(RouterFunction) 或通过调用
RouterFunction.andRoute(RequestPredicate, HandlerFunction) 来组成两个路由功能,这是 RouterFunction.and() 与 RouterFunctions.route() 的一种方便组合。

给定我们上面显示的 PersonH​​andler,我们现在可以定义路由功能,路由到相应的处理函数。我们使用 方法引用(method-references) 来引用处理函数:
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;

PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository);

RouterFunction<ServerResponse> personRoute =
route(GET("/person/{id}").and(accept(APPLICATION_JSON)), handler::getPerson)
.andRoute(GET("/person").and(accept(APPLICATION_JSON)), handler::listPeople)
.andRoute(POST("/person").and(contentType(APPLICATION_JSON)), handler::createPerson);

 
除路由功能之外,您还可以通过调用
RequestPredicate.and(RequestPredicate) 或
RequestPredicate.or(RequestPredicate) 来构成请求谓词。这些工作正如预期的那样:如果给定的谓词匹配,则生成的谓词匹配; 或者如果任一谓词都匹配。RequestPredicates 中发现的大多数谓词是组合的。例如,RequestPredicates.GET(String) 是
RequestPredicates.method(HttpMethod) 和RequestPredicates.path(String) 的组合。

启动服务器

现在只有一个难题遗留:在HTTP服务器中运行路由功能。您可以使用
RouterFunction<ServerResponse> route = ...
HttpHandler httpHandler = RouterFunctions.toHttpHandler(route);
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
HttpServer server = HttpServer.create(HOST, PORT);
server.newHandler(adapter).block();

 
对于 Tomcat ,它看起来像这样:
RouterFunction<ServerResponse> route = ...
HttpHandler httpHandler = RouterFunctions.toHttpHandler(route);
HttpServlet servlet = new ServletHttpHandlerAdapter(httpHandler);
Tomcat server = new Tomcat();
Context rootContext = server.addContext("", System.getProperty("java.io.tmpdir"));
Tomcat.addServlet(rootContext, "servlet", servlet);
rootContext.addServletMapping("/", "servlet");
tomcatServer.start();

 
待完成:DispatcherHandler
HandlerFilterFunction
路由功能映射的路由可以通过调用
RouterFunction.filter(HandlerFilterFunction) 进行过滤,其中
HandlerFilterFunction 本质上是一个接收 ServerRequest 和
HandlerFunction 的函数,并返回一个 ServerResponse 。处理函数参数表示链中的下一个元素:通常是路由到的 HandlerFunction ,但是如果应用了多个过滤器,也可以是另一个 FilterFunction 。使用注解,可以使用 @ControllerAdvice 和 / 或 ServletFilter 来实现类似的功能。让我们在我们的路由中添加一个简单的安全过滤器,假设我们有一个 SecurityManager 可以确定是否允许特定的路径:
import static org.springframework.http.HttpStatus.UNAUTHORIZED;

SecurityManager securityManager = ...
RouterFunction<ServerResponse> route = ...

RouterFunction<ServerResponse> filteredRoute =
route.filter(request, next) -> {
if (securityManager.allowAccessTo(request.path())) {
return next.handle(request);
}
else {
return ServerResponse.status(UNAUTHORIZED).build();
}
});

 
在这个例子中可以看到,调用 next.handle(ServerRequest) 是可选的:我们只允许在允许访问时执行处理函数。

2.2 客户端(Client Side)

WebFlux 包括一个 functional, reactive WebClient,它为 RestTemplate 提供了一种完全无阻塞和响应式的替代方案。
它将网络输入和输出公开为客户端 HttpRequest 和ClientHttpResponse ,其中请求和响应的主体是 Flux <DataBuffer>而不是 InputStream 和 OutputStream。此外,它还支持与服务器端相同的响应式 JSON,XML和SSE 序列化机制,因此您可以使用类型化对象。以下是使用需要 ClientHttpConnector 实现的 WebClient 插入特定HTTP客户端(如 Reactor Netty)的示例:
WebClient client = WebClient.create("http://example.com";);

Mono<Account> account = client.get()
.url("/accounts/{id}", 1L)
.accept(APPLICATION_JSON)
.exchange(request)
.flatMap(response -> response.bodyToMono(Account.class));

 
AsyncRestTemplate 还支持非阻塞交互。主要区别在于它不支持非阻塞流,例如 Twitter one ,因为它基本上仍然依赖于 InputStream 和
OutputStream。

2.4 请求体和响应体的转换(Request and Response Body Conversion)

spring-core 模块提供了响应式 Encoder (编码器) 和 Decoder (解码器),使得能够串行化字符串与类型对象的转换。spring-web 模块添加了 JSON(Jackson)和 XML(JAXB)实现,用于Web应用程序以及其他用于SSE流和零拷贝文件传输。

支持以下 Reactive API:

Reactor 3.x 支持开箱即用
io.reactivex.rxjava2:rxjava 依赖项在类路径上时支持 RxJava 2.x
当 ·io.reactivex:rxjava和io.reactivex:rxjava-reactive-streams`(RxJava 和 Reactive Streams 之间的适配器)依赖关系在类路径上时,支持 RxJava 1.x

例如,请求体可以是以下方式之一,它将在注解和功能编程模型中自动解码:

Account account - 在调用控制器之前,account 将无阻塞地被反序列化。
Mono<Account> account - controller 可以使用 Mono 来声明在反序列化 account 后执行的逻辑。
Single<Account> account - 和 Mono 类似,但是用的是 RxJava
Flux<Account> accounts - 输入流场景
Observable<Account> accounts - RxJava 的 输入流场景

响应体(response body)可以是以下之一:

Mono<Account> - 当 Mono 完成时,序列化而不阻塞给定的Account。
Single<Account> - 与上类似,但是使用的 RxJava
Flux<Account> - 流式场景,可能是SSE,具体取决于所请求的内容类型。
Observable<Account> - 与上类似, 但是使用的 RxJava Observable 类型
Flowable<Account> - 与上类似, 但是使用的 RxJava 2 Flowable 类型。
Publisher<Account> 或 Flow.Publisher<Account> - 支持任何实现Reactive Streams Publisher 的类型。
Flux<ServerSentEvent> - SSE 流。
Mono<Void> - 当 Mono 完成时,请求处理完成。
Account - 序列化而不阻塞给定的Account; 意味着同步、非阻塞的
Controller 方法。
Void - 特定于基于注解的编程模型,方法返回时,请求处理完成;
意味着同步、非阻塞的 Controller 方法。

当使用像 Flux 或 Observable 这样的流类型时,请求/响应或映射/路由级别中指定的媒体类型用于确定数据应如何序列化和刷新。例如,返回 Flux<Account> 的REST端点将默认序列化如下:

application/json : Flux<Account> 作为异步集合处理,并在完成事件发布时将其序列化为具有显式刷新的JSON数组。
application/stream+json : 一个 Flux<Account> 将作为一系列的Account 元素处理,作为以新行分隔的单个JSON对象,并在每个元素之后显式刷新。WebClient 支持JSON流解码,因此这对于服务器到服务器的用例来说是一个很好的用例。
text/event-stream : 一个 Flux<Account> 或 Flux<ServerSentEvent<Account >> 将作为一个 Stream 或ServerSentEvent 元素的流处理,作为单独的 SSE 元素,使用默认的JSON进行数据编码和每个元素之间的显式刷新。这非常适合将流暴露给浏览器客户端。WebClient 也支持读取SSE流。

2.4 响应式 Websocket 支持

WebFlux 包括响应式 WebSocket 客户端和服务器支持。Java WebSocket API(JSR-356),Jetty,Undertow和Reactor Netty都支持客户端和服务器。

在服务器端,声明一个 WebSocketHandlerAdapter,然后简单地添加映射到基于 WebSocketHandler 的端点:
@Bean
public HandlerMapping webSocketMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/foo", new FooWebSocketHandler());
map.put("/bar", new BarWebSocketHandler());

SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(10);
mapping.setUrlMap(map);
return mapping;
}

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}

在客户端,为上面列出的支持的库之一创建一个 WebSocketClient:
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute("ws://localhost:8080/echo"), session -> {... }).blockMillis(5000);

 
2.5 测试

spring-test 模块包括一个 WebTestClient,可用于测试具有或不具有正在运行的服务器的 WebFlux 服务器端点。

没有运行服务器的测试与来自Spring MVC的 MockMvc 相当,其中使用模拟请求和响应,而不是使用套接字通过网络连接。然而,WebTestClient 也可以针对正在运行的服务器执行测试。

更多请查看 sample tests

3、开始入门
3.1 Spring Boot Starter

通过 http://start.spring.io 提供的 Spring Boot WebFlux 启动器是最快的入门方式。它做所有必要的,所以你开始像Spring MVC一样编写@Controller类。只需转到 http://start.spring.io ,选择版本
2.0.0.BUILD-SNAPSHOT,并在依赖关系框中键入 respond。
默认情况下,启动器使用 Reactor Netty 运行,但依赖关系可以像往常一样通过 Spring Boot 更改为不同的运行时。有关更多详细信息和说明,请参阅 Spring Boo t参考文档页面。

3.2 手动引导(Manual Bootstrapping)

对于依赖关系,从 spring-webflux 和 spring-context 开始。
然后添加jackson-databind 和 io.netty:netty-buffer(暂时见SPR-14528)以获得JSON支持。最后添加一个支持的运行时的依赖项:

Tomcat — org.apache.tomcat.embed:tomcat-embed-core
Jetty — org.eclipse.jetty:jetty-server 和 org.eclipse.jetty:jetty-servlet
Reactor Netty — io.projectreactor.ipc:reactor-netty
Undertow — io.undertow:undertow-core

基于注解编程模式的引导:
ApplicationContext context = new AnnotationConfigApplicationContext(DelegatingWebFluxConfiguration.class);  // (1)
HttpHandler handler = DispatcherHandler.toHttpHandler(context); // (2)

 
以上加载默认的 Spring Web 框架配置(1),然后创建一个DispatcherHandler,主类驱动请求处理(2),并适应 HttpHandler - 响应式HTTP请求处理的最低级别的Spring抽象。

函数编程模式的引导:
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); // (1)
context.registerBean(FooBean.class, () -> new FooBeanImpl()); // (2)
context.registerBean(BarBean.class); // (3)
context.refresh();

HttpHandler handler = WebHttpHandlerBuilder
.webHandler(RouterFunctions.toHttpHandler(...))
.applicationContext(context)
.build(); // (4)

以上创建了一个 AnnotationConfigApplicationContext 实例(1),可以利用新的功能 bean 注册API(2)使用 Java 8 供应商注册 bean,或者只需通过指定其类(3)即可。HttpHandler 是使用WebHttpHandlerBuilder(4)创建的。

然后可以将 HttpHandler 安装在支持的运行服务器之一中:
// Tomcat and Jetty (also see notes below)
HttpServlet servlet = new ServletHttpHandlerAdapter(handler);
...

// Reactor Netty
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer.create(host, port).newHandler(adapter).block();

// Undertow
UndertowHttpHandlerAdapter adapter = new UndertowHttpHandlerAdapter(handler);
Undertow server = Undertow.builder().addHttpListener(port, host).setHandler(adapter).build();
server.start();

对于特别是使用 WAR 部署的 Servlet 容器,可以使用作为WebApplicationInitializer 的
AbstractAnnotationConfigDispatcherHandlerInitializer,并由 Servlet容器自动检测。它负责注册 ServletHttpHandlerAdapter ,如上所示。您将需要实现一个抽象方法来指向您的 Spring 配置。

3.3 Examples

您将在以下项目中找到有助于构建反应式 Web 应用程序的代码示例:

Functional programming model sample
Spring Reactive Playground: playground for most Spring Web reactive features
Reactor website: the spring-functional
branch is a Spring 5 functional, Java 8 lambda-style application
Spring Reactive University session: live-coded project from this Devoxx BE 2106 university talk
Reactive Thymeleaf Sandbox
Mix-it 2017 website: Kotlin + Reactive + Functional web and bean registration API application
Reactor by example: code snippets coming from this InfoQ article
Spring integration tests: various features tested with Reactor StepVerifier


Webflux 实战项目

demo地址: https://anyim.cfapps.io/

作者:Anoyi
链接:http://www.jianshu.com/p/f4ff6d74ad4a

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
 

Spring 的xml配置问题

Spring Frameworkcarl_zhao 回复了问题 • 2 人关注 • 1 个回复 • 125 次浏览 • 2017-09-18 14:58 • 来自相关话题

数据分片解决方案??

Spring FrameworkLevin 回复了问题 • 2 人关注 • 1 个回复 • 116 次浏览 • 2017-09-12 13:29 • 来自相关话题

使用@Component和@Configuration下面的@Bean两种方式装配的Bean有区别吗?

Spring Frameworkxiaobaxi 回复了问题 • 4 人关注 • 3 个回复 • 207 次浏览 • 2017-09-11 11:10 • 来自相关话题

Spring中的@Role注解是干啥的?

回复

Spring Frameworkspringdata 发起了问题 • 1 人关注 • 0 个回复 • 171 次浏览 • 2017-09-06 17:36 • 来自相关话题

@EnableTransactionManagement注解的mode属性是干啥的?

Spring Frameworkstrongant 回复了问题 • 2 人关注 • 1 个回复 • 197 次浏览 • 2017-09-06 16:13 • 来自相关话题

Spring Cache中的@Caching 注解是干啥用的?

Spring FrameworkDrTrang 回复了问题 • 2 人关注 • 1 个回复 • 198 次浏览 • 2017-09-04 14:22 • 来自相关话题

struts2+mybatis+spring 整合查询问题

Spring Frameworkxiaobaxi 回复了问题 • 4 人关注 • 3 个回复 • 202 次浏览 • 2017-08-29 10:41 • 来自相关话题

Idea 启动web项目报错:java.io.FileNotFoundException

Spring Frameworkxiaobaxi 回复了问题 • 3 人关注 • 2 个回复 • 167 次浏览 • 2017-08-29 10:15 • 来自相关话题