multiprocess for fuzzy clusters

This commit is contained in:
Yomguithereal 2018-06-07 12:49:43 +02:00
parent 98cf8fec5e
commit b743e05022
5 changed files with 115 additions and 14 deletions

View File

@ -3,19 +3,32 @@ from timeit import default_timer as timer
from fog.clustering import *
from Levenshtein import distance as levenshtein
with open('./data/universities.csv', 'r') as f:
# with open('./data/universities.csv', 'r') as f:
# reader = csv.DictReader(f)
# universities = sorted(set(line['university'] for line in reader))
# print('Universities: %i' % len(universities))
# start = timer()
# clusters = list(pairwise_leader(universities, distance=levenshtein, radius=2))
# print('Leader (%i):' % len(clusters), timer() - start)
# start = timer()
# clusters = list(pairwise_fuzzy_clusters(universities, distance=levenshtein, radius=2, processes=2))
# print('Fuzzy clusters (%i):' % len(clusters), timer() - start)
# start = timer()
# clusters = list(pairwise_connected_components(universities, distance=levenshtein, radius=2))
# print('Connected components (%i):' % len(clusters), timer() - start)
with open('./data/musicians.csv', 'r') as f:
reader = csv.DictReader(f)
universities = set(line['university'] for line in reader)
artists = sorted(set(line['artist'] for line in reader))
print('Artists: %i' % len(artists))
start = timer()
clusters = list(pairwise_leader(universities, distance=levenshtein, radius=2))
print('Leader (%i):' % len(clusters), timer() - start)
start = timer()
clusters = list(pairwise_fuzzy_clusters(universities, distance=levenshtein, radius=2))
clusters = list(pairwise_fuzzy_clusters(artists, distance=levenshtein, radius=2, processes=6))
print('Fuzzy clusters (%i):' % len(clusters), timer() - start)
start = timer()
clusters = list(pairwise_connected_components(universities, distance=levenshtein, radius=2))
print('Connected components (%i):' % len(clusters), timer() - start)

View File

@ -5,11 +5,13 @@
# Clustering algorithms computing every pairwise distance/similarity to build
# suitable clusters.
#
import dill
from collections import defaultdict
from multiprocessing import Pool
from phylactery import UnionFind
from fog.clustering.utils import make_similarity_function
from fog.utils import upper_triangular_matrix_chunk_iter
def pairwise_leader(data, similarity=None, distance=None, radius=None,
@ -111,8 +113,34 @@ def pairwise_leader(data, similarity=None, distance=None, radius=None,
yield cluster
def pairwise_fuzzy_clusters_worker(payload):
"""
Worker function used to compute pairwise fuzzy clusters over chunks
of the final matrix in parallel.
"""
similarity, I, J, offset_i, offset_j = payload
similarity = dill.loads(similarity)
matches = []
diagonal_chunk = offset_i == offset_j
for i in range(len(I)):
A = I[i]
for j in range(0 if not diagonal_chunk else i + 1, len(J)):
B = J[j]
if similarity(A, B):
matches.append((offset_i + i, offset_j + j))
return matches
def pairwise_fuzzy_clusters(data, similarity=None, distance=None, radius=None,
min_size=2, max_size=float('inf'), processes=1):
min_size=2, max_size=float('inf'), processes=1,
chunk_size=100):
"""
Function returning an iterator over found clusters using an algorithm
yielding fuzzy clusters.
@ -158,6 +186,8 @@ def pairwise_fuzzy_clusters(data, similarity=None, distance=None, radius=None,
max_size (number, optional): maximum number of items in a cluster for
it to be considered viable. Defaults to infinity.
processes (number, optional): number of processes to use. Defaults to 1.
chunk_size (number, optional): size of matrix chunks to send to
subprocesses. Defaults to 100.
Yields:
list: A viable cluster.
@ -186,8 +216,22 @@ def pairwise_fuzzy_clusters(data, similarity=None, distance=None, radius=None,
graph[i].append(j)
graph[j].append(i)
else:
pickled_similarity = dill.dumps(similarity)
# Iterator
pool_iter = (
(pickled_similarity, I, J, offset_i, offset_j)
for I, J, offset_i, offset_j
in upper_triangular_matrix_chunk_iter(data, chunk_size)
)
# Pool
with Pool(processes=processes) as pool:
pass
for matches in pool.imap_unordered(pairwise_fuzzy_clusters_worker, pool_iter):
for i, j in matches:
graph[i].append(j)
graph[j].append(i)
# Building clusters
visited = set()

40
fog/utils.py Normal file
View File

@ -0,0 +1,40 @@
# =============================================================================
# Fog Utils
# =============================================================================
#
# Miscellaneous functions used throughout the library.
#
import math
def upper_triangular_matrix_chunk_iter(data, chunk_size):
"""
Function returning an iterator over chunks of an upper triangular matrix.
It's a useful utility to parallelize pairwise distance computations, for
instance.
Args:
data (iterable): The matrix's data.
chunk_size (int): Size of the chunks to yield.
Yields:
tuple of slices: A matrix's chunk.
"""
n = len(data)
c = math.ceil(n / chunk_size)
for j in range(c):
j_offset = j * chunk_size
j_limit = j_offset + chunk_size
for i in range(0, min(j + 1, c)):
i_offset = i * chunk_size
i_limit = i_offset + chunk_size
yield (
data[i_offset:i_limit],
data[j_offset:j_limit],
i_offset,
j_offset
)

View File

@ -4,4 +4,5 @@ pytest==3.5.1
twine==1.11.0
# Dependencies
dill==0.2.7.1
phylactery==0.0.4

View File

@ -16,5 +16,8 @@ setup(name='fog',
python_requires='>=3',
packages=find_packages(exclude=['experiments', 'test']),
package_data={'docs': ['README.md']},
install_requires=['phylactery==0.0.4'],
install_requires=[
'dill==0.2.7.1',
'phylactery==0.0.4'
],
zip_safe=True)