(define-interface async-channels-interface (export make-async-channel send-async receive-async receive-async-selective)) (define-structure async-channels async-channels-interface (open scheme threads locks queues cells big-util srfi-9) (begin (define-record-type :async-channel (really-make-async-channel lock messages receive-waiting) async-channel? (lock async-channel-lock) (messages async-channel-messages) (receive-waiting async-channel-receive-waiting)) (define *async-channel-lock* (make-lock)) (define (make-async-channel) (really-make-async-channel #f (make-queue) (make-queue))) (define (send-async channel message) (obtain-lock *async-channel-lock*) (if (queue-empty? (async-channel-receive-waiting channel)) (begin (enqueue! (async-channel-messages channel) message) (release-lock *async-channel-lock*) (relinquish-timeslice)) (let* ((pair (dequeue! (async-channel-receive-waiting channel))) (waiting-lock (car pair)) (cell (cdr pair))) (cell-set! cell (cons pair message)) ;; hand *async-channel-lock* to the receive operation (release-lock waiting-lock)))) (define (receive-async channel) (obtain-lock *async-channel-lock*) (if (queue-empty? (async-channel-messages channel)) (let ((waiting-lock (make-lock)) (cell (make-cell #f))) (obtain-lock waiting-lock) (enqueue! (async-channel-receive-waiting channel) (cons waiting-lock cell)) (release-lock *async-channel-lock*) (obtain-lock waiting-lock) ;; we have *async-channel-lock* again (release-lock *async-channel-lock*) (cdr (cell-ref cell))) (let ((message (dequeue! (async-channel-messages channel)))) (release-lock *async-channel-lock*) message))) (define (choice-enabled? choice) (let* ((channel (car choice)) (waiting (async-channel-messages channel))) (not (queue-empty? waiting)))) (define (receive-async-selective choices) (obtain-lock *async-channel-lock*) (cond ((any choice-enabled? choices) => (lambda (choice) (let* ((channel (car choice)) (action (cdr choice)) (message (dequeue! (async-channel-messages channel)))) (release-lock *async-channel-lock*) (action message)))) (else (let ((waiting-lock (make-lock)) (cell (make-cell #f))) (obtain-lock waiting-lock) (let ((ids (map (lambda (ignore) (cons waiting-lock cell)) choices))) (for-each (lambda (choice id) (let ((channel (car choice))) (enqueue! (async-channel-receive-waiting channel) id))) choices ids) (release-lock *async-channel-lock*) (obtain-lock waiting-lock) ;; we have *async-channel-lock* again (let ((message (cdr (cell-ref cell))) (id (car (cell-ref cell)))) (let recur ((ids ids) (choices choices)) (cond ((null? ids) (values)) ((eq? id (car ids)) (let* ((choice (car choices)) (action (cdr choice))) (recur (cdr ids) (cdr choices)) (release-lock *async-channel-lock*) (action message))) (else (let* ((choice (car choices)) (channel (car choice))) (delete-from-queue! (async-channel-receive-waiting channel) (car ids)) (recur (cdr ids) (cdr choices)))))))))))) ))