import core.time : msecs; import std.algorithm.iteration : each; import std.datetime.stopwatch : StopWatch; import vibe.core.core : runTask, sleep; import vibe.core.log : logInfo; ObservableSource!int source; StopWatch sw; auto t1 = runTask({ source .subscribe() .each!((evt) { logInfo("%3d ms, immediate: %s", sw.peek.total!"msecs", evt); }); }); auto t2 = runTask({ source .delay(50.msecs) .subscribe() .each!((evt) { logInfo("%3d ms, delayed: %s", sw.peek.total!"msecs", evt); }); }); // quick and dirty wait to let the tasks subscribe sleep(10.msecs); sw.start(); source.put(1); source.put(2); sleep(100.msecs); source.put(3); source.close(); t1.join(); t2.join(); // expected output: // 0 ms, immediate: 1 // 0 ms, immediate: 2 // 50 ms, delayed: 1 // 50 ms, delayed: 2 // 100 ms, immediate: 3 // 150 ms, delayed: 3
Forwards the events emitted by an observable with a fixed delay.