-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add induceError and induceEnd #310
Conversation
Hey @semmel, thanks for updating with more info after you read #1. Based on the use case, would it work for you to implement polling using a periodic stream that you map to fetch requests? Here's a quick sketch of the idea: const fetchPolling = awaitPromises(map(() => fetch(...), periodic(5000))) You can always further map the fetch results after doing the awaitPromises if you need. If that won't work, maybe implementing Either way, you'll implement a function that receives a Sink (and a Scheduler). The Sink has methods you can use to produce events, errors, and end the stream. The Scheduler's currentTime method can (must, in fact) be used to get the The Let me know if that sounds like any of the above will work for you. |
Thanks @briancavalier for your answer! I did not know about I am sorry, but actually the My actual problem is modelling asynchronous state machines (SM) with streams when not all input action streams can run from the beginning. For example this SM
const [inducePollEvent, pollStream] = createAdapter();
let disposePolling = identity;
// :: Stream 'armed'|'running'|'complete'
const state = flow(
mergeArray([ // here go the SM actions
map(() => "run", fromClick('#run')),
map(() => "cancel", fromClick('#cancel')),
map(() => "ready", pollStream) // <-- feedback stream <--
]),
flatScan(
(state, action) => {
switch(state) {
case 'armed':
if (action === 'run') {
return flow(
fetch(`${api}/scheduleWork`),
fromPromise,
map(() => 'running'),
tap(() => {
// run the feedback stream imperatively <--
disposePolling = runStatusPolling(inducePollEvent);
})
);
else
return throwError(new Error(`Illegal action ${action} in state ${state}`));
case 'running':
if (action === 'cancel') {
disposePolling()
return now('armed');
}
else if (action === 'ready') {
return now('complete');
}
// …
case 'complete':
// …
}
},
'armed' // initial state
)
) Not all input action streams can run from the beginning: In this example In general I find this pattern of modelling a SM with streams and Does this make sense? How would you model it without Btw. the implementation for // flatScan :: ((acc: S, t: T) -> Stream S, S) -> Stream T -> Stream S
flatScan = (reducer, seed) => pipe(
concatMap((() => {
let state = seed;
return pipe(
sourceValue => reducer(state, sourceValue),
tap_o(next => { state = next; })
);
})())
); |
Sorry for the delay, @semmel. There are a few ways to model state machines mostjs without going into full imperative mode. For example, there's For your specific situation, though, the it looks like the real snag is that feedback from a later async action (polling) needs to loop back to an earlier node in the stream graph (the map -> "ready"). Most/core doesn't have an elegant way of doing that, and it's usually a situation where you end up needing something like Given all of that, I think your solution is quite reasonable 😄 |
@briancavalier Thank you for your analysis, and I am sorry for the long example. So back to my example and this PR: So in conclusion, in general I might have overestimated the usefulness of error events ("stream failure") in the streams? My intention to add I have not made up my mind if caring too much about stream failure makes sense, given that I can always feed back polling application errors in Btw. I think the distinction between "Stream Failure vs. Application Errors" very valuable. Conclusion No 2: So if you think |
tl;dr
No need to apologize! The example was very helpful.
I apologize if "quite reasonable" came across as negative in any way. This is a challenging use case, and I always use case, and I always end up using I'll try to explain a bit more of my perspective on why I'd like The concepts of "error" and "end" represent statefulness. Mostjs has stateful internals, but tries really hard not to expose that statefulness to developers so they don't accidentally create subtle dependencies on state, timing, ordering, etc. For example, Adding induceEvent(x)
induceError(e)
induceEvent(y) There are a few choices, but let's say it produces 1 event, That sounds reasonable, but now imagine that those lines of code are far apart in your code base, each in a different async operation. New problems arise: for example, you may have created a race between Here's another: induceEnd()
induceError(e) Again, imagine these are far apart and end up racing. Now sometimes your app fails and sometimes it doesn't. Or worse yet, important errors may go silent. I hope that provides a bit more context! |
Yes, that looks like a perfect fit! I'll try it soon. Btw. in most-subject's
Hey, no need to apologise. I did not understand "quite reasonable" in a negative way, on the contrary. I meant the "" just as citations marks. (I am not a native speaker, thus I should phrase my sentences more clearly so they don't come across wrong.)
Indeed I understand and I agree that providing such an api would give the wrong idea how to use streams. Thank you again for your explanations. |
Thanks for the great discussion, @semmel. I'm glad |
Follow up: I've updated |
I needed that in one place, and I guess, this completes the API.
Should be backwards compatible.
What do you think?
Edit:
I saw that #1 already discusses
induceEnd
andinduceError
and warns about race conditions, only after writing my own implementation in JS and also completing this PR.To illustrate my use case:
I have a loop which
fetch
,The results a processed by a consumer
Stream
into which IinduceValue
inside the loop. However, iffetch
fails severely (e.g. with HTTP error code 400-499) I'd like to process that using the same consumerStream
. Thus, inside the loop I need toinduceError
.