一、反应式编程初探

什么是反应式编程

有这么一个场景:

假设你定了一年的费用,并支付了一整年的订阅费用。可是过了几天,你却没有收到一份报纸,打电话给报社问为什么没有收到报纸。他告诉你:“你支付的是一年的费用,现在一年还没结束,当这一年结束的时候,我们会把这一年的报纸一次性交付给你。”这听起来很不合理,报纸是一种很具有时效性的东西,昨天的报纸对于今天来说已经没有多大意义了,可况年底将一年的报纸打包发给你,正常情况下应该是将每天都给你报纸。

好在这只是一个假设的场景,并不是真实的。从上面这个假设的场景中,我们感觉到了不合理

再来对比一下命令式编程反应式编程

命令式编程:非常类似于上文中虚拟的订阅报纸的方法。我们普通的编程方法几乎都是命令式编程,按顺序执行一批代码,下一个任务的执行依赖于上一个任务的成功执行,最后等到所有代码执行完毕,我们才能拿到最终的输出结果,这就好像是必须等一年才能拿到今年的报纸。

反应式编程:类似于生活中真实的订阅报纸。虽然我订阅的是一年份的报纸,但是我每天都可以收到新的报纸。我们不需要等到所有代码都执行完毕才能取到数据,任务可以是并行处理的,我们可以得到中间的数据,每个任务处理一部分数据,最后进行汇总。

这么说可能还是有点绕,可以想想一个这么场景。

我们可以选择这两种方式打水仗。

  • 一种是用一个个气球装满水,然后直接往对方身上扔,水球扔到对方身上,水一下子迸发而出。这就好像是命令式编程。

  • 另外一种则是使用一根水管,连续不断的将水喷到对方身上。这就好像是反应式编程。

为什么需要反应式编程?

反应式编程旨在提供无阻塞异步回压的异步处理。
举个例子:什么时候需要所谓的无阻塞异步处理呢?

比如HTTP服务器,对于这些服务器来说,它肯定服务的不只是一个人,它要服务很多人。如果按照同步的方法,那么服务器一次就要被一个客户端独占了。而在我们通常的使用场景中,一个客户端并不是时时刻刻都在向服务器发送请求,比如我们浏览网页的时候,我们点进去一个页面,get到服务器端返回来的页面以后,我们起码也要在这个页面上浏览好几秒钟才会再次向服务器发送请求跳转到下一个页面吧。如果服务器被一个客户端独占了,自然就没有办法响应其它客户端的请求。那肯定不行。怎么办呢?这时候就采用异步的方法。
所谓的同步和异步就好像是打电话和发邮件的区别。打电话得保证双方都得同时占线,而且双方打电话的时候就不能干其它事情了,如果电话那头话说得又非常慢,你还得耐着性子等着他说完。发邮件则不是,你给对方发邮件,对方空闲下来就会去看看邮箱,没有空的时候他就会忙他的事情。你也是,发完邮件以后你就去干自己的事情,不用像打电话一样必须等着对方说完才会去做自己的事情,这样能极大的提高对时间的利用效率。
这里也是,若HTTP服务器采用异步的方法,服务器就不会被一个客户端独占了。你向服务器发送请求,服务器不会马上响应你,而是会等到它忙忘手中的事情才回来理你。这样的话服务器不会马上响应你的需求,影响大吗?得看你的实际需求,如果你是在浏览网页,可能基本上没什么影响,因为你并不是时时刻刻都会向服务器发起请求,而是隔三岔五的请求一下。而对于服务器来说,这样它就能同时服务很多客户端了,而不会被一个客户端长时间占线。

反应式编程的规范

反应式编程的规范可以总结为4个接口PublisherSubscriberSubscriptionProcessor

Publisher负责生成数据,并将数据发送给Subscriber。一个Subscriber对应着一个Subscription。Subscriber可以使用Subscription来管理其订阅情况。

  1. 首先是Subscriber订阅Publisher。订阅成功以后,Subscriber就会得到一个Subscription对象。Subscription中有两个方法:request方法和cancel方法。

  2. Subscriber得到一个Subscription对象以后,就可以调用该Subscription对象的request方法,请求Publisher发送数据给Subscriber。通过调用cancel方法就可以取消对Publisher的订阅。

  3. Processor则是用来处理Publisher发送给Subscriber的数据。

对应的关系如图所示:
在这里插入图片描述

二、上手反应式编程(使用Spring中的Reactor)

对比反应式编程和命令式编程代码

先来看看两段代码。第一段是命令式编程。第二段则是Spring中的反应式编程框架Reactor来实现的。

这两段代码都能完成相同的功能

String name = "Simon";
String capitalName = name.toUpperCase();
String greeting = "Hello " + capitalName + "!";
System.out.println(greeting);
Mono.just("Simon")
            .map(n -> n.toUpperCase())
            .map(cn -> "Hello " + cn + "!")
            .subscribe(gn -> System.out.println(gn));

添加相应依赖

正式使用Reactor之前需要添加相应依赖。

如果使用Spring Boot,那么则需要添加如下依赖

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>

Reactor还提供了test功能,需要的话则可以添加如下依赖:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

添加了测试功能以后就可以使用StepVerifier进行测试

Flux<String> fruitFlux = Flux
	      .just("Apple", "Orange", "Grape", "Banana", "Strawberry");
    
StepVerifier.create(fruitFlux)
    .expectNext("Apple")
    .expectNext("Orange")
    .expectNext("Grape")
    .expectNext("Banana")
    .expectNext("Strawberry")
    .verifyComplete();

Mono和Flux

Reactor有两种核心类型:一种是Mono,一种是Flux。Mono的英文意思是单声道。对比如下两种图可知,Mono处理的数据项不超过一个,也就是所谓的。而Flux则可以处理一个或者是多个数据项

Mono示意图
在这里插入图片描述
Flux示意图
在这里插入图片描述

三、反应式编程的常见操作

1.创建操作

在使用反应式编程之前,我们得先创建反应式类型的数据,也就是说得先将数据封装为Flux或者是Mono类型的对象。

根据对象创建

Flux<String> fruitFlux = Flux
	      .just("Apple", "Orange", "Grape", "Banana", "Strawberry");

/*
创建好放映室类型的对象以后,就可以开始反应式操作了。
需要注意的是反应式类型的数据离不开subscribe
只有被订阅了整个数据才会开始流动起来,操作才会一步一步往下走
*/
fruitFlux.subscribe(f -> System.out.println(f));

根据数组创建

String[] fruits = new String[] {
    "Apple", "Orange", "Grape", "Banana", "Strawberry" };
	  
Flux<String> fruitFlux = Flux.fromArray(fruits);

根据List创建

List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");

Flux<String> fruitFlux = Flux.fromIterable(fruitList);

根据Java Stream创建

Stream<String> fruitStream = Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");

Flux<String> fruitFlux = Flux.fromStream(fruitStream);

根据区间生成数据

//Flux会生成五个数1,2,3,4,5。被订阅以后将会发布这五个数
Flux<Integer> intervalFlux = Flux.range(1, 5);

根据时间生成数据

//每秒钟生成一个数据,从0开始,总共生成五个数据,最终生成的数据为0,1,2,3,4
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)).take(5);

2.组合操作

使用 mergeWith进行合并

mergeWith不能保证合并的先后顺序

Flux<String> characterFlux = Flux
        .just("Garfield", "Kojak", "Barbossa")
        .delayElements(Duration.ofMillis(500));//delayElements(Duration.ofMillis(500))每延迟500ms才发送一个数据
Flux<String> foodFlux = Flux
    .just("Lasagna", "Lollipops", "Apples")
    .delaySubscription(Duration.ofMillis(250))//delaySubscription(Duration.ofMillis(250))表示订阅成功后延迟250ms才开始发送数据
    .delayElements(Duration.ofMillis(500));

Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);

示意图
在这里插入图片描述

使用 zip 进行合并

zip能保证合并的顺序,依次从两个Flux中分别取出一个数据合并为一个Tuple2类型的数据

Flux<String> characterFlux = Flux
    .just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
    .just("Lasagna", "Lollipops", "Apples");

Flux<Tuple2<String, String>> zippedFlux = Flux.zip(characterFlux, foodFlux);

//打印输出
zippedFlux.subscribe(p -> System.out.println(p.getT1() + ";" + p.getT2()));

/*
打印结果:
Garfield ; Lasagna
Kojak ; Lollipops
Barbossa ; Apples
*/

示意图:
在这里插入图片描述

使用zip的合并函数就可以使得合并以后的数据并不是Tuple2类型的数据

Flux<String> characterFlux = Flux
    .just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
    .just("Lasagna", "Lollipops", "Apples");

//zip(合并源头1, 合并源头2, 合并操作)
Flux<String> zippedFlux = Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);

zippedFlux.subscribe(p -> System.out.println(p));
/*
打印结果:
Garfield eats Lasagna
Kojak eats Lollipops
Barbossa eats Apples
*/

使用 first 进行合并

first操作将会选择第一个发布消息的Flux。

Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
    .delaySubscription(Duration.ofMillis(100));
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");

Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);

fastFlux.subscribe(p -> System.out.println(p));
/*
由于slowFlux会在订阅的100ms后在开始发布数据,
因此slowFlux的数据将会比fastFlux的数据出现得晚,
因此会选择fashFlux中的数据
*/

/*
打印结果:
hare
cheetah
squirrel
*/

3.转换和过滤操作

过滤操作

使用 skip 过滤数据
Flux<String> countFlux = Flux.just("one", "two", "three", "four", "five").skip(3);//跳过前三个数据

//最后countFlux发布的数据为"four"和"five"

示意图:
在这里插入图片描述

Flux<String> countFlux = Flux.just("one", "two", "three", "four", "five")
        .delayElements(Duration.ofSeconds(1))//每延迟一秒才会发送一个数据,发送第一个数据之前也会被延迟一秒
        .skip(Duration.ofSeconds(4));//跳过前四秒发送的数据

//最后countFlux中的数据为"four"和"five"

示意图如下:
在这里插入图片描述

在这里插入图片描述

使用 take 过滤数据

skip是跳过指定数据,take是取走指定数据

Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Acadia")
        .take(3);

nationalParkFlux.subscribe(s -> System.out.println(s));
/*
运行结果:
Yellowstone
Yosemite
Grand Canyon
*/
Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
        .delayElements(Duration.ofSeconds(1))
        .take(Duration.ofMillis(3500));

nationalParkFlux.subscribe(s -> System.out.println(s));
/*
运行结果:
Yellowstone
Yosemite
Grand Canyon
*/
使用 filter 进行过滤
Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
        .filter(np -> !np.contains(" "));//过滤掉表达式为false的值
        
/*
过滤掉包含空格" "的字符串
最终nationalParkFlux发布的数据为:
"Yellowstone", "Yosemite", "Zion"
*/
使用 distinct 进行过滤
Flux<String> animalFlux = Flux.just(
        "dog", "cat", "bird", "dog", "bird", "anteater")
        .distinct();
/*
过滤掉重复的字符串
最终nationalParkFlux发布的数据为:
"dog", "cat", "bird", "anteater"
*/

转换操作

使用 map 进行转换

map操作实际上是创建了一个新的Flux对象,不过在创建新的Flux对象之前会先进行转换操作,并且map操作是同步进行的。

private static class Player {
    private final String firstName;
    private final String lastName;

    @Override
    public String toString() {
        return firstName + " " + lastName;
    }
}

@Test
public void map() {
    Flux<Player> playerFlux = Flux
        .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
        .map(n -> {
            String[] split = n.split("\\s");// "\s"表示空格
            return new Player(split[0], split[1]);
        });
	//转换以后playerFlux中就包含了三个player对象
    playerFlux.subscribe(f -> log.info("player = {}", f));
}

日志打印输出:

16:56:56.004 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
16:56:56.015 [main] INFO sia5.FluxTransformingTests - player = Michael Jordan
16:56:56.017 [main] INFO sia5.FluxTransformingTests - player = Scottie Pippen
16:56:56.017 [main] INFO sia5.FluxTransformingTests - player = Steve Kerr

示意图:
在这里插入图片描述

使用 flatMap 进行转换

flatMap并不像map操作那样简单地将一个对象转化为另一个对象,而是将对象转化为新的Mono或者是Flux对象。得到的Mono或者是Flux对象会扁平化为新的Flux对象。

@Test
public void flatMap() {
    Flux<Player> playerFlux = Flux
        .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
        .flatMap(n -> Mono.just(n)
                 .map(p -> {
                     String[] split = p.split("\\s");
                     return new Player(split[0], split[1]);
                 })
                 .subscribeOn(Schedulers.parallel())//表示每个订阅都应该在并行的线程中进行处理
                );
    playerFlux.subscribe(f ->log.info("player = {}",f));
}

subscribeOn() 的名称与 subscribe() 类似,但它们却截然不同。subscribe() 是一个动词,它订阅一个响应式流并有效地将其启动,而subscribeOn()则更具描述性,它指定了应该如何并发地处理订阅
在本例中,使用了Schedulers.parallel(),它是使用固定大小线程池的工作线程(大小与 CPU 内核的数量一样)。但是调度程序支持多个并发模型,如下表所示

Schedulers 方法描述
.immediate()在当前线程中执行订阅
.single()在单个可重用线程中执行订阅,对所有调用方重复使用同一线程
.newSingle()在每个调用专用线程中执行订阅
.elastic()在从无限弹性池中提取的工作进程中执行订阅,根据需要创建新的工作线程,并释放空闲的工作线程(默认情况下 60 秒)
.parallel()在从固定大小的池中提取的工作进程中执行订阅,该池的大小取决于 CPU 核心的数量。

日志打印输出

subscribeOn(Schedulers.parallel())指定了每个订阅都应该在并行的线程中进行处理,所以最终得到的数据的先后顺序和输入时的顺序不一定一致。

16:53:07.958 [parallel-2] INFO sia5.FluxTransformingTests - player = Scottie Pippen
16:53:07.963 [parallel-3] INFO sia5.FluxTransformingTests - player = Michael Jordan
16:53:07.963 [parallel-3] INFO sia5.FluxTransformingTests - player = Steve Kerr

示意图:
在这里插入图片描述

在反应式流上缓存数据
将数据缓存到多个List中
Flux<String> fruitFlux = Flux.just(
    "apple", "orange", "banana", "kiwi", "strawberry");

//每三个元素组成一个List<String>集合,然后封装到新的Flux对象中
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);

bufferedFlux.subscribe(s -> log.info("bufferedFlux = {}", s));

打印结果

17:08:53.863 [main] INFO sia5.FluxBufferingTests - bufferedFlux = [apple, orange, banana]
17:08:53.864 [main] INFO sia5.FluxBufferingTests - bufferedFlux = [kiwi, strawberry]
遇到了一个问题,未解决
@Test
  public void bufferAndFlatMap() throws Exception {
    Flux.just("apple", "orange", "banana", "kiwi", "strawberry")
            .buffer(3)
            .flatMap(x ->
                    Flux.fromIterable(x)
                            .map(y -> y.toUpperCase())
//                            .subscribeOn(Schedulers.parallel())
                            .log()
            ).subscribe();
  }

注释掉subscribeOn(Schedulers.parallel())或者是将其修改为:subscribeOn(Schedulers.immediate())就可以打印出完整日志。

18:34:11.257 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
18:34:11.276 [main] INFO reactor.Flux.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
18:34:11.279 [main] INFO reactor.Flux.MapFuseable.1 - | request(32)
18:34:11.279 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(APPLE)
18:34:11.279 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(ORANGE)
18:34:11.279 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(BANANA)
18:34:11.280 [main] INFO reactor.Flux.MapFuseable.1 - | onComplete()
18:34:11.280 [main] INFO reactor.Flux.MapFuseable.2 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
18:34:11.280 [main] INFO reactor.Flux.MapFuseable.2 - | request(32)
18:34:11.280 [main] INFO reactor.Flux.MapFuseable.2 - | onNext(KIWI)
18:34:11.280 [main] INFO reactor.Flux.MapFuseable.2 - | onNext(STRAWBERRY)
18:34:11.280 [main] INFO reactor.Flux.MapFuseable.2 - | onComplete()

否则打印出来的日志如下:明显差了处理"apple", "orange", "banana", "kiwi", "strawberry"这几个元素的日志

18:37:07.504 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
18:37:07.531 [main] INFO reactor.Flux.SubscribeOn.1 - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
18:37:07.533 [main] INFO reactor.Flux.SubscribeOn.1 - request(32)
18:37:07.535 [main] INFO reactor.Flux.SubscribeOn.2 - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
18:37:07.535 [main] INFO reactor.Flux.SubscribeOn.2 - request(32)
将数据缓存到一个 List 中
Flux<String> fruitFlux = Flux.just(
    "apple", "orange", "banana", "kiwi", "strawberry");

Mono<List<String>> fruitListMono = fruitFlux.collectList();
fruitListMono.subscribe(s -> s.forEach(System.out::println));

/*
打印结果:
apple
orange
banana
kiwi
strawberry
*/
将数据缓存到一个 Map 中
Flux<String> animalFlux = Flux.just(
    "aardvark", "elephant", "koala", "eagle", "kangaroo");

Mono<Map<Character, String>> animalMapMono =
    animalFlux.collectMap(a -> a.charAt(0));

animalMapMono.subscribe(s -> s.forEach((key, value) -> System.out.println("key = " + key + " ; " + "value = " + value)));

/*
打印结果:
key = a ; value = aardvark
key = e ; value = eagle
key = k ; value = kangaroo
*/

4.逻辑操作

all 逻辑操作

all:Mono或者是Flux发布的数据全部都满足条件才返回true。

Flux<String> animalFlux = Flux.just(
    "aardvark", "elephant", "koala", "eagle", "kangaroo");

Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));//animalFlux发布的每一个数据是否都包含了字母'a'?
hasAMono.subscribe(System.out::println);

/*
打印结果:
true
*/

Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));//animalFlux发布的数据中不是都包含字母'k',故返回false
hasKMono.subscribe(System.out::println);

/*
打印结果:
false
*/

any 逻辑操作

all:Mono或者是Flux发布任意一个数据满足条件则返回true。

Flux<String> animalFlux = Flux.just(
    "aardvark", "elephant", "koala", "eagle", "kangaroo");

Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("t"));//animalFlux发布的数据中是否有一个数据包含了字母't'
hasAMono.subscribe(System.out::println);

/*
打印结果:
true
*/
Logo

尧米是由西云算力与CSDN联合运营的AI算力和模型开源社区品牌,为基于DaModel智算平台的AI应用企业和泛AI开发者提供技术交流与成果转化平台。

更多推荐