The Kitchen Sink and Other Oddities

Atabey Kaygun

Parallel map-reduce in Common Lisp

Description of the problem

Assume we have a large collection of data points. We will map over the data points using a function and then use an associative and commutative reducer function. The associativity and commutativity of the reducer function allows us to parallelize this operation.

Implementation in Lisp

First, we need to import the necessary libraries:

(defpackage parallel-map-reduce
  (:use cl)
  (:import-from bordeaux-threads make-thread join-thread)
  (:import-from ppcre split)
  (:import-from utils ->> merge-with))
#<PACKAGE "PARALLEL-MAP-REDUCE">

(in-package parallel-map-reduce)
#<PACKAGE "PARALLEL-MAP-REDUCE">

Now, the implementation:

(defun map-reduce (fn re xs num)
  (let* ((n (length xs))
         (ys (make-array n :initial-contents xs))
         (size (floor n num)))
    (->> (loop for i from 0 below n by size collect
              (make-array (min size (- n i))
                          :displaced-to ys
                          :displaced-index-offset i))
         (mapcar (lambda (xs)
                   (bordeaux-threads:make-thread
                    (lambda () (reduce re (map 'list fn xs))))))
         (mapcar #'bordeaux-threads:join-thread)
         (reduce re))))
MAP-REDUCE

I am going to test this function to calculate the frequencies of words in a text file. First, I am going to need a function that removes all non-alpha-numeric characters:

(let ((junk (coerce "\@#$%^&*[]_+-=(){}\'\:\",/<>“”‘’–—" ’list)))
 (defun clean (word)
    (remove-if (lambda (x) (member x junk)) (string-downcase word))))
 CLEAN

Now, a function to load the lines:

(defun read-lines (input)
   (let (res)
      (handler-case 
          (do () (nil) (push (clean (read-line input)) res))
        (end-of-file () (reverse res)))))
READ-LINES

and finally the test code:

(let* ((file "test.txt")
       (num 20)
       (fn (lambda (x) (mapcar (lambda (i) (cons i 1)) (split "\s+" x))))
       (re (lambda (x y) (merge-with #'+ (append x y))))
       (xs (with-open-file (in file :direction :input)
             (read-lines in))))
   (time (sort (map-reduce fn re xs num) #'> :key #'cdr))
   (time (sort (reduce re (mapcar fn xs)) #'> :key #'cdr))
   nil)
NIL



Evaluation took:
  1.051 seconds of real time
  1.223333 seconds of total run time (1.163333 user, 0.060000 system)
  116.37% CPU
  2,412,238,511 processor cycles
  3,081,296 bytes consed

Evaluation took:
  9.274 seconds of real time
  9.273333 seconds of total run time (9.256666 user, 0.016667 system)
  99.99% CPU
  21,280,707,454 processor cycles
  12,615,600 bytes consed