1 /**
2 	Signals/slots implementation.
3 
4 	See `Signal` for usage examples.
5 
6 	Copyright: Copyright © 2007-2021 Sönke Ludwig
7 	Authors: Sönke Ludwig
8 */
9 module observable.signal;
10 
11 import core.sync.mutex;
12 import core.thread;
13 import std.algorithm;
14 import std.traits : isInstanceOf;
15 import vibe.core.core;
16 import vibe.core.concurrency;
17 
18 
19 /** A thread local signal-slot implementation.
20 */
21 struct Signal(P...) {
22 	alias Params = P;
23 	alias Slot = void delegate(P) nothrow;
24 
25 	private {
26 		SignalSocket!Params m_socket;
27 	}
28 
29 	@disable this(this);
30 	this(ref return scope Signal rhs)
31 	{
32 		m_socket = rhs.m_socket;
33 	}
34 
35 	/// Determines if any connections are connected to the signal.
36 	@property bool empty() const { return m_socket.empty; }
37 
38 	/// Returns a reference to the socket of this signal.
39 	@property ref SignalSocket!Params socket() { return m_socket; }
40 
41 	/// Detaches all connections from the signal.
42 	void disconnectAll() { m_socket.disconnectAll(); }
43 
44 	/** Emits the signal.
45 
46 		This will iterate through all connections in the order in which they
47 		were registered and calls the associated slot with the given parameters.
48 
49 		Note that it is legal to disconnect any single connection of the signal
50 		from within a callback, as well as calling `disconnectAll`. However,
51 		disconnecting multiple connections from a callback may result in some
52 		outstanding callbacks to get skipped.
53 	*/
54 	void emit(P params) nothrow { m_socket.emit(params); }
55 }
56 
57 ///
58 @safe nothrow unittest {
59 	// declare a signal
60 	Signal!int signal;
61 
62 	{
63 		// define a connection
64 		SignalConnection conn;
65 
66 		// establish a connection between the signal and a delegate
67 		signal.socket.connect(conn, (i) { assert(i == 42); });
68 
69 		// emitting the signal will call the above delegate
70 		signal.emit(42);
71 	}
72 
73 	// once there are no copies of the connection left, it will
74 	// automatically disconnect, so the following emit will
75 	// not have an effect.
76 	signal.emit(13);
77 
78 
79 	// if a class connects to a member function, it must store the connection,
80 	// so that the connection life time is limited to the instance lifetime.
81 	class Test {
82 		SignalConnection conn;
83 
84 		this()
85 		@safe nothrow {
86 			signal.socket.connect(conn, &slot);
87 		}
88 
89 		void slot(int i)
90 		@safe nothrow {
91 			assert(i == 32);
92 		}
93 	}
94 
95 	auto t = new Test;
96 	signal.emit(32);
97 }
98 
99 /// This example shots the recommended convention for defining signals.
100 unittest {
101 	class Widget {
102 		private {
103 			Signal!bool m_visibilityChangeSignal;
104 		}
105 
106 		@property ref SignalSocket!bool visibilityChangeSignal() { return m_visibilityChangeSignal.socket; }
107 
108 		void setVisibility(bool v)
109 		{
110 			// ...
111 			m_visibilityChangeSignal.emit(v);
112 		}
113 	}
114 
115 	class Layout {
116 		private {
117 			SignalConnection m_visConn;
118 			Widget m_client;
119 		}
120 
121 		void setClient(Widget w)
122 		{
123 			m_client = w;
124 			// automatically disconnects a possible connection to a previous widget
125 			w.visibilityChangeSignal.connect(m_visConn, &onChildVisibilityChange);
126 		}
127 
128 		void onChildVisibilityChange(bool)
129 		nothrow {
130 			// ...
131 		}
132 	}
133 
134 	auto l = new Layout;
135 	auto w = new Widget;
136 	l.setClient(w);
137 	w.setVisibility(true);
138 }
139 
140 // connecting @system callbacks and using @system parameters should be @system
141 unittest {
142 	Signal!int sig;
143 	struct C { void opCall(int) @safe nothrow {} }
144 	struct CS { void opCall(int) @system nothrow {} }
145 	struct US { this(this) @system nothrow {} }
146 
147 	SignalConnection c;
148 	static assert(__traits(compiles, () @safe { sig.socket.connect(c, (i) @safe {}); }));
149 	static assert(!__traits(compiles, () @safe { sig.socket.connect(c, (i) @system {}); }));
150 	static assert(__traits(compiles, () @safe { C clbl; sig.socket.connect(c, clbl); }));
151 	static assert(!__traits(compiles, () @safe { CS clbl; sig.socket.connect(c, clbl); }));
152 	() @safe { sig.emit(0); } ();
153 
154 	Signal!US usig;
155 	static assert(__traits(compiles, () @system { usig.socket.connect(c, (i) @safe {}); }));
156 	static assert(!__traits(compiles, () @safe { usig.socket.connect(c, (i) @safe {}); }));
157 	static assert(__traits(compiles, (ref US p) @system { usig.emit(p); }));
158 	static assert(!__traits(compiles, (ref US p) @safe { usig.emit(p); }));
159 }
160 
161 // test disconnectAll from within a callback
162 nothrow unittest {
163 	Signal!() sig;
164 	bool visited = false;
165 	void foo() { sig.disconnectAll(); visited = true; }
166 	void bar() { assert(false); }
167 	SignalConnectionContainer c;
168 	sig.socket.connect(c, &foo);
169 	sig.socket.connect(c, &bar);
170 	sig.emit();
171 	assert(visited);
172 }
173 
174 // test disconnecting the current connection from within a callback
175 unittest {
176 	Signal!() sig;
177 	SignalConnection fconn, bconn;
178 	int visited = 0;
179 	void foo() { fconn.disconnect(); visited++; }
180 	void bar() { visited++; }
181 	sig.socket.connect(fconn, &foo);
182 	sig.socket.connect(bconn, &bar);
183 	sig.emit();
184 	assert(visited == 2);
185 }
186 
187 // test fixed parameters
188 unittest {
189 	Signal!(int) sig;
190 	SignalConnection conn;
191 	bool emitted = false;
192 	void foo(int i, string s) { assert(i == 42 && s == "foo"); emitted = true; }
193 	sig.socket.connect(conn, &foo, "foo");
194 	sig.emit(42);
195 	assert(emitted);
196 	emitted = false;
197 	conn.disconnect();
198 	sig.emit(13);
199 	assert(!emitted);
200 }
201 
202 // test copyability
203 unittest {
204 	Signal!int orig;
205 	Signal!int copy = orig;
206 
207 	assert(orig.m_socket.m_pivot !is null);
208 	assert(copy.m_socket.m_pivot !is null);
209 	assert(orig.m_socket.m_pivot is copy.m_socket.m_pivot);
210 
211 	bool got_it = false;
212 
213 	SignalConnection c;
214 	orig.socket.connect(c, (i) {
215 		assert(i == 42);
216 		got_it = true;
217 	});
218 
219 	copy.emit(42);
220 
221 	assert(got_it);
222 }
223 
224 
225 /** Represents a single connection between a signal and a slot.
226 
227 	This type is reference counted. When the reference count drops to zero,
228 	the connection will be detached automatically.
229 */
230 struct SignalConnection {
231 	private ConnectionHead m_ctx;
232 
233 	private this(ConnectionHead h) @safe nothrow { m_ctx = h; retain(); }
234 
235 	this(this) @safe nothrow { retain(); }
236 
237 	~this() @safe nothrow { release(); }
238 
239 	@property bool connected() const @safe nothrow { return m_ctx !is null && m_ctx.prev !is null; }
240 
241 	void disconnect()
242 	@safe nothrow {
243 		if (!m_ctx) return;
244 		assert(m_ctx.rc > 0, "Stale connection reference");
245 
246 		if (this.connected) {
247 			// remove from queue
248 			m_ctx.prev.next = m_ctx.next;
249 			m_ctx.next.prev = m_ctx.prev;
250 			m_ctx.prev = null;
251 			m_ctx.next = null;
252 		}
253 
254 		if (--m_ctx.rc == 0) // free context
255 			m_ctx.dispose();
256 
257 		m_ctx = null;
258 	}
259 
260 	private CallableConnectionHead!(Socket, Callable, FixedArgs) setCallable(Socket, Callable, FixedArgs...)(ref Callable callable, ref FixedArgs fixed_args)
261 	{
262 		release();
263 
264 		auto cch = CallableConnectionHead!(Socket, Callable, FixedArgs).make(callable, fixed_args);
265 		m_ctx = cch;
266 		return cch;
267 	}
268 
269 	private void retain()
270 	@safe nothrow {
271 		if (m_ctx) m_ctx.rc++;
272 	}
273 
274 	private void release()
275 	@safe nothrow {
276 		if (m_ctx) {
277 			assert(m_ctx.rc > 0, "Stale connection reference");
278 			if (m_ctx.rc == 1) {
279 				disconnect();
280 				assert(!m_ctx);
281 			} else {
282 				--m_ctx.rc;
283 				m_ctx = null;
284 			}
285 		}
286 	}
287 }
288 
289 ///
290 unittest {
291 	Signal!() sig;
292 	SignalConnection conn;
293 	size_t cnt = 0;
294 
295 	void slot() { cnt++; }
296 
297 	// disconnecting using a disconnect() call
298 	sig.socket.connect(conn, &slot);
299 	sig.emit();
300 	assert(cnt == 1);
301 	conn.disconnect();
302 	assert(!conn.connected);
303 	sig.emit();
304 	assert(cnt == 1);
305 
306 	// disconnecting by destroying/overwriting the connection object
307 	sig.socket.connect(conn, &slot);
308 	sig.emit();
309 	assert(cnt == 2);
310 	conn = SignalConnection.init;
311 	assert(!conn.connected);
312 	sig.emit();
313 	assert(cnt == 2);
314 
315 	// disconnecting through the signal
316 	sig.socket.connect(conn, &slot);
317 	sig.emit();
318 	assert(cnt == 3);
319 	sig.disconnectAll();
320 	assert(!conn.connected);
321 	sig.emit();
322 	assert(cnt == 3);
323 
324 	// disconnecting by destroying the signal
325 	sig.socket.connect(conn, &slot);
326 	sig.emit();
327 	assert(cnt == 4);
328 	destroy(sig);
329 	assert(!conn.connected);
330 	sig.emit();
331 	assert(cnt == 4);
332 }
333 
334 unittest {
335 	Signal!() sig;
336 	SignalConnection conn, conn2;
337 	size_t cnt = 0;
338 
339 	void slot() { cnt++; }
340 
341 	sig.socket.connect(conn, &slot);
342 	assert(conn.connected && !conn2.connected);
343 	sig.emit();
344 	assert(cnt == 1);
345 	conn2 = conn;
346 	assert(conn.connected && conn2.connected);
347 	sig.emit();
348 	assert(cnt == 2);
349 	conn.disconnect();
350 	assert(!conn.connected && !conn2.connected);
351 	sig.emit();
352 	assert(cnt == 2);
353 }
354 
355 unittest {
356 	Signal!() sig;
357 	SignalConnection conn, conn2;
358 	size_t cnt = 0;
359 
360 	void slot() { cnt++; }
361 
362 	sig.socket.connect(conn, &slot);
363 	assert(conn.connected && !conn2.connected);
364 	sig.emit();
365 	assert(cnt == 1);
366 	conn2 = conn;
367 	assert(conn.connected && conn2.connected);
368 	sig.emit();
369 	assert(cnt == 2);
370 	destroy(conn);
371 	assert(!conn.connected && conn2.connected);
372 	sig.emit();
373 	assert(cnt == 3);
374 	destroy(conn2);
375 	assert(!conn.connected && !conn2.connected);
376 	sig.emit();
377 	assert(cnt == 3);
378 }
379 
380 /** A container for multiple connections that share the same life time.
381 */
382 struct SignalConnectionContainer {
383 	import std.container.array : Array;
384 
385 	private {
386 		SignalConnection[4] m_smallConnections;
387 		size_t m_smallConnectionCount;
388 		Array!SignalConnection m_connections;
389 	}
390 
391 	~this()
392 	@safe nothrow {
393 		clear();
394 	}
395 
396 	void add(SignalConnection conn)
397 	@safe nothrow {
398 		if (m_smallConnectionCount < m_smallConnections.length)
399 			m_smallConnections[m_smallConnectionCount++] = conn;
400 		else {
401 			() @trusted { m_connections ~= conn; } ();
402 		}
403 	}
404 
405 	void clear()
406 	@safe nothrow {
407 		() @trusted { m_connections.clear(); } ();
408 		m_smallConnections[0 .. m_smallConnectionCount] = SignalConnection.init;
409 		m_smallConnectionCount = 0;
410 	}
411 }
412 
413 unittest {
414 	SignalConnectionContainer c;
415 	Signal!() sig;
416 	size_t cnt = 0;
417 	foreach (i; 0 .. 10) sig.socket.connect(c, { cnt++; });
418 	assert(cnt == 0);
419 	sig.emit();
420 	assert(cnt == 10);
421 	c.clear();
422 	sig.emit();
423 	assert(cnt == 10);
424 }
425 
426 unittest {
427 	SignalConnectionContainer c;
428 	Signal!() sig;
429 	size_t cnt = 0;
430 	foreach (i; 0 .. 10) sig.socket.connect(c, { cnt++; });
431 	assert(cnt == 0);
432 	sig.emit();
433 	assert(cnt == 10);
434 	c = SignalConnectionContainer.init;
435 	sig.emit();
436 	assert(cnt == 10);
437 }
438 
439 
440 /** Signal side endpoint used to connect to slots.
441 */
442 static struct SignalSocket(PARAMS...) {
443 	alias Params = PARAMS;
444 
445 	// NOTE: the int fixed parameter of Pivot is used to store the reference
446 	//       count for enabling copyability of SignalSocket
447 	private static struct DummyCalllable { void opCall(Params, int) { assert(false, "Signal pivot invoked!?"); } }
448 	private alias Pivot = CallableConnectionHead!(SignalSocket, DummyCalllable, int);
449 
450 	private {
451 		Pivot m_pivot;
452 	}
453 
454 	@disable this(this);
455 	this(ref return scope SignalSocket rhs)
456 	{
457 		if (!rhs.m_pivot) rhs.initializePivot();
458 
459 		m_pivot = rhs.m_pivot;
460 		m_pivot.fixedParams[0]++;
461 	}
462 
463 	~this()
464 	{
465 		if (m_pivot) {
466 			if (!--m_pivot.fixedParams[0]) {
467 				disconnectAll();
468 				assert(m_pivot.next is m_pivot && m_pivot.prev is m_pivot);
469 				m_pivot.dispose();
470 			}
471 		}
472 	}
473 
474 	void connect(ref SignalConnection c, void delegate(Params) @system nothrow callable)
475 	{
476 		push(c.setCallable!SignalSocket(callable));
477 	}
478 
479 	void connect(ref SignalConnectionContainer cc, void delegate(Params) @system nothrow callable)
480 	{
481 		SignalConnection c;
482 		connect(c, callable);
483 		cc.add(c);
484 	}
485 
486 	void connect(ref SignalConnection c, void delegate(Params) @safe nothrow callable)
487 	{
488 		push(c.setCallable!SignalSocket(callable));
489 	}
490 
491 	void connect(ref SignalConnectionContainer cc, void delegate(Params) @safe nothrow callable)
492 	{
493 		SignalConnection c;
494 		connect(c, callable);
495 		cc.add(c);
496 	}
497 
498 	void connect(Callable, FixedParams...)(ref SignalConnection c, Callable callable, FixedParams fixed_args)
499 		if (
500 			!is(Callable : void delegate(Params))
501 			&& is(typeof(Callable.init(Params.init, FixedParams.init)))
502 			&& __traits(compiles, () nothrow { Callable.init(Params.init, FixedParams.init); })
503 		)
504 	{
505 		push(c.setCallable!SignalSocket(callable, fixed_args));
506 	}
507 
508 	void connect(Callable, FixedParams...)(ref SignalConnectionContainer cc, Callable callable, FixedParams fixed_args)
509 		if (
510 			!is(Callable : void delegate(Params))
511 			&& is(typeof(Callable.init(Params.init, FixedParams.init)))
512 			&& __traits(compiles, () nothrow { Callable.init(Params.init, FixedParams.init); })
513 		)
514 	{
515 		SignalConnection c;
516 		connect(c, callable, fixed_args);
517 		cc.add(c);
518 	}
519 
520 	deprecated("Use a nothrow callable.")
521 	{
522 		void connect(ref SignalConnection c, void delegate(Params) callable)
523 		{
524 			connect(c, &nothrowWrap!(typeof(callable)), callable);
525 		}
526 
527 		void connect(ref SignalConnectionContainer cc, void delegate(Params) callable)
528 		{
529 			connect(cc, &nothrowWrap!(typeof(callable)), callable);
530 		}
531 
532 		void connect(Callable, FixedParams...)(ref SignalConnection c, Callable callable, FixedParams fixed_args)
533 			if (
534 				!is(Callable : void delegate(Params))
535 				&& is(typeof(Callable.init(Params.init, FixedParams.init)))
536 				&& !__traits(compiles, () nothrow { Callable.init(Params.init, FixedParams.init); })
537 			)
538 		{
539 			connect(c, &nothrowWrap!(Callable, FixedParams), callable, fixed_args);
540 		}
541 
542 		void connect(Callable, FixedParams...)(ref SignalConnectionContainer cc, Callable callable, FixedParams fixed_args)
543 			if (
544 				!is(Callable : void delegate(Params))
545 				&& is(typeof(Callable.init(Params.init, FixedParams.init)))
546 				&& !__traits(compiles, () nothrow { Callable.init(Params.init, FixedParams.init); })
547 			)
548 		{
549 			connect(cc, &nothrowWrap!(Callable, FixedParams), callable, fixed_args);
550 		}
551 
552 		private static void nothrowWrap(C, FP...)(Params args0, C callable, FP args1)
553 		{
554 			try callable(args0, args1);
555 			catch (Exception e) {
556 				import vibe.core.log : logDebug, logError;
557 				logError("Signal callback has thrown - ignoring: %s", e.msg);
558 				logDebug("Full error: %s", e.toString());
559 			}
560 		}
561 	}
562 
563 	private @property bool empty() const { return !m_pivot || m_pivot.next is m_pivot; }
564 
565 	private void push(TypedConnectionHead!Params ctx)
566 	{
567 		if (!m_pivot) initializePivot();
568 
569 		// enqueue as the last element
570 		ctx.prev = m_pivot.prev;
571 		ctx.next = m_pivot;
572 		m_pivot.prev.next = ctx;
573 		m_pivot.prev = ctx;
574 	}
575 
576 	private void disconnectAll()
577 	{
578 		if (!m_pivot) return;
579 
580 		while (m_pivot.next !is m_pivot)
581 			SignalConnection(m_pivot.next).disconnect();
582 
583 		assert(!m_pivot || (m_pivot.next is m_pivot && m_pivot.prev is m_pivot));
584 	}
585 
586 	private void initializePivot()
587 	{
588 		assert(!m_pivot);
589 
590 		DummyCalllable nf;
591 		int refcount = 1;
592 		m_pivot = Pivot.make(nf, refcount);
593 		m_pivot.prev = m_pivot.next = m_pivot;
594 	}
595 
596 	private void emit(ref Params params)
597 	nothrow {
598 		if (!m_pivot || m_pivot is m_pivot.next) return;
599 
600 		// make emit @system in case Params cannot be copied/destroyed @safely
601 		static if (!__traits(compiles, () @safe { Params p; auto q = p; }))
602 			systemFun();
603 
604 		// NOTE using SignalConnection to ensure that the ConnectionHeads don't
605 		// get destroyed while iterating over the list
606 		auto el = SignalConnection(m_pivot.next);
607 		do {
608 			SignalConnection elnext;
609 			if (el.m_ctx.next !is m_pivot)
610 				elnext = SignalConnection(el.m_ctx.next);
611 
612 			(cast(TypedConnectionHead!Params)el.m_ctx).call(params);
613 
614 			// go to the next best element if possible
615 			if (el.m_ctx.next is m_pivot) break;
616 			if (el.connected) el = SignalConnection(el.m_ctx.next);
617 			else el = elnext;
618 		} while (el.connected);
619 	}
620 }
621 
622 private class ConnectionHead {
623 	ConnectionHead prev, next;
624 	int rc = 1;
625 
626 	abstract void dispose() @safe nothrow;
627 }
628 
629 private class TypedConnectionHead(P...) : ConnectionHead {
630 	abstract void call(ref P params) @safe nothrow;
631 }
632 
633 private final class CallableConnectionHead(S, C, FP...) : TypedConnectionHead!(S.Params) {
634 	import vibe.internal.allocator : Mallocator, make, dispose;
635 	import std.traits : hasIndirections;
636 	import core.memory : GC;
637 
638 	private static struct Payload { C callable; FP fixedParams; }
639 
640 	C callable;
641 	FP fixedParams;
642 
643 	this(ref C c, ref FP fp)
644 	{
645 		static if (is(typeof(move(c))))
646 			this.callable = move(c);
647 		else this.callable = c;
648 
649 		static foreach (i; 0 .. FP.length) {
650 			static if (is(typeof(move(fp[i]))))
651 				this.fixedParams[i] = move(fp[i]);
652 			else this.fixedParams[i] = fp[i];
653 		}
654 	}
655 
656 	override void call(ref S.Params params)
657 	@trusted {
658 		callable(params, fixedParams);
659 	}
660 
661 	static CallableConnectionHead make(ref C c, ref FP fp)
662 	{
663 		// force this to be @system in case c() is @system, because the emit()
664 		// API is always @safe
665 		static if (!__traits(compiles, () @safe { S.Params p1; FP p2;  c(p1, p2); })) {
666 			systemFun();
667 		}
668 
669 		// NOTE: using the GC is mandatory, so that when a class stores the
670 		// connection to its own member function, it isn't kept alive
671 		// indefinitely due to the manually allocated GC range that keeps a
672 		// reference
673 		auto ret = new CallableConnectionHead(c, fp);
674 		assert(ret.rc == 1);
675 		return ret;
676 	}
677 
678 	final override void dispose()
679 	@trusted nothrow {
680 		import core.memory : GC;
681 		scope (failure) assert(false);
682 		auto thiscopy = this;
683 		destroy(thiscopy);
684 	}
685 }
686 
687 
688 /** A cross-thread signal struct with multiple endpoint dispatch.
689 
690 	Multiple slots can be connected using connect. Each of the
691 	connected slots will be called when emit is called with the arguments passed
692 	to each slot. The order of slot invocation is undefined.
693 
694 	Slots will always be called asynchronously in the same thread in which they
695 	were registered. Thus, SharedSignal is thread-safe, even if non-thread-safe
696 	delegates are used as slots. However, the argument types passed to emit()
697 	must be weakly shared.
698 
699 	Examples:
700 		---
701 		shared(SharedSignal!int) signal;
702 
703 		void setup()
704 		{
705 			auto mainthr = Thread.getThis();
706 
707 			signal = new shared(SharedSignal!int);
708 			signal.connect((value){
709 				assert(Thread.getThis() is mainthr);
710 				writefln("Value: %d", value);
711 			}
712 
713 			auto thr = new Thread({ signal.emit(42); });
714 			thr.run();
715 		}
716 		---
717 
718 	See_Also: vibe.core.concurrency.isWeaklyIsolated
719 */
720 class SharedSignal(P...) {
721 	///
722 	public alias shared(void delegate(P)) Slot;
723 
724 	private static struct ThreadSlots {
725 		Thread thread;
726 		Task task;
727 		Slot[] slots;
728 	}
729 
730 	private {
731 		Mutex m_mutex;
732 		ThreadSlots[] m_threads;
733 	}
734 
735 	this()
736 	{
737 		m_mutex = new Mutex;
738 	}
739 
740 	///
741 	SignalConnection connect(Slot slot)
742 	shared {
743 		auto thr = Thread.getThis();
744 		synchronized (m_mutex) return (cast()this).connectSync(thr, slot);
745 	}
746 
747 	///
748 	void disconnect(Slot slot)
749 	shared nothrow {
750 		auto thr = Thread.getThis();
751 		try synchronized (m_mutex) (cast()this).disconnectSync(thr, slot);
752 		catch (Exception e) assert(false, e.msg);
753 	}
754 
755 	///
756 	bool isSlotConnected(Slot slot)
757 	shared {
758 		auto thr = Thread.getThis();
759 		synchronized (m_mutex) return (cast()this).isSlotConnectedSync(thr, slot);
760 	}
761 
762 	void emit(P params)
763 	shared {
764 		Task[16] taskbuf;
765 		Task[] tasks;
766 		synchronized (m_mutex) tasks = (cast()this).collectTasksSync(taskbuf);
767 
768 		foreach (t; tasks)
769 			send(t, params);
770 	}
771 
772 	private SignalConnection connectSync(Thread thr, Slot slot)
773 	{
774 		//auto ret = SignalConnection({ (cast(shared)this).disconnect(slot); });
775 		SignalConnection ret; // FIXME: provide a functional connection!
776 
777 		foreach (ref ts; m_threads)
778 			if (ts.thread is thr) {
779 				assert(!ts.slots.canFind(slot), "Double-connecting slot.");
780 				auto idx = ts.slots.countUntil(cast(Slot)null);
781 				if (idx >= 0) ts.slots[idx] = slot;
782 				else ts.slots ~= slot;
783 				return ret;
784 			}
785 
786 		ThreadSlots ts;
787 		ts.thread = thr;
788 		ts.task = runTask(&emitterTask);
789 		ts.slots = [slot];
790 		m_threads ~= ts;
791 
792 		return ret;
793 	}
794 
795 	private void disconnectSync(Thread thr, Slot slot)
796 	{
797 		foreach (ref ts; m_threads)
798 			if (ts.thread is thr) {
799 				auto idx = ts.slots.countUntil(slot);
800 				assert(idx >= 0, "Removing unconnected slot.");
801 				ts.slots[idx] = null;
802 			}
803 	}
804 
805 	private bool isSlotConnectedSync(Thread thr, Slot slot)
806 	{
807 		foreach (ref ts; m_threads)
808 			if (ts.thread is thr)
809 				return ts.slots.canFind(slot);
810 		return false;
811 	}
812 
813 	private Task[] collectTasksSync(Task[] buffer)
814 	{
815 		if (m_threads.length <= buffer.length) {
816 			foreach (i, ref ts; m_threads)
817 				buffer[i] = ts.task;
818 			return buffer[0 .. m_threads.length];
819 		}
820 
821 		auto tasks = new Task[m_threads.length];
822 		foreach (i, ref ts; m_threads)
823 			tasks[i] = ts.task;
824 		return tasks;
825 	}
826 
827 	private void emitterTask()
828 	{
829 		auto thr = Thread.getThis();
830 
831 		while(true) {
832 			receive((P params){
833 				Slot[] slots;
834 				synchronized (m_mutex) {
835 					foreach (ref ts; m_threads)
836 						if (ts.thread is thr) {
837 							slots = ts.slots;
838 							break;
839 						}
840 				}
841 				foreach (s; slots)
842 					if (s) s(params);
843 			});
844 		}
845 	}
846 }
847 
848 
849 private void systemFun() @system nothrow {}