Assume we have a large collection of data points. We will map over the data points using a function and then use an associative and commutative reducer function. The associativity and commutativity of the reducer function allows us to parallelize this operation.
First, we need to import the necessary libraries:
(defpackage parallel-map-reduce
(:use cl)
(:import-from bordeaux-threads make-thread join-thread)
(:import-from ppcre split)
(:import-from utils ->> merge-with))
#<PACKAGE "PARALLEL-MAP-REDUCE">
(in-package parallel-map-reduce)
#<PACKAGE "PARALLEL-MAP-REDUCE">
Now, the implementation:
(defun map-reduce (fn re xs num)
(let* ((n (length xs))
(ys (make-array n :initial-contents xs))
(size (floor n num)))
(->> (loop for i from 0 below n by size collect
(make-array (min size (- n i))
:displaced-to ys
:displaced-index-offset i))
(mapcar (lambda (xs)
(bordeaux-threads:make-thread
(lambda () (reduce re (map 'list fn xs))))))
(mapcar #'bordeaux-threads:join-thread)
(reduce re))))
MAP-REDUCE
I am going to test this function to calculate the frequencies of words in a text file. First, I am going to need a function that removes all non-alpha-numeric characters:
(let ((junk (coerce "\@#$%^&*[]_+-=(){}\'\:\",/<>“”‘’–—" ’list)))
(defun clean (word)
(remove-if (lambda (x) (member x junk)) (string-downcase word))))
CLEAN
Now, a function to load the lines:
(defun read-lines (input)
(let (res)
(handler-case
(do () (nil) (push (clean (read-line input)) res))
(end-of-file () (reverse res)))))
READ-LINES
and finally the test code:
(let* ((file "test.txt")
(num 20)
(fn (lambda (x) (mapcar (lambda (i) (cons i 1)) (split "\s+" x))))
(re (lambda (x y) (merge-with #'+ (append x y))))
(xs (with-open-file (in file :direction :input)
(read-lines in))))
(time (sort (map-reduce fn re xs num) #'> :key #'cdr))
(time (sort (reduce re (mapcar fn xs)) #'> :key #'cdr))
nil)
NIL
Evaluation took:
1.051 seconds of real time
1.223333 seconds of total run time (1.163333 user, 0.060000 system)
116.37% CPU
2,412,238,511 processor cycles
3,081,296 bytes consed
Evaluation took:
9.274 seconds of real time
9.273333 seconds of total run time (9.256666 user, 0.016667 system)
99.99% CPU
21,280,707,454 processor cycles
12,615,600 bytes consed