Python multiprocessing: Pass the shared resource along to children
Update 11 Aug '20: Turned out I misunderstood how multiprocessing
module works! A Process
can inherit resources created in the parent process scope but that does not include globals; ie if used_ids
and used_ids_lock
were defined in the block marked by if __name__ == '__main__'
it would work (though not recommended).
Python's documentation briefly touches upon this subject (see "Explicitly pass resources to child processes" section").
So, one correct way of structuring the code is:
#!/usr/bin/env python
################################################################################
# simple tool to generate n random numbers.
#
# usage:
# main.py N_RECORDS CONCURRENCY_STRATEGY:"t"|"p" CONCURRENCY_FACTOR:int
#
# N_RECORDS is an integer
# CONCURRENCY_STRATEGY can only be either 't' or 'p'
# CONCURRENCY_FACTOR is an integer (preferrably less than or equal to the no of
# cores on the machine.)
#
# Example:
# main.py 30000 p 5
# generates 30000 random numbers using 5 multiprocessing.Process'es
#
from __future__ import print_function
import os, sys, multiprocessing, itertools, time, threading
def get_random_id(used_ids, used_ids_lock):
"""generates a single unique random id.
each id is ensured to be uqnie by checking it against `used_ids`, set of used ids.
the access to `used_ids` is supposedly guaranteed to be synchronised.
once a unique id is generated, it is written to `used_ids` and returned.
"""
result = None
with used_ids_lock:
while result is None:
_id = int(time.time() * (10 ** 6))
if _id not in used_ids:
used_ids.add(_id)
result = _id
return result
def generate_random_ids(n_ids, batch_no, used_ids, used_ids_lock):
"""generates `n_ids` number of unique ids and writes to a file whose name is
produced using `batch_no`.
"""
with open('data-batch-%s.txt' % batch_no, 'w') as out:
for i in range(0, n_ids):
_id = get_random_id(used_ids, used_ids_lock)
print('%s' % (_id), file=out)
def via_processes(nrecords, nprocs):
"""generates `nrecords` number of records and write them to file using
`nprocs` number of `multiprocessing.Process`es."""
used_ids_lock = multiprocessing.Lock()
used_ids = set()
procs = [multiprocessing.Process(target=generate_random_ids,
args=(int(nrecords/nprocs),
i,
used_ids,
used_ids_lock))
for i in range(0, nprocs)]
for p in procs:
p.start()
for p in procs:
p.join()
def via_threads(nrecords, nthreads):
"""generates `nrecords` number of records and write them to file using
`nthreads` number of `threading.Thread`s."""
used_ids_lock = threading.Lock()
used_ids = set()
threads = [threading.Thread(target=generate_random_ids,
args=(int(nrecords/nthreads),
i,
used_ids,
used_ids_lock))
for i in range(0, nthreads)]
for t in threads:
t.start()
for t in threads:
t.join()
################################################################################
if __name__ == '__main__':
if len(sys.argv) != 4:
print('main.py N_RECORDS:int CONCURRENCY_STRATEGY:"t"|"p" CONCURRENCY_FACTOR:int')
sys.exit(0)
nrecords = int(sys.argv[1])
concurrency_strategy = sys.argv[2]
concurrency_factor = int(sys.argv[3])
if concurrency_strategy == 't':
via_threads(nrecords, concurrency_factor)
elif concurrency_strategy == 'p':
via_processes(nrecords, concurrency_factor)
else:
print('main.py N_RECORDS:int CONCURRENCY_STRATEGY:"t"|"p" CONCURRENCY_FACTOR:int')
sys.exit(0)
Recently, I needed to generate a large number of random database records, which led me to stumble upon a curious (and buggy) behaviour in Python multiprocessing
module, where lock objects do not behave as expected.
I wrote a sample piece of code to demonstrate the behaviour. The code generates unique integers using a shared set
with synchronised access using Lock
s.
When it is run using threading
module, ie Lock
and Thread
, it generates no duplicate numbers.
For example:
$ rm data*; ./main.py 30000 t 5; cat data* | sort | uniq -cd | wc -l 0
However, if the program is run using multiprocessing
module, ie Lock
and Process
, there are quite a few duplicates in the output.
For example:
$ rm data*; ./main.py 10000 p 5 ; cat data* | sort | uniq -cd | wc -l 448
A very peculiar observation is that using multiprocessing
to generate relatively small batches of unique numbers works just as expected, for example on my machine I wasn't able to capture any any duplicates when generating less than 5-6k numbers - above which the results invariably contained duplicates. While threading
seems to be always reliable (though slower) and produce the expected output.
Here's the simple script I mentioned earlier:
#!/usr/bin/env python
################################################################################
# simple tool to generate n random numbers.
#
# usage:
# main.py N_RECORDS CONCURRENCY_STRATEGY:"t"|"p" CONCURRENCY_FACTOR:int
#
# N_RECORDS is an integer
# CONCURRENCY_STRATEGY can only be either 't' or 'p'
# CONCURRENCY_FACTOR is an integer (preferrably less than or equal to the no of
# cores on the machine.)
#
# Example:
# main.py 30000 p 5
# generates 30000 random numbers using 5 multiprocessing.Process'es
#
from __future__ import print_function
import os, sys, multiprocessing, itertools, time, threading
used_ids = set()
used_ids_lock = None
def get_random_id():
"""generates a single unique random id.
each id is ensured to be uqnie by checking it against `used_ids`, set of used ids.
the access to `used_ids` is supposedly guaranteed to be synchronised.
once a unique id is generated, it is written to `used_ids` and returned.
"""
result = None
with used_ids_lock:
while result is None:
_id = int(time.time() * (10 ** 6))
if _id not in used_ids:
used_ids.add(_id)
result = _id
return result
def generate_random_ids(n_ids, batch_no):
"""generates `n_ids` number of unique ids and writes to a file whose name is
produced using `batch_no`.
"""
with open('data-batch-%s.txt' % batch_no, 'w') as out:
for i in range(0, n_ids):
_id = get_random_id()
print('%s' % (_id), file=out)
def via_processes(nrecords, nprocs):
"""generates `nrecords` number of records and write them to file using
`nprocs` number of `multiprocessing.Process`es."""
global used_ids_lock
used_ids_lock = multiprocessing.Lock()
procs = [multiprocessing.Process(target=generate_random_ids,
args=(int(nrecords/nprocs), i,))
for i in range(0, nprocs)]
for p in procs:
p.start()
for p in procs:
p.join()
def via_threads(nrecords, nthreads):
"""generates `nrecords` number of records and write them to file using
`nthreads` number of `threading.Thread`s."""
global used_ids_lock
used_ids_lock = threading.Lock()
threads = [threading.Thread(target=generate_random_ids,
args=(int(nrecords/nthreads), i,))
for i in range(0, nthreads)]
for t in threads:
t.start()
for t in threads:
t.join()
################################################################################
if __name__ == '__main__':
if len(sys.argv) != 4:
print('main.py N_RECORDS:int CONCURRENCY_STRATEGY:"t"|"p" CONCURRENCY_FACTOR:int')
sys.exit(0)
nrecords = int(sys.argv[1])
concurrency_strategy = sys.argv[2]
concurrency_factor = int(sys.argv[3])
if concurrency_strategy == 't':
print('generating...')
via_threads(nrecords, concurrency_factor)
elif concurrency_strategy == 'p':
print('generating...')
via_processes(nrecords, concurrency_factor)
else:
print('main.py N_RECORDS:int CONCURRENCY_STRATEGY:"t"|"p" CONCURRENCY_FACTOR:int')
sys.exit(0)
print('done')
Comments
Post a Comment