1 /** Provides a channel/range style observer framework.
2 
3 	Observables provide a similar functionality to `Signal`, but they are
4 	composable and provide a sequential subscriber API. Their API is also
5 	`nothrow` for the most part, so that the subscriber side cannot interfere
6 	with the side that emits events.
7 
8 	Copyright: Copyright © 2020-2021 Sönke Ludwig
9 	Authors: Sönke Ludwig
10 */
11 module observable.observable;
12 
13 import observable.signal : Signal, SignalConnection;
14 
15 import core.time : Duration;
16 import std.meta : allSatisfy, staticMap;
17 import std.typecons : RefCounted, RefCountedAutoInitialize, refCounted;
18 import taggedalgebraic.taggedunion;
19 import vibe.core.log : logException;
20 import vibe.core.sync : LocalManualEvent, createManualEvent;
21 import vibe.internal.array : FixedRingBuffer;
22 
23 
24 ///
25 unittest {
26 	import vibe.core.core : runTask, sleep;
27 	import core.time : msecs;
28 
29 	ObservableSource!int source;
30 
31 	auto t1 = runTask({
32 		try {
33 			auto observer = source
34 				.subscribe();
35 
36 			assert(observer.consumeOne == 1);
37 			assert(observer.consumeOne == 2);
38 			assert(observer.consumeOne == 3);
39 			assert(observer.empty);
40 		} catch (Exception e) assert(false, e.toString);
41 	});
42 
43 	auto t2 = runTask({
44 		try {
45 			auto observer = source
46 				.map!(i => i * 2)
47 				.subscribe();
48 
49 			assert(observer.consumeOne == 2);
50 			assert(observer.consumeOne == 4);
51 			assert(observer.consumeOne == 6);
52 			assert(observer.empty);
53 		} catch (Exception e) assert(false, e.msg);
54 	});
55 
56 	// quick and dirty wait to let the tasks subscribe
57 	sleep(10.msecs);
58 
59 	source.put(1);
60 	source.put(2);
61 	source.put(3);
62 	source.close();
63 
64 	t1.join();
65 	t2.join();
66 }
67 
68 // basic operation should be nothrow
69 nothrow unittest {
70 	ObservableSource!int source;
71 	auto observer = source.subscribe();
72 	int r;
73 	source.put(1);
74 	source.close();
75 	assert(observer.tryConsumeOne(r));
76 	assert(r == 1);
77 	assert(!observer.tryConsumeOne(r));
78 	assert(source.closed);
79 	assert(observer.empty);
80 }
81 
82 
83 /** Creates an observer for the specified observable.
84 
85 	An observer returns the events emitted by the observable in a sequential,
86 	blocking manner. Usually, a separate task is used to drain the observer to
87 	ensure sequential processing with no concurrency/race-conditions.
88 */
89 Observer!(ObservableType!O) subscribe(O)(O observable)
90 	if (isObservable!O)
91 {
92 	assert (!observable.closed, "Subscribing to closed observable.");
93 	Observer!(ObservableType!O) ret;
94 	ret.initialize(observable);
95 	return ret;
96 }
97 
98 
99 /** Determines whether a type qualifies as an observable.
100 
101 	An observable must have the following members:
102 
103 	$(UL
104 		$(LI `Event`: the type of the events emitted by the observer - this must
105 			be an instance of `ObservedEvent`)
106 		$(LI `connect(C, A...)(ref SignalConnection, C callable, A args): Used
107 			by observers and operators to get notfied of new events emitted by
108 			an observer)
109 		$(LI `closed`: A boolean property that signifies whether the observable
110 			has been closed (cannot emit any more events))
111 	)
112 */
113 enum isObservable(O) =
114 	is(O.Event)
115 	&& __traits(compiles, (SignalConnection c) { O.init
116 		.connect(c, delegate(O.Event itm, int, int, int) {}, 1, 2, 3); })
117 	&& is(typeof(O.init.closed) == bool);
118 
119 static assert(isObservable!(Observable!int));
120 static assert(isObservable!(ObservableSource!int));
121 
122 /** Extracts the event tyoe of an observable.
123 */
124 alias ObservableType(O) = typeof(O.Event.init.eventValue);
125 
126 
127 /** The fundamental source for generating observable events.
128 
129 	This is the type used to manually generate events using the `put` method.
130 	The events can be observed using the `Observable` returned by the
131 	`observable` property.
132 */
133 struct ObservableSource(T, EXTRA_STORAGE = void) {
134 	private {
135 		Observable!(T, EXTRA_STORAGE) m_observable;
136 	}
137 
138 	alias Event = Observable!(T, EXTRA_STORAGE).Event;
139 
140 	@disable this(this);
141 	this(ref return scope ObservableSource rhs) { this.tupleof = rhs.tupleof; }
142 
143 	@property ref inout(Observable!(T, EXTRA_STORAGE)) observable() inout { return m_observable; }
144 
145 	alias observable this;
146 
147 	/** Closes the observable.
148 
149 		In the closed state, `put` may no longer be called and all subscribed
150 		observers will receive a notification that no more events will be
151 		emitted.
152 	*/
153 	void close() { m_observable.close(); }
154 
155 	/** Emits an event.
156 
157 		All subscribed observers will receive a copy of the event.
158 	*/
159 	void put(T event) { m_observable.put(event); }
160 
161 	static if (!is(EXTRA_STORAGE == void)) {
162 		@property ref EXTRA_STORAGE extraStorage() { return m_observable.extraStorage; }
163 	}
164 }
165 
166 /** A basic observable.
167 
168 	This is the observable provided by `ObservableSource` and some of the
169 	observer modifier functions. Instead of `ObservableSource` an `Observable`
170 	should usually be made accessible by external code, in order to avoid the
171 	possibility of injecting events from the outside.
172 */
173 struct Observable(T, EXTRA_STORAGE = void)
174 {
175 	alias Event = ObservedEvent!T;
176 
177 	private static struct Payload {
178 		Signal!Event signal;
179 		bool closed;
180 		static if (!is(EXTRA_STORAGE == void))
181 			EXTRA_STORAGE extraStorage;
182 
183 		@disable this(this);
184 		this(ref return scope Payload rhs) { this.tupleof = rhs.tupleof; }
185 
186 		~this()
187 		{
188 			close();
189 		}
190 
191 		void close()
192 		{
193 			if (this.closed) return;
194 
195 			this.closed = true;
196 			this.signal.emit(Event.close());
197 		}
198 	}
199 
200 	alias PayloadRef = RefCounted!(Payload, RefCountedAutoInitialize.no);
201 
202 	private {
203 		PayloadRef m_payload;
204 	}
205 
206 	@disable this(this);
207 	this(ref return scope Observable rhs)
208 	{
209 		rhs.initialize();
210 		m_payload = rhs.m_payload;
211 	}
212 
213 	@property bool closed() const { return m_payload.refCountedStore.isInitialized && m_payload.closed; }
214 
215 	void connect(C, ARGS...)(ref SignalConnection connection, C callable, ARGS args)
216 		if (is(typeof(callable(Event.init, args))))
217 	{
218 		static assert(__traits(compiles, () nothrow { callable(Event.init, args); }),
219 			"Observable connection callback must be nothrow.");
220 
221 		initialize();
222 		m_payload.signal.socket.connect(connection, callable, args);
223 	}
224 
225 	static if (!is(EXTRA_STORAGE == void)) {
226 		private @property ref EXTRA_STORAGE extraStorage()
227 		{
228 			initialize();
229 			return m_payload.extraStorage;
230 		}
231 	}
232 
233 	private void close()
234 	{
235 		initialize();
236 		m_payload.close();
237 	}
238 
239 	private void put(T event)
240 	{
241 		assert(!this.closed, "Calling Observable.put() on a closed observable.");
242 
243 		initialize();
244 		m_payload.signal.emit(Event.event(event));
245 	}
246 
247 	private void initialize()
248 	{
249 		m_payload.refCountedStore.ensureInitialized();
250 	}
251 }
252 
253 template ObservedEvent(T)
254 {
255 	import std.meta : AliasSeq;
256 
257 	enum has_nothrow_postblit = __traits(compiles, () nothrow {
258 		T a = T.init;
259 		T b = T.init;
260 		a = b;
261 	});
262 
263 	static if (has_nothrow_postblit) alias A = AliasSeq!(forceNothrowPostblit);
264 	else alias A = AliasSeq!();
265 
266 	@A
267 	struct U {
268 		T event;
269 		Void close;
270 	}
271 
272 	alias ObservedEvent = TaggedUnion!U;
273 }
274 
275 
276 /** An observer tied to a specific observable.
277 
278 	This type provides a channel and an input range interface to enable
279 	sequential consumption of events emitted by an observer.
280 */
281 struct Observer(T)
282 {
283 	private static struct Payload {
284 		FixedRingBuffer!T buffer;
285 		bool closed;
286 		LocalManualEvent event;
287 		SignalConnection conn;
288 
289 		void put(ObservedEvent!T itm)
290 		{
291 			import std.algorithm.comparison : max;
292 
293 			if (closed) return;
294 
295 			if (itm.isClose) closed = true;
296 			else {
297 				if (buffer.full)
298 					buffer.capacity = max(16, buffer.capacity * 3 / 2);
299 				buffer.put(itm.eventValue);
300 			}
301 
302 			event.emit();
303 		}
304 	}
305 
306 	private alias Event = ObservedEvent!T;
307 	private alias PayloadRef = RefCounted!(Payload, RefCountedAutoInitialize.no);
308 
309 	private {
310 		PayloadRef m_payload;
311 	}
312 
313 	/** Determines whether there are any events left.
314 
315 		Note that this property needs to wait until either at least one event
316 		is available, or until the observer was closed.
317 	*/
318 	@property bool empty()
319 	{
320 		auto ec = m_payload.event.emitCount;
321 
322 		while (m_payload.buffer.empty) {
323 			if (m_payload.closed) return true;
324 			ec = m_payload.event.waitUninterruptible(ec);
325 		}
326 
327 		return false;
328 	}
329 
330 	/** Determines whether any more events are currently pending.
331 
332 		This property is `true` *iff* an event is immediately available, i.e.
333 		that `front` yields a value without waiting. Note that `empty` is
334 		guaranteed to return `false` in this case.
335 	*/
336 	@property bool pending()
337 	const {
338 		return m_payload.buffer.length >= 1;
339 	}
340 
341 	/** Input range `front` property.
342 
343 		The caller must make sure that there is actually still an element
344 		available before invoking this property, either by evaluating `empty`,
345 		or by external knowledge. Failing to do so will result in an abnormal
346 		program termination.
347 	*/
348 	@property ref T front()
349 	{
350 		if (this.empty) assert(false, "Calling .front on an empty subscriber");
351 		return m_payload.buffer.front;
352 	}
353 
354 	/** Input range `popFront` method.
355 
356 		The caller must make sure that there is actually still an element
357 		available before invoking this method, either by evaluating `empty`,
358 		or by external knowledge. Failing to do so will result in an abnormal
359 		program termination.
360 	*/
361 	void popFront()
362 	{
363 		if (this.empty) assert(false, "Calling .popFront on an empty subscriber");
364 		m_payload.buffer.popFront();
365 	}
366 
367 	/** Reads a single event.
368 
369 		Throws: If the observer is closed and all events have been read,
370 			`ObserverClosedException` will be thrown.
371 	*/
372 	T consumeOne()
373 	{
374 		T ret;
375 		if (!tryConsumeOne(ret))
376 			throw new ObserverClosedException;
377 		return ret;
378 	}
379 
380 	/** Attempts to read a single event.
381 
382 		Returns: `true` is returned if an event was successfully read. Otherwise
383 			a value of `false` is returned, which means that the observable
384 			was closed and no more events are available.
385 	*/
386 	bool tryConsumeOne(ref T dst)
387 	{
388 		import std.algorithm.mutation : swap;
389 
390 		if (this.empty) return false;
391 
392 		swap(dst, m_payload.buffer.front);
393 		m_payload.buffer.popFront();
394 		return true;
395 	}
396 
397 	private void initialize(O)(ref O observable)
398 	{
399 		assert(!m_payload.refCountedStore.isInitialized, "Double-initializing observer!?");
400 		m_payload.refCountedStore.ensureInitialized();
401 		m_payload.event = createManualEvent();
402 		observable.connect(m_payload.conn, &m_payload.put);
403 	}
404 }
405 
406 unittest {
407 	ObservableSource!int o;
408 	auto obs = o.subscribe();
409 	assert(!obs.pending);
410 	o.put(1);
411 	assert(obs.front == 1);
412 	assert(obs.pending);
413 	assert(!obs.empty);
414 	o.put(2);
415 	assert(obs.pending);
416 	obs.popFront();
417 	assert(obs.front == 2);
418 	assert(obs.pending);
419 	assert(!obs.empty);
420 	obs.popFront();
421 	assert(!obs.pending);
422 	o.close();
423 	assert(obs.empty);
424 }
425 
426 import std.range : isInputRange;
427 static assert(isInputRange!(Observer!int));
428 
429 
430 /** Thrown by `Observer.consumeOne` in case there are no more events left.
431 */
432 class ObserverClosedException : Exception {
433 	this(string file = __FILE__, size_t line = __LINE__)
434 	{
435 		super("Attempt to consume event from closed observer.", file, line);
436 	}
437 }
438 
439 
440 /** Applies a transformation on the events emitted by an observer.
441 
442 	The return value is an observer that emits the transformed events.
443 */
444 auto map(alias fun, O)(O source)
445 	if (isObservable!O)
446 {
447 	alias T = ObservableType!O;
448 	alias TM = typeof(fun(T.init));
449 
450 	static struct OM {
451 		private O source;
452 
453 		alias Event = ObservedEvent!TM;
454 
455 		@property bool closed() const { return source.closed; }
456 
457 		void connect(C, ARGS...)(ref SignalConnection connection, C callable, ARGS args)
458 		{
459 			source.connect(connection, function(ObservedEvent!T val, C callable, ARGS args) {
460 					final switch (val.kind) with (ObservedEvent!T.Kind) {
461 						case close: callable(ObservedEvent!TM.close(), args); break;
462 						case event: callable(ObservedEvent!TM.event(fun(val.eventValue)), args); break;
463 					}
464 				}, callable, args);
465 		}
466 	}
467 
468 	static assert(isObservable!OM);
469 
470 	return OM(source);
471 }
472 
473 ///
474 unittest {
475 	ObservableSource!int source;
476 	auto observer = source
477 		.map!(i => 2 * i)
478 		.subscribe();
479 
480 	source.put(1);
481 	source.put(2);
482 	source.put(3);
483 
484 	assert(observer.consumeOne == 2);
485 	assert(observer.consumeOne == 4);
486 	assert(observer.consumeOne == 6);
487 }
488 
489 
490 /** Combines multiple observers into one.
491 
492 	The events of the input observables will be stored in a `TaggedUnion` and
493 	will be emitted by the combined observable interleaved in the same order as
494 	they occurred.
495 
496 	The values of the tagged union can be accessed using `evt.observer0Value`,
497 	`evt.observer1Value` etc., or by type using `evt.value!T`, with `T` being
498 	the event type of one or more of the source observers.
499 */
500 auto merge(OBSERVERS...)(OBSERVERS observers)
501 	if (allSatisfy!(isObservable, OBSERVERS))
502 {
503 	import std.algorithm.mutation : swap;
504 	import std.algorithm.searching : all;
505 
506 	alias RawTypes = staticMap!(ObservableType, OBSERVERS);
507 	alias Types = ObserverNamedTypes!RawTypes;
508 	alias TM = TaggedUnion!Types;
509 
510 	static struct ES {
511 		OBSERVERS sources;
512 		SignalConnection[OBSERVERS.length] connections;
513 
514 		@disable this(this);
515 		this(ref return scope ES rhs) { this.tupleof = rhs.tupleof; }
516 	}
517 
518 	static void onSourceEvent(size_t i)(ObservedEvent!(RawTypes[i]) val,
519 		ObservableSource!(TM, ES) target)
520 	nothrow {
521 		final switch (val.kind) with (ObservedEvent!(RawTypes[i]).Kind) {
522 			case close:
523 				scope ES* es = () @trusted { return &target.extraStorage(); } ();
524 				assert(es.sources[i].closed, "Observer sent close, but is not closed!?");
525 				assert(!target.closed, "An observer was closed after the merged observer is already closed!?");
526 
527 				es.connections[i].disconnect();
528 
529 				bool all_closed = true;
530 				static foreach (i; 0 .. OBSERVERS.length)
531 					if (!es.sources[i].closed)
532 						all_closed = false;
533 				if (all_closed) target.close();
534 				break;
535 			case event:
536 				auto et = mixin("TM.observer"~i.stringof~"(val.eventValue)");
537 				target.put(et);
538 				break;
539 		}
540 	}
541 
542 	ObservableSource!(TM, ES) ret;
543 	scope ES* es = () @trusted { return &ret.extraStorage(); } ();
544 	foreach (i, O; OBSERVERS) {
545 		swap(es.sources[i], observers[i]);
546 
547 		if (!es.sources[i].closed)
548 			es.sources[i].connect(es.connections[i], &onSourceEvent!i, ret);
549 	}
550 	return ret;
551 }
552 
553 ///
554 unittest {
555 	ObservableSource!int oint;
556 	ObservableSource!string ostr;
557 
558 	auto observer = merge(oint, ostr)
559 		.subscribe();
560 
561 	oint.put(1);
562 	ostr.put("foo");
563 	oint.put(2);
564 	oint.put(3);
565 	oint.close();
566 	ostr.put("bar");
567 	ostr.close();
568 
569 	assert(observer.consumeOne.value!int == 1);
570 	assert(observer.consumeOne.value!string == "foo");
571 	assert(observer.consumeOne.value!int == 2);
572 	assert(observer.consumeOne.value!int == 3);
573 	assert(observer.consumeOne.value!string == "bar");
574 	assert(observer.empty);
575 }
576 
577 private struct ObserverNamedTypes(TYPES...)
578 {
579 	static foreach (i, T; TYPES)
580 		mixin("TYPES["~i.stringof~"] observer"~i.stringof~";");
581 }
582 
583 
584 /** Records the time at which each event was emitted.
585 
586 	Returns an observable that emits `TimestampedEvent` events with the
587 	`MonoTime` timestamp of the original occurrence of each event.
588 */
589 auto timestamp(O)(ref O source)
590 {
591 	import core.time : MonoTime;
592 	alias T = ObservableType!O;
593 	return source.map!(val => TimestampedEvent!T(val, MonoTime.currTime()));
594 }
595 
596 struct TimestampedEvent(T)
597 {
598 	import core.time : MonoTime;
599 	T event;
600 	MonoTime timestamp;
601 }
602 
603 
604 /** Forwards the events emitted by an observable with a fixed delay.
605 */
606 auto delay(O)(ref O source, Duration delay)
607 	if (isObservable!O)
608 {
609 	import vibe.core.core : runTask, sleep;
610 	import core.time : MonoTime;
611 
612 	alias T = ObservableType!O;
613 
614 	ObservableSource!T delayed;
615 
616 	runTask(function (O source, ObservableSource!T delayed, Duration delay) nothrow {
617 		scope (exit) delayed.close();
618 
619 		foreach (val; source.timestamp.subscribe) {
620 			auto tt = val.timestamp + delay;
621 			auto tm = MonoTime.currTime();
622 			if (tm < tt) {
623 				try sleep(tt - tm);
624 				catch (Exception e) {
625 					logException(e, "Sleep in observer delay() got interrupted unexpectedly");
626 					break;
627 				}
628 			}
629 			delayed.put(val.event);
630 		}
631 	}, source, delayed, delay);
632 
633 	return delayed;
634 }
635 
636 ///
637 unittest {
638 	import core.time : msecs;
639 	import std.algorithm.iteration : each;
640 	import std.datetime.stopwatch : StopWatch;
641 	import vibe.core.core : runTask, sleep;
642 	import vibe.core.log : logInfo;
643 
644 	ObservableSource!int source;
645 	StopWatch sw;
646 
647 	auto t1 = runTask({
648 		source
649 			.subscribe()
650 			.each!((evt) {
651 				logInfo("%3d ms, immediate: %s", sw.peek.total!"msecs", evt);
652 			});
653 	});
654 
655 	auto t2 = runTask({
656 		source
657 			.delay(50.msecs)
658 			.subscribe()
659 			.each!((evt) {
660 				logInfo("%3d ms, delayed: %s", sw.peek.total!"msecs", evt);
661 			});
662 	});
663 
664 	// quick and dirty wait to let the tasks subscribe
665 	sleep(10.msecs);
666 
667 	sw.start();
668 
669 	source.put(1);
670 	source.put(2);
671 	sleep(100.msecs);
672 	source.put(3);
673 	source.close();
674 
675 	t1.join();
676 	t2.join();
677 
678 	// expected output:
679 	//   0 ms, immediate: 1
680 	//   0 ms, immediate: 2
681 	//  50 ms, delayed: 1
682 	//  50 ms, delayed: 2
683 	// 100 ms, immediate: 3
684 	// 150 ms, delayed: 3
685 }