RxJava 与响应式编程

RxJava 是什么

首先 Java 是一门编程语言。
ReactiveX 则是与语言无关的一组异步编程和提供可观察流的 API。 RxJava 既是 ReactiveX 的 Java 版本 API 实现。

何为响应式

响应式编程是一种以事件为基础处理异步数据流的规范。在响应式里边基本单元都被抽象为“流”的形式。和传统编程模式相比,响应式里边充满了事件,将逐步产生的数据串成一个扁平化的事件序列。

RxJava 的经典例子

这些例子在 这个 项目里边,同时也是参照这篇文章的内容而发的。目的是用自己的理解讲解示例代码和 Rx 中的概念。

首先来一个最简单的案例:

Observable<String> obs = Observable.just("Hello RxJava!");
          obs.subscribe(System.out::println);
          

上面会输出结果:

Hello RxJava!
          

当然这个例子几乎没有任何实际的意义。但是它五脏却全。 第一行代码,产生了一个 Observable 对象 “obs”,接收了一个普通字符串 “Hello RxJava!“,
Observable 是 RxJava 的核心接口,它是一个可“被观察”的对象。 第二行代码订阅了 obs 对象,接着 obs 对象会产生带有数据的事件供订阅者消费。参数 System.out::println 是 Java8 的简洁写法,如果你不明白,请看下面的完整写法:

obs.subscribe(new Action1<String>() {
              @Override
              public void call(String data) {
                  System.out.println(data);
              }
          });
          

从上面可以看出,原来参数是一个回调。而回调中的数据 “data” 既是构建 obs 时的字符串。此时 obs 已经产生了一个流,虽然只有一个事件。 上述例子,事件很明显的体现出来了。那就是 obs 被订阅的时候,产生的回调。但是这个例子对“流”的概念体现不明显。

第二个例子,换 from 方法用数组填充 obs 。

List<String> words = Arrays.asList(
           "the",
           "quick",
           "brown",
           "fox",
           "jumped",
           "over",
           "the",
           "lazy",
           "dog"
          );
          
          Observable.from(words).subscribe(word->System.out.println(word));
          

输出:

the
          quick
          brown
          fox
          jumped
          over
          the
          lazy
          dog
          

可以发现输出了9次,也就是单独发生9次回调,产生了9个事件。也就是产生了一个按顺序的“事件序列”,也就是“流”。
如果我们把多个流串起来会怎样呢?

Observable.from(words)
            .zipWith(Observable.range(1, Integer.MAX_VALUE), 
              (string, count)->String.format("%2d. %s", count, string))
            .subscribe(System.out::println);
          

zipWith 方法把“源流”中的数据(参数 string,也就是事件回调中的字符串)和 另一个流(Observable.range 所产生)中的数据组合起来。
注:Observable.range(1, Integer.MAX_VALUE) 产生一个 从 1 到 “最大 Int 值”的事件序列(流) obs。zipWith 则是把源 obs 和 这个 obs 组合起来。 所以当第一个事件产生,”the” 被返回的时候,数字1也在 range 流的一个回调中产生。然后在zipWith 的第二个参数:回调函数,处理这两个数据结果:(格式化为一个字符串)

(string, count)->String.format("%2d. %s", count, string))
          

最后订阅输出:

1. the
          2. quick
          3. brown
          4. fox
          5. jumped
          6. over
          7. the
          8. lazy
          9. dog
          

把多个事件序列串行,事件在多个流之间传递,一层一层的处理数据,最终产生结果。

例如,下面这个方法会产生一个极大的事件序列:

Observable.range(1, Integer.MAX_VALUE)
          

但是因为源流只产生9个事件,所以这个流只会被经过9次。
因为 Observable.range 并不是一次性产生最终1 到 “最大 Int 值” 的结果,而是只有被消费一次的时候才会计算一个值,也就是延时计算。
跟 Java8 的 Streams API 的概念一样。在流里边,例如我指定从一个一百万的序列里边取前50个,并不是一次性把100万个值产生然后取前50个值。而是仅计算前50次,取这50个结果。计算被延时,并且不会产生资源浪费。

要求:从 “the quick brown fox jumped over the lazy dogs” 句子中提取单词数组,依次输出每一个字母。不能重复并且按照顺序。

List<String> words = Arrays.asList(
           "the",
           "quick",
           "brown",
           "fox",
           "jumped",
           "over",
           "the",
           "lazy",
           "dogs"
          );
          
          Observable.from(words)
           .flatMap(word -> Observable.from(word.split("")))
           .distinct()
           .sorted()
           .zipWith(Observable.range(1, Integer.MAX_VALUE),
             (string, count) -> String.format("%2d. %s", count, string))
           .subscribe(System.out::println);
          

flatMap 将源流中的单词拆分成字母产生新流,distinct 过滤掉流中的重复元素,sorted 将字母排序,zipWith 格式化字符串。它的目的是排序输出上述语句中存在的所有不重复字母。
最终结果:

 1. a
           2. b
           3. c
           4. d
           5. e
           6. f
           7. g
           8. h
           9. i
          10. j
          11. k
          12. l
          13. m
          14. n
          15. o
          16. p
          17. q
          18. r
          19. s
          20. t
          21. u
          22. v
          23. w
          24. x
          25. y
          26. z
          

是不是有点意思?那么我们用这种基于流产生数据的模式干点活:有两个时刻产生数据的数据源,这两个数据源隔数秒就产生一次数据,数据量很大。我们要求一个在只处理周末数据另一个仅处理工作日数据。

首先,判断周末:

private static boolean isSlowTickTime() {
           return LocalDate.now().getDayOfWeek() == DayOfWeek.SATURDAY || 
                  LocalDate.now().getDayOfWeek() == DayOfWeek.SUNDAY;
          }
          

不过,我们是在写 Demo 代码,真等到周末那岂不是黄花菜都凉了。所以我们假设这个方法的意图是判断周末,但是把内在逻辑换一换:

private static long start = System.currentTimeMillis();
          public static Boolean isSlowTime() {
             return (System.currentTimeMillis() - start) % 30_000 >= 15_000;
          }
          

这段代码的意思是,在一个 15 秒内调用都返回 true,在下一个15秒内调用返回 false,一直交替,模拟周末和工作日的快速交替过程。

接下来我们创建两个数据源,分别是 fast 和 slow。interval 方法会构建一个定时产生数据的 obs,数据产生 的间隔取决于参数中的数字和时间单位。
例如这段代码会产生一个隔1秒产生一次数据的 obs,一个隔3秒产生一次数据的 obs。而数据则是从0开始递增的值。(具体数据不重要)

Observable<Long> fast = Observable.interval(1, TimeUnit.SECONDS);
          Observable<Long> slow = Observable.interval(3, TimeUnit.SECONDS);
          

然后我们用 merge 方法合并这两个流。slow 过滤掉不是周末的数据(也就是仅输出周末日的数据),fast 过滤掉是周末的数据(即输出工作日的数据)。最后由一个流 clock 统一输出。
tick 参数就是 interval 构建的流产生的数字,它不重要,因为我们要体会的是事件的调度过程,不是消费数据。

Observable<Long> clock = Observable.merge(
                 slow.filter(tick-> isSlowTime()),
                 fast.filter(tick-> !isSlowTime())
          );
          

订阅 clock,输出 clock 流中的数据产生时的时间:

clock.subscribe(tick-> System.out.println(new Date()));
          Thread.sleep(60_000); // 加上 sleep 是为了防止main方法结束程序退出
          

我们看输出:

Wed Nov 23 00:02:00 CST 2016
          Wed Nov 23 00:02:01 CST 2016
          Wed Nov 23 00:02:02 CST 2016
          Wed Nov 23 00:02:03 CST 2016
          Wed Nov 23 00:02:04 CST 2016
          Wed Nov 23 00:02:05 CST 2016
          Wed Nov 23 00:02:06 CST 2016
          Wed Nov 23 00:02:07 CST 2016
          Wed Nov 23 00:02:08 CST 2016
          Wed Nov 23 00:02:09 CST 2016
          Wed Nov 23 00:02:10 CST 2016
          Wed Nov 23 00:02:11 CST 2016
          Wed Nov 23 00:02:12 CST 2016
          Wed Nov 23 00:02:13 CST 2016
          Wed Nov 23 00:02:14 CST 2016
          Wed Nov 23 00:02:17 CST 2016
          Wed Nov 23 00:02:20 CST 2016
          Wed Nov 23 00:02:23 CST 2016
          Wed Nov 23 00:02:26 CST 2016
          Wed Nov 23 00:02:29 CST 2016
          Wed Nov 23 00:02:30 CST 2016
          Wed Nov 23 00:02:31 CST 2016
          Wed Nov 23 00:02:32 CST 2016
          Wed Nov 23 00:02:33 CST 2016
          Wed Nov 23 00:02:34 CST 2016
          Wed Nov 23 00:02...
          

你们注意看15和16行之间,是每一秒一条数据到每三秒一条输出的转变。20行和21行是三秒到一秒的转变。一直交替下去。
这便是 ReactiveX 的响应式编程,是不是有点神奇。但又有点不知所措,它究竟该用在什么地方?

先别急,上面的内容都是基于静态数据构建的 obs,静态 Observable 只会在被订阅的时候产生事件流,并且都是历史数据,毕竟静态 Observable 创建以后就不会有新数据添加进来了。

下一节介绍如何接入动态的数据源,还有很多后续内容,待更。