Flow的基本使用

创建

  • flow builder

    1
    2
    3
    4
    5
    6
    7
    8
    9
    flow {
    for (i in 1..5) {
    delay(100)
    emit(i)
    }
    }.collect{
    println(it)
    }

  • flowOf()

    1
    2
    3
    4
    5
    6
    7
    flowOf(1,2,3,4,5)
    .onEach {
    delay(100)
    }
    .collect{
    println(it)
    }
  • asFlow

    1
    2
    3
    4
    5
    6
    listOf(1, 2, 3, 4, 5).asFlow()
    .onEach {
    delay(100)
    }.collect {
    println(it)
    }
  • channelFlow

    1
    2
    3
    4
    5
    6
    7
    8
    channelFlow {
    for (i in 1..5) {
    delay(100)
    send(i)
    }
    }.collect{
    println(it)
    }

    其中 chanelFlow是HotStream,实现了生产者和消费者异步非阻塞模型
    flow是Code Stream在没有切换线程情况下,生产者和消费者是同步非阻塞的

切换线程

相比于 RxJava 需要使用 observeOn、subscribeOn 来切换线程,flow 会更加简单。只需使用 flowOn,下面的例子中,展示了 flow builder 和 map 操作符都会受到 flowOn 的影响

  • flowOn可以将执行此流的上下文更改为指定的上下文。
  • flowOn可以进行组合使用。
  • flowOn只影响前面没有自己上下文的操作符。已经有上下文的操作符不受后面flowOn影响。
  • 不管flowOn如何切换线程,collect始终是运行在调用它的协程调度器上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
fun test() {
lifecycleScope.launch {
flow {
for (i in 1..3) {
Log.d(TAG, "flow :${ currentCoroutineContext()}")
delay(100)
emit(i)
}
}.flowOn(Dispatchers.IO)
.map {
Log.d(TAG, "map :${ currentCoroutineContext()}")
it
}.flowOn(Dispatchers.Default)
.collect { value ->
Log.d(TAG, "collect:${ currentCoroutineContext()} value :${value}")
}
}
}
//打印
D/carman: flow :[ProducerCoroutine{Active}@78b0fe4, Dispatchers.IO]
D/carman: flow :[ProducerCoroutine{Active}@78b0fe4, Dispatchers.IO]
D/carman: map :[ScopeCoroutine{Active}@cc43a14, Dispatchers.Default]
D/carman: collect:[ScopeCoroutine{Active}@8b702bd, Dispatchers.Main.immediate] value :1
D/carman: flow :[ProducerCoroutine{Active}@78b0fe4, Dispatchers.IO]
D/carman: map :[ScopeCoroutine{Active}@cc43a14, Dispatchers.Default]
D/carman: collect:[ScopeCoroutine{Active}@8b702bd, Dispatchers.Main.immediate] value :2
D/carman: map :[ScopeCoroutine{Active}@cc43a14, Dispatchers.Default]
D/carman: collect:[ScopeCoroutine{Active}@8b702bd, Dispatchers.Main.immediate] value :3

取消

我们知道Flow的执行是依赖于collect的,而它又必须在协程当中调用,因此取消Flow的主要依赖于collect所在的协程的状态。所以取消Flow只需要取消它所在的协程即可

1
2
3
4
5
6
7
8
9
10
11
fun test() {
val job = lifecycleScope.launch {
flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}.collect { value -> Log.d(TAG, "value :${value}") }
}
job.cancel()
}

或者通过操作符withTimeOutOrNull来取消

1
2
3
4
5
6
7
8
9
10
11
12
MainScope().launch {
withTimeoutOrNull(2500) {
flow {
for (i in 1..5) {
delay(1000)
emit(i)
}
}.collect {
Log.d("xys", "Flow: $it")
}
}
}

Flow Lifecycle

RxJava的do操作符能够监听Observable的生命周期的各个阶段,Flow只有onStart、onCompletion来监听Flow的创建和结束

  • onStart:在上游流启动之前被调用。
  • onEach:在上游流的每个值被下游发出之前调用。
  • onCompletion:在流程完成或取消后调用,并将取消异常或失败作为操作的原因参数传递。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    fun test() {
    lifecycleScope.launch {
    flow {
    Log.d(TAG, "flow")
    emit(1)
    }.onStart {
    Log.d(TAG, "onStart ")
    }.onEach {
    Log.d(TAG, "onEach :${it}")
    }.onCompletion {
    Log.d(TAG, "onCompletion")
    }.collect { value ->
    Log.d(TAG, "collect :${value}")
    }
    }
    }
    //打印
    D/carman: onStart
    D/carman: flow
    D/carman: onEach :1
    D/carman: collect :1
    D/carman: onCompletion
    可以看到整个执行流程依次是onStart->flow{ …}->onEach->collect->onCompletion。

例举他们的使用场景: 比如,在 Android 开发中使用 Flow 创建网络请求时,通过 onStart 操作符调用 loading 动画以及网络请求结束后通过 onCompletion 操作符取消动画。
再比如,在借助这些操作符做一些日志的打印

1
2
3
4
5
6
7
8
9
10
fun <T> Flow<T>.log(opName: String) = onStart {
println("Loading $opName")
}.onEach {
println("Loaded $opName : $it")
}.onCompletion { maybeErr ->
maybeErr?.let {
println("Error $opName: $it")
} ?: println("Completed $opName")
}

对比

Flow Vs Sequences

Flow 跟 Sequences 之间的区别是 Flow 不会阻塞主线程的运行,而 Sequences 会阻塞主线程的运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fun main() = runBlocking {

launch {
for (j in 1..5) {
delay(100)
println("I'm not blocked $j")
}
}

flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect { println(it) }

println("Done")
}
//执行结果
1
I'm not blocked 1
2
I'm not blocked 2
3
I'm not blocked 3
4
I'm not blocked 4
5
Done
I'm not blocked 5

使用sequence

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fun main() = runBlocking {

launch {
for (k in 1..5) {
delay(100)
println("I'm blocked $k")
}
}

sequence {
for (i in 1..5) {
Thread.sleep(100)
yield(i)
}
}.forEach { println(it) }

println("Done")
}
//执行结果
1
2
3
4
5
Done
I'm blocked 1
I'm blocked 2
I'm blocked 3
I'm blocked 4
I'm blocked 5

由此,可以得出 Flow 在使用各个 suspend 函数时(本例子中使用了collect、emit函数)不会阻塞主线程的运行。

Completion

Flow完成时(正常或出现异常),如果执行一个操作,可以通过 一下2种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// try …… finallyn
fun main() = runBlocking {
try {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect { println(it) }
} finally {
println("Done")
}
}

//onCompletion
//但是 onCompletion 不能捕获异常,只能用于判断是否有异常。
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.onCompletion { println("Done") }
.collect { println(it) }
}

//onCompleted 只能有正常结束

Backpressure

RxJava2 Flowable 支持的 Backpressure 策略,包括:

  • MISSING:创建的 Flowable 没有指定背压策略,不会对通过 OnNext 发射的数据做缓存或丢弃处理。
  • ERROR:如果放入 Flowable 的异步缓存池中的数据超限了,则会抛出 MissingBackpressureException 异常。
  • BUFFER:Flowable 的异步缓存池同 Observable 的一样,没有固定大小,可以无限制添加数据,不会抛出 MissingBackpressureException 异常,但会导致 OOM。
  • DROP:如果 Flowable 的异步缓存池满了,会丢掉将要放入缓存池中的数据。
  • LATEST:如果缓存池满了,会丢掉将要放入缓存池中的数据。这一点跟 DROP 策略一样,不同的是,不管缓存池的状态如何,LATEST 策略会将最后一条数据强行放入缓存池中。

而 Flow 的 Backpressure 是通过 suspend 函数实现。

buffer() 对应 BUFFER策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
fun currTime() = System.currentTimeMillis()

var start: Long = 0

fun main() = runBlocking {

val time = measureTimeMillis {
(1..5)
.asFlow()
.onStart { start = currTime() }
.onEach {
delay(100)
println("Emit $it (${currTime() - start}ms) ")
}
.buffer()
.collect {
println("Collect $it starts (${currTime() - start}ms) ")
delay(500)
println("Collect $it ends (${currTime() - start}ms) ")
}
}

println("Cost $time ms")
}
//执行结果
Emit 1 (104ms)
Collect 1 starts (108ms)
Emit 2 (207ms)
Emit 3 (309ms)
Emit 4 (411ms)
Emit 5 (513ms)
Collect 1 ends (613ms)
Collect 2 starts (613ms)
Collect 2 ends (1114ms)
Collect 3 starts (1114ms)
Collect 3 ends (1615ms)
Collect 4 starts (1615ms)
Collect 4 ends (2118ms)
Collect 5 starts (2118ms)
Collect 5 ends (2622ms)
Collected in 2689 ms

conflate() 对应 LATEST 策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fun main() = runBlocking {

val time = measureTimeMillis {
(1..5)
.asFlow()
.onStart { start = currTime() }
.onEach {
delay(100)
println("Emit $it (${currTime() - start}ms) ")
}
.conflate()
.collect {
println("Collect $it starts (${currTime() - start}ms) ")
delay(500)
println("Collect $it ends (${currTime() - start}ms) ")
}
}

println("Cost $time ms")
}
//执行结果
Emit 1 (106ms)
Collect 1 starts (110ms)
Emit 2 (213ms)
Emit 3 (314ms)
Emit 4 (419ms)
Emit 5 (520ms)
Collect 1 ends (613ms)
Collect 5 starts (613ms)
Collect 5 ends (1113ms)
Cost 1162 ms

异常处理

可以使用传统的try……catch

catch操作符

但是 onCompletion 不能捕获异常,只能用于判断是否有异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
fun test() {
lifecycleScope.launch {
flow {
Log.d(TAG, "flow")
emit(1)
}.onStart {
Log.d(TAG, "onStart ")
}.onEach {
Log.d(TAG, "onEach :${it}")
throw NullPointerException("空指针")
}.onCompletion { cause ->
Log.d(TAG, "onCompletion catch $cause")
}.collect { value ->
Log.d(TAG, "collect :${value}")
}
}
}
//打印
D/carman: onStart
D/carman: flow
D/carman: onEach 1
D/carman: onCompletion catch java.lang.NullPointerException: 空指针
Process: com.example.myapplication, PID: 31145
java.lang.NullPointerException: 空指针
...
...

可以看到在onEach中抛出一个异常后,因为异常导致协程退出,所以collect没有执行,但是执行了onCompletion
onCompletion不应该是在collect后执行吗?为什么没有执行collect,反而执行了onCompletion。这个时候我们需要看下源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow {
try {
collect(this)
} catch (e: Throwable) {
ThrowingCollector(e).invokeSafely(action, e)
throw e
}
val sc = SafeCollector(this, currentCoroutineContext())
try {
sc.action(null)
} finally {
sc.releaseIntercepted()
}
}

可以看到在onCompletion中,通过try/catch 块来捕获了collect方法,然后在catch分支里。通过invokeSafely执行了onCompletion中的代码,然后重新抛出异常。既然onCompletion又重新抛出了异常,那我们又该通过什么方式合理的处理这个异常呢?

try catch操作符可以捕获来自上游的异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fun main() = runBlocking {
flow {
emit(1)
throw RuntimeException()
}
.onCompletion { cause ->
if (cause != null)
println("Flow completed exceptionally")
else
println("Done")
}
.catch{ println("catch exception") }
.collect { println(it) }
}
//执行结果
1
Flow completed exceptionally
catch exception

虽然我们同样的可以使用try/catch来处理异常,但是这种写法是不是看上去没有那么优雅。而且出现异常后,无法再继续往下执行,此时我们可以使用catch操作符

catch操作符的代码块可以分析异常并根据捕获到的异常以不同的方式对其做出反应:

  • 可以使用 throw 重新抛出异常。
  • 可以在catch代码块中通过emit将异常转换为新的值发射出去。
  • 可以将异常忽略,或用日志打印,或使用一些其他代码处理它。
  • 每个操作符只是针对它上游的流,如果下游的流中出现异常,我们需要再次添加一个catch操作符才能正常捕获。

    catch 操作符用于实现异常透明化处理。例如在 catch 操作符内,可以使用 throw 再次抛出异常、可以使用 emit() 转换为发射值、可以用于打印或者其他业务逻辑的处理等等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
fun test() {
lifecycleScope.launch {
flow {
Log.d(TAG, "flow")
emit(1)
throw NullPointerException("空指针")
}.onStart {
Log.d(TAG, "onStart ")
}.onEach {
Log.d(TAG, "onEach ")
}.catch { cause ->
Log.d(TAG, "catch $cause")
emit(2)
}.onCompletion { cause ->
Log.d(TAG, "onCompletion catch $cause")
}.collect { value ->
Log.d(TAG, "collect :${value}")
}
}
}
//打印结果
D/carman: onStart
D/carman: flow
D/carman: onEach 1
D/carman: catch java.lang.NullPointerException: 空指针
D/carman: collect :2
D/carman: onCompletion catch null



fun main() = runBlocking {
flow {
emit(1)
throw RuntimeException()
}
.catch{ println("catch exception") }
.onCompletion { cause ->
if (cause != null)
println("Flow completed exceptionally")
else
println("Done")
}
.collect { println(it) }
}
//执行结果
1
catch exception
Done

此时我们增加一个map操作符通过它再抛出一个新的异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
fun test() {
lifecycleScope.launch {
flow {
Log.d(TAG, "flow")
emit(1)
}.onStart {
Log.d(TAG, "onStart ")
}.onEach {
Log.d(TAG, "onEach $it")
throw NullPointerException("空指针")
}.catch { cause ->
Log.d(TAG, "catch $cause")
emit(2)
}.map {
Log.d(TAG, "map")
throw NullPointerException("新的异常")
it
}.onCompletion { cause ->
Log.d(TAG, "onCompletion2 catch $cause")
}.collect { value ->
Log.d(TAG, "collect :${value}")
}
}
}
D/carman: onStart
D/carman: flow
D/carman: onEach 1
D/carman: catch java.lang.NullPointerException: 空指针
D/carman: map
D/carman: onCompletion2 catch java.lang.NullPointerException: 新的异常
Process: com.example.myapplication, PID: 32168
java.lang.NullPointerException: 新的异常
...
...

程序直接崩溃了。这又是什么情况。这是因为每个操作符只是针对它上游的流,如果下游的流中出现异常,我们需要再次添加一个catch操作符才能正常捕获。
但是如果我们的异常是在collect末端操作符中出现,这个时候我们就只能通过try/catch整个Flow数据流或来处理,或者通过协程上下文中的CoroutineExceptionHandler来处理

retry、retryWhen操作符

像 RxJava 一样,Flow 也有重试的操作符。
如果上游遇到了异常,并使用了 retry 操作符,则 retry 会让 Flow 最多重试 retries 指定的次数。

1
2
3
4
5
6
7
public fun <T> Flow<T>.retry(
retries: Long = Long.MAX_VALUE,
predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
require(retries > 0) { "Expected positive amount of retries, but had $retries" }
return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
}

retry 操作符最终调用的是 retryWhen 操作符。下面的代码跟刚才的执行结果一致:

1
2
3
4
5
6
7
8
9
10
11
12
fun main() = runBlocking {

(1..5).asFlow().onEach {
if (it == 3) throw RuntimeException("Error on $it")
}
.onEach { println("Emitting $it") }
.retryWhen { cause, attempt ->
attempt < 2
}
.catch { it.printStackTrace() }
.collect()
}

线程操作

Flow 只需使用 flowOn 操作符,而不必像 RxJava 需要去深入理解 observeOn、subscribeOn 之间的区别

flowOn VS observeOn

RxJava 的 observeOn 操作符,接收一个 Scheduler 参数,用来指定下游操作运行在特定的线程调度器 Scheduler 上。
Flow 的 flowOn 操作符,接收一个 CoroutineContext 参数,影响的是上游的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
val customerDispatcher = Executors.newFixedThreadPool(5).asCoroutineDispatcher()

fun main() = runBlocking {

flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.map {
it+1
}
.flowOn(customerDispatcher)
.collect {
println("${Thread.currentThread().name}: $it")
}
}

flow builder 和两个 map 操作符都会受到两个flowOn的影响,其中 flow builder 和第一个 map 操作符跟上面的例子一样,第二个 map 操作符会切换到指定的 customerDispatcher 线程池。flowOn设置的调度只对它之前的操作有影响

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//msg parse to IWSMessage instance
Observable.just(mResponseParsers)
.observeOn(Schedulers.io())
.map { parsers ->
var message: IWSMessage? = null
for(parser in parsers) {
val parseResult = parser.parse(messageType, msg.payload, msg.payloadEncoding)
if (parseResult != null) {
message = parseResult
break
}
}
message ?: EMPTY_MESSAGE
}.subscribeOn(AndroidSchedulers.mainThread())
.subscribe { message ->
message ?: return@subscribe
dispatchMessage(message, messageType)
}

subscribeOn和observeOn

RxJava observeOn()与subscribeOn()的关系
subscribeOn指示源Observable从哪个线程发出数据,这个线程将把数据流一直推到我们的Obsever
observerOn 指定下游运算符所在的线程
subscribeOn

  • subscribeOn改变了observable本身产生事件的schedule以及发出事件后相关处理事件的程序所在的scheduler(比如说产生observable事件是一件费时可能会卡主线程的操作(比如说获取网络数据),那么subscribeOn就是你的选择,这样可以避免卡住主线程)
  • 一次性
  • 改变的是订阅的线程,即call()执行的线程

observeOn

  • obseverOn仅仅是改变了对发出事件后相关处理事件的程序所在的scheduler
  • 可以多次调用
  • 改变的是发送的线程 即onNext执行的线程

调用顺序:
在调用subscribe之后,逻辑开始运行。分别调用每一步OnSubscribe.call(),注意:自下往上。当运行到最上,即Observable.create()后,我们在其中调用了subscriber.onNext(),于是程序开始自上往下执行每一个对象的subscriber.onNext()方法。最终,直到subscribe()中的回调。
其次:
从上面对subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()是在onNext()中作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable.create<Int> {
(1..3).forEach { e ->
HLogger.tag("RXJAVA").d("${Thread.currentThread().name}: $e")
it.onNext(e)
}
it.onComplete()
}.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe {
HLogger.tag("RXJAVA").d("${Thread.currentThread().name}: $it")
}
//打印
D/H.RXJAVA: H_IO-0: 1
D/H.RXJAVA: H_IO-0: 2
D/H.RXJAVA: H_IO-0: 3
D/H.RXJAVA: H_CPU-6: 1
D/H.RXJAVA: H_CPU-6: 2
D/H.RXJAVA: H_CPU-6: 3

交换.observeOn(Schedulers.computation()).subscribeOn(Schedulers.io()) 表现不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.create<Int> {
(1..3).forEach { e ->
HLogger.tag("RXJAVA").d("${Thread.currentThread().name}: $e")
it.onNext(e)
}
it.onComplete()
}.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())

.subscribe {
HLogger.tag("RXJAVA").d("${Thread.currentThread().name}: $it")
}
//打印
D/H.RXJAVA: H_IO-0: 1
D/H.RXJAVA: H_IO-0: 2
D/H.RXJAVA: H_IO-0: 3
D/H.RXJAVA: H_CPU-6: 1
D/H.RXJAVA: H_CPU-6: 2
D/H.RXJAVA: H_CPU-6: 3

Flow常用操作符

转换操作符

map

map操作符进行数据转换操作,同类型操作符mapNotNull、mapLatest,它会过滤掉空值,只发射不为空的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
fun test() {
lifecycleScope.launch {
flow {
emit(1)
}.map {
Log.d(TAG, "第一次转换")
it * 5
}.map {
Log.d(TAG, "第一次转换后的值 :$it")
"map $it"
}.collect { value ->
Log.d(TAG, "最终转换后的值 :${value}")
}
}
}
D/carman: 第一次转换
D/carman: 第一次转换后的值 :5
D/carman: 最终转换后的值 :map 5

fun test() {
val flow = flowOf("one", "two", "three",null, "four")
lifecycleScope.launch {
flow.mapNotNull {
it
}.collect { value ->
Log.d(TAG, "collect :${value}")
}
}
}
D/carman: collect :one
D/carman: collect :two
D/carman: collect :three
D/carman: collect :four

transform

在使用 transform 操作符时,可以任意多次调用 emit ,以及发射任意值,这是 transform 跟 map 最大的区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun test() {
lifecycleScope.launch {
(1..3).asFlow().transform {
emit(it)
emit("transform $it")
}.collect { value ->
Log.d(TAG, "collect :${value}")
}
}
}
D/carman: collect :1
D/carman: collect :transform 1
D/carman: collect :2
D/carman: collect :transform 2
D/carman: collect :3
D/carman: collect :transform 3

类似的还有 transformWhile,控制流的截断,返回true则流继续执行,返回false则流截断

过滤操作符

filter

fliter操作符主要是对数据进行一个过滤,返回仅包含与给定匹配的原始流的值的流

1
2
3
4
5
6
7
8
9
10
fun test() {
lifecycleScope.launch {
(1..3).asFlow().filter {
it < 2
}.collect { value ->
Log.d(TAG, "collect :${value}")
}
}
}
D/carman: collect :1

fliter还有很多同类型操作符,如:filterNot/filterIsInstance/filterNotNull。
filterNot效果恰恰与fliter想法,它取得是与判断条件相反的值

1
2
3
4
5
6
7
8
9
fun test() {
lifecycleScope.launch {
(1..3).asFlow().filterNot { it < 2 }.collect { value ->
Log.d(TAG, "collect :${value}")
}
}
}
D/carman: collect :2
D/carman: collect :3

take

take操作符只取前几个emit发射的值,(原理是当发射次数大于等于count的值时,通过抛出异常来取消执行)

1
2
3
4
5
6
fun main() = runBlocking {

(1..5).asFlow()
.take(2)
.collect { println(it) }
}

类似的还有takeWhile与filter类似,不过判断条件为false时候将会中断后续的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun test() {
lifecycleScope.launch {
flowOf(1,1,1,2,3,4,4,5,1,2,2,3,3).map {
delay(100)
it
}.takeWhile {
it == 1
}.collect { value ->
Log.d(TAG, "collect :${value}")
}
}
}
D/carman: collect :1
D/carman: collect :1
D/carman: collect :1

drop

drop操作符与take恰恰相反,它是丢弃掉指定的count数量后执行后续的流。

1
2
3
4
5
6
7
8
9
fun test() {
lifecycleScope.launch {
(1..3).asFlow().drop(2)
.collect { value ->
Log.d(TAG, "collect :${value}")
}
}
}
D/carman: collect :3

debounce

用于防抖操作,指定时间内的值只接收最新的一个

sample

sample操作符与debounce操作符有点像,但是却限制了一个周期性时间,sample操作符获取的是一个周期内的最新的数据,可以理解为debounce操作符增加了周期的限制。

distinctUntilChangedBy

去重操作符,可以按照指定类型的参数进行去重

末端操作符

末端操作符都是suspend函数,所以需要运行在协程作用域中。

  • collect

  • single/first

  • toList/toSet/toCollection

  • count

  • fold/reduce

  • launchIn/produceIn/broadcastIn

    collect

    最常用的末端操作符

    collectIndexed

    带下标的collect,下标是Flow中的emit顺序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    MainScope().launch {
    val time = measureTimeMillis {
    flow {
    for (i in 0..3) {
    Log.d("test", "emit value---$i")
    emit(i.toString())
    }
    }.collectIndexed { index, value ->
    Log.d("test", "Result in $index --- $value")
    }
    }
    Log.d("test", "Time---$time")
    }

    集合转换

  • toCollection、toSet、toList

这些操作符用于将Flow转换为Collection、Set和List

last、lastOrNull、first、firstOrNull

返回Flow的最后一个值(第一个值),区别是last为空的话,last会抛出异常,而lastOrNull可空

reduce

类似于Kotlin集合中的reduce函数,能够对集合进行计算操作

  • 平方求和

    1
    2
    3
    4
    5
    6
    7
    8
    fun main() = runBlocking {

    val sum = (1..5).asFlow()
    .map { it * it }
    .reduce { a, b -> a + b }

    println(sum)
    }

    fold

    类似于kotlin中的fold函数,也需要设置初始值,比如下面的计算机阶乘

    1
    2
    3
    4
    5
    6
    fun main() = runBlocking {

    val sum = (1..5).asFlow().fold(1) { a, b -> a * b }

    println(sum)
    }

    状态操作符

    状态操作符不做任何修改,只是在合适的节点返回状态。

  • onStart:在上游生产数据前调用

  • onCompletion:在流完成或者取消时调用 (可以监听异常)

  • onEach:在上游每次emit前调用

  • onEmpty:流中未产生任何数据时调用

  • catch:对上游中的异常进行捕获

  • retry、retryWhen:在发生异常时进行重试,retryWhen中可以拿到异常和当前重试的次数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    MainScope().launch {
    Log.d("test", "Coroutine in ${Thread.currentThread().name}")
    val time = measureTimeMillis {
    flow {
    for (i in 0..3) {
    emit(i.toString())
    }
    throw Exception("Test")
    }.retryWhen { _, retryCount ->
    retryCount <= 3
    }.onStart {
    Log.d("test", "Start Flow in ${Thread.currentThread().name}")
    }.onEach {
    Log.d("test", "emit value---$it")
    }.onCompletion {
    Log.d("test", "Flow Complete")
    }.catch { error ->
    Log.d("test", "Flow Error $error")
    }.collect {
    Log.d("test", "Result---$it")
    }
    }
    Log.d("test", "Time---$time")
    }

    另外 onCompletion也可以监听异常,代码如下

    1
    2
    3
    .onCompletion { exception ->
    Log.d("test", "Result---$exception")
    }

    组合操作符

    zip

    将2个flow合并的操作符,如果 flowA 中 item 个数大于 flowB 中 item 个数,执行合并后新的 flow 的 item 个数 = 较小的 flow 的 item 个数。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    fun main() = runBlocking {

    val flowA = (1..6).asFlow()
    val flowB = flowOf("one", "two", "three","four","five")
    flowA.zip(flowB) { a, b -> "$a and $b" }
    .collect { println(it) }
    }
    //执行结果
    1 and one
    2 and two
    3 and three
    4 and four
    5 and five

    combine

    使用 combine 合并时,每次从 flowA 发出新的 item ,会将其与 flowB 的最新的 item 合并。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    fun main() = runBlocking {

    val flowA = (1..5).asFlow().onEach { delay(100) }
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }
    flowA.combine(flowB) { a, b -> "$a and $b" }
    .collect { println(it) }
    }
    //结果
    1 and one
    2 and one
    3 and one
    3 and two
    4 and two
    5 and two
    5 and three
    5 and four
    5 and five

    flattenConcat
    不会组合多个flow 而是将他们作为单个流执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    fun main() = runBlocking {

    val flowA = (1..5).asFlow().onEach { delay(100) }
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }

    flowOf(flowA,flowB)
    .flattenMerge(2)
    .collect{ println(it) }
    }

    //结果
    1
    one
    2
    3
    two
    4
    5
    three
    four
    five

    flatMapConcat

    flatMapConcat 由 map、flattenConcat 操作符实现。

    1
    2
    3
    @FlowPreview
    public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

    在调用 flatMapConcat 后,collect 函数在收集新值之前会等待 flatMapConcat 内部的 flow 完成。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    fun currTime() = System.currentTimeMillis()

    var start: Long = 0

    fun main() = runBlocking {

    (1..5).asFlow()
    .onStart { start = currTime() }
    .onEach { delay(100) }
    .flatMapConcat {
    flow {
    emit("$it: First")
    delay(500)
    emit("$it: Second")
    }
    }
    .collect {
    println("$it at ${System.currentTimeMillis() - start} ms from start")
    }
    }
    //执行结果
    1: First at 114 ms from start
    1: Second at 619 ms from start
    2: First at 719 ms from start
    2: Second at 1224 ms from start
    3: First at 1330 ms from start
    3: Second at 1830 ms from start
    4: First at 1932 ms from start
    4: Second at 2433 ms from start
    5: First at 2538 ms from start
    5: Second at 3041 ms from start

flatMapMerge

flatMapMerge 由 map、flattenMerge 操作符实现。

1
2
3
4
5
@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)

flatMapMerge 是顺序调用内部代码块,并且并行地执行 collect 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fun currTime() = System.currentTimeMillis()

var start: Long = 0

fun main() = runBlocking {

(1..5).asFlow()
.onStart { start = currTime() }
.onEach { delay(100) }
.flatMapMerge {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("$it at ${System.currentTimeMillis() - start} ms from start")
}
}
//执行结果
1: First at 116 ms from start
2: First at 216 ms from start
3: First at 319 ms from start
4: First at 422 ms from start
5: First at 525 ms from start
1: Second at 618 ms from start
2: Second at 719 ms from start
3: Second at 822 ms from start
4: Second at 924 ms from start
5: Second at 1030 ms from start

FloatMapLast

当发射了新值之后上个flow就会被取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
fun currTime() = System.currentTimeMillis()

var start: Long = 0

fun main() = runBlocking {

(1..5).asFlow()
.onStart { start = currTime() }
.onEach { delay(100) }
.flatMapLatest {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("$it at ${System.currentTimeMillis() - start} ms from start")
}
}
//结果
1: First at 114 ms from start
2: First at 220 ms from start
3: First at 321 ms from start
4: First at 422 ms from start
5: First at 524 ms from start
5: Second at 1024 ms from start

参考