mirror of https://github.com/python/cpython.git
312 lines
6.8 KiB
Python
312 lines
6.8 KiB
Python
|
#
|
||
|
# A test of `multiprocessing.Pool` class
|
||
|
#
|
||
|
|
||
|
import multiprocessing
|
||
|
import time
|
||
|
import random
|
||
|
import sys
|
||
|
|
||
|
#
|
||
|
# Functions used by test code
|
||
|
#
|
||
|
|
||
|
def calculate(func, args):
|
||
|
result = func(*args)
|
||
|
return '%s says that %s%s = %s' % (
|
||
|
multiprocessing.current_process().get_name(),
|
||
|
func.__name__, args, result
|
||
|
)
|
||
|
|
||
|
def calculatestar(args):
|
||
|
return calculate(*args)
|
||
|
|
||
|
def mul(a, b):
|
||
|
time.sleep(0.5*random.random())
|
||
|
return a * b
|
||
|
|
||
|
def plus(a, b):
|
||
|
time.sleep(0.5*random.random())
|
||
|
return a + b
|
||
|
|
||
|
def f(x):
|
||
|
return 1.0 / (x-5.0)
|
||
|
|
||
|
def pow3(x):
|
||
|
return x**3
|
||
|
|
||
|
def noop(x):
|
||
|
pass
|
||
|
|
||
|
#
|
||
|
# Test code
|
||
|
#
|
||
|
|
||
|
def test():
|
||
|
print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
|
||
|
|
||
|
#
|
||
|
# Create pool
|
||
|
#
|
||
|
|
||
|
PROCESSES = 4
|
||
|
print 'Creating pool with %d processes\n' % PROCESSES
|
||
|
pool = multiprocessing.Pool(PROCESSES)
|
||
|
print 'pool = %s' % pool
|
||
|
print
|
||
|
|
||
|
#
|
||
|
# Tests
|
||
|
#
|
||
|
|
||
|
TASKS = [(mul, (i, 7)) for i in range(10)] + \
|
||
|
[(plus, (i, 8)) for i in range(10)]
|
||
|
|
||
|
results = [pool.apply_async(calculate, t) for t in TASKS]
|
||
|
imap_it = pool.imap(calculatestar, TASKS)
|
||
|
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
|
||
|
|
||
|
print 'Ordered results using pool.apply_async():'
|
||
|
for r in results:
|
||
|
print '\t', r.get()
|
||
|
print
|
||
|
|
||
|
print 'Ordered results using pool.imap():'
|
||
|
for x in imap_it:
|
||
|
print '\t', x
|
||
|
print
|
||
|
|
||
|
print 'Unordered results using pool.imap_unordered():'
|
||
|
for x in imap_unordered_it:
|
||
|
print '\t', x
|
||
|
print
|
||
|
|
||
|
print 'Ordered results using pool.map() --- will block till complete:'
|
||
|
for x in pool.map(calculatestar, TASKS):
|
||
|
print '\t', x
|
||
|
print
|
||
|
|
||
|
#
|
||
|
# Simple benchmarks
|
||
|
#
|
||
|
|
||
|
N = 100000
|
||
|
print 'def pow3(x): return x**3'
|
||
|
|
||
|
t = time.time()
|
||
|
A = map(pow3, xrange(N))
|
||
|
print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
|
||
|
(N, time.time() - t)
|
||
|
|
||
|
t = time.time()
|
||
|
B = pool.map(pow3, xrange(N))
|
||
|
print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
|
||
|
(N, time.time() - t)
|
||
|
|
||
|
t = time.time()
|
||
|
C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
|
||
|
print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
|
||
|
' seconds' % (N, N//8, time.time() - t)
|
||
|
|
||
|
assert A == B == C, (len(A), len(B), len(C))
|
||
|
print
|
||
|
|
||
|
L = [None] * 1000000
|
||
|
print 'def noop(x): pass'
|
||
|
print 'L = [None] * 1000000'
|
||
|
|
||
|
t = time.time()
|
||
|
A = map(noop, L)
|
||
|
print '\tmap(noop, L):\n\t\t%s seconds' % \
|
||
|
(time.time() - t)
|
||
|
|
||
|
t = time.time()
|
||
|
B = pool.map(noop, L)
|
||
|
print '\tpool.map(noop, L):\n\t\t%s seconds' % \
|
||
|
(time.time() - t)
|
||
|
|
||
|
t = time.time()
|
||
|
C = list(pool.imap(noop, L, chunksize=len(L)//8))
|
||
|
print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
|
||
|
(len(L)//8, time.time() - t)
|
||
|
|
||
|
assert A == B == C, (len(A), len(B), len(C))
|
||
|
print
|
||
|
|
||
|
del A, B, C, L
|
||
|
|
||
|
#
|
||
|
# Test error handling
|
||
|
#
|
||
|
|
||
|
print 'Testing error handling:'
|
||
|
|
||
|
try:
|
||
|
print pool.apply(f, (5,))
|
||
|
except ZeroDivisionError:
|
||
|
print '\tGot ZeroDivisionError as expected from pool.apply()'
|
||
|
else:
|
||
|
raise AssertionError, 'expected ZeroDivisionError'
|
||
|
|
||
|
try:
|
||
|
print pool.map(f, range(10))
|
||
|
except ZeroDivisionError:
|
||
|
print '\tGot ZeroDivisionError as expected from pool.map()'
|
||
|
else:
|
||
|
raise AssertionError, 'expected ZeroDivisionError'
|
||
|
|
||
|
try:
|
||
|
print list(pool.imap(f, range(10)))
|
||
|
except ZeroDivisionError:
|
||
|
print '\tGot ZeroDivisionError as expected from list(pool.imap())'
|
||
|
else:
|
||
|
raise AssertionError, 'expected ZeroDivisionError'
|
||
|
|
||
|
it = pool.imap(f, range(10))
|
||
|
for i in range(10):
|
||
|
try:
|
||
|
x = it.next()
|
||
|
except ZeroDivisionError:
|
||
|
if i == 5:
|
||
|
pass
|
||
|
except StopIteration:
|
||
|
break
|
||
|
else:
|
||
|
if i == 5:
|
||
|
raise AssertionError, 'expected ZeroDivisionError'
|
||
|
|
||
|
assert i == 9
|
||
|
print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
|
||
|
print
|
||
|
|
||
|
#
|
||
|
# Testing timeouts
|
||
|
#
|
||
|
|
||
|
print 'Testing ApplyResult.get() with timeout:',
|
||
|
res = pool.apply_async(calculate, TASKS[0])
|
||
|
while 1:
|
||
|
sys.stdout.flush()
|
||
|
try:
|
||
|
sys.stdout.write('\n\t%s' % res.get(0.02))
|
||
|
break
|
||
|
except multiprocessing.TimeoutError:
|
||
|
sys.stdout.write('.')
|
||
|
print
|
||
|
print
|
||
|
|
||
|
print 'Testing IMapIterator.next() with timeout:',
|
||
|
it = pool.imap(calculatestar, TASKS)
|
||
|
while 1:
|
||
|
sys.stdout.flush()
|
||
|
try:
|
||
|
sys.stdout.write('\n\t%s' % it.next(0.02))
|
||
|
except StopIteration:
|
||
|
break
|
||
|
except multiprocessing.TimeoutError:
|
||
|
sys.stdout.write('.')
|
||
|
print
|
||
|
print
|
||
|
|
||
|
#
|
||
|
# Testing callback
|
||
|
#
|
||
|
|
||
|
print 'Testing callback:'
|
||
|
|
||
|
A = []
|
||
|
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
|
||
|
|
||
|
r = pool.apply_async(mul, (7, 8), callback=A.append)
|
||
|
r.wait()
|
||
|
|
||
|
r = pool.map_async(pow3, range(10), callback=A.extend)
|
||
|
r.wait()
|
||
|
|
||
|
if A == B:
|
||
|
print '\tcallbacks succeeded\n'
|
||
|
else:
|
||
|
print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
|
||
|
|
||
|
#
|
||
|
# Check there are no outstanding tasks
|
||
|
#
|
||
|
|
||
|
assert not pool._cache, 'cache = %r' % pool._cache
|
||
|
|
||
|
#
|
||
|
# Check close() methods
|
||
|
#
|
||
|
|
||
|
print 'Testing close():'
|
||
|
|
||
|
for worker in pool._pool:
|
||
|
assert worker.is_alive()
|
||
|
|
||
|
result = pool.apply_async(time.sleep, [0.5])
|
||
|
pool.close()
|
||
|
pool.join()
|
||
|
|
||
|
assert result.get() is None
|
||
|
|
||
|
for worker in pool._pool:
|
||
|
assert not worker.is_alive()
|
||
|
|
||
|
print '\tclose() succeeded\n'
|
||
|
|
||
|
#
|
||
|
# Check terminate() method
|
||
|
#
|
||
|
|
||
|
print 'Testing terminate():'
|
||
|
|
||
|
pool = multiprocessing.Pool(2)
|
||
|
DELTA = 0.1
|
||
|
ignore = pool.apply(pow3, [2])
|
||
|
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
|
||
|
pool.terminate()
|
||
|
pool.join()
|
||
|
|
||
|
for worker in pool._pool:
|
||
|
assert not worker.is_alive()
|
||
|
|
||
|
print '\tterminate() succeeded\n'
|
||
|
|
||
|
#
|
||
|
# Check garbage collection
|
||
|
#
|
||
|
|
||
|
print 'Testing garbage collection:'
|
||
|
|
||
|
pool = multiprocessing.Pool(2)
|
||
|
DELTA = 0.1
|
||
|
processes = pool._pool
|
||
|
ignore = pool.apply(pow3, [2])
|
||
|
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
|
||
|
|
||
|
results = pool = None
|
||
|
|
||
|
time.sleep(DELTA * 2)
|
||
|
|
||
|
for worker in processes:
|
||
|
assert not worker.is_alive()
|
||
|
|
||
|
print '\tgarbage collection succeeded\n'
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
multiprocessing.freeze_support()
|
||
|
|
||
|
assert len(sys.argv) in (1, 2)
|
||
|
|
||
|
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
|
||
|
print ' Using processes '.center(79, '-')
|
||
|
elif sys.argv[1] == 'threads':
|
||
|
print ' Using threads '.center(79, '-')
|
||
|
import multiprocessing.dummy as multiprocessing
|
||
|
else:
|
||
|
print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
|
||
|
raise SystemExit(2)
|
||
|
|
||
|
test()
|