1 /** 2 Signals/slots implementation. 3 4 See `Signal` for usage examples. 5 6 Copyright: Copyright © 2007-2021 Sönke Ludwig 7 Authors: Sönke Ludwig 8 */ 9 module observable.signal; 10 11 import core.sync.mutex; 12 import core.thread; 13 import std.algorithm; 14 import std.traits : isInstanceOf; 15 import vibe.core.core; 16 import vibe.core.concurrency; 17 18 19 /** A thread local signal-slot implementation. 20 */ 21 struct Signal(P...) { 22 alias Params = P; 23 alias Slot = void delegate(P) nothrow; 24 25 private { 26 SignalSocket!Params m_socket; 27 } 28 29 @disable this(this); 30 this(ref return scope Signal rhs) 31 { 32 m_socket = rhs.m_socket; 33 } 34 35 /// Determines if any connections are connected to the signal. 36 @property bool empty() const { return m_socket.empty; } 37 38 /// Returns a reference to the socket of this signal. 39 @property ref SignalSocket!Params socket() { return m_socket; } 40 41 /// Detaches all connections from the signal. 42 void disconnectAll() { m_socket.disconnectAll(); } 43 44 /** Emits the signal. 45 46 This will iterate through all connections in the order in which they 47 were registered and calls the associated slot with the given parameters. 48 49 Note that it is legal to disconnect any single connection of the signal 50 from within a callback, as well as calling `disconnectAll`. However, 51 disconnecting multiple connections from a callback may result in some 52 outstanding callbacks to get skipped. 53 */ 54 void emit(P params) nothrow { m_socket.emit(params); } 55 } 56 57 /// 58 @safe nothrow unittest { 59 // declare a signal 60 Signal!int signal; 61 62 { 63 // define a connection 64 SignalConnection conn; 65 66 // establish a connection between the signal and a delegate 67 signal.socket.connect(conn, (i) { assert(i == 42); }); 68 69 // emitting the signal will call the above delegate 70 signal.emit(42); 71 } 72 73 // once there are no copies of the connection left, it will 74 // automatically disconnect, so the following emit will 75 // not have an effect. 76 signal.emit(13); 77 78 79 // if a class connects to a member function, it must store the connection, 80 // so that the connection life time is limited to the instance lifetime. 81 class Test { 82 SignalConnection conn; 83 84 this() 85 @safe nothrow { 86 signal.socket.connect(conn, &slot); 87 } 88 89 void slot(int i) 90 @safe nothrow { 91 assert(i == 32); 92 } 93 } 94 95 auto t = new Test; 96 signal.emit(32); 97 } 98 99 /// This example shots the recommended convention for defining signals. 100 unittest { 101 class Widget { 102 private { 103 Signal!bool m_visibilityChangeSignal; 104 } 105 106 @property ref SignalSocket!bool visibilityChangeSignal() { return m_visibilityChangeSignal.socket; } 107 108 void setVisibility(bool v) 109 { 110 // ... 111 m_visibilityChangeSignal.emit(v); 112 } 113 } 114 115 class Layout { 116 private { 117 SignalConnection m_visConn; 118 Widget m_client; 119 } 120 121 void setClient(Widget w) 122 { 123 m_client = w; 124 // automatically disconnects a possible connection to a previous widget 125 w.visibilityChangeSignal.connect(m_visConn, &onChildVisibilityChange); 126 } 127 128 void onChildVisibilityChange(bool) 129 nothrow { 130 // ... 131 } 132 } 133 134 auto l = new Layout; 135 auto w = new Widget; 136 l.setClient(w); 137 w.setVisibility(true); 138 } 139 140 // connecting @system callbacks and using @system parameters should be @system 141 unittest { 142 Signal!int sig; 143 struct C { void opCall(int) @safe nothrow {} } 144 struct CS { void opCall(int) @system nothrow {} } 145 struct US { this(this) @system nothrow {} } 146 147 SignalConnection c; 148 static assert(__traits(compiles, () @safe { sig.socket.connect(c, (i) @safe {}); })); 149 static assert(!__traits(compiles, () @safe { sig.socket.connect(c, (i) @system {}); })); 150 static assert(__traits(compiles, () @safe { C clbl; sig.socket.connect(c, clbl); })); 151 static assert(!__traits(compiles, () @safe { CS clbl; sig.socket.connect(c, clbl); })); 152 () @safe { sig.emit(0); } (); 153 154 Signal!US usig; 155 static assert(__traits(compiles, () @system { usig.socket.connect(c, (i) @safe {}); })); 156 static assert(!__traits(compiles, () @safe { usig.socket.connect(c, (i) @safe {}); })); 157 static assert(__traits(compiles, (ref US p) @system { usig.emit(p); })); 158 static assert(!__traits(compiles, (ref US p) @safe { usig.emit(p); })); 159 } 160 161 // test disconnectAll from within a callback 162 nothrow unittest { 163 Signal!() sig; 164 bool visited = false; 165 void foo() { sig.disconnectAll(); visited = true; } 166 void bar() { assert(false); } 167 SignalConnectionContainer c; 168 sig.socket.connect(c, &foo); 169 sig.socket.connect(c, &bar); 170 sig.emit(); 171 assert(visited); 172 } 173 174 // test disconnecting the current connection from within a callback 175 unittest { 176 Signal!() sig; 177 SignalConnection fconn, bconn; 178 int visited = 0; 179 void foo() { fconn.disconnect(); visited++; } 180 void bar() { visited++; } 181 sig.socket.connect(fconn, &foo); 182 sig.socket.connect(bconn, &bar); 183 sig.emit(); 184 assert(visited == 2); 185 } 186 187 // test fixed parameters 188 unittest { 189 Signal!(int) sig; 190 SignalConnection conn; 191 bool emitted = false; 192 void foo(int i, string s) { assert(i == 42 && s == "foo"); emitted = true; } 193 sig.socket.connect(conn, &foo, "foo"); 194 sig.emit(42); 195 assert(emitted); 196 emitted = false; 197 conn.disconnect(); 198 sig.emit(13); 199 assert(!emitted); 200 } 201 202 // test copyability 203 unittest { 204 Signal!int orig; 205 Signal!int copy = orig; 206 207 assert(orig.m_socket.m_pivot !is null); 208 assert(copy.m_socket.m_pivot !is null); 209 assert(orig.m_socket.m_pivot is copy.m_socket.m_pivot); 210 211 bool got_it = false; 212 213 SignalConnection c; 214 orig.socket.connect(c, (i) { 215 assert(i == 42); 216 got_it = true; 217 }); 218 219 copy.emit(42); 220 221 assert(got_it); 222 } 223 224 225 /** Represents a single connection between a signal and a slot. 226 227 This type is reference counted. When the reference count drops to zero, 228 the connection will be detached automatically. 229 */ 230 struct SignalConnection { 231 private ConnectionHead m_ctx; 232 233 private this(ConnectionHead h) @safe nothrow { m_ctx = h; retain(); } 234 235 this(this) @safe nothrow { retain(); } 236 237 ~this() @safe nothrow { release(); } 238 239 @property bool connected() const @safe nothrow { return m_ctx !is null && m_ctx.prev !is null; } 240 241 void disconnect() 242 @safe nothrow { 243 if (!m_ctx) return; 244 assert(m_ctx.rc > 0, "Stale connection reference"); 245 246 if (this.connected) { 247 // remove from queue 248 m_ctx.prev.next = m_ctx.next; 249 m_ctx.next.prev = m_ctx.prev; 250 m_ctx.prev = null; 251 m_ctx.next = null; 252 } 253 254 if (--m_ctx.rc == 0) // free context 255 m_ctx.dispose(); 256 257 m_ctx = null; 258 } 259 260 private CallableConnectionHead!(Socket, Callable, FixedArgs) setCallable(Socket, Callable, FixedArgs...)(ref Callable callable, ref FixedArgs fixed_args) 261 { 262 release(); 263 264 auto cch = CallableConnectionHead!(Socket, Callable, FixedArgs).make(callable, fixed_args); 265 m_ctx = cch; 266 return cch; 267 } 268 269 private void retain() 270 @safe nothrow { 271 if (m_ctx) m_ctx.rc++; 272 } 273 274 private void release() 275 @safe nothrow { 276 if (m_ctx) { 277 assert(m_ctx.rc > 0, "Stale connection reference"); 278 if (m_ctx.rc == 1) { 279 disconnect(); 280 assert(!m_ctx); 281 } else { 282 --m_ctx.rc; 283 m_ctx = null; 284 } 285 } 286 } 287 } 288 289 /// 290 unittest { 291 Signal!() sig; 292 SignalConnection conn; 293 size_t cnt = 0; 294 295 void slot() { cnt++; } 296 297 // disconnecting using a disconnect() call 298 sig.socket.connect(conn, &slot); 299 sig.emit(); 300 assert(cnt == 1); 301 conn.disconnect(); 302 assert(!conn.connected); 303 sig.emit(); 304 assert(cnt == 1); 305 306 // disconnecting by destroying/overwriting the connection object 307 sig.socket.connect(conn, &slot); 308 sig.emit(); 309 assert(cnt == 2); 310 conn = SignalConnection.init; 311 assert(!conn.connected); 312 sig.emit(); 313 assert(cnt == 2); 314 315 // disconnecting through the signal 316 sig.socket.connect(conn, &slot); 317 sig.emit(); 318 assert(cnt == 3); 319 sig.disconnectAll(); 320 assert(!conn.connected); 321 sig.emit(); 322 assert(cnt == 3); 323 324 // disconnecting by destroying the signal 325 sig.socket.connect(conn, &slot); 326 sig.emit(); 327 assert(cnt == 4); 328 destroy(sig); 329 assert(!conn.connected); 330 sig.emit(); 331 assert(cnt == 4); 332 } 333 334 unittest { 335 Signal!() sig; 336 SignalConnection conn, conn2; 337 size_t cnt = 0; 338 339 void slot() { cnt++; } 340 341 sig.socket.connect(conn, &slot); 342 assert(conn.connected && !conn2.connected); 343 sig.emit(); 344 assert(cnt == 1); 345 conn2 = conn; 346 assert(conn.connected && conn2.connected); 347 sig.emit(); 348 assert(cnt == 2); 349 conn.disconnect(); 350 assert(!conn.connected && !conn2.connected); 351 sig.emit(); 352 assert(cnt == 2); 353 } 354 355 unittest { 356 Signal!() sig; 357 SignalConnection conn, conn2; 358 size_t cnt = 0; 359 360 void slot() { cnt++; } 361 362 sig.socket.connect(conn, &slot); 363 assert(conn.connected && !conn2.connected); 364 sig.emit(); 365 assert(cnt == 1); 366 conn2 = conn; 367 assert(conn.connected && conn2.connected); 368 sig.emit(); 369 assert(cnt == 2); 370 destroy(conn); 371 assert(!conn.connected && conn2.connected); 372 sig.emit(); 373 assert(cnt == 3); 374 destroy(conn2); 375 assert(!conn.connected && !conn2.connected); 376 sig.emit(); 377 assert(cnt == 3); 378 } 379 380 /** A container for multiple connections that share the same life time. 381 */ 382 struct SignalConnectionContainer { 383 import std.container.array : Array; 384 385 private { 386 SignalConnection[4] m_smallConnections; 387 size_t m_smallConnectionCount; 388 Array!SignalConnection m_connections; 389 } 390 391 ~this() 392 @safe nothrow { 393 clear(); 394 } 395 396 void add(SignalConnection conn) 397 @safe nothrow { 398 if (m_smallConnectionCount < m_smallConnections.length) 399 m_smallConnections[m_smallConnectionCount++] = conn; 400 else { 401 () @trusted { m_connections ~= conn; } (); 402 } 403 } 404 405 void clear() 406 @safe nothrow { 407 () @trusted { m_connections.clear(); } (); 408 m_smallConnections[0 .. m_smallConnectionCount] = SignalConnection.init; 409 m_smallConnectionCount = 0; 410 } 411 } 412 413 unittest { 414 SignalConnectionContainer c; 415 Signal!() sig; 416 size_t cnt = 0; 417 foreach (i; 0 .. 10) sig.socket.connect(c, { cnt++; }); 418 assert(cnt == 0); 419 sig.emit(); 420 assert(cnt == 10); 421 c.clear(); 422 sig.emit(); 423 assert(cnt == 10); 424 } 425 426 unittest { 427 SignalConnectionContainer c; 428 Signal!() sig; 429 size_t cnt = 0; 430 foreach (i; 0 .. 10) sig.socket.connect(c, { cnt++; }); 431 assert(cnt == 0); 432 sig.emit(); 433 assert(cnt == 10); 434 c = SignalConnectionContainer.init; 435 sig.emit(); 436 assert(cnt == 10); 437 } 438 439 440 /** Signal side endpoint used to connect to slots. 441 */ 442 static struct SignalSocket(PARAMS...) { 443 alias Params = PARAMS; 444 445 // NOTE: the int fixed parameter of Pivot is used to store the reference 446 // count for enabling copyability of SignalSocket 447 private static struct DummyCalllable { void opCall(Params, int) { assert(false, "Signal pivot invoked!?"); } } 448 private alias Pivot = CallableConnectionHead!(SignalSocket, DummyCalllable, int); 449 450 private { 451 Pivot m_pivot; 452 } 453 454 @disable this(this); 455 this(ref return scope SignalSocket rhs) 456 { 457 if (!rhs.m_pivot) rhs.initializePivot(); 458 459 m_pivot = rhs.m_pivot; 460 m_pivot.fixedParams[0]++; 461 } 462 463 ~this() 464 { 465 if (m_pivot) { 466 if (!--m_pivot.fixedParams[0]) { 467 disconnectAll(); 468 assert(m_pivot.next is m_pivot && m_pivot.prev is m_pivot); 469 m_pivot.dispose(); 470 } 471 } 472 } 473 474 void connect(ref SignalConnection c, void delegate(Params) @system nothrow callable) 475 { 476 push(c.setCallable!SignalSocket(callable)); 477 } 478 479 void connect(ref SignalConnectionContainer cc, void delegate(Params) @system nothrow callable) 480 { 481 SignalConnection c; 482 connect(c, callable); 483 cc.add(c); 484 } 485 486 void connect(ref SignalConnection c, void delegate(Params) @safe nothrow callable) 487 { 488 push(c.setCallable!SignalSocket(callable)); 489 } 490 491 void connect(ref SignalConnectionContainer cc, void delegate(Params) @safe nothrow callable) 492 { 493 SignalConnection c; 494 connect(c, callable); 495 cc.add(c); 496 } 497 498 void connect(Callable, FixedParams...)(ref SignalConnection c, Callable callable, FixedParams fixed_args) 499 if ( 500 !is(Callable : void delegate(Params)) 501 && is(typeof(Callable.init(Params.init, FixedParams.init))) 502 && __traits(compiles, () nothrow { Callable.init(Params.init, FixedParams.init); }) 503 ) 504 { 505 push(c.setCallable!SignalSocket(callable, fixed_args)); 506 } 507 508 void connect(Callable, FixedParams...)(ref SignalConnectionContainer cc, Callable callable, FixedParams fixed_args) 509 if ( 510 !is(Callable : void delegate(Params)) 511 && is(typeof(Callable.init(Params.init, FixedParams.init))) 512 && __traits(compiles, () nothrow { Callable.init(Params.init, FixedParams.init); }) 513 ) 514 { 515 SignalConnection c; 516 connect(c, callable, fixed_args); 517 cc.add(c); 518 } 519 520 deprecated("Use a nothrow callable.") 521 { 522 void connect(ref SignalConnection c, void delegate(Params) callable) 523 { 524 connect(c, ¬hrowWrap!(typeof(callable)), callable); 525 } 526 527 void connect(ref SignalConnectionContainer cc, void delegate(Params) callable) 528 { 529 connect(cc, ¬hrowWrap!(typeof(callable)), callable); 530 } 531 532 void connect(Callable, FixedParams...)(ref SignalConnection c, Callable callable, FixedParams fixed_args) 533 if ( 534 !is(Callable : void delegate(Params)) 535 && is(typeof(Callable.init(Params.init, FixedParams.init))) 536 && !__traits(compiles, () nothrow { Callable.init(Params.init, FixedParams.init); }) 537 ) 538 { 539 connect(c, ¬hrowWrap!(Callable, FixedParams), callable, fixed_args); 540 } 541 542 void connect(Callable, FixedParams...)(ref SignalConnectionContainer cc, Callable callable, FixedParams fixed_args) 543 if ( 544 !is(Callable : void delegate(Params)) 545 && is(typeof(Callable.init(Params.init, FixedParams.init))) 546 && !__traits(compiles, () nothrow { Callable.init(Params.init, FixedParams.init); }) 547 ) 548 { 549 connect(cc, ¬hrowWrap!(Callable, FixedParams), callable, fixed_args); 550 } 551 552 private static void nothrowWrap(C, FP...)(Params args0, C callable, FP args1) 553 { 554 try callable(args0, args1); 555 catch (Exception e) { 556 import vibe.core.log : logDebug, logError; 557 logError("Signal callback has thrown - ignoring: %s", e.msg); 558 logDebug("Full error: %s", e.toString()); 559 } 560 } 561 } 562 563 private @property bool empty() const { return !m_pivot || m_pivot.next is m_pivot; } 564 565 private void push(TypedConnectionHead!Params ctx) 566 { 567 if (!m_pivot) initializePivot(); 568 569 // enqueue as the last element 570 ctx.prev = m_pivot.prev; 571 ctx.next = m_pivot; 572 m_pivot.prev.next = ctx; 573 m_pivot.prev = ctx; 574 } 575 576 private void disconnectAll() 577 { 578 if (!m_pivot) return; 579 580 while (m_pivot.next !is m_pivot) 581 SignalConnection(m_pivot.next).disconnect(); 582 583 assert(!m_pivot || (m_pivot.next is m_pivot && m_pivot.prev is m_pivot)); 584 } 585 586 private void initializePivot() 587 { 588 assert(!m_pivot); 589 590 DummyCalllable nf; 591 int refcount = 1; 592 m_pivot = Pivot.make(nf, refcount); 593 m_pivot.prev = m_pivot.next = m_pivot; 594 } 595 596 private void emit(ref Params params) 597 nothrow { 598 if (!m_pivot || m_pivot is m_pivot.next) return; 599 600 // make emit @system in case Params cannot be copied/destroyed @safely 601 static if (!__traits(compiles, () @safe { Params p; auto q = p; })) 602 systemFun(); 603 604 // NOTE using SignalConnection to ensure that the ConnectionHeads don't 605 // get destroyed while iterating over the list 606 auto el = SignalConnection(m_pivot.next); 607 do { 608 SignalConnection elnext; 609 if (el.m_ctx.next !is m_pivot) 610 elnext = SignalConnection(el.m_ctx.next); 611 612 (cast(TypedConnectionHead!Params)el.m_ctx).call(params); 613 614 // go to the next best element if possible 615 if (el.m_ctx.next is m_pivot) break; 616 if (el.connected) el = SignalConnection(el.m_ctx.next); 617 else el = elnext; 618 } while (el.connected); 619 } 620 } 621 622 private class ConnectionHead { 623 ConnectionHead prev, next; 624 int rc = 1; 625 626 abstract void dispose() @safe nothrow; 627 } 628 629 private class TypedConnectionHead(P...) : ConnectionHead { 630 abstract void call(ref P params) @safe nothrow; 631 } 632 633 private final class CallableConnectionHead(S, C, FP...) : TypedConnectionHead!(S.Params) { 634 import vibe.internal.allocator : Mallocator, make, dispose; 635 import std.traits : hasIndirections; 636 import core.memory : GC; 637 638 private static struct Payload { C callable; FP fixedParams; } 639 640 C callable; 641 FP fixedParams; 642 643 this(ref C c, ref FP fp) 644 { 645 static if (is(typeof(move(c)))) 646 this.callable = move(c); 647 else this.callable = c; 648 649 static foreach (i; 0 .. FP.length) { 650 static if (is(typeof(move(fp[i])))) 651 this.fixedParams[i] = move(fp[i]); 652 else this.fixedParams[i] = fp[i]; 653 } 654 } 655 656 override void call(ref S.Params params) 657 @trusted { 658 callable(params, fixedParams); 659 } 660 661 static CallableConnectionHead make(ref C c, ref FP fp) 662 { 663 // force this to be @system in case c() is @system, because the emit() 664 // API is always @safe 665 static if (!__traits(compiles, () @safe { S.Params p1; FP p2; c(p1, p2); })) { 666 systemFun(); 667 } 668 669 // NOTE: using the GC is mandatory, so that when a class stores the 670 // connection to its own member function, it isn't kept alive 671 // indefinitely due to the manually allocated GC range that keeps a 672 // reference 673 auto ret = new CallableConnectionHead(c, fp); 674 assert(ret.rc == 1); 675 return ret; 676 } 677 678 final override void dispose() 679 @trusted nothrow { 680 import core.memory : GC; 681 scope (failure) assert(false); 682 auto thiscopy = this; 683 destroy(thiscopy); 684 } 685 } 686 687 688 /** A cross-thread signal struct with multiple endpoint dispatch. 689 690 Multiple slots can be connected using connect. Each of the 691 connected slots will be called when emit is called with the arguments passed 692 to each slot. The order of slot invocation is undefined. 693 694 Slots will always be called asynchronously in the same thread in which they 695 were registered. Thus, SharedSignal is thread-safe, even if non-thread-safe 696 delegates are used as slots. However, the argument types passed to emit() 697 must be weakly shared. 698 699 Examples: 700 --- 701 shared(SharedSignal!int) signal; 702 703 void setup() 704 { 705 auto mainthr = Thread.getThis(); 706 707 signal = new shared(SharedSignal!int); 708 signal.connect((value){ 709 assert(Thread.getThis() is mainthr); 710 writefln("Value: %d", value); 711 } 712 713 auto thr = new Thread({ signal.emit(42); }); 714 thr.run(); 715 } 716 --- 717 718 See_Also: vibe.core.concurrency.isWeaklyIsolated 719 */ 720 class SharedSignal(P...) { 721 /// 722 public alias shared(void delegate(P)) Slot; 723 724 private static struct ThreadSlots { 725 Thread thread; 726 Task task; 727 Slot[] slots; 728 } 729 730 private { 731 Mutex m_mutex; 732 ThreadSlots[] m_threads; 733 } 734 735 this() 736 { 737 m_mutex = new Mutex; 738 } 739 740 /// 741 SignalConnection connect(Slot slot) 742 shared { 743 auto thr = Thread.getThis(); 744 synchronized (m_mutex) return (cast()this).connectSync(thr, slot); 745 } 746 747 /// 748 void disconnect(Slot slot) 749 shared nothrow { 750 auto thr = Thread.getThis(); 751 try synchronized (m_mutex) (cast()this).disconnectSync(thr, slot); 752 catch (Exception e) assert(false, e.msg); 753 } 754 755 /// 756 bool isSlotConnected(Slot slot) 757 shared { 758 auto thr = Thread.getThis(); 759 synchronized (m_mutex) return (cast()this).isSlotConnectedSync(thr, slot); 760 } 761 762 void emit(P params) 763 shared { 764 Task[16] taskbuf; 765 Task[] tasks; 766 synchronized (m_mutex) tasks = (cast()this).collectTasksSync(taskbuf); 767 768 foreach (t; tasks) 769 send(t, params); 770 } 771 772 private SignalConnection connectSync(Thread thr, Slot slot) 773 { 774 //auto ret = SignalConnection({ (cast(shared)this).disconnect(slot); }); 775 SignalConnection ret; // FIXME: provide a functional connection! 776 777 foreach (ref ts; m_threads) 778 if (ts.thread is thr) { 779 assert(!ts.slots.canFind(slot), "Double-connecting slot."); 780 auto idx = ts.slots.countUntil(cast(Slot)null); 781 if (idx >= 0) ts.slots[idx] = slot; 782 else ts.slots ~= slot; 783 return ret; 784 } 785 786 ThreadSlots ts; 787 ts.thread = thr; 788 ts.task = runTask(&emitterTask); 789 ts.slots = [slot]; 790 m_threads ~= ts; 791 792 return ret; 793 } 794 795 private void disconnectSync(Thread thr, Slot slot) 796 { 797 foreach (ref ts; m_threads) 798 if (ts.thread is thr) { 799 auto idx = ts.slots.countUntil(slot); 800 assert(idx >= 0, "Removing unconnected slot."); 801 ts.slots[idx] = null; 802 } 803 } 804 805 private bool isSlotConnectedSync(Thread thr, Slot slot) 806 { 807 foreach (ref ts; m_threads) 808 if (ts.thread is thr) 809 return ts.slots.canFind(slot); 810 return false; 811 } 812 813 private Task[] collectTasksSync(Task[] buffer) 814 { 815 if (m_threads.length <= buffer.length) { 816 foreach (i, ref ts; m_threads) 817 buffer[i] = ts.task; 818 return buffer[0 .. m_threads.length]; 819 } 820 821 auto tasks = new Task[m_threads.length]; 822 foreach (i, ref ts; m_threads) 823 tasks[i] = ts.task; 824 return tasks; 825 } 826 827 private void emitterTask() 828 { 829 auto thr = Thread.getThis(); 830 831 while(true) { 832 receive((P params){ 833 Slot[] slots; 834 synchronized (m_mutex) { 835 foreach (ref ts; m_threads) 836 if (ts.thread is thr) { 837 slots = ts.slots; 838 break; 839 } 840 } 841 foreach (s; slots) 842 if (s) s(params); 843 }); 844 } 845 } 846 } 847 848 849 private void systemFun() @system nothrow {}