RxJava教程(二):基本概念、数据流创建和线程调度

概述

RxJava 很早就开始接触和使用了,但只是仅仅会一些简单的使用而已,于是打算通过一系列的博客来加深对RxJava的理解。
写这篇文章的时候,RxJava最新版本已经是 2.1.5 了,那么我们就以最新版本为基础来介绍 RxJava 的使用。
使用之前要加入一下依赖:


RxAndroid 是一个 RxJava 扩展库,更好的兼容了 Android 特性,比如主线程,UI事件等。

基本概念

RxJava 官方的解释是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。简要概括一下,它就是一个实现异步操作的库。它的本质体现在异步两个字上面。
RxJava 的异步的实现,是通过一种扩展的观察者模式来实现的,观察者模式相信我们都不陌生。
RxJava 提供众多的操作符以及它的链式操作可以替代深度回调逻辑,可以使代码简短优雅。
想要使用RxJava,我们先来了解一下几个基本概念。

  • Observable/Observer (可观察者,即被观察者/观察者):发射数据流/接收数据流
  • Consumer:它也是一个 Observer,只有一个 accept() 回调
  • subscribe (订阅):建立 Observable 和 Observer 的联系
  • subscribeOn:为 Observable 对数据的处理指定一个调度器
  • observeOn:为下游对数据的操作指定一个调度器
  • Disposable:用于解除订阅以及查询订阅关系是否解除
  • Operators操作符:可以理解为对数据流的操作,包括创建、过滤、变换、组合、聚合等。
  • Flowable/Subscriber:(被观察者/观察者):一种观察者模式组合,支持背压
  • Publisher:Flowable 的父类
  • Subscription:可以通过request发起请求数据,通过cancel取消订阅关系。
  • Single/SingleObserver:一种观察者模式组合
  • Completable/CompletableObserver:一种观察者模式组合
  • Maybe/MaybeObserver:一种观察者模式组合

订阅关系:Observable/Observer是一对,Flowable/Subscriber是一对。

基本用法

正如我们实现一个基本的观察者模式一样,你要创建被观察者和观察者,然后通过订阅事件使他们联系起来。
下面介绍一个RxJava的最基本的实现:

创建Observable


create 方法创建一个 ObservableCreate 对象。
ObservableEmitter 相当于一个事件发射器,每执行一次 onNext(),观察者就会收到一次数据,数据发送完毕后调用 onComplete() 方法。
在事件处理过程中出异常时,触发onError() ,同时队列自动终止,不允许再有事件发出。在一个正确运行的事件序列中, onCompleted()onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

创建Observer


观察者的 onNext() 回调会收到被观察者发送的数据。

subscribe(订阅)


执行后输出:

通过上面三步实现了 RxJava 最简单的用法,其中并没有涉及到线程切换等操作,这些后面再介绍。

创建Observable

请看下文。

创建Observer

RxJava 支持多种不同方式的 Observer 回调。

  • subscribe():忽略 onNext 以及 onComplete 等事件。
  • subscribe(Observer<? super T> observer):以 Observer 为参数。
  • subscribe(Consumer<? super T> onNext):只接受 onNext
  • subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError):接受 onNextonError
  • subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete):接受 onNext onErroronComplete
  • subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe):接受 onNext onErroronComplete,接受参数为 Disposable 的一个回调,用于解除订阅,这中实现就和 Observer 类似了,四个回调。

线程调度

Scheduler(调度器)

在上面的例子中,并没有涉及到线程切换的操作。如果只是这样在一个线程中同步使用还没有将RxJava的优势体现出来。我们在使用过程中会经常遇到这种情况,比如,我们会将网络请求等耗时操作放到后台线程中,将UI操作放到主线程中执行。
RxJava 提供了线程调度的功能,我们可以借助于 Scheduler 来完成。另外 RxAndroid 提供了 AndroidSchedulers 调度器来供开发者使用。
SchedulerAndroidSchedulers 提供了6种线程调度器:

调度器 使用场景
Schedulers.io() 主要用于一些耗时IO操作,比如读写文件,数据库存取,网络交互等。这个调度器具有线程缓存机制,它会根据需要,增加或者减少线程池中的线程数量。需要注意的是Schedulers.io()中的线程池数量是无限制大的,大量的I/0操作将创建许多线程,我们需要在性能和线程数量中做出取舍。
Schedulers.computation() 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
Schedulers.newThread() 开启一个新的线程,不具有线程缓存机制,因为创建一个新的线程比复用一个线程更耗时耗力,因此,Schedulers.newThread( )的效率没有Schedulers.io( )高。
Schedulers.from(Executor executor) 使用指定的 Executor 来作为线程调度器
Schedulers.single() 拥有一个线程单例,所有的任务都在这一个线程中执行。
Schedulers.trampoline() 在当前线程执行一个任务,但不是立即执行,用trampoline()将它加入队列。这个调度器将会处理它的队列并且按程序运行队列中每一个任务。
AndroidSchedulers.mainThread() Android中的主线程执行任务,为Android开发定制。

实现线程调度

实现线程的调度可以通过 subscribeOn()observerOn() 实现。

  • subscribeOn():指定被观察者在哪个调度器上执行,跟调用的位置没有关系。直到遇到observeOn改变线程调度器。
  • observerOn():指定下游观察者对数据的操作运行在哪个调度器上。在调用位置切换线程。

使用时需要注意:

  • subscribeOn() 可以多次调用,但只有第一次的调用会起作用。
  • observerOn() 可以多次调用,每调用一次切换一次线程。

示例1

在这个例子中,我们通过 subscribeOn(Schedulers.io()) 指定被观察者在IO线程中进行图片下载,然后通过 observeOn(AndroidSchedulers.mainThread()) 在主线程中更新UI。


结果:

示例2

这个例子主要来介绍一下线程调度的时机问题,被观察者所在的线程肯定是由 subscribeOn() 来指定,然后就直到遇到 observeOn() 再切换线程,否则就在当前线程执行下去。
看下面一段伪代码:


如果我们不指定线程调度器,被观察者和观察者会在什么线程执行呢?我们通过在前面的例子中添加一些打印信息会发现,它们会默认在当前线程中执行。

doOnSubscribe()

这里再提一个方法 doOnSubscribe(),它是在 subscribe() 调用后而且在事件发送前执行。前面我们说过,有多个 subscribeOn() 来对别观察者指定线程,只会有第一个起作用,但是多个 subscribeOn() 却可以影响 doOnSubscribe() 的执行线程。
先来测试一下我们的结论:

这里通过 subscribeOn 两次指定被观察者执行线程,一个是IO线程,一个指定主线程。
结果:


执行在 IO 线程,是第一次指定生效。

上面例子稍加改动,再来看一下:

结果:


可以看到,subscribeOn 是可以重新指定 doOnSubscribe 的执行线程的。

关于内存泄漏

RxJava的使用不当极有可能会导致内存泄漏。比如,使用RxJava发布一个订阅后,当Activity被finish,此时订阅逻辑还未完成,如果没有及时取消订阅,就会导致Activity无法被回收,从而引发内存泄漏。
解决办法:

  • 手动为 RxJava 的每一次订阅进行控制,在指定的时机进行取消订阅。这个时候,CompositeDisposable 可能会被排上用场。
  • 使用开源框架 RxLifecycle。

Disposable 和 CompositeDisposable

前面说过,RxJava 可能会导致内存泄漏,RxJava 提供了 Disposable 接口来处理这类问题,它提供了两个方法。

  • dispose():主动解除订阅
  • isDisposed():查询是否解除订阅 true 代表 已经解除订阅

我们可以在合适的时候取消订阅,来避免内存泄漏。
onSubscribe(@NonNull Disposable d) 方法会传递一个实现了 Disposable 接口的对象,我们可以把这个对象保存下来,然后在合适的时机调用 dispose() 取消订阅。

再来介绍一下 CompositeDisposable。它是一个 Disposable 的容器,可以容纳多个 Disposable。我们可以使用 CompositeDisposable 来管理订阅可以有效地避免内存泄漏。
如果 CompositeDisposable 容器已经是处于 dispose 的状态,那么所有加进来的 Disposable 都会被自动切断。

下面来给出它的用法:


0

为什么要使用 RxJava,它的好处是什么?

我们为什么要使用RxJava?RxJava有哪些好处?

RxJava最精髓的地方,在于它的思想,它把一切数据看成“流”,数据是流动的,在流动的过程中进行加工,最终把加工好的数据呈现在流的末端。
我们为什么要使用RxJava,其实RxJava解决的还是我们之前解决的问题,其实RxJava的解决过程跟我们之前的解决过程是一样的,只是Rxjava经过了更好的抽象,使得代码之间的耦合性变的更低。
举个例子,在Android中,为了不阻塞UI线程,所有的耗时操作都需要异步执行,一旦有了异步,就会涉及到线程通信,这也就是Android经典的Handler机制了。在异步这个领域内,我们要注意以下几点:

  1. Activity与WorkThread的生命周期的不同步导致的内存泄漏
  2. 不断的new Thread导致进程中有多个Thread,进而增加了Thread对CPU执行时间争抢的难度,有可能导致UI Thread迟迟抢不到CPU,导致UI卡顿。
  3. 使用Handler进行Thread间通信时,要注意对Message的obtain方式,防止Message浪费内存
  4. Activity与Handler生命周期不一致导致的内存泄漏
  5. 要完成一件事,得在WorkThread与UI Thread之间不停的切换,而带来的成本就是代码不停的上下翻动。

在RxJava中,线程间的通信只是一个很小的配角,RxJava把所有的操作都看成时对数据的加工,而UI Thread 与WorkThread只是“数据加工厂”需要的环境。当数据留到这个“数据加工厂”时,只需要指定它所需要的环境即可。
代码演示如下:

如果不算注释,整个的代码量是很小的。这其中包含了之前的handler机制,把各个方法块儿通过“.”这个符号联系了起来,并且指明了方法块儿与方法块儿之间的关系属性。
开发中很大的一块就是程序的升级,需求变更,RxJava很好的把完成一件事情的所有步骤都解藕,使得代码之间是可以插拔的。如果新需求进来时,只需要在合适的地方添加新的“数据加工厂”即可,或者去到相应的“数据加工厂”,而且完成这件事情的整个过程是很清晰的。

RxJava的流程:

生产数据源(携带原始数据)-(数据类型)-> 数据加工厂1(对数据进行加工完之后,进行包装,重新发送到流中) – (新数据类型) -> 数据加工厂2(再次对数据进行加工,然后发出去) -> (新数据类型) -> 数据最终接收工厂

只要记住总流程,就知道RxJava怎么用了。

0

RxJava教程(一):响应式编程介绍

简介

响应式编程是一种以异步数据流为核心的编程方式。这里的数据一般是一些事件,而流则是在时间序列上的一系列的事件。任何东西都可以转化为数据流,如变量、用户输入事件、数据结构等。

我们可以很灵活地操纵数据流,如可以将两个甚至多个数据流融合成一个数据流,可以从数据流中过滤出感兴趣的事件,还可以将数据流中的事件转化为其他新的事件。

数据流中的事件通常可以分成三种类型:普通事件、错误事件和结束事件。以用户的键盘输入事件为例,当用户依次敲击“A”、“B”、“C”键的时候,就会产生三个输入事件,计算机接收到这些事件并对其做出响应一一将字母“ A”、“B ”、“ C”显示在显示器上。当用户敲击回车键时,可以将其作为一个结束事件来表示数据流的结束,即用户输入结束。而在输入过程中发生的任何错误都可以作为数据流中的错误事件。

继续阅读“RxJava教程(一):响应式编程介绍”
0

使用Retrofit + RxJava 实现优雅的处理服务器返回异常、错误

异常&错误

实际开发经常有这种情况,比如登录请求,接口返回的

信息包括请求返回的状态:失败还是成功,错误码,User对象等等。如果网络等原因引起的登录失败可以归结为异常,如果是用户信息输入错误导致的登录失败算是错误。

假如服务器返回的是统一数据格式:

/**

* 标准数据格式

* @param <T>

*/

public class Response<T> {

public int state;

public String message;

public T data;

}

网络异常导致的登录失败,在使用Retrofit+RxJava请求时都会直接调用subscribe的onError事件;

密码错误导致的登录失败,在使用Retrofit+RxJava请求时都会调用subscribe的onNext事件;

无论是异常还是错误,都要在subscribe里面处理异常信息,如下代码:

APIWrapper.getInstance().login(“username”, “password”)

.subscribe(new Observer<Response<User>>() {

@Override

public void onCompleted() {

}

@Override

public void onError(Throwable e) {

}

@Override

public void onNext(Response<User> data) {

if(data.state == 1001){

//…..

}else if(data.state == 1002){

}

}

});

现在我希望在发生任何错误的情况下,都会调用onError事件,并且由model来处理错误信息。那么,此时我们就应该有一个ExceptionEngine来处理事件流中的错误信息了。

在工作流中处理异常

在正常情况下,我们获取网络数据的流程通常如下:

请求接口->解析数据->更新UI

整个数据请求过程都是发生在Rx中的工作流之中。当有异常产生的时候,我们要尽量不在ui层里面进行判断,换句话说,我们没有必要去告诉ui层具体的错误信息,只需要让他弹出一个信息(Toast或者Dialog)展示我们给它的信息就行。

请求接口和数据解析都可能出错,所以在这两层进行错误处理。为了更好的解耦,我们通过拦截器拦截错误,然后根据错误类型分发信息。

拦截器

数据解析层的拦截器

这个拦截器主要是为了获取具体的错误信息,分发给上层的UI,给用户以提示,增强用户体验。

public Observable<Weather> getWeather(String cityName){

return weatherService.getWeather(cityName)

//拦截服务器返回的错误

.map(new ServerResponseFunc<Weather>())

//HttpResultFunc()为拦截onError事件的拦截器,后面会讲到,这里先忽略

.onErrorResumeNext(new HttpResponseFunc<Weather>());

}

//拦截固定格式的公共数据类型Response<T>,判断里面的状态码

private class ServerResponseFunc<T> implements Func1<Response<T>, T> {

@Override

public T call(Response<T> reponse) {

//对返回码进行判断,如果不是0,则证明服务器端返回错误信息了,便根据跟服务器约定好的错误码去解析异常

if (reponse.state != 0) {

//如果服务器端有错误信息返回,那么抛出异常,让下面的方法去捕获异常做统一处理

throw new ServerException(reponse.state,reponse.message);

}

//服务器请求数据成功,返回里面的数据实体

return reponse.data;

}

}

所以整个逻辑是这样的:

所以在前三步的过程中,只要发生异常(服务器返回的错误也抛出了)都会抛出,这时候就触发了RxJava的OnError事件。

处理onError事件的拦截器

这个拦截器主要是将异常信息转化为用户”能看懂”的友好提示。

private class HttpResponseFunc<T> implements Func1<Throwable, Observable<T>> {

@Override

public Observable<T> call(Throwable throwable) {

//ExceptionEngine为处理异常的驱动器

return Observable.error(ExceptionEngine.handleException(throwable));

}

}

两个拦截器以前使用,代码如下:

public Observable<Weather> getWeather(String cityName){

return weatherService.getWeather(cityName)

//拦截服务器返回的错误

.map(new ServerResponseFunc<Weather>())

//HttpResponseFunc()为拦截onError事件的拦截器

.onErrorResumeNext(new HttpResponseFunc<Weather>());

}

调用:

APIWrapper.getInstance().getWeather(“北京”)

.subscribe(new SampleProgressObserver<Weather>(MainActivity.this) {

@Override

public void onNext(WeatherBean weatherBean) {

tv.setText(weatherBean.toString());

}

});

相关类:

public class RxSubscriber<T> extends ErrorSubscriber<T> {

@Override

public void onStart() {

super.onStart();

DialogHelper.showProgressDlg(context, “正在加载数据”);

}

@Override

public void onCompleted() {

DialogHelper.stopProgressDlg();

}

@Override

protected void onError(ApiException ex) {

DialogHelper.stopProgressDlg();

Toast.makeText(context, ex.message, Toast.LENGTH_SHORT).show();

}

@Override

public void onNext(T t) {

}

}

public abstract class ErrorSubscriber<T> extends Observer<T> {

@Override

public void onError(Throwable e) {

if(e instanceof ApiException){

onError((ApiException)e);

}else{

onError(new ApiException(e,123));

}

}

/**

* 错误回调

*/

protected abstract void onError(ApiException ex);

}

处理异常的驱动器

package com.v791202;

import android.net.ParseException;

import com.google.gson.JsonParseException;

import org.json.JSONException;

import java.net.ConnectException;

import retrofit2.adapter.rxjava.HttpException;

/**

* Created by 791202.com

*/

public class ExceptionEngine {

//对应HTTP的状态码

private static final int UNAUTHORIZED = 401;

private static final int FORBIDDEN = 403;

private static final int NOT_FOUND = 404;

private static final int REQUEST_TIMEOUT = 408;

private static final int INTERNAL_SERVER_ERROR = 500;

private static final int BAD_GATEWAY = 502;

private static final int SERVICE_UNAVAILABLE = 503;

private static final int GATEWAY_TIMEOUT = 504;

public static ApiException handleException(Throwable e){

ApiException ex;

if (e instanceof HttpException){ //HTTP错误

HttpException httpException = (HttpException) e;

ex = new ApiException(e, ERROR.HTTP_ERROR);

switch(httpException.code()){

case UNAUTHORIZED:

case FORBIDDEN:

case NOT_FOUND:

case REQUEST_TIMEOUT:

case GATEWAY_TIMEOUT:

case INTERNAL_SERVER_ERROR:

case BAD_GATEWAY:

case SERVICE_UNAVAILABLE:

default:

ex.message = “网络错误”; //均视为网络错误

break;

}

return ex;

} else if (e instanceof ServerException){ //服务器返回的错误

ServerException resultException = (ServerException) e;

ex = new ApiException(resultException, resultException.code);

ex.message = resultException.message;

return ex;

} else if (e instanceof JsonParseException

|| e instanceof JSONException

|| e instanceof ParseException){

ex = new ApiException(e, ERROR.PARSE_ERROR);

ex.message = “解析错误”; //均视为解析错误

return ex;

}else if(e instanceof ConnectException){

ex = new ApiException(e, ERROR.NETWORD_ERROR);

ex.message = “连接失败”; //均视为网络错误

return ex;

}else {

ex = new ApiException(e, ERROR.UNKNOWN);

ex.message = “未知错误”; //未知错误

return ex;

}

}

}

/**

* 约定异常

*/

public class ERROR {

/**

* 未知错误

*/

public static final int UNKNOWN = 1000;

/**

* 解析错误

*/

public static final int PARSE_ERROR = 1001;

/**

* 网络错误

*/

public static final int NETWORD_ERROR = 1002;

/**

* 协议出错

*/

public static final int HTTP_ERROR = 1003;

}

public class ApiException extends Exception {

public int code;

public String message;

public ApiException(Throwable throwable, int code) {

super(throwable);

this.code = code;

}

}

public class ServerException extends RuntimeException {

public int code;

public String message;

}

DialogHelper.Java

public class DialogHelper {

/**

* 通用Dialog

*

*/

// 因为本类不是activity所以通过继承接口的方法获取到点击的事件

public interface OnOkClickListener {

abstract void onOkClick();

}

/**

* Listener

*/

public interface OnCancelClickListener {

abstract void onCancelClick();

}

private static AlertDialog mDialog;

public static void showDialog(Context context, String title, String content, final OnOkClickListener listenerYes,

final OnCancelClickListener listenerNo) {

showDialog(context, context.getString(android.R.string.ok), context.getString(android.R.string.cancel), title, content, listenerYes, listenerNo);

}

public static void showDialog(Context context, String ok, String cancel, String title, String content, final OnOkClickListener listenerYes,

final OnCancelClickListener listenerNo) {

AlertDialog.Builder builder = new AlertDialog.Builder(context);

builder.setMessage(content);

// 设置title

builder.setTitle(title);

// 设置确定按钮,固定用法声明一个按钮用这个setPositiveButton

builder.setPositiveButton(ok,

new DialogInterface.OnClickListener() {

public void onClick(DialogInterface dialog, int which) {

// 如果确定被电击

if (listenerYes != null) {

listenerYes.onOkClick();

}

mDialog = null;

}

});

// 设置取消按钮,固定用法声明第二个按钮要用setNegativeButton

builder.setNegativeButton(cancel,

new DialogInterface.OnClickListener() {

public void onClick(DialogInterface dialog, int which) {

// 如果取消被点击

if (listenerNo != null) {

listenerNo.onCancelClick();

}

mDialog = null;

}

});

// 控制这个dialog可不可以按返回键,true为可以,false为不可以

builder.setCancelable(false);

// 显示dialog

mDialog = builder.create();

if (!mDialog.isShowing())

mDialog.show();

}

public static void showDialog(Context context, int ok, int cancel, int title, int content, final OnOkClickListener listenerYes,

final OnCancelClickListener listenerNo) {

showDialog(context, context.getString(ok), context.getString(cancel), context.getString(title), context.getString(content), listenerYes, listenerNo);

}

static ProgressDialog progressDlg = null;

/**

* 启动进度条

*

* @param strMessage 进度条显示的信息

* @param // 当前的activity

*/

public static void showProgressDlg(Context ctx, String strMessage) {

if (null == progressDlg) {

if (ctx == null) return;

progressDlg = new ProgressDialog(ctx);

//设置进度条样式

progressDlg.setProgressStyle(ProgressDialog.STYLE_SPINNER);

//提示的消息

progressDlg.setMessage(strMessage);

progressDlg.setIndeterminate(false);

progressDlg.setCancelable(true);

progressDlg.show();

}

}

public static void showProgressDlg(Context ctx) {

showProgressDlg(ctx, “”);

}

/**

* 结束进度条

*/

public static void stopProgressDlg() {

if (null != progressDlg && progressDlg.isShowing()) {

progressDlg.dismiss();

progressDlg = null;

}

if (null != dialog && dialog.isShowing()) {

dialog.dismiss();

dialog = null;

}

}

private static Dialog dialog;

public static void showDialogForLoading(Context context, String msg, boolean cancelable) {

if (null == dialog) {

if (null == context) return;

View view = LayoutInflater.from(context).inflate(R.layout.layout_loading_dialog, null);

TextView loadingText = (TextView)view.findViewById(R.id.loading_tip_text);

loadingText.setText(msg);

dialog = new Dialog(context, R.style.loading_dialog_style);

dialog.setCancelable(cancelable);

dialog.setCanceledOnTouchOutside(cancelable);

dialog.setContentView(vwww.791202.comiew, new LinearLayout.LayoutParams(LinearLayout.LayoutParams.MATCH_PARENT, LinearLayout.LayoutParams.MATCH_PARENT));

Activity activity = (Activity) context;

if (activity.isFinishing()) return;

dialog.show();

}

}

}

可能本博客也不是最好的解决方案,如果有更好的想法,我愿与你互相交流!

分享: Retrofit+RxJava错误预处理

看到bobo_wang的文章,不仅感觉有受益匪浅,这里做下介绍。

首先定义如下Transformer转换器。

public static <T> Observable.Transformer<Response<T>, T> sTransformer() {

return responseObservable -> responseObservable.map(tResponse -> {

if (!tResponse.success) throw new RuntimeException(tResponse.code);

return tResponse.dwww.791202.comata;

}).onErrorResumeNext(new HttpResponseFunc<>());

}

public static <T> Observable.Transformer<T, T> switchSchedulers() {

return observable -> observable.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread());

}

private static class HttpResponseFunc<T> implements Func1<Throwable, Observable<T>> {

@Override public Observable<T> call(Throwable throwable) {

//ExceptionEngine为处理异常的驱动器

return Observable.error(new Throwable(throwable));

}

}

调用:

public void login(View v){

apiservice.login(name,pwd)

.compose(Transformers.sTransformer())

.compose(Transformers.switchSchedulers())

.subscribe(subscriber);

}

private Subscriber<UserModel> subscriber = new Subscriber<UserModel>() {

@Override public void onCompleted() {

// do onCompleted

}

@Override public void onError(Throwable e) {

// do on success != true;

// do on http error

// do on other error

}

@Override public void onNext(UserModel model) {

// parse data

}

};

接口:

@FormUrlEncoded @POST(“interface?login”)

Observable<Response<UserModel>> login(@Field(“name”) String name,@Field(“pwd”) String pwd);

最后再来点干货。

Transformer 和 Func 处理的区别

如上的处理,定义了 一个 sTransformer 和一个 HttpResponseFunc,

从中可以明显感觉的到sTransformer其实也是可以用Func1来定义的,

public void login(View v){

apiservice.login(name,pwd)

.compose(Transformers.switchSchedulers())

.map(new TransFuc<UserModel>())

.onErrorReturn(new HttpResponseFunc<>())

.subscribe(subscriber);

}

public static class TransFuc<T> implements Func1<Response<T>, T> {

@Override public T call(Response<T> tResponse) {

if (!tResponse.success) throw new RuntimeException(tResponse.code);

return tResponse.data;

}

}

Transformer作用于整个流,Func1是一个操作符,作用于数据项。

不规范数据的处理

有时候服务器返回的数据并不是十分规范的,比如

正常返回是这样的

{

“success”: true, // 是否成功

“status”: “1”, // 状态码

“data”: {

// 内容

}

}

错误时时这样的

{

“success”: false, // 是否成功

“status”: “0”, // 状态码

“data”: “371” //错误码

}

这时候如果我么用泛型处理

public class Response<T> {

public boolean success;

public String status;

public T data;

}

针对这种数据,我们的泛型该怎么写成了问题,错误的时候是String,正确的时候是Bean?

如果我们直接写成JavaBean,那么我们会得到一个错误,LinkedTreeMap cannot be cast to xxx

类型转换错误.这时候只能将泛型写成String来处理了,使用如下Transformer

public static Observable.Transformer<String, UserModel> trans() {

return stringObservable -> stringObservable.map(s -> {

Response parseJson = GsonUtil.parseJson(s, Response.class);

if (null == parseJson) {

throw new RuntimeException(“null == parseJson”);

}

if (PatternsUtil.isNum(parseJson.data.toString())) {

throw new RuntimeException(parseJson.data.toString());

}

return GsonUtil.parseJson(s, UserModel.class);

}).onErrorResumeNext(new HttpResponseFunc<>());

}

使用就变成了如下这样

public void login(View v){

apiservice.login(name,pwd)

.compose(Transformers.switchSchedulers())

.compose(Transformers.trans())

.subscribe(subscriber);

}

封装的越来越简介了,用到实际项目吧!

0