Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] DataFlow implementation #1354

Closed
wants to merge 1 commit into from
Closed

[DRAFT] DataFlow implementation #1354

wants to merge 1 commit into from

Conversation

elizarov
Copy link
Contributor

@elizarov elizarov commented Jul 18, 2019

DataFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API is simpler than channels APIs, the implementation of DataFlow is simpler. It consumes and allocates less memory, while still providing full deadlock-freedom (even though it is not lock-free internally).

The name reflects the fact that DataFlow represent an cell in a "data flow programming" model (think of an electronic spreadsheet). Dependent values can be defined by using various operators on the flows, with combineLatest being especially useful to combine multiple data flows using arbitrary functions.

It is also a play of Android's LiveData, providing very similar API.

Fixes #1082

Open issues

  1. The name. Shall we stick with DataFlow or pick something else?

  2. I've made it fusible with flowOn and conflated operators (they shall have no effect on the DataFlow). As a result, the sequence of dataFlow.flowOn(ctx).buffer() becomes different from a sequence of dataFlow.buffer().flowOn(ctx). In the former case cxt that is "applied" to dataflow is ignored. In the later case ctx is applied to the buffering coroutine is created between the data flow and the downstream collector. Is it clear why there is difference?

DataFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of DataFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes #1082
* A data flow can be [closed][close] and all its collectors will complete.
*
* A read-only interface to data flow is a [Flow]. It is supposed to be collected by the consumers of the data.
* The ability to read the current [value] is provided only for convenience of the code that updates the data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not specified how to get the current value using the flow read-only interface, is counter.first() work?

May multiple counter.first() invocations return the same value?

Copy link
Contributor Author

@elizarov elizarov Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. counter.first() does compute and return the current value. I'll mention it in the docs.

public var value: T

/**
* Closes this data fl9w.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: flow

* A data flow can be [closed][close] and all its collectors will complete.
*
* A read-only interface to data flow is a [Flow]. It is supposed to be collected by the consumers of the data.
* The ability to read the current [value] is provided only for convenience of the code that updates the data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is CAS not supported by design? The CounterModel example is not thread-safe.

Copy link
Contributor Author

@elizarov elizarov Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is assume that the owner of LiveData that makes updates is single-threaded. Do you have compelling use-cases to support CAS there to allow coordinated updates from multiple threads?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I often use a Channel or a Mutex to resolve this kind of issues, so these use cases can be resolved outside the DataFlow class.

Flow is a safe abstraction, but the example counter.value++ is not. I am not proposing a some kind of CAS operation, however there is not possible to declare that counter.value++ simply works.

Similar to other @qwwdfsad's considerations, we can consider DataFlow<T> : Flow<T>, FlowCollector<T>, this adds a lot of cerimonies but it make evident any issue in counter.emit( counter.first() + 1).

/**
* Value of this data flow.
*
* Setting a value that is [equal][Any.equals] to the previous one does nothing.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where this restriction came from? Are we sure this is desired behavior for most of the users?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other Reactive frameworks, such entity does not only represent Flow, but also FlowCollector.
Example of usages:

Note that I (currently) do not advocate for doing the same thing, but it is something definitely worth consideration (maybe @gildor can give us a few pointers here?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've thought about making it implement FlowCollector (it is easy to add), but I could not find a complessing use-case to do it. In Kotlin word it will be less convenient, since FlowCollector.emit is a suspending function, do you do not actually need suspending update function. You can simply update DataFlow.value from any context, even non-suspending one.

P.S. If we create some kind of EventFlow as a complement (which would replace ArrayBroadcastChannel and would not conflate events), then it would be logical to call its updating function emit and naturally implement FlowCollector interface there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @elizarov

FlowCollector.emit is a suspending function, do you do not actually need suspending update function

Overriding a suspending function with a non-suspending one is a current Kotlin limitation.
DataFlow shares this issue with ConflatedChannel and UnlimitedChannel.

On the other hand in current design to update value (dataFlow.value=42) is cheap, instead to read a value using the public interface is really expensive (flow.first()).

Copy link
Contributor

@gildor gildor Jul 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qwwdfsad Personally, I do not have good use cases for FlowCollector for DataFlow (same as rarely use io.reactivex.Observer that implemented by Subject in our codebase). so my opinion that some simple adapter function that allows forwarding events from Flow/Channel/Lambda to DataFlow would be enough

But maybe I just do not see some really common use cases for it, but because it's very easy to create extension function for any type to forward any values to DataFlow, I think it's not really needed in Kotlin

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for allowing same value to be set twice. If someone wants to dedup it, they can as well use distinctUntilChanged.
One use case for dispatching the same value is using it for UI actions, like a refresh.

private val refreshTrigger = DataFlow<Unit>().also {
   it.value = Unit
}
val data = refreshTrigger.flatMapLatest {
    repositroy.loadData()
}
fun refresh() { 
    refreshTrigger.value = Unit
}

If there is an EventFlow, it could also fulfill this need.

public fun <T> DataFlow(value: T): DataFlow<T> = DataFlowImpl(value ?: NULL)

@SharedImmutable
private val NONE = Symbol("NONE")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move implementation to a different file (so users won't see it when exploring our public API)

* cost, where `N` is the number of collectors.
*/
@FlowPreview
public interface DataFlow<T> : Flow<T> {
Copy link
Collaborator

@qwwdfsad qwwdfsad Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFlow sounds more like a BroacastChannel with (possibly cached or replayable) data.
This primitive represents a single state as a flow of consecutive updates, I'd rather name it StateFlow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that StateFlow is better, for more use cases it's essentially observable data field than some abstract state. At least it how we usually use it

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for DataFlow. State sounds more limiting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yigit @gildor @qwwdfsad Bikeshedding the name here: #1082 (comment)

* Getting a value from an [uninitialized][isInitialized] flow produces [IllegalStateException].
* Getting a value from or setting a value to a [closed][isClosed] flow produces [IllegalStateException].
*/
public var value: T
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding my previous comment about FlowCollector.

I am afraid that mutable value will mix imperative and declarative approaches.
If we designed something like Compose, then yes, updating a var is a way to go.
But we are already in the world of Flow that requires more ceremonies, but is pretty approachable and clear in return. I am afraid that the ability to mutate var instead of writing state.emit(update) will make end-users code less clear and understandable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should emit be suspendable or regular function/

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an open question. Either we can play the same trick as with channels (suspendable send + non-suspendable offer and guarantee, that for non-closed conflated channel offer always succeeds) and provide two methods (naming TBD) or make emit non-suspendable

Copy link
Contributor

@digitalbuddha digitalbuddha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm delighted for this api. My request is a way to transform a flow into a data flow. My use case: I have a db that exposes flows for each query, the flow emits each time a table changes. I'd like to broadcast the flow and collect it multiple times (2 parts of screen). Ideally I'd like to do dbFlow.toDataFlow and retain that in a field, then I can have many parts of my screen collect the query changes without having to create more than one dbFlow.

@JakeWharton
Copy link
Contributor

That's #1261

@elizarov
Copy link
Contributor Author

This prototype PR is superseded by the actual state flow design that is explained in issue #1973 and PR for StateFlow implementation in #1974.

@elizarov elizarov closed this Apr 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants