-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DRAFT] DataFlow implementation #1354
Conversation
DataFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API is simpler than channels APIs, the implementation of DataFlow is simpler. It consumes and allocates less memory, while still providing full deadlock-freedom (even though it is not lock-free internally). Fixes #1082
* A data flow can be [closed][close] and all its collectors will complete. | ||
* | ||
* A read-only interface to data flow is a [Flow]. It is supposed to be collected by the consumers of the data. | ||
* The ability to read the current [value] is provided only for convenience of the code that updates the data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not specified how to get the current value using the flow
read-only interface, is counter.first()
work?
May multiple counter.first()
invocations return the same value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. counter.first()
does compute and return the current value. I'll mention it in the docs.
public var value: T | ||
|
||
/** | ||
* Closes this data fl9w. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: flow
* A data flow can be [closed][close] and all its collectors will complete. | ||
* | ||
* A read-only interface to data flow is a [Flow]. It is supposed to be collected by the consumers of the data. | ||
* The ability to read the current [value] is provided only for convenience of the code that updates the data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is CAS not supported by design? The CounterModel
example is not thread-safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It is assume that the owner of LiveData
that makes updates is single-threaded. Do you have compelling use-cases to support CAS there to allow coordinated updates from multiple threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I often use a Channel
or a Mutex
to resolve this kind of issues, so these use cases can be resolved outside the DataFlow
class.
Flow
is a safe abstraction, but the example counter.value++
is not. I am not proposing a some kind of CAS operation, however there is not possible to declare that counter.value++
simply works.
Similar to other @qwwdfsad's considerations, we can consider DataFlow<T> : Flow<T>, FlowCollector<T>
, this adds a lot of cerimonies but it make evident any issue in counter.emit( counter.first() + 1)
.
/** | ||
* Value of this data flow. | ||
* | ||
* Setting a value that is [equal][Any.equals] to the previous one does nothing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where this restriction came from? Are we sure this is desired behavior for most of the users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other Reactive frameworks, such entity does not only represent Flow
, but also FlowCollector
.
Example of usages:
Note that I (currently) do not advocate for doing the same thing, but it is something definitely worth consideration (maybe @gildor can give us a few pointers here?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've thought about making it implement FlowCollector
(it is easy to add), but I could not find a complessing use-case to do it. In Kotlin word it will be less convenient, since FlowCollector.emit
is a suspending function, do you do not actually need suspending update function. You can simply update DataFlow.value
from any context, even non-suspending one.
P.S. If we create some kind of EventFlow
as a complement (which would replace ArrayBroadcastChannel
and would not conflate events), then it would be logical to call its updating function emit
and naturally implement FlowCollector
interface there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @elizarov
FlowCollector.emit is a suspending function, do you do not actually need suspending update function
Overriding a suspending function with a non-suspending one is a current Kotlin limitation.
DataFlow
shares this issue with ConflatedChannel
and UnlimitedChannel
.
On the other hand in current design to update value (dataFlow.value=42
) is cheap, instead to read a value using the public interface is really expensive (flow.first()
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@qwwdfsad Personally, I do not have good use cases for FlowCollector for DataFlow (same as rarely use io.reactivex.Observer that implemented by Subject in our codebase). so my opinion that some simple adapter function that allows forwarding events from Flow/Channel/Lambda to DataFlow would be enough
But maybe I just do not see some really common use cases for it, but because it's very easy to create extension function for any type to forward any values to DataFlow, I think it's not really needed in Kotlin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for allowing same value to be set twice. If someone wants to dedup it, they can as well use distinctUntilChanged
.
One use case for dispatching the same value is using it for UI actions, like a refresh.
private val refreshTrigger = DataFlow<Unit>().also {
it.value = Unit
}
val data = refreshTrigger.flatMapLatest {
repositroy.loadData()
}
fun refresh() {
refreshTrigger.value = Unit
}
If there is an EventFlow
, it could also fulfill this need.
public fun <T> DataFlow(value: T): DataFlow<T> = DataFlowImpl(value ?: NULL) | ||
|
||
@SharedImmutable | ||
private val NONE = Symbol("NONE") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move implementation to a different file (so users won't see it when exploring our public API)
* cost, where `N` is the number of collectors. | ||
*/ | ||
@FlowPreview | ||
public interface DataFlow<T> : Flow<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataFlow
sounds more like a BroacastChannel
with (possibly cached or replayable) data.
This primitive represents a single state as a flow of consecutive updates, I'd rather name it StateFlow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that StateFlow is better, for more use cases it's essentially observable data field than some abstract state. At least it how we usually use it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for DataFlow
. State sounds more limiting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yigit @gildor @qwwdfsad Bikeshedding the name here: #1082 (comment)
* Getting a value from an [uninitialized][isInitialized] flow produces [IllegalStateException]. | ||
* Getting a value from or setting a value to a [closed][isClosed] flow produces [IllegalStateException]. | ||
*/ | ||
public var value: T |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding my previous comment about FlowCollector
.
I am afraid that mutable value
will mix imperative and declarative approaches.
If we designed something like Compose, then yes, updating a var
is a way to go.
But we are already in the world of Flow
that requires more ceremonies, but is pretty approachable and clear in return. I am afraid that the ability to mutate var
instead of writing state.emit(update)
will make end-users code less clear and understandable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should emit
be suspendable or regular function/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an open question. Either we can play the same trick as with channels (suspendable send
+ non-suspendable offer
and guarantee, that for non-closed conflated channel offer
always succeeds) and provide two methods (naming TBD) or make emit
non-suspendable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm delighted for this api. My request is a way to transform a flow into a data flow. My use case: I have a db that exposes flows for each query, the flow emits each time a table changes. I'd like to broadcast the flow and collect it multiple times (2 parts of screen). Ideally I'd like to do dbFlow.toDataFlow and retain that in a field, then I can have many parts of my screen collect the query changes without having to create more than one dbFlow.
That's #1261 |
4a49830
to
aff8202
Compare
DataFlow
is aFlow
analogue toConflatedBroadcastChannel
. Since Flow API is simpler than channels APIs, the implementation ofDataFlow
is simpler. It consumes and allocates less memory, while still providing full deadlock-freedom (even though it is not lock-free internally).The name reflects the fact that
DataFlow
represent an cell in a "data flow programming" model (think of an electronic spreadsheet). Dependent values can be defined by using various operators on the flows, withcombineLatest
being especially useful to combine multiple data flows using arbitrary functions.It is also a play of Android's
LiveData
, providing very similar API.Fixes #1082
Open issues
The name. Shall we stick with
DataFlow
or pick something else?I've made it fusible with
flowOn
andconflated
operators (they shall have no effect on theDataFlow
). As a result, the sequence ofdataFlow.flowOn(ctx).buffer()
becomes different from a sequence ofdataFlow.buffer().flowOn(ctx)
. In the former case cxt that is "applied" to dataflow is ignored. In the later case ctx is applied to the buffering coroutine is created between the data flow and the downstream collector. Is it clear why there is difference?