Python multiprocessing: Pass the shared resource along to children

Update 11 Aug '20: Turned out I misunderstood how multiprocessing module works! A Process can inherit resources created in the parent process scope but that does not include globals; ie if used_ids and used_ids_lock were defined in the block marked by if __name__ == '__main__' it would work (though not recommended).

Python's documentation briefly touches upon this subject (see "Explicitly pass resources to child processes" section").

So, one correct way of structuring the code is:

#!/usr/bin/env python
################################################################################
# simple tool to generate n random numbers.
#
# usage:
#   main.py N_RECORDS CONCURRENCY_STRATEGY:"t"|"p" CONCURRENCY_FACTOR:int
#
#   N_RECORDS is an integer
#   CONCURRENCY_STRATEGY can only be either 't' or 'p'
#   CONCURRENCY_FACTOR is an integer (preferrably less than or equal to the no of
#    cores on the machine.)
#
# Example:
#  main.py 30000 p 5
#  generates 30000 random numbers using 5 multiprocessing.Process'es
#
from __future__ import print_function
import os, sys, multiprocessing, itertools, time, threading

def get_random_id(used_ids, used_ids_lock):
    """generates a single unique random id.

    each id is ensured to be uqnie by checking it against `used_ids`, set of used ids.
    the access to `used_ids` is supposedly guaranteed to be synchronised.
    once a unique id is generated, it is written to `used_ids` and returned.
    """
    result = None
    with used_ids_lock:
        while result is None:
            _id = int(time.time() * (10 ** 6))
            if _id not in used_ids:
                used_ids.add(_id)
                result = _id
    return result

def generate_random_ids(n_ids, batch_no, used_ids, used_ids_lock):
    """generates `n_ids` number of unique ids and writes to a file whose name is
    produced using `batch_no`.
    """
    with open('data-batch-%s.txt' % batch_no, 'w') as out:
        for i in range(0, n_ids):
            _id = get_random_id(used_ids, used_ids_lock)
            print('%s' % (_id), file=out)

def via_processes(nrecords, nprocs):
    """generates `nrecords` number of records and write them to file using
    `nprocs` number of `multiprocessing.Process`es."""
    used_ids_lock = multiprocessing.Lock()
    used_ids = set()
    procs = [multiprocessing.Process(target=generate_random_ids,
                                     args=(int(nrecords/nprocs),
                                           i,
                                           used_ids,
                                           used_ids_lock))
             for i in range(0, nprocs)]
    for p in procs:
        p.start()
    for p in procs:
        p.join()


def via_threads(nrecords, nthreads):
    """generates `nrecords` number of records and write them to file using
    `nthreads` number of `threading.Thread`s."""
    used_ids_lock = threading.Lock()
    used_ids = set()
    threads = [threading.Thread(target=generate_random_ids,
                                args=(int(nrecords/nthreads),
                                      i,
                                      used_ids,
                                      used_ids_lock))
               for i in range(0, nthreads)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

################################################################################

if __name__ == '__main__':
    if len(sys.argv) != 4:
        print('main.py N_RECORDS:int CONCURRENCY_STRATEGY:"t"|"p" CONCURRENCY_FACTOR:int')
        sys.exit(0)
    nrecords = int(sys.argv[1])
    concurrency_strategy = sys.argv[2]
    concurrency_factor = int(sys.argv[3])
    if concurrency_strategy == 't':
        via_threads(nrecords, concurrency_factor)
    elif concurrency_strategy == 'p':
        via_processes(nrecords, concurrency_factor)
    else:
        print('main.py N_RECORDS:int CONCURRENCY_STRATEGY:"t"|"p" CONCURRENCY_FACTOR:int')
        sys.exit(0)

Recently, I needed to generate a large number of random database records, which led me to stumble upon a curious (and buggy) behaviour in Python multiprocessing module, where lock objects do not behave as expected.

I wrote a sample piece of code to demonstrate the behaviour. The code generates unique integers using a shared set with synchronised access using Locks.

When it is run using threading module, ie Lock and Thread, it generates no duplicate numbers.
For example:

$ rm data*; ./main.py 30000 t 5; cat data* | sort | uniq -cd | wc -l
0

However, if the program is run using multiprocessing module, ie Lock and Process, there are quite a few duplicates in the output.
For example:

$ rm data*; ./main.py 10000 p 5 ; cat data* | sort | uniq -cd | wc -l
448

A very peculiar observation is that using multiprocessing to generate relatively small batches of unique numbers works just as expected, for example on my machine I wasn't able to capture any any duplicates when generating less than 5-6k numbers - above which the results invariably contained duplicates. While threading seems to be always reliable (though slower) and produce the expected output.

Here's the simple script I mentioned earlier:

#!/usr/bin/env python
################################################################################
# simple tool to generate n random numbers.
#
# usage:
#   main.py N_RECORDS CONCURRENCY_STRATEGY:"t"|"p" CONCURRENCY_FACTOR:int
#
#   N_RECORDS is an integer
#   CONCURRENCY_STRATEGY can only be either 't' or 'p'
#   CONCURRENCY_FACTOR is an integer (preferrably less than or equal to the no of
#    cores on the machine.)
#
# Example:
#  main.py 30000 p 5
#  generates 30000 random numbers using 5 multiprocessing.Process'es
#
from __future__ import print_function
import os, sys, multiprocessing, itertools, time, threading

used_ids = set()
used_ids_lock = None

def get_random_id():
    """generates a single unique random id.

    each id is ensured to be uqnie by checking it against `used_ids`, set of used ids.
    the access to `used_ids` is supposedly guaranteed to be synchronised.
    once a unique id is generated, it is written to `used_ids` and returned.
    """
    result = None
    with used_ids_lock:
        while result is None:
            _id = int(time.time() * (10 ** 6))
            if _id not in used_ids:
                used_ids.add(_id)
                result = _id
    return result

def generate_random_ids(n_ids, batch_no):
    """generates `n_ids` number of unique ids and writes to a file whose name is
    produced using `batch_no`.
    """
    with open('data-batch-%s.txt' % batch_no, 'w') as out:
        for i in range(0, n_ids):
            _id = get_random_id()
            print('%s' % (_id), file=out)

def via_processes(nrecords, nprocs):
    """generates `nrecords` number of records and write them to file using
    `nprocs` number of `multiprocessing.Process`es."""
    global used_ids_lock
    used_ids_lock = multiprocessing.Lock()
    procs = [multiprocessing.Process(target=generate_random_ids,
                                     args=(int(nrecords/nprocs), i,))
             for i in range(0, nprocs)]
    for p in procs:
        p.start()
    for p in procs:
        p.join()


def via_threads(nrecords, nthreads):
    """generates `nrecords` number of records and write them to file using
    `nthreads` number of `threading.Thread`s."""
    global used_ids_lock
    used_ids_lock = threading.Lock()
    threads = [threading.Thread(target=generate_random_ids,
                                args=(int(nrecords/nthreads), i,))
               for i in range(0, nthreads)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

################################################################################

if __name__ == '__main__':
    if len(sys.argv) != 4:
        print('main.py N_RECORDS:int CONCURRENCY_STRATEGY:"t"|"p" CONCURRENCY_FACTOR:int')
        sys.exit(0)
    nrecords = int(sys.argv[1])
    concurrency_strategy = sys.argv[2]
    concurrency_factor = int(sys.argv[3])
    if concurrency_strategy == 't':
        print('generating...')
        via_threads(nrecords, concurrency_factor)
    elif concurrency_strategy == 'p':
        print('generating...')
        via_processes(nrecords, concurrency_factor)
    else:
        print('main.py N_RECORDS:int CONCURRENCY_STRATEGY:"t"|"p" CONCURRENCY_FACTOR:int')
        sys.exit(0)
    print('done')

Comments

Popular posts from this blog

Variables in GNU Make: Simple and Recursive

Checkmate on Your Terms: A Personal Journey with Correspondence Chess

Firefox profiles: Quickly replicate your settings to any machine