Source code for utils.observable

[docs]class Observable(object): """Textbook observer pattern""" def __init__(self): self.observers = set()
[docs] def enroll(self, function): self.observers.add(function)
[docs] def dispatch(self, *args, **kwargs): for f in self.observers: f(*args, **kwargs)
[docs]def forward(observable1, observable2): """Forwards messages from an observable to another with the same signature""" def forwarder(*args, **kwargs): observable2.dispatch(*args, **kwargs) observable1.enroll(forwarder)