From 43befa3b59dc86b09dc5d69bae0dc1412ae6abc0 Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Sat, 22 Aug 2020 20:28:33 +0200 Subject: [PATCH] tests: multiprocessing: major overhaul Changes include: - consistent naming - replaced unnecessary use of (sleep) by better synchronization mechanisms - tests are run with timeouts - clean up stray threads which would otherwise wait for all eternity - better error messages in case of test failures --- src/tests/normal-tests/multiprocessing.lsp | 1024 +++++++++----------- 1 file changed, 480 insertions(+), 544 deletions(-) diff --git a/src/tests/normal-tests/multiprocessing.lsp b/src/tests/normal-tests/multiprocessing.lsp index 0f3b8506f..d0d356075 100644 --- a/src/tests/normal-tests/multiprocessing.lsp +++ b/src/tests/normal-tests/multiprocessing.lsp @@ -9,42 +9,16 @@ (suite 'mp) - -;; Auxiliary routines for multithreaded tests - -(defun kill-and-wait (process-list &optional original wait) - "Kills a list of processes, which may be the difference between two lists, -waiting for all processes to finish. Currently it has no timeout, meaning -it may block hard the lisp image." - (let ((process-list (set-difference process-list original))) - (when (member mp:*current-process* process-list) - (error "Found myself in the kill list")) - (mapc #'mp:process-kill process-list) - (when wait - (loop for i in process-list - do (mp:process-join i))) - process-list)) - -(defun mp-test-run (closure) - (let* ((all-processes (mp:all-processes)) - (output (multiple-value-list (funcall closure)))) - (sleep 0.2) ; time to exit some processes - (let ((leftovers (kill-and-wait (mp:all-processes) all-processes))) - (cond (leftovers - (format t "~%;;; Stray processes: ~A" leftovers)) - (t - (values-list output)))))) - -(defmacro def-mp-test (name body expected-value) - "Runs some test code and only returns the output when the code exited without -creating stray processes." - (let ((all-processes (gensym)) - (output (gensym)) - (leftover (gensym))) - `(test ,name - (is-equal - (mp-test-run #'(lambda () ,body)) - ,expected-value)))) +;;; Important Note: +;;; +;;; Testing multithreading primitives such as locks or semaphores +;;; often requires synchronizing multiple threads. To keep the tests +;;; as simple as possible, this is synchronization is often done by +;;; using busy waits. As a consequence, test failures may manifest as +;;; timeouts. Some tests for semaphores and barriers also use mutexes +;;; for synchronization purposes and will fail if mutexes don't work +;;; correctly. Nearly all tests also assume that creating, killing or +;;; joining threads works properly, which is not tested separately. ;; Locks @@ -57,13 +31,13 @@ creating stray processes." ;;; When a WITH-LOCK is interrupted, it is not able to release ;;; the resulting lock and an error is signaled. ;;; -(test mp-0001-with-lock - (let ((flag t) - (lock (mp:make-lock :name "mp-0001-with-lock" :recursive nil))) +(test-with-timeout mp.mutex.with-lock + (let ((flag 0) + (lock (mp:make-lock :name "mutex.with-lock" :recursive nil))) (mp:with-lock (lock) (let ((background-process (mp:process-run-function - "mp-0001-with-lock" + "mutex.with-lock" #'(lambda () (handler-case (progn @@ -73,288 +47,54 @@ creating stray processes." (error (c) (princ c)(terpri) (setf flag c))) - (setf flag 2))))) + (setf flag 3))))) ;; The background process should not be able to get ;; the lock, and will simply wait. Now we interrupt it ;; and the process should gracefully quit, without ;; signalling any serious condition - (and (progn (sleep 1) - (is (mp:process-kill background-process))) - (progn (sleep 1) - (is (not (mp:process-active-p background-process)))) - (is (eq flag 1))))))) + (loop until (/= flag 0)) + (sleep 0.1) + (is (mp:process-kill background-process)) + (mp:process-join background-process) + (is (= flag 1)))))) - -;; Semaphores - -;;; Date: 14/04/2012 -;;; Ensure that at creation name and counter are set -(test sem-make-and-counter - (is (loop with name = "sem-make-and-counter" - for count from 0 to 10 - for sem = (mp:make-semaphore :name name :count count) - always (and (eq (mp:semaphore-name sem) name) - (= (mp:semaphore-count sem) count) - (zerop (mp:semaphore-wait-count sem)))))) - -;;; Date: 14/04/2012 -;;; Ensure that signal changes the counter by the specified amount -(test sem-signal-semaphore-count - (is - (loop with name = "sem-signal-semaphore-count" - for count from 0 to 10 - always (loop for delta from 0 to 10 - for sem = (mp:make-semaphore :name name :count count) - always (and (= (mp:semaphore-count sem) count) - (null (mp:signal-semaphore sem delta)) - (= (mp:semaphore-count sem ) (+ count delta))))))) - -;;; Date: 14/04/2012 -;;; A semaphore with a count of zero blocks a process -(def-mp-test sem-signal-one-process - (let* ((flag nil) - (sem (mp:make-semaphore :name "sem-signal-one")) - (a-process (mp:process-run-function - "sem-signal-one-process" - #'(lambda () - (mp:wait-on-semaphore sem) - (setf flag t))))) - (and (null flag) - (mp:process-active-p a-process) - (progn (mp:signal-semaphore sem) (sleep 0.2) flag) - (= (mp:semaphore-count sem) 0))) - t) - -;;; Date: 14/04/2012 -;;; We can signal multiple processes -(test sem-signal-n-processes - (loop for count from 1 upto 10 always - (let* ((counter 0) - (lock (mp:make-lock :name "sem-signal-n-processes")) - (sem (mp:make-semaphore :name "sem-signal-n-processs")) - (all-process - (loop for i from 1 upto count - collect (mp:process-run-function - "sem-signal-n-processes" - #'(lambda () - (mp:wait-on-semaphore sem) - (mp:with-lock (lock) (incf counter))))))) - (sleep 0.1) ; let threads settle on semaphore - (and (is (zerop counter)) - (is (every #'mp:process-active-p all-process)) - (is (= (mp:semaphore-wait-count sem) count) - "Number of threads waitng on semaphore should be ~s (but is ~s)." - count (mp:semaphore-wait-count sem)) - (is (progn (mp:signal-semaphore sem count) - (sleep 0.2) - (= counter count)) - "Counter should be ~s (but is ~s)." count counter) - (is (= (mp:semaphore-count sem) 0)))))) - -;;; Date: 14/04/2012 -;;; When we signal N processes and N+M are waiting, only N awake -(test sem-signal-only-n-processes - (loop for m from 1 upto 3 always - (loop for n from 1 upto 4 always - (let* ((counter 0) - (lock (mp:make-lock :name "sem-signal-n-processes")) - (sem (mp:make-semaphore :name "sem-signal-n-processs")) - (all-process - (loop for i from 1 upto (+ n m) - collect (mp:process-run-function - "sem-signal-n-processes" - #'(lambda () - (mp:wait-on-semaphore sem) - (mp:with-lock (lock) (incf counter))))))) - (sleep 0.1) ; let threads settle on semaphore - (and (is (zerop counter)) - (is (every #'mp:process-active-p all-process)) - (is (= (mp:semaphore-wait-count sem) (+ m n)) - "Number of threads waiting on semaphore should be ~s (but is ~s)." - (+ m n) (mp:semaphore-wait-count sem)) - (is (progn (mp:signal-semaphore sem n) - (sleep 0.02) - (= counter n))) - (is (= (mp:semaphore-wait-count sem) m) - "Number of threads waitng on semaphore should be ~s (but is ~s)." - m (mp:semaphore-wait-count sem)) - (is (progn (mp:signal-semaphore sem m) - (sleep 0.02) - (= counter (+ n m))) - "Counter should be ~s (but is ~s)." (+ n m) counter)))))) - -;;; Date: 14/04/2012 -;;; It is possible to kill processes waiting for a semaphore. -;;; -(def-mp-test sem-interruptible - (loop with sem = (mp:make-semaphore :name "sem-interruptible") - with flag = nil - for count from 1 to 10 - for all-processes = (loop for i from 1 upto count - collect (mp:process-run-function - "sem-interruptible" - #'(lambda () - (mp:wait-on-semaphore sem) - (setf flag t)))) - always (and (progn (sleep 0.2) (null flag)) - (every #'mp:process-active-p all-processes) - (= (mp:semaphore-wait-count sem) count) - (mapc #'mp:process-kill all-processes) - (progn (sleep 0.2) (notany #'mp:process-active-p all-processes)) - (null flag) - (zerop (mp:semaphore-wait-count sem)) - t)) - t) - -;;; Date: 14/04/2012 -;;; When we kill a process, it is removed from the wait queue. -;;; -(def-mp-test sem-interrupt-updates-queue - (let* ((sem (mp:make-semaphore :name "sem-interrupt-updates-queue")) - (process (mp:process-run-function - "sem-interrupt-updates-queue" - #'(lambda () (mp:wait-on-semaphore sem))))) - (sleep 0.2) - (and (= (mp:semaphore-wait-count sem) 1) - (mp:process-active-p process) - (progn (mp:process-kill process) - (sleep 0.2) - (not (mp:process-active-p process))) - (zerop (mp:semaphore-wait-count sem)) - t)) - t) - -;;; Date: 14/04/2012 -;;; When we kill a process, it signals another one. This is tricky, -;;; because we need the awake signal to arrive _after_ the process is -;;; killed, but the process must still be in the queue for the semaphore -;;; to awake it. The way we solve this is by intercepting the kill signal. -;;; -(test sem-interrupted-resignals - (let* ((sem (mp:make-semaphore :name "sem-interrupted-resignals")) - (flag1 nil) - (flag2 nil) - (process1 (mp:process-run-function - "sem-interrupted-resignals" - #'(lambda () - (unwind-protect - (mp:wait-on-semaphore sem) - (sleep 4) - (setf flag1 t) - )))) - (process2 (mp:process-run-function - "sem-interrupted-resignals" - #'(lambda () - (mp:wait-on-semaphore sem) - (setf flag2 t))))) - (sleep 0.2) - (and (is (= (mp:semaphore-wait-count sem) 2)) - (is (mp:process-active-p process1)) - (is (mp:process-active-p process2)) - ;; We kill the process but ensure it is still running - (is (progn (mp:process-kill process1) - (mp:process-active-p process1))) - (is (null flag1)) - ;; ... and in the queue - (is (= (mp:semaphore-wait-count sem) 2)) - ;; We awake it and it should awake the other one - (is (progn (format t "~%;;; Signaling semaphore") - (mp:signal-semaphore sem) - (sleep 1) - (zerop (mp:semaphore-wait-count sem)))) - (is flag2)))) - -;;; Date: 14/04/2012 -;;; 1 producer and N consumers, non-blocking, because the initial count -;;; is larger than the consumed data. -(def-mp-test sem-1-to-n-non-blocking - (loop with counter = 0 - with lock = (mp:make-lock :name "sem-1-to-n-communication") - for n from 1 to 10 - for m = (round 128 n) - for length = (* n m) - for sem = (mp:make-semaphore :name "sem-1-to-n-communication" :count length) - for producers = (progn - (setf counter 0) - (loop for i from 0 below n - collect (mp:process-run-function - "sem-1-to-n-consumer" - #'(lambda () - (loop for i from 0 below m - do (mp:wait-on-semaphore sem) - do (mp:with-lock (lock) (incf counter))))))) - do (mapc #'mp:process-join producers) - always (and (= counter length) - (zerop (mp:semaphore-count sem)) - (zerop (mp:semaphore-wait-count sem)))) - t) - -;;; Date: 14/04/2012 -;;; 1 producer and N consumers, blocking due to a slow producer. -(def-mp-test sem-1-to-n-blocking - (loop with lock = (mp:make-lock :name "sem-1-to-n-communication") - for n from 1 to 10 - for m = (round 10000 n) - for length = (* n m) - for sem = (mp:make-semaphore :name "sem-1-to-n-communication" :count 0) - for counter = 0 - for producers = (loop for i from 0 below n - collect (mp:process-run-function - "sem-1-to-n-consumer" - #'(lambda () - (loop for i from 0 below m - do (mp:wait-on-semaphore sem)) - (mp:with-lock (lock) (incf counter))))) - do (loop for i from 0 below length - do (mp:signal-semaphore sem)) - do (mapc #'mp:process-join producers) - always (and (= counter n) - (zerop (mp:semaphore-count sem)) - (zerop (mp:semaphore-wait-count sem)))) - t) - - -;; Mutexes ;;; Date: 12/04/2012 ;;; Non-recursive mutexes should signal an error when they ;;; cannot be relocked. -(test mutex-001-recursive-error - (is-true - (let* ((mutex (mp:make-lock :name 'mutex-001-recursive-error))) - (and - (mp:get-lock mutex) - (eq (mp:lock-owner mutex) mp:*current-process*) - (handler-case - (progn (mp:get-lock mutex) nil) - (error (c) t)) - (mp:giveup-lock mutex) - (null (mp:lock-owner mutex)) - (zerop (mp:lock-count mutex)))))) +(test-with-timeout mp.mutex.recursive-error + (let* ((mutex (mp:make-lock :name 'mutex-001-recursive-error))) + (is (mp:get-lock mutex)) + (is (eq (mp:lock-owner mutex) mp:*current-process*)) + (signals error (mp:get-lock mutex)) + (is (mp:giveup-lock mutex)) + (is (null (mp:lock-owner mutex))) + (is (zerop (mp:lock-count mutex))))) ;;; Date: 12/04/2012 ;;; Recursive locks increase the counter. -(test mutex-002-recursive-count - (is-true - (let* ((mutex (mp:make-lock :name 'mutex-002-recursive-count :recursive t))) - (and - (loop for i from 1 upto 10 - always (and (mp:get-lock mutex) - (= (mp:lock-count mutex) i) - (eq (mp:lock-owner mutex) mp:*current-process*))) - (loop for i from 9 downto 0 - always (and (eq (mp:lock-owner mutex) mp:*current-process*) - (mp:giveup-lock mutex) - (= (mp:lock-count mutex) i))) - (null (mp:lock-owner mutex)) - (zerop (mp:lock-count mutex)))))) +(test-with-timeout mp.mutex.recursive-count + (let* ((mutex (mp:make-lock :name 'mutex-002-recursive-count :recursive t))) + (is (loop for i from 1 upto 10 + always (and (mp:get-lock mutex) + (= (mp:lock-count mutex) i) + (eq (mp:lock-owner mutex) mp:*current-process*)))) + (is (loop for i from 9 downto 0 + always (and (eq (mp:lock-owner mutex) mp:*current-process*) + (mp:giveup-lock mutex) + (= (mp:lock-count mutex) i)))) + (is (null (mp:lock-owner mutex))) + (is (zerop (mp:lock-count mutex))))) ;;; Date: 12/04/2012 ;;; When multiple threads compete for a mutex, they should ;;; all get the same chance of accessing the resource ;;; -(def-mp-test mutex-003-fairness - (let* ((mutex (mp:make-lock :name 'mutex-001-fairness)) +;;; Disabled since underlying OS functions don't guarantee fairness -- +;;; mg 2020-08-22 +#+(or) +(test-with-timeout mp.mutex.fairness + (let* ((mutex (mp:make-lock :name "mutex.fairness")) (nthreads 10) (count 10) (counter (* nthreads count)) @@ -382,82 +122,296 @@ creating stray processes." (loop for p in all-processes do (mp:process-join p)) (loop for i from 0 below nthreads - always (= (aref array i) count))))) - t) + (is (= (aref array i) count))))))) ;;; Date: 12/04/2012 -;;; It is possible to kill processes waiting for a lock. We launch a lot of -;;; processes, 50% of which are zombies: they acquire the lock and do not -;;; do anything. These processes are then killed, resulting in the others -;;; doing their job. +;;; It is possible to kill processes waiting for a lock. ;;; -(def-mp-test mutex-004-interruptible - (let* ((mutex (mp:make-lock :name "mutex-003-fairness")) - (nprocesses 20) - (counter 0)) - (flet ((normal-thread () - (mp:with-lock (mutex) - (incf counter))) - (zombie-thread () - (mp:with-lock (mutex) - (loop (sleep 10))))) - (let* ((all-processes (loop for i from 0 below nprocesses - for zombie = (zerop (mod i 2)) - for fn = (if zombie #'zombie-thread #'normal-thread) - collect (cons zombie - (mp:process-run-function - "mutex-003-fairness" - fn)))) - (zombies (mapcar #'cdr (remove-if-not #'car all-processes)))) - (and (zerop counter) ; No proces works because the first one is a zombie - (kill-and-wait zombies) - (progn (sleep 0.2) (= counter (/ nprocesses 2))) - (not (mp:lock-owner mutex)) - t)))) - t) +(test-with-timeout mp.mutex.interruptible + (let ((mutex (mp:make-lock :name "mutex.interruptible")) + (flag 0)) + (mp:get-lock mutex) + (let ((sleeper-thread + (mp:process-run-function + "mutex.interruptible" + #'(lambda () + (setf flag 1) + (mp:with-lock (mutex) + (setf flag 2)))))) + (loop until (/= flag 0)) + (sleep 0.1) + (is (mp:process-active-p sleeper-thread)) + (mp:process-kill sleeper-thread) + (mp:process-join sleeper-thread) + (is (= flag 1)) + (is (eq (mp:lock-owner mutex) mp:*current-process*))) + (mp:giveup-lock mutex))) + + +;; Semaphores + +;;; Date: 14/04/2012 +;;; Ensure that at creation name and counter are set +(test mp.sem.make-and-counter + (loop with name = "sem.make-and-counter" + for count from 0 to 10 + for sem = (mp:make-semaphore :name name :count count) + do (is (eq (mp:semaphore-name sem) name)) + (is (= (mp:semaphore-count sem) count)) + (is (zerop (mp:semaphore-wait-count sem))))) + +;;; Date: 14/04/2012 +;;; Ensure that signal changes the counter by the specified amount +(test-with-timeout mp.sem.signal-semaphore-count + (loop with name = "sem.signal-semaphore-count" + for count from 0 to 10 + do (loop for delta from 0 to 10 + for sem = (mp:make-semaphore :name name :count count) + do (is (= (mp:semaphore-count sem) count)) + (is (null (mp:signal-semaphore sem delta))) + (is (= (mp:semaphore-count sem ) (+ count delta)))))) + +;;; Date: 14/04/2012 +;;; A semaphore with a count of zero blocks a process +(test-with-timeout mp.sem.signal-one-process + (let* ((flag nil) + (sem (mp:make-semaphore :name "sem.signal-one")) + (a-process (mp:process-run-function + "sem.signal-one-process" + #'(lambda () + (mp:wait-on-semaphore sem) + (setf flag t))))) + (is (null flag)) + (is (mp:process-active-p a-process)) + (mp:signal-semaphore sem) + (mp:process-join a-process) + (is flag) + (is (= (mp:semaphore-count sem) 0)))) + +;;; Date: 14/04/2012 +;;; We can signal multiple processes +(test-with-timeout mp.sem.signal-n-processes + (loop for count from 1 upto 10 always + (let* ((counter 0) + (lock (mp:make-lock :name "sem.signal-n-processes")) + (sem (mp:make-semaphore :name "sem.signal-n-processs")) + (all-processes + (loop for i from 1 upto count + collect (mp:process-run-function + "sem.signal-n-processes" + #'(lambda () + (mp:wait-on-semaphore sem) + (mp:with-lock (lock) (incf counter))))))) + (loop until (= (mp:semaphore-wait-count sem) count)) + (is (zerop counter)) + (is (every #'mp:process-active-p all-processes)) + (mp:signal-semaphore sem count) + (mapc #'mp:process-join all-processes) + (is (= counter count) + "Counter should be ~s (but is ~s)." count counter) + (is (= (mp:semaphore-count sem) 0))))) + +;;; Date: 14/04/2012 +;;; When we signal N processes and N+M are waiting, only N awake +(test-with-timeout mp.sem.signal-only-n-processes + (loop for m from 1 upto 3 do + (loop for n from 1 upto 4 do + (let* ((counter 0) + (lock (mp:make-lock :name "sem.signal-n-processes")) + (sem (mp:make-semaphore :name "sem.signal-n-processs")) + (all-processes + (loop for i from 1 upto (+ n m) + collect (mp:process-run-function + "sem.signal-n-processes" + #'(lambda () + (mp:wait-on-semaphore sem) + (mp:with-lock (lock) (incf counter))))))) + (loop until (= (mp:semaphore-wait-count sem) (+ m n))) + (is (zerop counter)) + (is (every #'mp:process-active-p all-processes)) + (mp:signal-semaphore sem n) + (loop until (= (count-if #'mp:process-active-p all-processes) m)) + (is (= counter n)) + (is (= (mp:semaphore-wait-count sem) m) + "Number of threads waiting on semaphore should be ~s (but is ~s)." + m (mp:semaphore-wait-count sem)) + (mp:signal-semaphore sem m) + (mapc #'mp:process-join all-processes) + (is (= counter (+ n m)) + "Counter should be ~s (but is ~s)." + (+ n m) counter))))) + +;;; Date: 14/04/2012 +;;; It is possible to kill processes waiting for a semaphore. +;;; +(test-with-timeout mp.sem.interruptible + (loop with sem = (mp:make-semaphore :name "sem.interruptible") + with flag = nil + for count from 1 to 10 + for all-processes = (loop for i from 1 upto count + collect (mp:process-run-function + "sem.interruptible" + #'(lambda () + (mp:wait-on-semaphore sem) + (setf flag t)))) + do (loop until (= (mp:semaphore-wait-count sem) count)) + (is (null flag)) + (is (every #'mp:process-active-p all-processes)) + (mapc #'mp:process-kill all-processes) + (mapc #'mp:process-join all-processes) + (is (null flag)) + ;; Usually, the wait count should be zero at this point. We may + ;; get higher values since the interrupt doesn't lock the mutex + ;; associated to the semaphore and thus multiple threads may write + ;; the wait count at the same time. However, interrupts are provided + ;; only for debugging purposes, for which this behaviour is acceptable. + (is (<= (mp:semaphore-wait-count sem) count)))) + +;;; Date: 14/04/2012 +;;; When we kill a process, it is removed from the wait queue. +;;; +(test-with-timeout mp.sem.interrupt-updates-queue + (let* ((sem (mp:make-semaphore :name "sem.interrupt-updates-queue")) + (process (mp:process-run-function + "sem.interrupt-updates-queue" + #'(lambda () (mp:wait-on-semaphore sem))))) + (loop until (= (mp:semaphore-wait-count sem) 1)) + (is (mp:process-active-p process)) + (mp:process-kill process) + (mp:process-join process) + ;; In contrast to the previous test, if we interrupt only a single thread + ;; the wait count must be correct, since only a single thread is writing. + (is (zerop (mp:semaphore-wait-count sem))))) + +;;; Date: 14/04/2012 +;;; When we kill a process, it signals another one. This is tricky, +;;; because we need the awake signal to arrive _after_ the process is +;;; killed, but the process must still be in the queue for the semaphore +;;; to awake it. +;;; +(test-with-timeout mp.sem.interrupted-resignals + (let* ((sem (mp:make-semaphore :name "sem.interrupted-resignals")) + (flag1 nil) + (flag2 nil) + (process1 (mp:process-run-function + "sem.interrupted-resignals-1" + #'(lambda () + (unwind-protect + (mp:wait-on-semaphore sem) + (loop repeat (* 60 100) do (sleep 1/100)) + (setf flag1 t))))) + (process2 (mp:process-run-function + "sem.interrupted-resignals-2" + #'(lambda () + (mp:wait-on-semaphore sem) + (setf flag2 t))))) + (loop until (= (mp:semaphore-wait-count sem) 2)) + (is (mp:process-active-p process1)) + (is (mp:process-active-p process2)) + ;; We kill the process but ensure it is still running + (mp:process-kill process1) + (is (mp:process-active-p process1)) + (is (null flag1)) + ;; Wait until the process is no longer waiting for the semaphore + (loop until (= (mp:semaphore-wait-count sem) 1)) + ;; ... then awake it and the other process should start working + (mp:signal-semaphore sem) + (mp:process-join process2) + (is (zerop (mp:semaphore-wait-count sem))) + (is flag2) + ;; Finally we kill the first process (which will by this time be + ;; stuck in the unwind-protect call) again. + (mp:process-kill process1) + (mp:process-join process1) + (is (null flag1)))) + +;;; Date: 14/04/2012 +;;; 1 producer and N consumers, non-blocking, because the initial count +;;; is larger than the consumed data. +(test-with-timeout mp.sem.1-to-n-non-blocking + (loop with counter = 0 + with lock = (mp:make-lock :name "sem.1-to-n-communication") + for n from 1 to 10 + for m = (round 128 n) + for length = (* n m) + for sem = (mp:make-semaphore :name "sem.1-to-n-communication" :count length) + for producers = (progn + (setf counter 0) + (loop for i from 0 below n + collect (mp:process-run-function + "sem.1-to-n-consumer" + #'(lambda () + (loop for i from 0 below m + do (mp:wait-on-semaphore sem) + do (mp:with-lock (lock) (incf counter))))))) + do (mapc #'mp:process-join producers) + (is (= counter length)) + (is (zerop (mp:semaphore-count sem))) + (is (zerop (mp:semaphore-wait-count sem))))) + +;;; Date: 14/04/2012 +;;; 1 producer and N consumers, blocking due to a slow producer. +(test-with-timeout mp.sem.1-to-n-blocking + (loop with lock = (mp:make-lock :name "sem.1-to-n-communication") + for n from 1 to 10 + for m = (round 10000 n) + for length = (* n m) + for sem = (mp:make-semaphore :name "sem.1-to-n-communication" :count 0) + for counter = 0 + for producers = (loop for i from 0 below n + collect (mp:process-run-function + "sem.1-to-n-consumer" + #'(lambda () + (loop for i from 0 below m + do (mp:wait-on-semaphore sem)) + (mp:with-lock (lock) (incf counter))))) + do (loop for i from 0 below length + do (mp:signal-semaphore sem)) + do (mapc #'mp:process-join producers) + (is (= counter n)) + (is (zerop (mp:semaphore-count sem))) + (is (zerop (mp:semaphore-wait-count sem))))) ;; Mailbox ;;; Date: 14/04/2012 ;;; Ensure that at creation name and counter are set, and mailbox is empty. -(test mailbox-make-and-counter - (is - (loop with name = "mbox-make-and-counter" - for count from 4 to 63 - for mbox = (mp:make-mailbox :name name :count count) - always (and (eq (mp:mailbox-name mbox) name) - (>= (mp:mailbox-count mbox) count) - (mp:mailbox-empty-p mbox))))) +(test mp.mbox.make-and-counter + (loop with name = "mbox.make-and-counter" + for count from 4 to 63 + for mbox = (mp:make-mailbox :name name :count count) + do (is (eq (mp:mailbox-name mbox) name)) + (is (>= (mp:mailbox-count mbox) count)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; Ensure that the mailbox works in a nonblocking fashion (when the ;;; number of messages < mailbox size in a single producer and single ;;; consumer setting. We do not need to create new threads for this. -(test mbox-mailbox-nonblocking-io-1-to-1 - (is - (loop with count = 30 - with name = "mbox-mailbox-nonblocking-io-1-to-1" - with mbox = (mp:make-mailbox :name name :count count) - for l from 1 to 10 - for messages = (loop for i from 1 to l - do (mp:mailbox-send mbox i) - collect i) - always - (and (not (mp:mailbox-empty-p mbox)) - (equalp (loop for i from 1 to l - collect (mp:mailbox-read mbox)) - messages) - (mp:mailbox-empty-p mbox))))) +(test-with-timeout mp.mbox.nonblocking-io-1-to-1 + (loop with count = 30 + with name = "mbox.nonblocking-io-1-to-1" + with mbox = (mp:make-mailbox :name name :count count) + for l from 1 to 10 + for messages = (loop for i from 1 to l + do (mp:mailbox-send mbox i) + collect i) + do + (is (not (mp:mailbox-empty-p mbox))) + (is (equalp (loop for i from 1 to l + collect (mp:mailbox-read mbox)) + messages)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; The mailbox blocks a process when it saturates the write queue. -(def-mp-test mbox-blocks-1-to-1 +(test-with-timeout mp.mbox.blocks-1-to-1 (let* ((flag nil) - (mbox (mp:make-mailbox :name "mbox-signal-one" :count 32)) + (mbox (mp:make-mailbox :name "mbox.blocks-1-to-1" :count 32)) (size (mp:mailbox-count mbox)) (a-process (mp:process-run-function - "mbox-signal-one-process" + "mbox.blocks-1-to-1" #'(lambda () ;; This does not block (loop for i from 1 to size @@ -467,85 +421,83 @@ creating stray processes." (mp:mailbox-send mbox (1+ size)) ;; Now we unblock (setf flag nil))))) - (sleep 0.2) ; give time for all messages to arrive - (and (not (mp:mailbox-empty-p mbox)) ; the queue has messages - (mp:process-active-p a-process) ; the process is active - flag ; and it is blocked - (loop for i from 1 to (1+ size) ; messages arrive in order - always (= i (mp:mailbox-read mbox))) - (null flag) ; and process unblocked - (mp:mailbox-empty-p mbox) - t)) - t) + (sleep 0.2) + (is (not (mp:mailbox-empty-p mbox))) ; the queue has messages + (is (mp:process-active-p a-process)) ; the process is active + (is flag) ; and it is blocked + (is (loop for i from 1 to (1+ size) ; messages arrive in order + always (= i (mp:mailbox-read mbox)))) + (mp:process-join a-process) + (is (null flag)) ; and process unblocked + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; N producers and 1 consumer -(def-mp-test mbox-n-to-1-communication +(test-with-timeout mp.mbox.n-to-1-communication (loop with length = 10000 - with mbox = (mp:make-mailbox :name "mbox-n-to-1-communication" :count 128) - for n from 1 to 10 - for m = (round length n) - for messages = (loop for i from 0 below (* n m) collect i) - for producers = (loop for i from 0 below n - do (mp:process-run-function - "mbox-n-to-1-producer" - (let ((proc-no i)) - #'(lambda () - (loop for i from 0 below m - for msg = (+ i (* proc-no m)) - do (mp:mailbox-send mbox msg)))))) - always (and (equalp - (sort (loop for i from 1 to (* n m) - collect (mp:mailbox-read mbox)) - #'<) - messages) - (mp:mailbox-empty-p mbox))) - t) + with mbox = (mp:make-mailbox :name "mbox.n-to-1-communication" :count 128) + for n from 1 to 10 + for m = (round length n) + for messages = (loop for i from 0 below (* n m) collect i) + for producers = (loop for i from 0 below n + do (mp:process-run-function + "mbox.n-to-1-producer" + (let ((proc-no i)) + #'(lambda () + (loop for i from 0 below m + for msg = (+ i (* proc-no m)) + do (mp:mailbox-send mbox msg)))))) + do (is (equalp + (sort (loop for i from 1 to (* n m) + collect (mp:mailbox-read mbox)) + #'<) + messages)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; 1 producer and N consumer, but they do not block, because the ;;; queue is large enough and pre-filled with messages -(test mbox-1-to-n-non-blocking +(test-with-timeout mp.mbox.1-to-n-non-blocking (loop - for n from 1 to 10 - for m = (round 128 n) - for length = (* n m) - for mbox = (mp:make-mailbox :name "mbox-1-to-n-communication" :count length) - for flags = (make-array length :initial-element nil) - for aux = (loop for i from 0 below length - do (mp:mailbox-send mbox i)) - for producers = (loop for i from 0 below n - do (mp:process-run-function - "mbox-1-to-n-consumer" - #'(lambda () - (loop for i from 0 below m - for msg = (mp:mailbox-read mbox) - do (setf (aref flags msg) t))))) - do (sleep 0.1) - always (and (is (every #'identity flags)) - (is (mp:mailbox-empty-p mbox))))) + for n from 1 to 10 + for m = (round 128 n) + for length = (* n m) + for mbox = (mp:make-mailbox :name "mbox.1-to-n-non-blocking" :count length) + for flags = (make-array length :initial-element nil) + for aux = (loop for i from 0 below length + do (mp:mailbox-send mbox i)) + for consumers = (loop for i from 0 below n + collect (mp:process-run-function + "mbox.1-to-n-consumer" + #'(lambda () + (loop for i from 0 below m + for msg = (mp:mailbox-read mbox) + do (setf (aref flags msg) t))))) + do (mapc #'mp:process-join consumers) + (is (every #'identity flags)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 14/04/2012 ;;; 1 producer and N consumers, which block, because the producer ;;; is started _after_ them and is slower. -(test mbox-1-to-n-blocking +(test-with-timeout mp.mbox.1-to-n-blocking (loop for n from 1 to 10 - for m = (round 10000 n) - for length = (* n m) - for mbox = (mp:make-mailbox :name "mbox-1-to-n-communication" :count length) - for flags = (make-array length :initial-element nil) - for producers = (loop for i from 0 below n - do (mp:process-run-function - "mbox-1-to-n-consumer" - #'(lambda () - (loop for i from 0 below m - for msg = (mp:mailbox-read mbox) - do (setf (aref flags msg) t))))) - do (loop for i from 0 below length - do (mp:mailbox-send mbox i)) - do (sleep 0.1) - always (and (is (every #'identity flags)) - (is (mp:mailbox-empty-p mbox))))) + for m = (round 10000 n) + for length = (* n m) + for mbox = (mp:make-mailbox :name "mp.mbox.1-to-n-blocking" :count length) + for flags = (make-array length :initial-element nil) + for consumers = (loop for i from 0 below n + collect (mp:process-run-function + "mbox.1-to-n-consumer" + #'(lambda () + (loop for i from 0 below m + for msg = (mp:mailbox-read mbox) + do (setf (aref flags msg) t))))) + do (loop for i from 0 below length + do (mp:mailbox-send mbox i)) + do (mapc #'mp:process-join consumers) + (is (every #'identity flags)) + (is (mp:mailbox-empty-p mbox)))) ;;; Date: 2016-11-10 @@ -561,19 +513,22 @@ creating stray processes." ;;; HOLDING-LOCK-P verifies, if the current process holds the ;;; lock. ;;; -(test mp-holding-lock-p - (let ((lock (mp:make-lock :name "mp-holding-lock-p" :recursive nil))) +(test-with-timeout mp.mutex.holding-lock-p + (let ((lock (mp:make-lock :name "mutex.holding-lock-p" :recursive nil))) (is-false (mp:holding-lock-p lock)) (mp:with-lock (lock) (is-true (mp:holding-lock-p lock)) (mp:process-run-function - "mp-holding-lock-p" + "mutex.holding-lock-p" #'(lambda () (is-false (mp:holding-lock-p lock))))) (is-false (mp:holding-lock-p lock)) (mp:process-run-function - "mp-holding-lock-p" + "mutex.holding-lock-p" #'(lambda () (is-false (mp:holding-lock-p lock)))))) + +;; Atomics + (ext:with-clean-symbols (test-struct test-class *x*) (defstruct (test-struct :atomic-accessors) (slot1 0) @@ -582,14 +537,13 @@ creating stray processes." ((slot1 :initform 0) (slot2))) (defvar *x*) - ;;; Date: 2018-09-21 ;;; From: Marius Gerbershagen ;;; Description: ;;; ;;; Verifies that atomic-update works correctly. ;;; - (test atomic-update + (test-with-timeout mp.atomics.atomic-update (let* ((n-threads 100) (n-updates 1000) (n-total (* n-threads n-updates))) @@ -614,8 +568,7 @@ creating stray processes." (list (first plist) (1+ (second plist))))) (mp:atomic-update (slot-value object 'slot1) #'1+) - (mp:atomic-update (test-struct-slot1 struct) #'1+) - (sleep 0.00001)))))) + (mp:atomic-update (test-struct-slot1 struct) #'1+)))))) (is (car cons) n-total) (is (cdr cons) n-total) (is (svref vector 1) n-total) @@ -627,14 +580,14 @@ creating stray processes." (eq (slot-value object 'slot2) 1))) (signals error (eval '(mp:compare-and-swap (test-struct-slot2 struct) 1 2)))))) - + ;;; Date: 2018-09-22 ;;; From: Marius Gerbershagen ;;; Description: ;;; ;;; Verifies that atomic-push and atomic-pop work correctly. ;;; - (test atomic-push/pop + (test-with-timeout mp.atomics.atomic-push/pop (let* ((n-threads 100) (n-updates 1000)) (setf *x* nil) @@ -644,21 +597,18 @@ creating stray processes." "" (lambda () (loop for i below n-updates do - (mp:atomic-push i (symbol-value '*x*)) - (sleep 0.00001)) + (mp:atomic-push i (symbol-value '*x*))) (loop repeat (1- n-updates) do - (mp:atomic-pop (symbol-value '*x*)) - (sleep 0.00001)))))) + (mp:atomic-pop (symbol-value '*x*))))))) (is (length *x*) n-threads))) - ;;; Date: 2018-09-29 ;;; From: Marius Gerbershagen ;;; Description: ;;; ;;; Verifies that atomic-incf and atomic-decf work correctly. ;;; - (test atomic-incf/decf + (test-with-timeout mp.atomics.atomic-incf/decf (let* ((n-threads 100) (n-updates 1000) (increment (1+ (random most-positive-fixnum))) @@ -680,8 +630,7 @@ creating stray processes." (mp:atomic-incf (car cons) increment) (mp:atomic-incf (svref vector 1) increment) (mp:atomic-incf (symbol-value '*x*) increment) - (mp:atomic-incf (slot-value object 'slot1) increment) - (sleep 0.00001)))))) + (mp:atomic-incf (slot-value object 'slot1) increment)))))) (is (car cons) final-value) (is (cdr cons) final-value) (is (svref vector 1) final-value) @@ -697,8 +646,7 @@ creating stray processes." (mp:atomic-decf (car cons) increment) (mp:atomic-decf (svref vector 1) increment) (mp:atomic-decf (symbol-value '*x*) increment) - (mp:atomic-decf (slot-value object 'slot1) increment) - (sleep 0.00001)))))) + (mp:atomic-decf (slot-value object 'slot1) increment)))))) (is (car cons) 0) (is (cdr cons) 0) (is (svref vector 1) 0) @@ -712,7 +660,7 @@ creating stray processes." ;;; Verifies that CAS expansion may be removed. ;;; (ext:with-clean-symbols (*obj* foo) - (test defcas/remcas + (test mp.atomics.defcas/remcas (mp:defcas foo (lambda (object old new) (assert (consp object)) (setf (car object) old @@ -730,13 +678,15 @@ creating stray processes." ;;; ;;; Verifies that CAS modifications honor the package locks. ;;; -(test cas-locked-package +(test mp.atomics.cas-locked-package (signals package-error (mp:defcas cl:car (lambda (obj old new) nil))) (signals package-error (mp:remcas 'cl:car)) (finishes (mp:defcas cor (lambda (obj old new) nil))) (finishes (mp:remcas 'cor))) +;; Barriers + ;;; Date: 2020-08-14 ;;; From: Daniel KochmaƄski ;;; Description: @@ -750,105 +700,91 @@ creating stray processes." (is (= 3 (mp:barrier-count barrier))) (is (= 0 (mp:barrier-arrivers-count barrier))))) -(test mp.barrier.blocking - (let ((barrier (mp:make-barrier 3)) - (before-barrier 0) - (after-barrier 0)) - (labels ((try-barrier () - (mp:process-run-function +(let (barrier before-barrier after-barrier all-processes mutex) + (labels ((try-barrier () + (push (mp:process-run-function "try-barrier" (lambda () - (incf before-barrier) + (mp:with-lock (mutex) + (incf before-barrier)) (mp:barrier-wait barrier) - (incf after-barrier)))) - (check-barrier (before after arrivers) - (try-barrier) - (sleep 0.01) - (is (= before before-barrier)) - (is (= after after-barrier)) - (is (= arrivers (mp:barrier-arrivers-count barrier))))) - (check-barrier 1 0 1) - (check-barrier 2 0 2) - (check-barrier 3 3 0) - (check-barrier 4 3 1) - (check-barrier 5 3 2) - (check-barrier 6 6 0)))) + (mp:with-lock (mutex) + (incf after-barrier)))) + all-processes)) + (check-barrier (before after arrivers) + (try-barrier) + (loop until (= before before-barrier)) + (loop until (= after after-barrier)) + (loop until (= arrivers (mp:barrier-arrivers-count barrier)))) + (wake-barrier () + (mp:barrier-unblock barrier :kill-waiting nil)) + (kill-barrier () + (mp:barrier-unblock barrier :kill-waiting t))) -(test mp.barrier.unblock-1 - (let ((barrier (mp:make-barrier 3)) - (before-barrier 0) - (after-barrier 0)) - (labels ((try-barrier () - (mp:process-run-function - "try-barrier" - (lambda () - (incf before-barrier) - (mp:barrier-wait barrier) - (incf after-barrier)))) - (check-barrier (before after arrivers) - (try-barrier) - (sleep 0.01) - (is (= before before-barrier)) - (is (= after after-barrier)) - (is (= arrivers (mp:barrier-arrivers-count barrier)))) - (wake-barrier () - (mp:barrier-unblock barrier :kill-waiting nil)) - (kill-barrier () - (mp:barrier-unblock barrier :kill-waiting t))) - (check-barrier 1 0 1) - (check-barrier 2 0 2) - (wake-barrier) - (sleep 0.01) - (check-barrier 3 2 1) - (check-barrier 4 2 2) - (kill-barrier) - (sleep 0.01) - (check-barrier 5 2 1)))) + (test-with-timeout mp.barrier.blocking + (setf barrier (mp:make-barrier 3) + before-barrier 0 + after-barrier 0 + all-processes nil + mutex (mp:make-lock)) + (check-barrier 1 0 1) + (check-barrier 2 0 2) + (check-barrier 3 3 0) + (check-barrier 4 3 1) + (check-barrier 5 3 2) + (check-barrier 6 6 0) + ;; clean up + (mapc #'mp:process-join all-processes)) -(test mp.barrier.unblock-2 - (let ((barrier (mp:make-barrier 3)) - (before-barrier 0) - (after-barrier 0)) - (labels ((try-barrier () - (mp:process-run-function - "try-barrier" - (lambda () - (incf before-barrier) - (mp:barrier-wait barrier) - (incf after-barrier)))) - (check-barrier (before after arrivers) - (try-barrier) - (sleep 0.01) - (is (= before before-barrier)) - (is (= after after-barrier)) - (is (= arrivers (mp:barrier-arrivers-count barrier))))) - (mp:barrier-unblock barrier :disable t) - (check-barrier 1 1 0) - (check-barrier 2 2 0) - (check-barrier 3 3 0) - (check-barrier 4 4 0)))) + (test-with-timeout mp.barrier.unblock-1 + (setf barrier (mp:make-barrier 3) + before-barrier 0 + after-barrier 0 + all-processes nil + mutex (mp:make-lock)) + (check-barrier 1 0 1) + (check-barrier 2 0 2) + (wake-barrier) + (mapc #'mp:process-join all-processes) + (check-barrier 3 2 1) + (check-barrier 4 2 2) + (kill-barrier) + (mapc #'mp:process-join all-processes) + (setf all-processes nil) + (check-barrier 5 2 1) + ;; clean up + (mapc #'mp:process-kill all-processes) + (mapc #'mp:process-join all-processes)) -(test mp.barrier.unblock-3 - (let ((barrier (mp:make-barrier 3)) - (before-barrier 0) - (after-barrier 0)) - (labels ((try-barrier () - (mp:process-run-function - "try-barrier" - (lambda () - (incf before-barrier) - (mp:barrier-wait barrier) - (incf after-barrier)))) - (check-barrier (before after arrivers) - (try-barrier) - (sleep 0.01) - (is (= before before-barrier)) - (is (= after after-barrier)) - (is (= arrivers (mp:barrier-arrivers-count barrier))))) - (mp:barrier-unblock barrier :reset-count 4) - (check-barrier 1 0 1) - (check-barrier 2 0 2) - (check-barrier 3 0 3) - (check-barrier 4 4 0) - (check-barrier 5 4 1) - (check-barrier 6 4 2)))) + (test-with-timeout mp.barrier.unblock-2 + (setf barrier (mp:make-barrier 3) + before-barrier 0 + after-barrier 0 + all-processes nil + mutex (mp:make-lock)) + (mp:barrier-unblock barrier :disable t) + (check-barrier 1 1 0) + (check-barrier 2 2 0) + (check-barrier 3 3 0) + (check-barrier 4 4 0) + ;; clean up + (mapc #'mp:process-join all-processes)) + + (test-with-timeout mp.barrier.unblock-3 + (setf barrier (mp:make-barrier 3) + before-barrier 0 + after-barrier 0 + all-processes nil + mutex (mp:make-lock)) + (mp:barrier-unblock barrier :reset-count 4) + (check-barrier 1 0 1) + (check-barrier 2 0 2) + (check-barrier 3 0 3) + (check-barrier 4 4 0) + (mapc #'mp:process-join all-processes) + (setf all-processes nil) + (check-barrier 5 4 1) + (check-barrier 6 4 2) + ;; clean up + (mapc #'mp:process-kill all-processes) + (mapc #'mp:process-join all-processes))))