diff --git a/experiments/pairwise_benchmarking.py b/experiments/pairwise_benchmarking.py index 3b45324..e559b6f 100644 --- a/experiments/pairwise_benchmarking.py +++ b/experiments/pairwise_benchmarking.py @@ -3,19 +3,32 @@ from timeit import default_timer as timer from fog.clustering import * from Levenshtein import distance as levenshtein -with open('./data/universities.csv', 'r') as f: +# with open('./data/universities.csv', 'r') as f: +# reader = csv.DictReader(f) + +# universities = sorted(set(line['university'] for line in reader)) + +# print('Universities: %i' % len(universities)) + +# start = timer() +# clusters = list(pairwise_leader(universities, distance=levenshtein, radius=2)) +# print('Leader (%i):' % len(clusters), timer() - start) + +# start = timer() +# clusters = list(pairwise_fuzzy_clusters(universities, distance=levenshtein, radius=2, processes=2)) +# print('Fuzzy clusters (%i):' % len(clusters), timer() - start) + +# start = timer() +# clusters = list(pairwise_connected_components(universities, distance=levenshtein, radius=2)) +# print('Connected components (%i):' % len(clusters), timer() - start) + +with open('./data/musicians.csv', 'r') as f: reader = csv.DictReader(f) - universities = set(line['university'] for line in reader) + artists = sorted(set(line['artist'] for line in reader)) + + print('Artists: %i' % len(artists)) start = timer() - clusters = list(pairwise_leader(universities, distance=levenshtein, radius=2)) - print('Leader (%i):' % len(clusters), timer() - start) - - start = timer() - clusters = list(pairwise_fuzzy_clusters(universities, distance=levenshtein, radius=2)) + clusters = list(pairwise_fuzzy_clusters(artists, distance=levenshtein, radius=2, processes=6)) print('Fuzzy clusters (%i):' % len(clusters), timer() - start) - - start = timer() - clusters = list(pairwise_connected_components(universities, distance=levenshtein, radius=2)) - print('Connected components (%i):' % len(clusters), timer() - start) diff --git a/fog/clustering/pairwise.py b/fog/clustering/pairwise.py index 243c75c..84df17e 100644 --- a/fog/clustering/pairwise.py +++ b/fog/clustering/pairwise.py @@ -5,11 +5,13 @@ # Clustering algorithms computing every pairwise distance/similarity to build # suitable clusters. # +import dill from collections import defaultdict from multiprocessing import Pool from phylactery import UnionFind from fog.clustering.utils import make_similarity_function +from fog.utils import upper_triangular_matrix_chunk_iter def pairwise_leader(data, similarity=None, distance=None, radius=None, @@ -111,8 +113,34 @@ def pairwise_leader(data, similarity=None, distance=None, radius=None, yield cluster +def pairwise_fuzzy_clusters_worker(payload): + """ + Worker function used to compute pairwise fuzzy clusters over chunks + of the final matrix in parallel. + + """ + similarity, I, J, offset_i, offset_j = payload + + similarity = dill.loads(similarity) + matches = [] + + diagonal_chunk = offset_i == offset_j + + for i in range(len(I)): + A = I[i] + + for j in range(0 if not diagonal_chunk else i + 1, len(J)): + B = J[j] + + if similarity(A, B): + matches.append((offset_i + i, offset_j + j)) + + return matches + + def pairwise_fuzzy_clusters(data, similarity=None, distance=None, radius=None, - min_size=2, max_size=float('inf'), processes=1): + min_size=2, max_size=float('inf'), processes=1, + chunk_size=100): """ Function returning an iterator over found clusters using an algorithm yielding fuzzy clusters. @@ -158,6 +186,8 @@ def pairwise_fuzzy_clusters(data, similarity=None, distance=None, radius=None, max_size (number, optional): maximum number of items in a cluster for it to be considered viable. Defaults to infinity. processes (number, optional): number of processes to use. Defaults to 1. + chunk_size (number, optional): size of matrix chunks to send to + subprocesses. Defaults to 100. Yields: list: A viable cluster. @@ -186,8 +216,22 @@ def pairwise_fuzzy_clusters(data, similarity=None, distance=None, radius=None, graph[i].append(j) graph[j].append(i) else: + + pickled_similarity = dill.dumps(similarity) + + # Iterator + pool_iter = ( + (pickled_similarity, I, J, offset_i, offset_j) + for I, J, offset_i, offset_j + in upper_triangular_matrix_chunk_iter(data, chunk_size) + ) + + # Pool with Pool(processes=processes) as pool: - pass + for matches in pool.imap_unordered(pairwise_fuzzy_clusters_worker, pool_iter): + for i, j in matches: + graph[i].append(j) + graph[j].append(i) # Building clusters visited = set() diff --git a/fog/utils.py b/fog/utils.py new file mode 100644 index 0000000..958de56 --- /dev/null +++ b/fog/utils.py @@ -0,0 +1,40 @@ +# ============================================================================= +# Fog Utils +# ============================================================================= +# +# Miscellaneous functions used throughout the library. +# +import math + + +def upper_triangular_matrix_chunk_iter(data, chunk_size): + """ + Function returning an iterator over chunks of an upper triangular matrix. + It's a useful utility to parallelize pairwise distance computations, for + instance. + + Args: + data (iterable): The matrix's data. + chunk_size (int): Size of the chunks to yield. + + Yields: + tuple of slices: A matrix's chunk. + + """ + n = len(data) + c = math.ceil(n / chunk_size) + + for j in range(c): + j_offset = j * chunk_size + j_limit = j_offset + chunk_size + + for i in range(0, min(j + 1, c)): + i_offset = i * chunk_size + i_limit = i_offset + chunk_size + + yield ( + data[i_offset:i_limit], + data[j_offset:j_limit], + i_offset, + j_offset + ) diff --git a/requirements.txt b/requirements.txt index 0057e35..1c8db4f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ pytest==3.5.1 twine==1.11.0 # Dependencies +dill==0.2.7.1 phylactery==0.0.4 diff --git a/setup.py b/setup.py index d404f87..7099304 100644 --- a/setup.py +++ b/setup.py @@ -16,5 +16,8 @@ setup(name='fog', python_requires='>=3', packages=find_packages(exclude=['experiments', 'test']), package_data={'docs': ['README.md']}, - install_requires=['phylactery==0.0.4'], + install_requires=[ + 'dill==0.2.7.1', + 'phylactery==0.0.4' + ], zip_safe=True)