2008-06-11 16:44:04 +00:00
|
|
|
#
|
|
|
|
# A test of `multiprocessing.Pool` class
|
|
|
|
#
|
Merged revisions 67348,67355,67359,67362,67364-67365,67367-67368,67398,67423-67424,67432,67440-67441,67444-67445,67454-67455,67457-67458 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r67348 | benjamin.peterson | 2008-11-22 20:09:41 -0600 (Sat, 22 Nov 2008) | 1 line
raise a better error
........
r67355 | georg.brandl | 2008-11-23 13:17:25 -0600 (Sun, 23 Nov 2008) | 2 lines
#4392: fix parameter name.
........
r67359 | georg.brandl | 2008-11-23 15:57:30 -0600 (Sun, 23 Nov 2008) | 2 lines
#4399: fix typo.
........
r67362 | gregory.p.smith | 2008-11-23 18:41:43 -0600 (Sun, 23 Nov 2008) | 2 lines
Document PY_SSIZE_T_CLEAN for PyArg_ParseTuple.
........
r67364 | benjamin.peterson | 2008-11-23 19:16:29 -0600 (Sun, 23 Nov 2008) | 2 lines
replace reference to debugger-hooks
........
r67365 | benjamin.peterson | 2008-11-23 22:09:03 -0600 (Sun, 23 Nov 2008) | 1 line
#4396 make the parser module correctly validate the with syntax
........
r67367 | georg.brandl | 2008-11-24 10:16:07 -0600 (Mon, 24 Nov 2008) | 2 lines
Fix typo.
........
r67368 | georg.brandl | 2008-11-24 13:56:47 -0600 (Mon, 24 Nov 2008) | 2 lines
#4404: make clear what "path" is.
........
r67398 | benjamin.peterson | 2008-11-26 11:39:17 -0600 (Wed, 26 Nov 2008) | 1 line
fix typo in sqlite3 docs
........
r67423 | jesse.noller | 2008-11-28 12:59:35 -0600 (Fri, 28 Nov 2008) | 2 lines
issue4238: bsd support for cpu_count
........
r67424 | christian.heimes | 2008-11-28 13:33:33 -0600 (Fri, 28 Nov 2008) | 1 line
Retain copyright of processing examples. This was requested by a Debian maintainer during packaging of the multiprocessing package for 2.4/2.5
........
r67432 | benjamin.peterson | 2008-11-28 17:18:46 -0600 (Fri, 28 Nov 2008) | 1 line
SVN format 9 is the same it seems
........
r67440 | jeremy.hylton | 2008-11-28 17:42:59 -0600 (Fri, 28 Nov 2008) | 4 lines
Move definition int sval into branch of ifdef where it is used.
Otherwise, you get a warning about an undefined variable.
........
r67441 | jeremy.hylton | 2008-11-28 18:09:16 -0600 (Fri, 28 Nov 2008) | 2 lines
Reflow long lines.
........
r67444 | amaury.forgeotdarc | 2008-11-28 20:03:32 -0600 (Fri, 28 Nov 2008) | 2 lines
Fix a small typo in docstring
........
r67445 | benjamin.peterson | 2008-11-29 21:07:33 -0600 (Sat, 29 Nov 2008) | 1 line
StringIO.close() stops you from using the buffer, too
........
r67454 | benjamin.peterson | 2008-11-30 08:43:23 -0600 (Sun, 30 Nov 2008) | 1 line
note the version that works
........
r67455 | martin.v.loewis | 2008-11-30 13:28:27 -0600 (Sun, 30 Nov 2008) | 1 line
Issue #4365: Add crtassem.h constants to the msvcrt module.
........
r67457 | christian.heimes | 2008-11-30 15:16:28 -0600 (Sun, 30 Nov 2008) | 1 line
w# requires Py_ssize_t
........
r67458 | benjamin.peterson | 2008-11-30 15:46:16 -0600 (Sun, 30 Nov 2008) | 1 line
fix pyspecific extensions that were broken by Sphinx's grand renaming
........
2008-11-30 22:46:23 +00:00
|
|
|
# Copyright (c) 2006-2008, R Oudkerk
|
|
|
|
# All rights reserved.
|
|
|
|
#
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
import multiprocessing
|
|
|
|
import time
|
|
|
|
import random
|
|
|
|
import sys
|
|
|
|
|
|
|
|
#
|
|
|
|
# Functions used by test code
|
|
|
|
#
|
|
|
|
|
|
|
|
def calculate(func, args):
|
|
|
|
result = func(*args)
|
|
|
|
return '%s says that %s%s = %s' % (
|
2008-08-19 19:17:39 +00:00
|
|
|
multiprocessing.current_process().name,
|
2008-06-11 16:44:04 +00:00
|
|
|
func.__name__, args, result
|
|
|
|
)
|
|
|
|
|
|
|
|
def calculatestar(args):
|
|
|
|
return calculate(*args)
|
|
|
|
|
|
|
|
def mul(a, b):
|
|
|
|
time.sleep(0.5*random.random())
|
|
|
|
return a * b
|
|
|
|
|
|
|
|
def plus(a, b):
|
|
|
|
time.sleep(0.5*random.random())
|
|
|
|
return a + b
|
|
|
|
|
|
|
|
def f(x):
|
|
|
|
return 1.0 / (x-5.0)
|
|
|
|
|
|
|
|
def pow3(x):
|
|
|
|
return x**3
|
|
|
|
|
|
|
|
def noop(x):
|
|
|
|
pass
|
|
|
|
|
|
|
|
#
|
|
|
|
# Test code
|
|
|
|
#
|
|
|
|
|
|
|
|
def test():
|
2008-11-28 11:23:26 +00:00
|
|
|
print('cpu_count() = %d\n' % multiprocessing.cpu_count())
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
#
|
|
|
|
# Create pool
|
|
|
|
#
|
|
|
|
|
|
|
|
PROCESSES = 4
|
2008-11-28 11:23:26 +00:00
|
|
|
print('Creating pool with %d processes\n' % PROCESSES)
|
2008-06-11 16:44:04 +00:00
|
|
|
pool = multiprocessing.Pool(PROCESSES)
|
2008-11-28 11:23:26 +00:00
|
|
|
print('pool = %s' % pool)
|
|
|
|
print()
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
#
|
|
|
|
# Tests
|
|
|
|
#
|
|
|
|
|
|
|
|
TASKS = [(mul, (i, 7)) for i in range(10)] + \
|
|
|
|
[(plus, (i, 8)) for i in range(10)]
|
|
|
|
|
|
|
|
results = [pool.apply_async(calculate, t) for t in TASKS]
|
|
|
|
imap_it = pool.imap(calculatestar, TASKS)
|
|
|
|
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
|
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('Ordered results using pool.apply_async():')
|
2008-06-11 16:44:04 +00:00
|
|
|
for r in results:
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\t', r.get())
|
|
|
|
print()
|
2008-06-11 16:44:04 +00:00
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('Ordered results using pool.imap():')
|
2008-06-11 16:44:04 +00:00
|
|
|
for x in imap_it:
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\t', x)
|
|
|
|
print()
|
2008-06-11 16:44:04 +00:00
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('Unordered results using pool.imap_unordered():')
|
2008-06-11 16:44:04 +00:00
|
|
|
for x in imap_unordered_it:
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\t', x)
|
|
|
|
print()
|
2008-06-11 16:44:04 +00:00
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('Ordered results using pool.map() --- will block till complete:')
|
2008-06-11 16:44:04 +00:00
|
|
|
for x in pool.map(calculatestar, TASKS):
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\t', x)
|
|
|
|
print()
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
#
|
|
|
|
# Simple benchmarks
|
|
|
|
#
|
|
|
|
|
|
|
|
N = 100000
|
2008-11-28 11:23:26 +00:00
|
|
|
print('def pow3(x): return x**3')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
t = time.time()
|
2008-11-28 11:23:26 +00:00
|
|
|
A = list(map(pow3, range(N)))
|
|
|
|
print('\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
|
|
|
|
(N, time.time() - t))
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
t = time.time()
|
2008-11-28 11:23:26 +00:00
|
|
|
B = pool.map(pow3, range(N))
|
|
|
|
print('\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
|
|
|
|
(N, time.time() - t))
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
t = time.time()
|
2008-11-28 11:23:26 +00:00
|
|
|
C = list(pool.imap(pow3, range(N), chunksize=N//8))
|
|
|
|
print('\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
|
|
|
|
' seconds' % (N, N//8, time.time() - t))
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
assert A == B == C, (len(A), len(B), len(C))
|
2008-11-28 11:23:26 +00:00
|
|
|
print()
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
L = [None] * 1000000
|
2008-11-28 11:23:26 +00:00
|
|
|
print('def noop(x): pass')
|
|
|
|
print('L = [None] * 1000000')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
t = time.time()
|
2008-11-28 11:23:26 +00:00
|
|
|
A = list(map(noop, L))
|
|
|
|
print('\tmap(noop, L):\n\t\t%s seconds' % \
|
|
|
|
(time.time() - t))
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
t = time.time()
|
|
|
|
B = pool.map(noop, L)
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\tpool.map(noop, L):\n\t\t%s seconds' % \
|
|
|
|
(time.time() - t))
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
t = time.time()
|
|
|
|
C = list(pool.imap(noop, L, chunksize=len(L)//8))
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
|
|
|
|
(len(L)//8, time.time() - t))
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
assert A == B == C, (len(A), len(B), len(C))
|
2008-11-28 11:23:26 +00:00
|
|
|
print()
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
del A, B, C, L
|
|
|
|
|
|
|
|
#
|
|
|
|
# Test error handling
|
|
|
|
#
|
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('Testing error handling:')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
try:
|
2008-11-28 11:23:26 +00:00
|
|
|
print(pool.apply(f, (5,)))
|
2008-06-11 16:44:04 +00:00
|
|
|
except ZeroDivisionError:
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\tGot ZeroDivisionError as expected from pool.apply()')
|
2008-06-11 16:44:04 +00:00
|
|
|
else:
|
2008-11-28 11:23:26 +00:00
|
|
|
raise AssertionError('expected ZeroDivisionError')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
try:
|
2008-11-28 11:23:26 +00:00
|
|
|
print(pool.map(f, list(range(10))))
|
2008-06-11 16:44:04 +00:00
|
|
|
except ZeroDivisionError:
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\tGot ZeroDivisionError as expected from pool.map()')
|
2008-06-11 16:44:04 +00:00
|
|
|
else:
|
2008-11-28 11:23:26 +00:00
|
|
|
raise AssertionError('expected ZeroDivisionError')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
try:
|
2008-11-28 11:23:26 +00:00
|
|
|
print(list(pool.imap(f, list(range(10)))))
|
2008-06-11 16:44:04 +00:00
|
|
|
except ZeroDivisionError:
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
|
2008-06-11 16:44:04 +00:00
|
|
|
else:
|
2008-11-28 11:23:26 +00:00
|
|
|
raise AssertionError('expected ZeroDivisionError')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
it = pool.imap(f, list(range(10)))
|
2008-06-11 16:44:04 +00:00
|
|
|
for i in range(10):
|
|
|
|
try:
|
2008-11-28 11:23:26 +00:00
|
|
|
x = next(it)
|
2008-06-11 16:44:04 +00:00
|
|
|
except ZeroDivisionError:
|
|
|
|
if i == 5:
|
|
|
|
pass
|
|
|
|
except StopIteration:
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
if i == 5:
|
2008-11-28 11:23:26 +00:00
|
|
|
raise AssertionError('expected ZeroDivisionError')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
assert i == 9
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
|
|
|
|
print()
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
#
|
|
|
|
# Testing timeouts
|
|
|
|
#
|
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('Testing ApplyResult.get() with timeout:', end=' ')
|
2008-06-11 16:44:04 +00:00
|
|
|
res = pool.apply_async(calculate, TASKS[0])
|
|
|
|
while 1:
|
|
|
|
sys.stdout.flush()
|
|
|
|
try:
|
|
|
|
sys.stdout.write('\n\t%s' % res.get(0.02))
|
|
|
|
break
|
|
|
|
except multiprocessing.TimeoutError:
|
|
|
|
sys.stdout.write('.')
|
2008-11-28 11:23:26 +00:00
|
|
|
print()
|
|
|
|
print()
|
2008-06-11 16:44:04 +00:00
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('Testing IMapIterator.next() with timeout:', end=' ')
|
2008-06-11 16:44:04 +00:00
|
|
|
it = pool.imap(calculatestar, TASKS)
|
|
|
|
while 1:
|
|
|
|
sys.stdout.flush()
|
|
|
|
try:
|
|
|
|
sys.stdout.write('\n\t%s' % it.next(0.02))
|
|
|
|
except StopIteration:
|
|
|
|
break
|
|
|
|
except multiprocessing.TimeoutError:
|
|
|
|
sys.stdout.write('.')
|
2008-11-28 11:23:26 +00:00
|
|
|
print()
|
|
|
|
print()
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
#
|
|
|
|
# Testing callback
|
|
|
|
#
|
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('Testing callback:')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
A = []
|
|
|
|
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
|
|
|
|
|
|
|
|
r = pool.apply_async(mul, (7, 8), callback=A.append)
|
|
|
|
r.wait()
|
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
r = pool.map_async(pow3, list(range(10)), callback=A.extend)
|
2008-06-11 16:44:04 +00:00
|
|
|
r.wait()
|
|
|
|
|
|
|
|
if A == B:
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\tcallbacks succeeded\n')
|
2008-06-11 16:44:04 +00:00
|
|
|
else:
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\t*** callbacks failed\n\t\t%s != %s\n' % (A, B))
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
#
|
|
|
|
# Check there are no outstanding tasks
|
|
|
|
#
|
|
|
|
|
|
|
|
assert not pool._cache, 'cache = %r' % pool._cache
|
|
|
|
|
|
|
|
#
|
|
|
|
# Check close() methods
|
|
|
|
#
|
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('Testing close():')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
for worker in pool._pool:
|
|
|
|
assert worker.is_alive()
|
|
|
|
|
|
|
|
result = pool.apply_async(time.sleep, [0.5])
|
|
|
|
pool.close()
|
|
|
|
pool.join()
|
|
|
|
|
|
|
|
assert result.get() is None
|
|
|
|
|
|
|
|
for worker in pool._pool:
|
|
|
|
assert not worker.is_alive()
|
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\tclose() succeeded\n')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
#
|
|
|
|
# Check terminate() method
|
|
|
|
#
|
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('Testing terminate():')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
pool = multiprocessing.Pool(2)
|
|
|
|
DELTA = 0.1
|
|
|
|
ignore = pool.apply(pow3, [2])
|
|
|
|
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
|
|
|
|
pool.terminate()
|
|
|
|
pool.join()
|
|
|
|
|
|
|
|
for worker in pool._pool:
|
|
|
|
assert not worker.is_alive()
|
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\tterminate() succeeded\n')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
#
|
|
|
|
# Check garbage collection
|
|
|
|
#
|
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('Testing garbage collection:')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
pool = multiprocessing.Pool(2)
|
|
|
|
DELTA = 0.1
|
|
|
|
processes = pool._pool
|
|
|
|
ignore = pool.apply(pow3, [2])
|
|
|
|
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
|
|
|
|
|
|
|
|
results = pool = None
|
|
|
|
|
|
|
|
time.sleep(DELTA * 2)
|
|
|
|
|
|
|
|
for worker in processes:
|
|
|
|
assert not worker.is_alive()
|
|
|
|
|
2008-11-28 11:23:26 +00:00
|
|
|
print('\tgarbage collection succeeded\n')
|
2008-06-11 16:44:04 +00:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
multiprocessing.freeze_support()
|
|
|
|
|
|
|
|
assert len(sys.argv) in (1, 2)
|
|
|
|
|
|
|
|
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
|
2008-11-28 11:23:26 +00:00
|
|
|
print(' Using processes '.center(79, '-'))
|
2008-06-11 16:44:04 +00:00
|
|
|
elif sys.argv[1] == 'threads':
|
2008-11-28 11:23:26 +00:00
|
|
|
print(' Using threads '.center(79, '-'))
|
2008-06-11 16:44:04 +00:00
|
|
|
import multiprocessing.dummy as multiprocessing
|
|
|
|
else:
|
2008-11-28 11:23:26 +00:00
|
|
|
print('Usage:\n\t%s [processes | threads]' % sys.argv[0])
|
2008-06-11 16:44:04 +00:00
|
|
|
raise SystemExit(2)
|
|
|
|
|
|
|
|
test()
|