From 802f6ae19b211ada3f2eea600b513dd32e2648a3 Mon Sep 17 00:00:00 2001 From: Yomguithereal Date: Fri, 6 Jul 2018 15:58:21 +0200 Subject: [PATCH] Adding a blocking clusterer --- experiments/pairwise_benchmark.py | 9 ++ fog/clustering/__init__.py | 1 + fog/clustering/blocking.py | 148 ++++++++++++++++++++++++++++++ fog/clustering/pairwise.py | 4 +- test/clustering/blocking_test.py | 32 +++++++ 5 files changed, 192 insertions(+), 2 deletions(-) create mode 100644 fog/clustering/blocking.py create mode 100644 test/clustering/blocking_test.py diff --git a/experiments/pairwise_benchmark.py b/experiments/pairwise_benchmark.py index 515419a..e6ffa81 100644 --- a/experiments/pairwise_benchmark.py +++ b/experiments/pairwise_benchmark.py @@ -1,4 +1,5 @@ import csv +from functools import partial from timeit import default_timer as timer from fog.clustering import * from fog.tokenizers import ngrams @@ -28,6 +29,10 @@ with open('./data/universities.csv', 'r') as f: clusters = list(vp_tree(universities, distance=levenshtein, radius=2)) print('VPTree (%i):' % len(clusters), timer() - start) + start = timer() + clusters = list(blocking(universities, blocks=partial(ngrams, 6), distance=levenshtein, radius=2)) + print('Blocking (%i):' % len(clusters), timer() - start) + print() with open('./data/musicians.csv', 'r') as f: reader = csv.DictReader(f) @@ -44,6 +49,10 @@ with open('./data/musicians.csv', 'r') as f: clusters = list(key_collision(artists, key=fingerprint)) print('Fingerprint key collision (%i)' % len(clusters), timer() - start) + start = timer() + clusters = list(blocking(artists, blocks=partial(ngrams, 6), distance=levenshtein, radius=2, processes=8)) + print('Blocking (%i):' % len(clusters), timer() - start) + start = timer() clusters = list(pairwise_fuzzy_clusters(artists, distance=levenshtein, radius=2, processes=8)) print('Parallel Fuzzy clusters (%i):' % len(clusters), timer() - start) diff --git a/fog/clustering/__init__.py b/fog/clustering/__init__.py index 1d5a322..edcbcc0 100644 --- a/fog/clustering/__init__.py +++ b/fog/clustering/__init__.py @@ -1,3 +1,4 @@ +from fog.clustering.blocking import blocking from fog.clustering.jaccard_intersection_index import ( jaccard_intersection_index ) diff --git a/fog/clustering/blocking.py b/fog/clustering/blocking.py new file mode 100644 index 0000000..fd88066 --- /dev/null +++ b/fog/clustering/blocking.py @@ -0,0 +1,148 @@ +# ============================================================================= +# Fog Blocking Clustering +# ============================================================================= +# +# Implementation of a blocking clusterer that starts by dispatching given +# items to one or more buckets before computing pairwise comparisons on them. +# +import dill +from collections import defaultdict +from multiprocessing import Pool +from fog.clustering.utils import make_similarity_function + +# TODO: max_block_size to avoid ngrams with high DF + + +def blocking_worker(payload): + """ + Worker function used compute pairwise distance/similarity over a whole + block. + """ + similarity, block, serialized, graph = payload + + if serialized: + similarity = dill.loads(similarity) + + matches = [] + n = len(block) + + for i in range(n): + A = block[i] + + for j in range(i + 1, n): + B = block[j] + + if graph is not None and A in graph and B in graph[A]: + continue + + if similarity(A, B): + matches.append((A, B)) + + return matches + + +def blocking(data, block=None, blocks=None, similarity=None, distance=None, + radius=None, min_size=2, max_size=float('inf'), processes=1): + """ + Function returning an iterator over found clusters using the leader + algorithm. + + It works by dispatching given items into one or more buckets before + computing pairwise comparisons on each bucket. + + Args: + data (iterable): Arbitrary iterable containing data points to gather + into clusters. Will be fully consumed. + block (callable): A function returning an item's block. + blocks (callable): A function returning an item's blocks. + similarity (callable): If radius is specified, a function returning + the similarity between two points. Else, a function returning + whether two points should be deemed similar. Alternatively, one can + specify `distance` instead. + distance (callable): If radius is specified, a function returning + the distance between two points. Else, a function returning + whether two point should not be deemed similar. Alternatively, one + can specify `similarity` instead. + radius (number, optional): produced clusters' radius. + min_size (number, optional): minimum number of items in a cluster for + it to be considered viable. Defaults to 2. + max_size (number, optional): maximum number of items in a cluster for + it to be considered viable. Defaults to infinity. + processes (number, optional): number of processes to use. + Defaults to 1. + + Yields: + list: A viable cluster. + + """ + + # Formatting similarity + similarity = make_similarity_function(similarity=similarity, distance=distance, radius=radius) + + # Single block or blocks? + if blocks is None: + buckets = defaultdict(list) + graph = defaultdict(list) + worker_graph = None + + def add(x, y): + x.append(y) + else: + buckets = defaultdict(set) + graph = defaultdict(set) + worker_graph = graph + + def add(x, y): + x.add(y) + + # Grouping items into buckets + for item in data: + bs = blocks(item) if blocks is not None else [block(item)] + + for b in bs: + add(buckets[b], item) + + # Fuzzy clustering + if processes == 1: + for bucket in buckets.values(): + if len(bucket) < 2: + continue + + if type(bucket) is not list: + bucket = list(bucket) + + for A, B in blocking_worker((similarity, bucket, False, worker_graph)): + add(graph[A], B) + add(graph[B], A) + else: + + pickled_similarity = dill.dumps(similarity) + + pool_iter = ( + (pickled_similarity, bucket if type(bucket) is list else list(bucket), True, None) + for bucket + in buckets.values() + if len(bucket) > 1 + ) + + with Pool(processes=processes) as pool: + for matches in pool.imap_unordered(blocking_worker, pool_iter): + for A, B in matches: + add(graph[A], B) + add(graph[B], A) + + # Building clusters + visited = set() + for A, neighbors in graph.items(): + if A in visited: + continue + + if len(neighbors) + 1 < min_size: + continue + if len(neighbors) + 1 > max_size: + continue + + visited.update(neighbors) + + cluster = [A] + (neighbors if type(neighbors) is list else list(neighbors)) + yield cluster diff --git a/fog/clustering/pairwise.py b/fog/clustering/pairwise.py index c32d48b..291bca0 100644 --- a/fog/clustering/pairwise.py +++ b/fog/clustering/pairwise.py @@ -246,7 +246,7 @@ def pairwise_fuzzy_clusters(data, similarity=None, distance=None, radius=None, # Pool with Pool(processes=processes) as pool: - for matches in pool.imap(pairwise_fuzzy_clusters_worker, pool_iter): + for matches in pool.imap_unordered(pairwise_fuzzy_clusters_worker, pool_iter): for i, j in matches: graph[i].append(j) graph[j].append(i) @@ -362,7 +362,7 @@ def pairwise_connected_components(data, similarity=None, distance=None, radius=N # Pool with Pool(processes=processes) as pool: - for matches in pool.imap(pairwise_fuzzy_clusters_worker, pool_iter): + for matches in pool.imap_unordered(pairwise_fuzzy_clusters_worker, pool_iter): for i, j in matches: sets.union(i, j) diff --git a/test/clustering/blocking_test.py b/test/clustering/blocking_test.py new file mode 100644 index 0000000..4cb5c50 --- /dev/null +++ b/test/clustering/blocking_test.py @@ -0,0 +1,32 @@ +# ============================================================================= +# Fog Blocking Unit Tests +# ============================================================================= +import csv +from test.clustering.utils import Clusters +from Levenshtein import distance as levenshtein +from fog.clustering import blocking +from fog.tokenizers import ngrams + +DATA = [ + 'Abelard', + 'Abelar', + 'Atrium', + 'Atrides', + 'Belgian', + 'Belgia', + 'Telgia' +] + +CLUSTERS = Clusters([ + ('Abelard', 'Abelar'), + ('Belgian', 'Belgia') +]) + + +class TestBlocking(object): + def test_basics(self): + + # Blocking on first letter + clusters = Clusters(blocking(DATA, blocks=lambda x: x[0], distance=levenshtein, radius=1)) + + assert clusters == CLUSTERS