Monday, May 20, 2024
167
rated 0 times [  171] [ 4]  / answers: 1 / hits: 24586  / 10 Years ago, wed, february 19, 2014, 12:00:00

I have an RxJS sequence being consumed in the normal manner...



However, in the observable 'onNext' handler, some of the operations will complete synchronously, but others require async callbacks, that need to be waited on before processing the next item in the input sequence.



...little bit confused how to do this. Any ideas? thanks!



someObservable.subscribe(
function onNext(item)
{
if (item == 'do-something-async-and-wait-for-completion')
{
setTimeout(
function()
{
console.log('okay, we can continue');
}
, 5000
);
}
else
{
// do something synchronously and keep on going immediately
console.log('ready to go!!!');
}
},
function onError(error)
{
console.log('error');
},
function onComplete()
{
console.log('complete');
}
);

More From » asynchronous

 Answers
45

Each operation you want to perform can be modeled as an observable. Even the synchronous operation can be modeled this way. Then you can use map to convert your sequence into a sequence of sequences, then use concatAll to flatten the sequence.



someObservable
.map(function (item) {
if (item === do-something-async) {
// create an Observable that will do the async action when it is subscribed
// return Rx.Observable.timer(5000);

// or maybe an ajax call? Use `defer` so that the call does not
// start until concatAll() actually subscribes.
return Rx.Observable.defer(function () { return Rx.Observable.ajaxAsObservable(...); });
}
else {
// do something synchronous but model it as an async operation (using Observable.return)
// Use defer so that the sync operation is not carried out until
// concatAll() reaches this item.
return Rx.Observable.defer(function () {
return Rx.Observable.return(someSyncAction(item));
});
}
})
.concatAll() // consume each inner observable in sequence
.subscribe(function (result) {
}, function (error) {
console.log(error, error);
}, function () {
console.log(complete);
});


To reply to some of your comments...at some point you need to force some expectations on the stream of functions. In most languages, when dealing with functions that are possibly async, the function signatures are async and the actual async vs sync nature of the function is hidden as an implementation detail of the function. This is true whether you are using javaScript promises, Rx observables, c# Tasks, c++ Futures, etc. The functions end up returning a promise/observable/task/future/etc and if the function is actually synchronous, then the object it returns is just already completed.



Having said that, since this is JavaScript, you can cheat:



var makeObservable = function (func) {
return Rx.Observable.defer(function () {
// execute the function and then examine the returned value.
// if the returned value is *not* an Rx.Observable, then
// wrap it using Observable.return
var result = func();
return result instanceof Rx.Observable ? result: Rx.Observable.return(result);
});
}

someObservable
.map(makeObservable)
.concatAll()
.subscribe(function (result) {
}, function (error) {
console.log(error, error);
}, function () {
console.log(complete);
});

[#72440] Tuesday, February 18, 2014, 10 Years  [reply] [flag answer]
Only authorized users can answer the question. Please sign in first, or register a free account.
mackennamelissac

Total Points: 110
Total Questions: 118
Total Answers: 103

Location: Sweden
Member since Sun, Jan 16, 2022
2 Years ago
;