diff --git a/experiments/benchmark.py b/experiments/benchmark.py index d2f0a22..f042882 100644 --- a/experiments/benchmark.py +++ b/experiments/benchmark.py @@ -74,7 +74,7 @@ with open('./data/musicians.csv', 'r') as f: print('SNM Skeleton (%i):' % len(clusters), timer() - start) start = timer() - clusters = list(quickjoin(artists, distance=levenshtein, radius=2)) + clusters = list(quickjoin(artists, distance=levenshtein, radius=2, processes=8)) print('QuickJoin (%i):' % len(clusters), timer() - start) start = timer() diff --git a/fog/clustering/blocking.py b/fog/clustering/blocking.py index c96522a..d899f06 100644 --- a/fog/clustering/blocking.py +++ b/fog/clustering/blocking.py @@ -25,7 +25,7 @@ def blocking_worker(payload): if serialized: similarity = dill.loads(similarity) - matches = [] + pairs = [] n = len(block) for i in range(n): @@ -38,9 +38,9 @@ def blocking_worker(payload): continue if similarity(A, B): - matches.append((A, B)) + pairs.append((A, B)) - return matches + return pairs def blocking(data, block=None, blocks=None, similarity=None, distance=None, @@ -133,8 +133,8 @@ def blocking(data, block=None, blocks=None, similarity=None, distance=None, ) with Pool(processes=processes) as pool: - for matches in pool.imap_unordered(blocking_worker, pool_iter): - yield from matches + for pairs in pool.imap_unordered(blocking_worker, pool_iter): + yield from pairs yield from clusters_from_pairs( clustering(), diff --git a/fog/clustering/pairwise.py b/fog/clustering/pairwise.py index ad0099c..1cc737e 100644 --- a/fog/clustering/pairwise.py +++ b/fog/clustering/pairwise.py @@ -124,7 +124,7 @@ def pairwise_worker(payload): similarity, I, J, offset_i, offset_j = payload similarity = dill.loads(similarity) - matches = [] + pairs = [] diagonal_chunk = offset_i == offset_j @@ -138,9 +138,9 @@ def pairwise_worker(payload): B = J[j] if similarity(A, B): - matches.append((offset_i + i, offset_j + j)) + pairs.append((offset_i + i, offset_j + j)) - return matches + return pairs def pairwise_fuzzy_clusters(data, similarity=None, distance=None, radius=None, diff --git a/fog/clustering/quickjoin.py b/fog/clustering/quickjoin.py index 8d086e9..449f279 100644 --- a/fog/clustering/quickjoin.py +++ b/fog/clustering/quickjoin.py @@ -16,7 +16,10 @@ # Applications. SISAP 2013. Lecture Notes in Computer Science, vol 8199. # Springer, Berlin, Heidelberg # +import dill import random +from multiprocessing import Pool + from fog.clustering.utils import clusters_from_pairs @@ -70,10 +73,21 @@ def quickjoin_self_bruteforce(S, distance, radius): yield (A, B) +def worker(payload): + distance, radius, S1, S2 = payload + + distance = dill.loads(distance) + + if S2 is None: + return list(quickjoin_self_bruteforce(S1, distance, radius)) + + return list(quickjoin_bruteforce(S1, S2, distance, radius)) + + def quickjoin(data, distance, radius, block_size=500, min_size=2, max_size=float('inf'), mode='connected_components', - seed=None): + seed=None, processes=1): """ Function returning an iterator over found clusters using the QuickJoin algorithm. @@ -93,6 +107,8 @@ def quickjoin(data, distance, radius, block_size=500, mode (string, optional): 'fuzzy_clusters', 'connected_components'. Defaults to 'connected_components'. seed (number, optional): Seed for RNG. Defaults to None. + processes (number, optional): Number of processes to use. + Defaults to 1. Yields: list: A viable cluster. @@ -104,9 +120,11 @@ def quickjoin(data, distance, radius, block_size=500, if type(data) is not list: data = list(data) - def clustering(): + # Iterator recursively partitioning the data set using QuickJoin's method + def blocks(): stack = [(data, None)] + # "Recursivity" through stack while len(stack) != 0: S1, S2 = stack.pop() @@ -117,7 +135,7 @@ def quickjoin(data, distance, radius, block_size=500, N = len(S) if N <= block_size: - yield from quickjoin_self_bruteforce(S, distance, radius) + yield (S, None) continue # Randomly selecting pivots. They must be different @@ -146,7 +164,7 @@ def quickjoin(data, distance, radius, block_size=500, N = N1 + N2 if N <= block_size: - yield from quickjoin_bruteforce(S1, S2, distance, radius) + yield (S1, S2) continue p1 = rng.randint(0, N - 1) @@ -168,8 +186,29 @@ def quickjoin(data, distance, radius, block_size=500, stack.append((Lw1, Gw2)) stack.append((Gw1, Lw2)) + # Iterator performing bruteforce distance computation over found blocks + def clustering(): + + for S1, S2 in blocks(): + if S2 is None: + yield from quickjoin_self_bruteforce(S1, distance, radius) + else: + yield from quickjoin_bruteforce(S1, S2, distance, radius) + + def clustering_parallel(): + with Pool(processes=processes) as pool: + pickled_distance = dill.dumps(distance) + + pool_iter = ( + (pickled_distance, radius, S1, S2) + for S1, S2 in blocks() + ) + + for pairs in pool.imap_unordered(worker, pool_iter): + yield from pairs + yield from clusters_from_pairs( - clustering(), + clustering() if processes == 1 else clustering_parallel(), min_size=min_size, max_size=max_size, mode=mode diff --git a/test/clustering/quickjoin_test.py b/test/clustering/quickjoin_test.py index 103c7b0..c0602f7 100644 --- a/test/clustering/quickjoin_test.py +++ b/test/clustering/quickjoin_test.py @@ -39,3 +39,7 @@ class TestQuickJoin(object): clusters = Clusters(quickjoin(UNIVERSITIES, distance=levenshtein, radius=1)) assert clusters == UNIVERSITY_CLUSTERS + + parallel_clusters = Clusters(quickjoin(UNIVERSITIES, distance=levenshtein, radius=1, processes=2)) + + assert parallel_clusters == UNIVERSITY_CLUSTERS