1 /** Provides a channel/range style observer framework. 2 3 Observables provide a similar functionality to `Signal`, but they are 4 composable and provide a sequential subscriber API. Their API is also 5 `nothrow` for the most part, so that the subscriber side cannot interfere 6 with the side that emits events. 7 8 Copyright: Copyright © 2020-2021 Sönke Ludwig 9 Authors: Sönke Ludwig 10 */ 11 module observable.observable; 12 13 import observable.signal : Signal, SignalConnection; 14 15 import core.time : Duration; 16 import std.meta : allSatisfy, staticMap; 17 import std.typecons : RefCounted, RefCountedAutoInitialize, refCounted; 18 import taggedalgebraic.taggedunion; 19 import vibe.core.log : logException; 20 import vibe.core.sync : LocalManualEvent, createManualEvent; 21 import vibe.internal.array : FixedRingBuffer; 22 23 24 /// 25 unittest { 26 import vibe.core.core : runTask, sleep; 27 import core.time : msecs; 28 29 ObservableSource!int source; 30 31 auto t1 = runTask({ 32 try { 33 auto observer = source 34 .subscribe(); 35 36 assert(observer.consumeOne == 1); 37 assert(observer.consumeOne == 2); 38 assert(observer.consumeOne == 3); 39 assert(observer.empty); 40 } catch (Exception e) assert(false, e.toString); 41 }); 42 43 auto t2 = runTask({ 44 try { 45 auto observer = source 46 .map!(i => i * 2) 47 .subscribe(); 48 49 assert(observer.consumeOne == 2); 50 assert(observer.consumeOne == 4); 51 assert(observer.consumeOne == 6); 52 assert(observer.empty); 53 } catch (Exception e) assert(false, e.msg); 54 }); 55 56 // quick and dirty wait to let the tasks subscribe 57 sleep(10.msecs); 58 59 source.put(1); 60 source.put(2); 61 source.put(3); 62 source.close(); 63 64 t1.join(); 65 t2.join(); 66 } 67 68 // basic operation should be nothrow 69 nothrow unittest { 70 ObservableSource!int source; 71 auto observer = source.subscribe(); 72 int r; 73 source.put(1); 74 source.close(); 75 assert(observer.tryConsumeOne(r)); 76 assert(r == 1); 77 assert(!observer.tryConsumeOne(r)); 78 assert(source.closed); 79 assert(observer.empty); 80 } 81 82 83 /** Creates an observer for the specified observable. 84 85 An observer returns the events emitted by the observable in a sequential, 86 blocking manner. Usually, a separate task is used to drain the observer to 87 ensure sequential processing with no concurrency/race-conditions. 88 */ 89 Observer!(ObservableType!O) subscribe(O)(O observable) 90 if (isObservable!O) 91 { 92 assert (!observable.closed, "Subscribing to closed observable."); 93 Observer!(ObservableType!O) ret; 94 ret.initialize(observable); 95 return ret; 96 } 97 98 99 /** Determines whether a type qualifies as an observable. 100 101 An observable must have the following members: 102 103 $(UL 104 $(LI `Event`: the type of the events emitted by the observer - this must 105 be an instance of `ObservedEvent`) 106 $(LI `connect(C, A...)(ref SignalConnection, C callable, A args): Used 107 by observers and operators to get notfied of new events emitted by 108 an observer) 109 $(LI `closed`: A boolean property that signifies whether the observable 110 has been closed (cannot emit any more events)) 111 ) 112 */ 113 enum isObservable(O) = 114 is(O.Event) 115 && __traits(compiles, (SignalConnection c) { O.init 116 .connect(c, delegate(O.Event itm, int, int, int) {}, 1, 2, 3); }) 117 && is(typeof(O.init.closed) == bool); 118 119 static assert(isObservable!(Observable!int)); 120 static assert(isObservable!(ObservableSource!int)); 121 122 /** Extracts the event tyoe of an observable. 123 */ 124 alias ObservableType(O) = typeof(O.Event.init.eventValue); 125 126 127 /** The fundamental source for generating observable events. 128 129 This is the type used to manually generate events using the `put` method. 130 The events can be observed using the `Observable` returned by the 131 `observable` property. 132 */ 133 struct ObservableSource(T, EXTRA_STORAGE = void) { 134 private { 135 Observable!(T, EXTRA_STORAGE) m_observable; 136 } 137 138 alias Event = Observable!(T, EXTRA_STORAGE).Event; 139 140 @disable this(this); 141 this(ref return scope ObservableSource rhs) { this.tupleof = rhs.tupleof; } 142 143 @property ref inout(Observable!(T, EXTRA_STORAGE)) observable() inout { return m_observable; } 144 145 alias observable this; 146 147 /** Closes the observable. 148 149 In the closed state, `put` may no longer be called and all subscribed 150 observers will receive a notification that no more events will be 151 emitted. 152 */ 153 void close() { m_observable.close(); } 154 155 /** Emits an event. 156 157 All subscribed observers will receive a copy of the event. 158 */ 159 void put(T event) { m_observable.put(event); } 160 161 static if (!is(EXTRA_STORAGE == void)) { 162 @property ref EXTRA_STORAGE extraStorage() { return m_observable.extraStorage; } 163 } 164 } 165 166 /** A basic observable. 167 168 This is the observable provided by `ObservableSource` and some of the 169 observer modifier functions. Instead of `ObservableSource` an `Observable` 170 should usually be made accessible by external code, in order to avoid the 171 possibility of injecting events from the outside. 172 */ 173 struct Observable(T, EXTRA_STORAGE = void) 174 { 175 alias Event = ObservedEvent!T; 176 177 private static struct Payload { 178 Signal!Event signal; 179 bool closed; 180 static if (!is(EXTRA_STORAGE == void)) 181 EXTRA_STORAGE extraStorage; 182 183 @disable this(this); 184 this(ref return scope Payload rhs) { this.tupleof = rhs.tupleof; } 185 186 ~this() 187 { 188 close(); 189 } 190 191 void close() 192 { 193 if (this.closed) return; 194 195 this.closed = true; 196 this.signal.emit(Event.close()); 197 } 198 } 199 200 alias PayloadRef = RefCounted!(Payload, RefCountedAutoInitialize.no); 201 202 private { 203 PayloadRef m_payload; 204 } 205 206 @disable this(this); 207 this(ref return scope Observable rhs) 208 { 209 rhs.initialize(); 210 m_payload = rhs.m_payload; 211 } 212 213 @property bool closed() const { return m_payload.refCountedStore.isInitialized && m_payload.closed; } 214 215 void connect(C, ARGS...)(ref SignalConnection connection, C callable, ARGS args) 216 if (is(typeof(callable(Event.init, args)))) 217 { 218 static assert(__traits(compiles, () nothrow { callable(Event.init, args); }), 219 "Observable connection callback must be nothrow."); 220 221 initialize(); 222 m_payload.signal.socket.connect(connection, callable, args); 223 } 224 225 static if (!is(EXTRA_STORAGE == void)) { 226 private @property ref EXTRA_STORAGE extraStorage() 227 { 228 initialize(); 229 return m_payload.extraStorage; 230 } 231 } 232 233 private void close() 234 { 235 initialize(); 236 m_payload.close(); 237 } 238 239 private void put(T event) 240 { 241 assert(!this.closed, "Calling Observable.put() on a closed observable."); 242 243 initialize(); 244 m_payload.signal.emit(Event.event(event)); 245 } 246 247 private void initialize() 248 { 249 m_payload.refCountedStore.ensureInitialized(); 250 } 251 } 252 253 template ObservedEvent(T) 254 { 255 import std.meta : AliasSeq; 256 257 enum has_nothrow_postblit = __traits(compiles, () nothrow { 258 T a = T.init; 259 T b = T.init; 260 a = b; 261 }); 262 263 static if (has_nothrow_postblit) alias A = AliasSeq!(forceNothrowPostblit); 264 else alias A = AliasSeq!(); 265 266 @A 267 struct U { 268 T event; 269 Void close; 270 } 271 272 alias ObservedEvent = TaggedUnion!U; 273 } 274 275 276 /** An observer tied to a specific observable. 277 278 This type provides a channel and an input range interface to enable 279 sequential consumption of events emitted by an observer. 280 */ 281 struct Observer(T) 282 { 283 private static struct Payload { 284 FixedRingBuffer!T buffer; 285 bool closed; 286 LocalManualEvent event; 287 SignalConnection conn; 288 289 void put(ObservedEvent!T itm) 290 { 291 import std.algorithm.comparison : max; 292 293 if (closed) return; 294 295 if (itm.isClose) closed = true; 296 else { 297 if (buffer.full) 298 buffer.capacity = max(16, buffer.capacity * 3 / 2); 299 buffer.put(itm.eventValue); 300 } 301 302 event.emit(); 303 } 304 } 305 306 private alias Event = ObservedEvent!T; 307 private alias PayloadRef = RefCounted!(Payload, RefCountedAutoInitialize.no); 308 309 private { 310 PayloadRef m_payload; 311 } 312 313 /** Determines whether there are any events left. 314 315 Note that this property needs to wait until either at least one event 316 is available, or until the observer was closed. 317 */ 318 @property bool empty() 319 { 320 auto ec = m_payload.event.emitCount; 321 322 while (m_payload.buffer.empty) { 323 if (m_payload.closed) return true; 324 ec = m_payload.event.waitUninterruptible(ec); 325 } 326 327 return false; 328 } 329 330 /** Determines whether any more events are currently pending. 331 332 This property is `true` *iff* an event is immediately available, i.e. 333 that `front` yields a value without waiting. Note that `empty` is 334 guaranteed to return `false` in this case. 335 */ 336 @property bool pending() 337 const { 338 return m_payload.buffer.length >= 1; 339 } 340 341 /** Input range `front` property. 342 343 The caller must make sure that there is actually still an element 344 available before invoking this property, either by evaluating `empty`, 345 or by external knowledge. Failing to do so will result in an abnormal 346 program termination. 347 */ 348 @property ref T front() 349 { 350 if (this.empty) assert(false, "Calling .front on an empty subscriber"); 351 return m_payload.buffer.front; 352 } 353 354 /** Input range `popFront` method. 355 356 The caller must make sure that there is actually still an element 357 available before invoking this method, either by evaluating `empty`, 358 or by external knowledge. Failing to do so will result in an abnormal 359 program termination. 360 */ 361 void popFront() 362 { 363 if (this.empty) assert(false, "Calling .popFront on an empty subscriber"); 364 m_payload.buffer.popFront(); 365 } 366 367 /** Reads a single event. 368 369 Throws: If the observer is closed and all events have been read, 370 `ObserverClosedException` will be thrown. 371 */ 372 T consumeOne() 373 { 374 T ret; 375 if (!tryConsumeOne(ret)) 376 throw new ObserverClosedException; 377 return ret; 378 } 379 380 /** Attempts to read a single event. 381 382 Returns: `true` is returned if an event was successfully read. Otherwise 383 a value of `false` is returned, which means that the observable 384 was closed and no more events are available. 385 */ 386 bool tryConsumeOne(ref T dst) 387 { 388 import std.algorithm.mutation : swap; 389 390 if (this.empty) return false; 391 392 swap(dst, m_payload.buffer.front); 393 m_payload.buffer.popFront(); 394 return true; 395 } 396 397 private void initialize(O)(ref O observable) 398 { 399 assert(!m_payload.refCountedStore.isInitialized, "Double-initializing observer!?"); 400 m_payload.refCountedStore.ensureInitialized(); 401 m_payload.event = createManualEvent(); 402 observable.connect(m_payload.conn, &m_payload.put); 403 } 404 } 405 406 unittest { 407 ObservableSource!int o; 408 auto obs = o.subscribe(); 409 assert(!obs.pending); 410 o.put(1); 411 assert(obs.front == 1); 412 assert(obs.pending); 413 assert(!obs.empty); 414 o.put(2); 415 assert(obs.pending); 416 obs.popFront(); 417 assert(obs.front == 2); 418 assert(obs.pending); 419 assert(!obs.empty); 420 obs.popFront(); 421 assert(!obs.pending); 422 o.close(); 423 assert(obs.empty); 424 } 425 426 import std.range : isInputRange; 427 static assert(isInputRange!(Observer!int)); 428 429 430 /** Thrown by `Observer.consumeOne` in case there are no more events left. 431 */ 432 class ObserverClosedException : Exception { 433 this(string file = __FILE__, size_t line = __LINE__) 434 { 435 super("Attempt to consume event from closed observer.", file, line); 436 } 437 } 438 439 440 /** Applies a transformation on the events emitted by an observer. 441 442 The return value is an observer that emits the transformed events. 443 */ 444 auto map(alias fun, O)(O source) 445 if (isObservable!O) 446 { 447 alias T = ObservableType!O; 448 alias TM = typeof(fun(T.init)); 449 450 static struct OM { 451 private O source; 452 453 alias Event = ObservedEvent!TM; 454 455 @property bool closed() const { return source.closed; } 456 457 void connect(C, ARGS...)(ref SignalConnection connection, C callable, ARGS args) 458 { 459 source.connect(connection, function(ObservedEvent!T val, C callable, ARGS args) { 460 final switch (val.kind) with (ObservedEvent!T.Kind) { 461 case close: callable(ObservedEvent!TM.close(), args); break; 462 case event: callable(ObservedEvent!TM.event(fun(val.eventValue)), args); break; 463 } 464 }, callable, args); 465 } 466 } 467 468 static assert(isObservable!OM); 469 470 return OM(source); 471 } 472 473 /// 474 unittest { 475 ObservableSource!int source; 476 auto observer = source 477 .map!(i => 2 * i) 478 .subscribe(); 479 480 source.put(1); 481 source.put(2); 482 source.put(3); 483 484 assert(observer.consumeOne == 2); 485 assert(observer.consumeOne == 4); 486 assert(observer.consumeOne == 6); 487 } 488 489 490 /** Combines multiple observers into one. 491 492 The events of the input observables will be stored in a `TaggedUnion` and 493 will be emitted by the combined observable interleaved in the same order as 494 they occurred. 495 496 The values of the tagged union can be accessed using `evt.observer0Value`, 497 `evt.observer1Value` etc., or by type using `evt.value!T`, with `T` being 498 the event type of one or more of the source observers. 499 */ 500 auto merge(OBSERVERS...)(OBSERVERS observers) 501 if (allSatisfy!(isObservable, OBSERVERS)) 502 { 503 import std.algorithm.mutation : swap; 504 import std.algorithm.searching : all; 505 506 alias RawTypes = staticMap!(ObservableType, OBSERVERS); 507 alias Types = ObserverNamedTypes!RawTypes; 508 alias TM = TaggedUnion!Types; 509 510 static struct ES { 511 OBSERVERS sources; 512 SignalConnection[OBSERVERS.length] connections; 513 514 @disable this(this); 515 this(ref return scope ES rhs) { this.tupleof = rhs.tupleof; } 516 } 517 518 static void onSourceEvent(size_t i)(ObservedEvent!(RawTypes[i]) val, 519 ObservableSource!(TM, ES) target) 520 nothrow { 521 final switch (val.kind) with (ObservedEvent!(RawTypes[i]).Kind) { 522 case close: 523 scope ES* es = () @trusted { return &target.extraStorage(); } (); 524 assert(es.sources[i].closed, "Observer sent close, but is not closed!?"); 525 assert(!target.closed, "An observer was closed after the merged observer is already closed!?"); 526 527 es.connections[i].disconnect(); 528 529 bool all_closed = true; 530 static foreach (i; 0 .. OBSERVERS.length) 531 if (!es.sources[i].closed) 532 all_closed = false; 533 if (all_closed) target.close(); 534 break; 535 case event: 536 auto et = mixin("TM.observer"~i.stringof~"(val.eventValue)"); 537 target.put(et); 538 break; 539 } 540 } 541 542 ObservableSource!(TM, ES) ret; 543 scope ES* es = () @trusted { return &ret.extraStorage(); } (); 544 foreach (i, O; OBSERVERS) { 545 swap(es.sources[i], observers[i]); 546 547 if (!es.sources[i].closed) 548 es.sources[i].connect(es.connections[i], &onSourceEvent!i, ret); 549 } 550 return ret; 551 } 552 553 /// 554 unittest { 555 ObservableSource!int oint; 556 ObservableSource!string ostr; 557 558 auto observer = merge(oint, ostr) 559 .subscribe(); 560 561 oint.put(1); 562 ostr.put("foo"); 563 oint.put(2); 564 oint.put(3); 565 oint.close(); 566 ostr.put("bar"); 567 ostr.close(); 568 569 assert(observer.consumeOne.value!int == 1); 570 assert(observer.consumeOne.value!string == "foo"); 571 assert(observer.consumeOne.value!int == 2); 572 assert(observer.consumeOne.value!int == 3); 573 assert(observer.consumeOne.value!string == "bar"); 574 assert(observer.empty); 575 } 576 577 private struct ObserverNamedTypes(TYPES...) 578 { 579 static foreach (i, T; TYPES) 580 mixin("TYPES["~i.stringof~"] observer"~i.stringof~";"); 581 } 582 583 584 /** Records the time at which each event was emitted. 585 586 Returns an observable that emits `TimestampedEvent` events with the 587 `MonoTime` timestamp of the original occurrence of each event. 588 */ 589 auto timestamp(O)(ref O source) 590 { 591 import core.time : MonoTime; 592 alias T = ObservableType!O; 593 return source.map!(val => TimestampedEvent!T(val, MonoTime.currTime())); 594 } 595 596 struct TimestampedEvent(T) 597 { 598 import core.time : MonoTime; 599 T event; 600 MonoTime timestamp; 601 } 602 603 604 /** Forwards the events emitted by an observable with a fixed delay. 605 */ 606 auto delay(O)(ref O source, Duration delay) 607 if (isObservable!O) 608 { 609 import vibe.core.core : runTask, sleep; 610 import core.time : MonoTime; 611 612 alias T = ObservableType!O; 613 614 ObservableSource!T delayed; 615 616 runTask(function (O source, ObservableSource!T delayed, Duration delay) nothrow { 617 scope (exit) delayed.close(); 618 619 foreach (val; source.timestamp.subscribe) { 620 auto tt = val.timestamp + delay; 621 auto tm = MonoTime.currTime(); 622 if (tm < tt) { 623 try sleep(tt - tm); 624 catch (Exception e) { 625 logException(e, "Sleep in observer delay() got interrupted unexpectedly"); 626 break; 627 } 628 } 629 delayed.put(val.event); 630 } 631 }, source, delayed, delay); 632 633 return delayed; 634 } 635 636 /// 637 unittest { 638 import core.time : msecs; 639 import std.algorithm.iteration : each; 640 import std.datetime.stopwatch : StopWatch; 641 import vibe.core.core : runTask, sleep; 642 import vibe.core.log : logInfo; 643 644 ObservableSource!int source; 645 StopWatch sw; 646 647 auto t1 = runTask({ 648 source 649 .subscribe() 650 .each!((evt) { 651 logInfo("%3d ms, immediate: %s", sw.peek.total!"msecs", evt); 652 }); 653 }); 654 655 auto t2 = runTask({ 656 source 657 .delay(50.msecs) 658 .subscribe() 659 .each!((evt) { 660 logInfo("%3d ms, delayed: %s", sw.peek.total!"msecs", evt); 661 }); 662 }); 663 664 // quick and dirty wait to let the tasks subscribe 665 sleep(10.msecs); 666 667 sw.start(); 668 669 source.put(1); 670 source.put(2); 671 sleep(100.msecs); 672 source.put(3); 673 source.close(); 674 675 t1.join(); 676 t2.join(); 677 678 // expected output: 679 // 0 ms, immediate: 1 680 // 0 ms, immediate: 2 681 // 50 ms, delayed: 1 682 // 50 ms, delayed: 2 683 // 100 ms, immediate: 3 684 // 150 ms, delayed: 3 685 }