RxJava学习之基本使用

RxJava现在在Android开发中越来越流行,作为一个Android开发者我也必须紧跟步伐学习学习RxJava,这篇文章就记录了RxJava中我认为比较常用的一些场景。

也给大伙推荐篇比较好的RxJava文章

RxJava基础

大家都知道JDK中提供了观察者模式的实现,它主要两个重要元素:

  • 被观察者 Observable
  • 观察者 Observer

至于Java中观察者模式的使用,大家可以自行Google下。

RxJava中也有两个重要的元素:

  • 被观察者(事件源) Observable
  • 观察者(事件订阅者) Subscriber

因此RxJava的设计看起来也有点类似JDK中的观察者模式,都有被观察者和观察者。
JDK观察者模式中当有操作需要时是由被观察者通知观察者来进行更新操作
RxJava中是由被观察者Observable发出事件给观察者Subscriber接收,然后观察者Subscriber调用noNext()进行处理,直到调用onComplete)()onError()结束

Gradle依赖

1
2
compile 'io.reactivex:rxjava:1.0.1'
compile 'io.reactivex:rxandroid:1.0.1'

基础方式创建Observable和Subscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//使用Observable.create()方式创建一个Observable事件源
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello world");
subscriber.onCompleted();
}
});
//创建一个观察者Subscriber
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d("RxJava" "onComplete");
}
@Override
public void onError(Throwable e) {
Log.d("RxJava" "onError");
}
@Override
public void onNext(String s) {
Log.d("RxJava" s);
}
};
//观察者订阅事件源
observable.subscribe(subscriber);

上面是RxJava的基础使用方式,这种方式使用起来和观察者模式还是比较像的,首先创建一个被观察者Observable,再创建一个观察者Subscriber,然后观察者订阅这个被观察者,一旦订阅之后Observable就会执行上面的call(Subscriber subscriber)方法(参数里面的Subscriber参数就是我们创建的观察者实例),通过该方法我们手动调用Subscriber方法的onNext和onCompleted方法。这里有个要注意的就是我们必须自己手动调用onNext和onCompleted方法,否则不会自己执行。

简化创建Observable和Subscriber

上面提到的ObservableSubscriber创建方式是RxJava中最基本的方式,但是上面的方式使用起来还是感觉有点繁琐,必须按部就班的来。

RxJava中也提供了简单的创建方式,比如:Observable.just()方式创建Observable,完整是示例如下:

1
2
3
4
5
6
7
8
9
10
11
//创建一个只发出一个事件就结束的对象
Observable<String> observable = Observable.just("hello world");
//创建一个只关心onNext处理的subscriber
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.d("RxJava" s);
}
};
//此方法有重载版本,可以传递处理onError,onComplete的Action
observable.subscribe(onNextAction);

上面的方法中使用Observable.just()方法可以快速的创建一个发送hello world事件的事件源,而如果我们只关心观察者对事件的处理,而不关心事件处理结束和事件发生错误时的处理,我们则可以创建Action1对象来替代Subscriber进行事件处理。

上面observable.subscribe(onNextAction)一旦订阅事件,就会自动的执行Action1中的call方法,该方法的作用等同于Subscriber中的onNext方法的作用,至于为什么一旦订阅就会自动执行call方法,而前面的一个例子中我们却需要手动调用Subscriber中的onNext方法,这个原因大家可以去源码实现中找答案,我就不介绍了。

当然如果你除了处理事件外,也需要对事件结束和事件错误时进行其他处理,则可以使用observable.subscribe(Action1)另一个重载方法observable.subscribe(Action1,Action1,Action1)分别接收对应onNext,onCompleted,onError.

Action1中的call方法只能接收一个参数,RxJava中也提供了很多其他的几种Action,从Action0Action9分表代表其call方法能接收0个参数到9个参数,另外还有一个ActionN其能接收N个参数。

RxJava其实是支持链式写法 的,所以上面的写法可以适用如下的方式实现:

1
2
3
4
5
6
7
Observable.just("hello world")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("RxJava" s);
}
});

上面介绍了RxJava的基本使用,下面接着介绍RxJava中一些比较常用的功能函数。

事件变换map

这个map是干什么用的呢?我举个例子:比如说我们有一个Observable对象,这个对象发送的事件是一串用户密码字符串,但是Subscriber进行处理的时候需要的是一个包含加密后的密码串,这个时候我们就可以使用map操作符将一个Observable对象发送的事件修改成另一个事件,下面的代码里通过map将hello world转换成其hashCode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.just("hello world")
//map操作将一个事件变换为另外一个事件,只会影响到当前的subscriber
//此处将Observable的String事件转换成Integer事件,所以事件是可用改变的
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return s.hashCode();
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer hashCode) {
//这里的值就是hello world的hash值
Log.d("RxJava" s);
}
});

例子中map(Func1<source, target>)操作符通过Func1<source, target>类将source事件转换修改成target事件

通过上面的介绍可以看出map是用来变换修改Observable所发出的事件

Observable变换flatMap

map是用来变换Observable所发出的事件,而flatMap就更强大,它可以将Observable转换成一个全新的Observable,依旧上例子代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.just("laohu")
//将一个事件数据变换为另一种事件输出的Observable
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> call(String s) {
return Observable.just(new User(s));
}
})
.subscribe(new Action1<User>() {
@Override
public void call(User user) {
Log.d("RxJava" user.getName());
}
});

上面的例子中通过flatMap操作符将一个发送laohu事件的Observable转换成一个发送User对象的Observable,该变化是完全生成一个新的Observable

Observable.from

在我们开发过程中经常需要对一个数组或是一个集合数据进行处理,比如我通过班级编号可以查出这个班的所有学生,查询出来后需要打印出每个学生的名字,那么我们使用上面介绍的方式该怎么做呢?

1
2
3
4
5
6
7
8
9
10
List<Student> students= ...
Observable.just(students)
.subscribe(new Action1<List<Student>>() {
@Override
public void call(List<Student> students) {
for(User user : users) {
Log.d("RxJava" students.getName());
}
}
});

上面的做法中,很明显我们是在Subscriber中对列表进行循环打印出每个学生的名字,这种方法是不是感觉很多余,我既然都拿到列表了我干嘛还要多次一举使用RxJava去进行循环处理。这时使用Observable.from就可以解决这个问题,该方法可以将集合或数组进行循环处理,每次发送一个事件元素给Subscriber进行处理,在Subscriber中只需要针对单个Student进行姓名打印就可以了,改进之后的写法如下:

1
2
3
4
5
6
7
8
List<Student> students= ...
Observable.from(students)
.subscribe(new Action1<Student>() {
@Override
public void call(Student student) {
Log.d("RxJava" student.getName());
}
});

改进之后的写法看着是不是很简单,from配合flatMap可以实现很多很复杂的操作,后面我们再举例

事件过滤filter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class User {
private String name;
private int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
...
}
String[] array = {"张三", "李四", "王麻子", "赵六"};
Observable.from(array)
//将一个事件数据变换为另一种事件输出的Observable
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> call(String s) {
int age = 20;
if(s.length() > 2) {//名字长度大于2的年龄设为10
age = 10;
}
return Observable.just(new User(s, age));
}
})
//将age <= 10的事件过滤掉
.filter(new Func1<User, Boolean>() {
@Override
public Boolean call(User user) {
return user.getAge() > 10;
}
})
.subscribe(new Action1<User>() {
@Override
public void call(User user) {
Log.d("RxJava" user.getName());
}
});

上面的例子中,会将年龄小于等于10的用户数据过滤掉不进行处理,因此在filtercall方法中进行判断,年龄小于等于10的数据返回false即可将该数据过滤掉。

选取指定数量数据take()

上面的例子中如果我只想对符合条件的前两个数据进行处理该怎么做呢,这时我们可以使用take()操作符来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
String[] array = {"张三", "李四", "王麻子", "赵六"};
Observable.from(array)
//将一个事件数据变换为另一种事件输出的Observable
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> call(String s) {
int age = 20;
if(s.length() > 2) {
age = 10;
}
return Observable.just(new User(s, age));
}
})
//将age <= 10的事件过滤掉
.filter(new Func1<User, Boolean>() {
@Override
public Boolean call(User user) {
return user.getAge() > 10;
}
})
//只取符合条件的前两个结果
.take(2)
.subscribe(new Action1<User>() {
@Override
public void call(User user) {
Log.d("RxJava" user.getName());
}
});

doOnNext

上面的例子中如果我们取到前两个符合条件的数据进行处理之前,我们要先进行缓存处理,这个时候我们就可以使用doOnNext操作符进行处理,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
String[] array = {"张三", "李四", "王麻子", "赵六"};
Observable.from(array)
//将一个事件数据变换为另一种事件输出的Observable
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> call(String s) {
int age = 20;
if(s.length() > 2) {
age = 10;
}
return Observable.just(new User(s, age));
}
})
//将age <= 10的事件过滤掉
.filter(new Func1<User, Boolean>() {
@Override
public Boolean call(User user) {
return user.getAge() > 10;
}
})
//只取符合条件的前两个结果
.take(2)
//在subscribe执行之前进行额外的操作,比如将数据保存到磁盘上
.doOnNext(new Action1<User>() {
@Override
public void call(User user) {
save(user);
}
})
.subscribe(new Action1<User>() {
@Override
public void call(User user) {
Log.d("RxJava" user.getName());
}
});

线程调度

比如我们有一个同步请求网络数据的服务,在Android中使用RxJava进行处理该怎么做呢?我们可以使用subscribeOn()指定被观察者(事件)的运行线程,使用observeOn()指定观察者(订阅者)的运行线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.just("查询条件")
.subscribeOn(Schedulers.io())//在子线程中进行查询操作
.flatMap(new Func1<String, Observable<Result>>() {
@Override
public Observable<Result> call(String whereClause) {
Result result = queryResult(whereClause);
return Observable.just(result);
}
})
.observeOn(AndroidSchedulers.mainThread())//在UI线程中处理结果
.subscribe(new Action1<Result>() {
@Override
public void call(final Result result) {
Log.d("RxJava" result.toString());
}
});

上面这些是我目前使用RxJava用到的一些功能函数和操作符,后面学习了其他操作符之后,我会用另外的一篇博文记录下来。

write by 老胡
2016年10月30日

本文章发表在 独立博客 ittiger.cn个人CSDN博客


原创文章,本文采用知识共享署名 2.5(中国大陆许可协议)进行许可,欢迎转载,但转载请注明来自ittiger.cn,并保证转载后文章内容的完整性。本人(laohu)保留所有版权相关权利。



评论