Parallel quickjoin

This commit is contained in:
Yomguithereal 2018-07-11 18:17:57 +02:00
parent 7931944719
commit 9f13609ad7
5 changed files with 57 additions and 14 deletions

View File

@ -74,7 +74,7 @@ with open('./data/musicians.csv', 'r') as f:
print('SNM Skeleton (%i):' % len(clusters), timer() - start)
start = timer()
clusters = list(quickjoin(artists, distance=levenshtein, radius=2))
clusters = list(quickjoin(artists, distance=levenshtein, radius=2, processes=8))
print('QuickJoin (%i):' % len(clusters), timer() - start)
start = timer()

View File

@ -25,7 +25,7 @@ def blocking_worker(payload):
if serialized:
similarity = dill.loads(similarity)
matches = []
pairs = []
n = len(block)
for i in range(n):
@ -38,9 +38,9 @@ def blocking_worker(payload):
continue
if similarity(A, B):
matches.append((A, B))
pairs.append((A, B))
return matches
return pairs
def blocking(data, block=None, blocks=None, similarity=None, distance=None,
@ -133,8 +133,8 @@ def blocking(data, block=None, blocks=None, similarity=None, distance=None,
)
with Pool(processes=processes) as pool:
for matches in pool.imap_unordered(blocking_worker, pool_iter):
yield from matches
for pairs in pool.imap_unordered(blocking_worker, pool_iter):
yield from pairs
yield from clusters_from_pairs(
clustering(),

View File

@ -124,7 +124,7 @@ def pairwise_worker(payload):
similarity, I, J, offset_i, offset_j = payload
similarity = dill.loads(similarity)
matches = []
pairs = []
diagonal_chunk = offset_i == offset_j
@ -138,9 +138,9 @@ def pairwise_worker(payload):
B = J[j]
if similarity(A, B):
matches.append((offset_i + i, offset_j + j))
pairs.append((offset_i + i, offset_j + j))
return matches
return pairs
def pairwise_fuzzy_clusters(data, similarity=None, distance=None, radius=None,

View File

@ -16,7 +16,10 @@
# Applications. SISAP 2013. Lecture Notes in Computer Science, vol 8199.
# Springer, Berlin, Heidelberg
#
import dill
import random
from multiprocessing import Pool
from fog.clustering.utils import clusters_from_pairs
@ -70,10 +73,21 @@ def quickjoin_self_bruteforce(S, distance, radius):
yield (A, B)
def worker(payload):
distance, radius, S1, S2 = payload
distance = dill.loads(distance)
if S2 is None:
return list(quickjoin_self_bruteforce(S1, distance, radius))
return list(quickjoin_bruteforce(S1, S2, distance, radius))
def quickjoin(data, distance, radius, block_size=500,
min_size=2, max_size=float('inf'),
mode='connected_components',
seed=None):
seed=None, processes=1):
"""
Function returning an iterator over found clusters using the QuickJoin
algorithm.
@ -93,6 +107,8 @@ def quickjoin(data, distance, radius, block_size=500,
mode (string, optional): 'fuzzy_clusters', 'connected_components'.
Defaults to 'connected_components'.
seed (number, optional): Seed for RNG. Defaults to None.
processes (number, optional): Number of processes to use.
Defaults to 1.
Yields:
list: A viable cluster.
@ -104,9 +120,11 @@ def quickjoin(data, distance, radius, block_size=500,
if type(data) is not list:
data = list(data)
def clustering():
# Iterator recursively partitioning the data set using QuickJoin's method
def blocks():
stack = [(data, None)]
# "Recursivity" through stack
while len(stack) != 0:
S1, S2 = stack.pop()
@ -117,7 +135,7 @@ def quickjoin(data, distance, radius, block_size=500,
N = len(S)
if N <= block_size:
yield from quickjoin_self_bruteforce(S, distance, radius)
yield (S, None)
continue
# Randomly selecting pivots. They must be different
@ -146,7 +164,7 @@ def quickjoin(data, distance, radius, block_size=500,
N = N1 + N2
if N <= block_size:
yield from quickjoin_bruteforce(S1, S2, distance, radius)
yield (S1, S2)
continue
p1 = rng.randint(0, N - 1)
@ -168,8 +186,29 @@ def quickjoin(data, distance, radius, block_size=500,
stack.append((Lw1, Gw2))
stack.append((Gw1, Lw2))
# Iterator performing bruteforce distance computation over found blocks
def clustering():
for S1, S2 in blocks():
if S2 is None:
yield from quickjoin_self_bruteforce(S1, distance, radius)
else:
yield from quickjoin_bruteforce(S1, S2, distance, radius)
def clustering_parallel():
with Pool(processes=processes) as pool:
pickled_distance = dill.dumps(distance)
pool_iter = (
(pickled_distance, radius, S1, S2)
for S1, S2 in blocks()
)
for pairs in pool.imap_unordered(worker, pool_iter):
yield from pairs
yield from clusters_from_pairs(
clustering(),
clustering() if processes == 1 else clustering_parallel(),
min_size=min_size,
max_size=max_size,
mode=mode

View File

@ -39,3 +39,7 @@ class TestQuickJoin(object):
clusters = Clusters(quickjoin(UNIVERSITIES, distance=levenshtein, radius=1))
assert clusters == UNIVERSITY_CLUSTERS
parallel_clusters = Clusters(quickjoin(UNIVERSITIES, distance=levenshtein, radius=1, processes=2))
assert parallel_clusters == UNIVERSITY_CLUSTERS