;; Multicast-Channels / Exchanges (define-record-type :exchange (really-make-exchange request-channel port-channel) exchange? (request-channel exchange-request-channel) (port-channel exchange-port-channel)) (define-record-type :port (make-port async-channel) port? (async-channel port-async-channel)) (define-record-type :send-request (make-send-request message) send-request? (message send-request-message)) (define-record-type :new-port-request (make-new-port-request) new-port-request?) (define the-new-port-request (make-new-port-request)) (define (make-exchange) (let ((request-channel (make-channel)) (port-channel (make-channel))) (let ((make-port (lambda (out-proc) (let ((async-channel (make-async-channel)) (in-channel (make-channel))) (spawn (lambda () (let tee () (let ((message (receive in-channel))) (send-async async-channel message) (out-proc message) (tee))))) (values (lambda (message) (send in-channel message)) (make-port async-channel)))))) (spawn (lambda () (let server ((out-proc (lambda (ignore) 'ignore))) (let ((request (receive request-channel))) (cond ((new-port-request? request) (call-with-values (lambda () (make-port out-proc)) (lambda (out-proc port) (send port-channel port) (server out-proc)))) ((send-request? request) (out-proc (send-request-message request)) (server out-proc))))))) (really-make-exchange request-channel port-channel)))) (define (exchange-send exchange message) (send (exchange-request-channel exchange) (make-send-request message))) (define (make-exchange-port exchange) (send (exchange-request-channel exchange) the-new-port-request) (receive (exchange-port-channel exchange))) (define (port-receive-rv port) (receive-async-rv (port-async-channel port)))