SharedFlow和StateFlow

托RxJava的福事件流,Kotlin现在也提供了自己的反应式流的实现,称之为Flow,和RxJava一样,KotlinFlow可以创建流并且响应,事件流同样有冷流和热流(冷流只有在订阅的时候才会发出事件,而热流则没有这种限制),StateFlow和ShareFlow是属于热流,(他们主要用来取代channel)
本文主要介绍一下SharedFlow和StateFlow,并对比下LiveData

SharedFlow

分为读写和只读 SharedFlow和MutableSharedFlow
SharedFlow是没有初始默认值,有3个可配置项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
require(replay >= 0) { "replay cannot be negative, but was $replay" }
require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
"replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
}
val bufferCapacity0 = replay + extraBufferCapacity
val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0
return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}

  • replay:重新发射给新的订阅者的值的数量,可以将旧的数据回播给新的订阅者。不能为负数,默认为0。
  • extraBufferCapacity:在replay基础上的缓冲池的数量,当有剩余缓冲区空间时,调用emit发射数据不会被挂起,同样的不能为负数,默认值为0。这个值加上replay的总和,构成了SharedFlow的总缓冲区大小。
  • onBufferOverflow:配置一个emit在缓冲区溢出时的触发操作。默认为BufferOverflow.SUSPEND,缓存溢出时挂起。另外还有BufferOverflow.DROP_OLDEST在溢出时删除缓冲区中最旧的值,将新值添加到缓冲区,不会进行挂起。BufferOverflow.DROP_LATEST在缓冲区溢出时删除当前添加到缓冲区的最新值来保持缓冲区内容不变,不会进行挂起。

    Default Behavior

    replay-0-extraBuffer-0.gif
  1. 这个SharedFlow有三个事件和两个订阅者。第一个事件是在还没有订阅者的情况下发出的,所以它将永远丢失
  2. 当SharedFlow发出第二个事件时,它已经有了一个订阅者,这个订阅者得到了上述事件
  3. 在到达第三个事件之前,另一个订阅者出现了,但第一个订阅者被suspend,并保持这样直到获取该事件。这意味着emit()将无法将第三个事件传递给那个订阅者。当这种情况发生时,SharedFlow有两种选择,它要么缓冲该事件,并在恢复时将其发射给suspend的订阅者,要么在没有足够的缓冲区留给该事件时造成缓冲区溢出
  4. 在这种情况下,总的缓冲区为零-replay+extraBufferCapacity。换句话说,就是缓冲区溢出。因为onBufferOverflow是使用的BufferOverflow.SUSPEND,Flow将suspend,直到它能把事件传递给所有的订阅者。
  5. 当订阅者恢复时,Flow也会恢复,将事件传递给所有订阅者并继续其工作

    注意:SharedFlow规范禁止你在缓冲区总值为零时使用onBufferOverflow = BufferOverflow.SUSPEND以外的任何东西。因为tryEmit(value: T)不会暂停,如果你用默认的replay和extraBufferCapacity值来使用它,它就不会工作。换句话说,用tryEmit(value: T)发射事件的唯一方法是,至少要有一个总缓冲区。

With Replay

下面是replay=1的情形
replay-1-extraBuffer-0.gif

  • 当SharedFlow到达第一个没有任何活动订阅者的事件时,它不再暂停。由于replay=1,所以现在总的缓冲区大小为1。因此,这个Flow缓冲了第一个事件并继续前进。

  • 当它到达第二个事件时,缓冲区没有更多的空间了,所以它suspend了。

  • Flow保持suspend,直到订阅者恢复。一旦它恢复,它就会得到缓冲的第一个事件,以及最新的第二个事件。SharedFlow恢复了,但第一个事件永远消失了,因为第二个事件现在在重放缓冲区中占据了位置。

  • 在到达第三个事件之前,一个新的订阅者出现了。由于replay,它也得到一份最新事件的副本。

  • 当流最终到达第三个事件时,两个订阅者都得到了它的副本。

  • SharedFlow缓冲了这第三个事件,同时抛弃了之前的事件。后来,当第三个订阅者出现时,它也得到了第三个事件的副本。

    With extraBufferCapacity and onBufferOverflow

    这个过程与extraBufferCapacity类似,但没有类似replay的行为。第三个例子显示了一个SharedFlow,extraBufferCapacity = 1,onBufferOverflow = BufferOverflow.DROP_OLDEST的场景
    replay-0-extraBuffer-1.gif

  • 这个行为和第一个例子是一样的。在一个suspend的订阅者和总缓冲区大小为1的情况下,SharedFlow缓冲了第一个事件。

  • 不同的行为开始于第二个事件的发射。由于onBufferOverflow = BufferOverflow.DROP_OLDEST,SharedFlow放弃了第一个事件,缓冲了第二个事件并继续进行。另外,注意到第二个订阅者没有得到缓冲事件的副本。记住,这个SharedFlow有extraBufferCapacity = 1,但replay = 0。

  • 这个Flow最终到达第三个事件,活动用户收到了这个事件。然后,Flow缓冲了这个事件,放弃了之前的事件。

  • 不久之后,suspend的用户恢复,触发SharedFlow向它发出缓冲的事件,并清理缓冲区。

StateFlow

一个StateFlow的结构像一个SharedFlow。这是因为StateFlow只不过是SharedFlow的一个特殊化子类

1
2
3
4
5
6
7
8
9
public interface StateFlow<out T> : SharedFlow<T> {
public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
public override var value: T

public fun compareAndSet(expect: T, update: T): Boolean
}

我们也可以通过一个SharedFlow来创建

1
2
3
4
5
6
val shared = MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(InitialState()) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior

与LiveData的重放污染不同,stateFlow只有在值发生更新的时候才会触发数据更新
image.png

LiveDat的数据倒灌
https://xuyisheng.top/livedata_bug_feature/

LiveData更新是根据更新version来决定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void considerNotify(ObserverWrapper observer) {
// 如果当前的宿主Activity不是Active状态,不分发
if (!observer.mActive) {
return;
}
// 宿主Activity的如果在后台,不分发
if (!observer.shouldBeActive()) {
observer.activeStateChanged(false);
return;
}
// 当前宿主Activity的mLastVersion大于等于LiveData的mVersion,不分发
// 这里就是数据倒灌的原因
if (observer.mLastVersion >= mVersion) {
return;
}
observer.mLastVersion = mVersion;
// 分发事件
observer.mObserver.onChanged((T) mData);
}

与LiveData的比较

StateFlow 和 LiveData 具有相似之处。两者都是可观察的数据容器类,并且在应用架构中使用时,两者都遵循相似模式。
但请注意,StateFlow 和 LiveData 的行为确实有所不同:

  • StateFlow 需要将初始状态传递给构造函数,而 LiveData 不需要。
  • 当 View 进入 STOPPED 状态时,LiveData.observe() 会自动取消注册使用方,而从 StateFlow 或任何其他数据流收集数据的操作并不会自动停止。如需实现相同的行为,需要从 Lifecycle.repeatOnLifecycle 块收集数据流。
  • stateFlow不会有重放污染的问题,只有在值发生更新的时候才会触发数据更新

在一些横竖屏切换这些场景往往会遇到LiveData的重放污染问题,这个时候我们虽然可以通过SingleEvent去解决,但是StateFlow也是很不错的选择,不过此时没有生命周期管理的需要我们手段管理防止内存泄漏

参考