写给Rikka自己的RxJava2说明书

网友投稿 257 2022-09-21

写给Rikka自己的RxJava2说明书

目录

为了便于学习和检索,这里列出一个目录:

1. 前言 2. 两句话概括RxJava 3. 一个RxJava2+Retrofti的示例 4. Single是啥?有什么用 5. just()源码 6. map操作符 7. onSubscribe(Disponsable disposable) 8. Disposable 9. subscribeOn 10. observerOn 11. Schedulers 12. 总结

前言

今天来学习并总结RxJava2。具体学到多深我不知道…反正就是一个阶段性的学习的blog。 因为之前用过但是不是很顶,公司的项目的话在用RxJava1,之后的重构应该会用RxJava2,所以这之前得认真学习一哈。

今天主要从实际开发出发,比如Retrofit+RxJava2的例子,然后从例子中去剖析RxJava的用法、特点。 尽量写的比较简单好懂(本来本篇blog讲的就不深,如果还写的天马行空,我自己都看不懂…)至于源码那就随缘了,之前读过,有一个大致的了解,这篇blog应该还会继续去追究源码,多读一次更加深印象一次。

两句话概括Rxjava

我用自己目前浅薄的知识来概括Rxjava,便于后面的学习:

一个RxJava2+Retrofit2的示例

PS:这里我没有找到一个很好的Api(那种要填post的Path的),只能拿到一个GET请求的API(= =!一堆网上的都过期了,我又不想申请,懒得找了淦,不过其实用法都大同小异,认真想一想的话POST其实很简单的)

按序号分步走:1、导入RxJava、Gson、Retorfit包并申请权限 大概就是下面这样(版本可能老了一丢丢,但这是学习版,都是2.0,所以问题不大)

'com.squareup.ok api 'com.squareup.retrofit2:retrofit:2.3.0' api 'com.squareup.retrofit2:converter-gson:2.3.0' api 'com.squareup.retrofit2:adapter-rxjava2:2.3.0' api 'io.reactivex.rxjava2:rxjava:2.0.7' api 'io.reactivex.rxjava2:rxandroid:2.0.1' api 'com.jakewharton.rxbinding2:rxbinding:2.0.0' api 'com.google.code.gson:gson:2.8.5'

然后在Manifest中申请网络权限,不然申请网络权限会 permission denied

2、写一个api网络申请接口类、根据接口的回调写JavaBean

我们一般会把Retrofit网络申请放在一个接口类中,因为很多时候我们不止申请一个请求,而且解耦和封装更加符合编程规范(←重点)

我这里找的接口是 : interface Api { @GET("widget/getSearchDefaultWords#") Single> getSearchDefault();}

对Retrofit2不熟悉的人可能看不懂,但我这里也就简单的讲明一下用法,它有两行,一行注解,一行接口函数。 第一行很简单,就是做一个拼凑,把url拼凑成一个请求头​​​Android 深入Http(1)HTTP原理和机制​​ ↑我的这篇文章里 讲过我们在浏览器 输入一个网址的时候,一个URL 转换成 一个Http Request

那么​​@GET("widget/getSearchDefaultWords#")​​​的含义就是将 widget/getSearchDefaultWords# 这个api的path, 拼接成:​​​GET /widget/getSearchDefaultWords# HTTP/1.1​​这么一个Http Request的请求行。

​​Single> getSearchDefault();​​​这一行则是Api的方法,我们通过调用这个 ​​getSearchDefault()​​​ 的方法,就能返回一个 ​​Single>​​​的返回体。​​​Single​​​是什么后面会讲(当然也可以换成​​Observer​​),然后其里面是 Repo的List,因为我们申请接口的返回是存放在在一个List/Array中的,所以我们要用List去装这些Repo(等下会知道)。

而Repo就是返回的结果啦,我们一开始怎么知道返回的结果是什么?所以我们要去Postman或者能测试接口的软件,去看看这个接口返回的数据是什么。

那么上面的Body就是我们的返回Response,从大括号和中括号也可以看出来,我们这些Repo用中括号阔起来放在大括号里面,就说明大括号是个数组List,而里面的中括号则是这个List的一个元素,这也是为什么我们前面要用List来放Repo。我们复制全部的Body,到Android Studio中新建一个类 Repo,然后键盘敲ALT+INS,然后选择GsonFormat,在转换框中粘贴进去,按下回车,就会变成下面这样:

这个插件就是把JSON格式转换成一个JavaBean的神器!(如果没有该插件的话可以去百度查查怎么在Android Studio下载,就几步,很快捷的)

这样的话,我们的api就算是封装好了。

3、构建Retrofit,并使用RxJava进行网络请求 最后一步来了,就是进行网络请求。 我们先建立一个Retrofit对象,并且让他添加RxJava和Gson的适配工厂

= " Gson gson = new Gson(); Retrofit retrofit = new Retrofit.Builder() .baseUrl(url) //添加RxJava2的适配 .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) //添加Gson的转换 .addConverterFactory(GsonConverterFactory.create(gson)) .build();

然后创建Api 的对象并且实现Api的方法:

= retrofit.create(Api.class); api.getSearchDefault() //将网络请求放在后台 .subscribeOn(Schedulers.io()) //将结果的返回放在前台 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new SingleObserver>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(List repo) { Log.d(TAG, repo.get(0).toString()); } @Override public void onError(Throwable e) { Log.e(TAG, e.getMessage()); } });

其中

.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())

这两个后面在解析,是有关于线程调度的,而且一定要加这两行,不让Retrofit是不会把网络请求放在后台的…那样的话网络请求可能会阻塞,一定会报错。

// subscirbe == enqueue.subscribe(new SingleObserver>() { ... });

至于上面这个 ​​subscribe​​​方法其实就相当于Ok​​enqueue​​​方法,让这个网络去执行! 然后参数就是网络的回调,有 ​​​onSubscribe​​​、​​onSuccess​​​、​​onError​​方法,分别代表执行、网络请求成功并返回结果,网络请求失败。我们在success中打印结果。

接下来我们来运行程序,得到下面的结果:

Ok,看来请求成功辽。

从上面的图中可以分析一下:从onSubsrcibe到onSuccess,用了1s多的时间,这就是一个网络请求的时间。那么onSubscribe就是程序一开始执行所会回调的程序。

那我们就来仔细的去了解其中的几个方法,首先最开始的Single的介绍开始

Single是啥?有什么用

为了更好的介绍Single,我们可以来写一下Single的一个简单的示例:

= Single.just("1"); single.subscribe(new SingleObserver() { @Override public void onSubscribe(Disposable d) { } @Override public void onSuccess(String s) { Log.d(TAG, s); } @Override public void onError(Throwable e) { } });

在这里,我们要引入 订阅者 和 观察者 两个概念:

订阅者/被观察者就是接收消息的一方,订阅事件的一端,在有事件传过来的时候,可以得到这些事件。然后可以对事件进行处理。在本例中,订阅者就是​​​subscribe()​​括号里面的东西,就是那个 ​​SingleObserver​​ 对象它所做的三个方法 ,就是对 发送方发送的 Disaponable、String、Throwable进行处理。观察者就是发送消息的一方,发送事件的一端,当执行完任务后,会得到一份结果,但自己不能处理,只能给订阅者处理,自己看着他处理(我自己烤了个鸡腿,但是我只能递给你,眼巴巴的看着你吃QAQ)所以这也是为什么:发送事件的一方叫做观察者,接收事件的一方又叫做被观察者在本例中,观察者就是​​​Single single​​​,它使用​​just()​​​方法向 被观察者 发送了一个字符 “1”,然后使用了​​subscribe()​​方法来观察 被观察者 怎么处理这个“1”。

构造一个Single,这个just(“1”) 就是指订阅者在订阅的一瞬间,发送者就会给接收者发送一个 “1”。 程序的输出的确就是1没错。

我们接下来来分析一下just() 是具体干了什么,从源码的角度分析。(这里开始刚源码啦)

just()源码

我们来追踪一下just()的源码:

@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static Single just(final T item) { //这一行是判空操作 ObjectHelper.requireNonNull(item, "value is null"); return RxJavaPlugins.onAssembly(new SingleJust(item)); }

那个return句子有两个东西, 一个是 ​​onAssembly()​​的东西,另一个是它传入的SingleJust对象,,Assembly就是集合的意思,这说明它会对括号里的参数进行一个“装箱”。我们来看看onAssembly()方法:

@NonNull public static Single onAssembly(@NonNull Single source) { //Fuction是一个转换器,把传入的对象 转换成一个 要处理的 传出的对象 //onSingleAssembly是一个转换后的结果..我们一般用不上这个参数 Function f = onSingleAssembly; if (f != null) { return apply(f, source); } return source; }

这句话就是把传入的对象进行一个加工处理。 然后就出来了,也就是说,return语句的关键在于另一条​​​SingleJust​​这个对象。

我们打开了SingleJust的类,把焦点聚集在下面这个方法上

//SingleJsust 重写了下面方法 @Override protected void subscribeActual(SingleObserver s) { //woc!这不就是外面 subscribe调用的方法咩?它们绝壁有关联的 s.onSubscribe(Disposables.disposed()); s.onSuccess(value); }

可以这么说,Single通过just() 方法创建了一个SingleJust的对象。

然后SingleJust里面有 ​​subscribeActual()​​​ 方法,它会调用参数的onSubscribe()和onSuccess()方法,而这就和Single调用的subscribe()方法一样,有一个大胆的猜测就是我们调用 Single的 ​​subscribe()​​​ 其实就是调用 SingleJust的 ​​subscribeActual()​​方法。

抱着这个猜测,我们去查看subscrible()的源码:

第一、二、三行分别是判空和包装,不看!

try/catch只看try语句,所以到最后,它就是调用了 ​​subscribeActual()​​这个方法!

那么在这里就可以总结下了:

Single通过just()来构造了一个对象,这个对象使用​​subscrible()​​​方法的执行其实就使用了SingleJust的​​subscribeActual()​​方法。他直接调用了onSubscribe啊onSuccess!所以我们 重写了 subscribe(),等于我们定制了 subscribleActual()方法

这就是 ​​subscrible()​​​的作用,它会直接调用我们重写的​​SingObserver​​​的东西。 所以 当我们在网络请求搞完了,retrofit得到了一个response之后,我门就是拿着它去走 subscrible()方法的回调的~

接下来,我们先不讲解线程调度,也不讲回调方法,我们来看一下RxJava一个特有的东西:操作符

map操作符

map()方法的作用是:转换数据类型 比如说我得到的网络申请结果是一个Integer,但是我要发给 订阅者 的是String,那么我在每次发送之前就要把Integer转换成String,就要在​​​map()​​里面做:

= Single.just(1); single.map(new Function() { @Override public String apply(Integer integer) throws Exception { return String.valueOf(integer); } });

我们传入了一个1,然后在使用map()去构造​​Function​​​然后重写了​​apply​​方法,最后在apply方法中,我们就是做类型的装换哒。

以上就是map操作符的用法,看起来是不是比较简单呢?但是如果你想深入一点研究这个map,会发现它还是有点用法的规矩的。

首先map在链式构造里面不等于建造模式,因为它不是返回this,比如:

.map(new Function() { .... }).subscribe(new SingleObserver() { ... });

这是因为使用map后返回的对象已经不同了= =。我们来看看map()方法:

是通过传入的function来转换成一个 ​​SingleMap​​对象。

然后返回的值和之前的 ​​SingleJust​​都不同啦!结构都变了…后面如果使用了subscrible(),那调用的就不是SingleJust的subscribeActual()了,而是调用SingleMap()的subscribeActual()了,而这个方法的具体实现如下:

@Override protected void subscribeActual(final SingleObserver t) { //帮SingleObserver去做网络请求了,所以map在中间相当于做了一个代理 source.subscribe(new MapSingleObserver(t, mapper)); }

实际上,​​SingleMap​​​的​​subscribeActual()​​​方法里面会创建一个​​MapSingleObserver​​,来调用Observer的那三个方法:

有两个参数:

​​SingleObserver​​​ t我们自己重写的SingleObserver(),里面有我们自己定制的onSuccess()、onSubscribe()、onError()​​Funcation​​​ mapper这个是我们在map中重写的​​​apply()方法​​

在往下面看,发现MapSingleObserver也会走那三个方法,而更重要的是在 onSuccess()方法中对结果 value值进行了apply()的处理,然后返回这个结果。

到这里我们就大概知道map所做的事情了,我们可以用下面几幅图来描述一下map所做的事情:

①:当没有使用map的时候,就是调用SingleJust的subscribe()方法。

②:当我们使用了map操作符后

map就是做了个代理,我们使用了map的subscribeActual(),它会帮我们去做后台请求,又能把结果转换目标类型,这样看的话其实很简单。

了解了map这个操作符,我们就来了解一下 ​​Disponsable​​这个对象,但是由于它出现在onSubscribe()里面,所以我们先看一下onSubscribe()这个方法是干什么的吧~

onSubscribe(Disponsable disposable)

我们在 ​​onSubscribe(Disponsable disposable)​​​ 看到里面有个参数:​​Disponsable​​​ 它是做什么的呢?

用来取消订阅 Rxjava可以让订阅者和被订阅者断开,之后的内容不再发送;发消息的人也可以停下来。 可以通过调用Dispinsable的方法来取消后续的发送。比如:

@Override public void onSubscribe(Disposable d) { d.dispose(); Log.d(TAG, "onSubscribe"); }

那它在执行完onSubscribe后,就不会再走 onSuccess() 或者 onError方法了。

那么Disponsable用途是干嘛的捏? 一般是用来防止内存、流量泄露。比如我(被观察者)已经拿完了并且处理完应该要做完的事件,但是这个时候观察者还在后台的不断的申请资源和发送事件,那一到我这个主线程来也会进入onSuccess()方法,我是不知道的,所以会多出很多不必要的东西来占用内存。 so,通过disposable来断开观察者和被观察者的连接。

Ok,Disposable的用处讲完了,那么我们就来从源码的角度分析Disposable吧。

Disposable

首先不同的Observerable,它们的Disponsable都不一样,因为要做的切断的具体操作是不一样。 它就是切断观察者和被观察者的联系。

它是一个接口:

public interface Disposable { void dispose(); boolean isDisposed();}

这两个方法都特别好理解。用在最上层的Observerable 就实现dispose()了。 它在走onNext()前会判断当前自己的disposable()里面的机制会不会让自己停止,如果是的话,那就不会再走onNext()了。

Dispose会从上一直往下传,每一层如果停掉了当前了Disponse,那它也会停掉了上一层的传递。就是这样。

subscribeOn

subscribeOn也是一个操作符,但它操作的不是数据,而是线程。它把数据、请求放到了后台线程去。 所以我们还是要去看它的subscribeActual()做了什么。

@Override protected void subscribeActual(final SingleObserver s) { final SubscribeOnObserver parent = new SubscribeOnObserver(s, source); s.onSubscribe(parent); //通过schedule做线程切换 Disposable f = scheduler.scheduleDirect(parent); parent.task.replace(f); }

​​parent​​是最上层的Observer,并把它包装成了一个Runnable对象。而它重写了run方法就是:

@Override public void run() { //把请求网上调了 source.subscribe(this); }

就相当于:

scheduler还是使用了Executor去进行线程切换,而不直接使用Executor的原因是scheduler的场景在RxJava2更多。

从图中也可以看出,如果多次使用subscribeOn(),则最上层(即第一行)的subscribeOn()才会起作用 而 ​​​Disposable f = scheduler.scheduleDirect(parent)​​ 中的f作为Dsponsable可以被我所控制,如果我们在外边调用了dispose(),则里面也会调用这个f的dispose()。

至于切线程的具体细节,这里就不多讲了。

ObserverOn

研究它和研究SubscribeOn()一样,也是看subscribeActual():

@Override protected void subscribeActual(final SingleObserver s) { source.subscribe(new ObserveOnSingleObserver(s, scheduler)); }

它直接就把数据交往上层了,在数据的过去时没做任何操作:

所以它真正做的事情不在subscribeActual()里面。

因为ObserverOn是掌管数据回来的线程,所以用脚都能想到,它的处理,肯定是在数据回来,也就是Obseverable的onSuccess和onError()中:

@Override public void onSuccess(T value) { this.value = value; //scheduleDirect切换线程 Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); } @Override public void onError(Throwable e) { this.error = e; //scheduleDirect切换线程 Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); } @Override public void run() { Throwable ex = error; if (ex != null) { actual.onError(ex); } else { actual.onSuccess(value); } }

因为onSuccess()中调用了​​scheduleDirect()​​,所以我们要去看run方法的实现。而run方法中,它就是执行了actual的回调方法。

它是这么走的:

所以从图中可以看出,如果多次使用ObserverOn,则最后一个使用ObserverOn()生效。

那我们看到了它们且线程的流程,那我们来看下​​Schedulers​​具体做了什么事情。

Schedulers

.subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.newThread())

​​Schedulers.io()​​​代表I/O线程,其实绝大多数的后台操作都是在这个线程上面做。 而和IO线程差不多的就是 ​​​newThread()线程​​。我们通过前面看到的 scheduleDirect()在做什么:

@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { //Worker就是用Executor来做线程切换 //通过createWorker()来走创建一个Executor final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); w.schedule(new Runnable() { @Override public void run() { try { decoratedRun.run(); } finally { w.dispose(); } } }, delay, unit); return w; }

​​Worker​​​就是用executor来做线程切换,它来走run()方法。io()和newThread()的区别是io是从Executor的线程中启用一个可用的线程来做事情。而newTreahd直接创建一个新线程,这两个其实都差不多。

切回主线程的方法:

.observeOn(AndroidSchedulers.mainThread())

点进去会到下面这里:

没错,看到了Handler,其实这里就是用Handler来做切换线程的。

到这里,其实框架就了解完了。

总结

RxJava就是异步,把请求数据调到想要的线程,把返回的数据返回到前台。

通过​​subscribe()​​​,最终会执行​​subscribeActual(xxObserverable)​​要把整个过程看成链式的,数据的传递、线程的切换会更加清晰操作符map和flatmap都是做返回体类型的转换​​Disponsable​​的作用是切断观察者和被观察者的联系subscribeOn和ObserverOn通过 Schedulers做线程切换,实质是通过Handler或Executor来切换

说实话,我连背压、FlatMap都没有讲,但是了解了RxJava的结构,这些还会难吗。

先这样,下次做RxJava估计是进行更加细致的总结。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:京东就“京东金融短视频广告”致歉:严肃处理相关管理者和责任人!
下一篇:codeforces 415B Mashmokh and Tokens
相关文章

 发表评论

暂时没有评论,来抢沙发吧~