Source code for aweSOM.som

## Self-Organizing Map base code including training and fitting data,
## along with limited cluster plotting capabilities, ported from
## the POPSOM library by Li Yuan (2018) https://github.com/njali2001/popsom.git
## with modifications by Trung Ha (2024) for aweSOM

import sys
import numpy as np
import matplotlib.pyplot as plt

plt.rcParams.update({"font.size": 12})
from sklearn.metrics.pairwise import euclidean_distances

from numba import njit, prange
from scipy.ndimage import map_coordinates

seed = 42
np.random.seed(seed)


[docs] class Lattice: def __init__( self, xdim: int = 10, ydim: int = 10, alpha_0: float = 0.3, train: int = 1000, alpha_type: str = "decay", sampling_type: str = "sampling", ): """Initialize the SOM lattice. Args: xdim (int): The x dimension of the map. Default is 10. ydim (int): The y dimension of the map. Default is 10. alpha_0 (float): The initial learning rate, should be a positive non-zero real number. Default is 0.3. train (int): Number of total training iterations; include for all batches. Default is 1000. alpha_type (str): A string that determines whether the learning rate is static or decaying. Default is "decay". sampling_type (str): A string that determines whether the initial lattice is uniform or randomly sampled from the data. Default is "sampling". """ self.xdim = xdim self.ydim = ydim self.alpha = alpha_0 self.train = train self.init = sampling_type # uniform or random sampling initial lattice self.seed = seed self.epoch = 0 if alpha_type == "static": self.alpha_type = 0 self.alpha_0 = alpha_0 elif alpha_type == "decay": self.alpha_type = 1 self.alpha_0 = alpha_0 else: sys.exit("alpha_type must be either 'static' or 'decay'") if self.train > 200: self.save_frequency = ( self.train // 200 ) # how often to save the node weights else: self.save_frequency = 5 self.lattice_history = [] self.umat_history = []
[docs] def train_lattice( self, data: np.ndarray, features_names: list[str], labels: np.ndarray = None, number_of_steps: int = -1, save_lattice: bool = False, restart_lattice: np.ndarray = None, ): """Train the Model with numba JIT acceleration. Args: data (np.ndarray): A numpy 2D array where each row contains an unlabeled training instance. features_names (list[str]): A list of feature names. labels (np.ndarray, optional): A vector with one label (ground truth) for each observation in data. Defaults to None. number_of_steps (int): Number of steps taken this batch, used for keeping track of training restarts. Default is self.train. save_lattice (bool, optional): A flag that determines whether the node weights are saved to a file at the end of training. Defaults to False. restart_lattice (np.ndarray, optional): Vectors for the weights of the nodes from past realizations. Defaults to None. """ self.restart_lattice = restart_lattice self.save_lattice = save_lattice self.data_array = data self.features_names = features_names self.labels = labels if number_of_steps == -1: self.this_batch_train = self.train else: self.this_batch_train = number_of_steps # self.momentum_decay_rate = momentum_decay_rate # check if the dims are reasonable if self.xdim < 4 or self.ydim < 4: sys.exit("build: map is too small.") # train SOM self.fast_som()
[docs] def fast_som(self): """Performs the self-organizing map (SOM) training. This method initializes the SOM with random values or a subset of the data, and then trains the SOM by updating the node weights based on the input vectors. The training process includes adjusting the learning rate, shrinking the neighborhood size, and saving the node weights and U-matrix periodically. """ # some constants number_input_vectors = self.data_array.shape[0] number_features = self.data_array.shape[1] number_nodes = self.xdim * self.ydim this_batch_train = ( self.this_batch_train ) # only train for this number of steps; useful for restarting training if self.restart_lattice is not None: lattice = self.restart_lattice else: if self.init == "uniform": # vector with small init values for all nodes # NOTE: each row represents a node, each column represents a feature. lattice = np.random.uniform(0.0, 1.0, (number_nodes, number_features)) else: # sample a random subset of the data to initialize the lattice ix = np.random.randint(0, number_input_vectors - 1, number_nodes) lattice = self.data_array[ix, :] self.lattice = lattice.copy() self.lattice_history.append(lattice) # save the initial lattice self.umat_history.append(self.compute_umat()) # save the initial U-matrix alpha = self.alpha # starting learning rate if self.alpha_type == 1: alpha_freq = ( self.train // 25 ) # how often to decay the learning rate; at 24 steps, alpha_f ~ 1e-3 alpha_0 else: alpha_freq = 1 # compute the initial neighborhood size and step nsize_max = max(self.xdim, self.ydim) + 1 nsize_min = ( 8 if this_batch_train // 4 > nsize_max - 8 else nsize_max - this_batch_train // 4 ) # minimum neighborhood size; avoid the case where there are less training steps than available to shrink the neighborhood to 8 nsize_step = ( this_batch_train // 4 ) # for the third quarter of the training steps, shrink the neighborhood nsize_freq = nsize_step // ( nsize_max - nsize_min ) # how often to shrink the neighborhood # if self.epoch < 2 * nsize_step: # nsize = nsize_max # elif self.epoch >= 3 * nsize_step: # nsize = nsize_min # else: # nsize = nsize_max - self.epoch // nsize_freq nsize = ( nsize_max # start with the largest neighborhood size at each training batch ) epoch = self.epoch # counts the number of epochs per nsize_freq stop_epoch = epoch + this_batch_train print("starting epoch is: ", epoch, flush=True) print("stopping epoch is: ", stop_epoch, flush=True) print("Saving lattice every ", self.save_frequency, " epochs", flush=True) # constants for the Gamma function m = np.reshape( list(range(number_nodes)), (number_nodes, 1) ) # a vector with all node 1D addresses # x-y coordinate of ith node: m2Ds[i,] = c(xi, yi) m2Ds = self.coordinate(m, self.xdim) # this ensures that the same random order is not repeated if the training is restarted if self.epoch > 1: self.seed += 1 np.random.seed(self.seed) # Added 06/17/2024: use random number generator, shuffle the data and take the first train samples rng = np.random.default_rng() indices = np.arange(number_input_vectors) rng.shuffle(indices) # Added 06/25/2024: if the number of training steps is larger than the number of data points, repeat the shuffled indices if this_batch_train > number_input_vectors: indices = np.tile(indices, this_batch_train // number_input_vectors + 1) xk = self.data_array[indices[:this_batch_train], :] # implement momentum-based gradient descent # momentum_decay_rate = self.momentum_decay_rate # history of the loss function # loss_freq = 1000 # self.loss_history = np.zeros((self.train, number_features)) # self.average_loss = np.zeros((self.train//loss_freq, number_features)) print("Begin training", flush=True) while True: if epoch % int(self.train // 10) == 0: print("Evaluating epoch = ", epoch, flush=True) # if (epoch % loss_freq == 0) & (epoch != 0): # this_average_loss = np.mean(self.loss_history[epoch-loss_freq:epoch], axis=0) # average loss over the last [loss_freq] epochs # self.average_loss[epoch//loss_freq-1,:] = this_average_loss # if training step has gone over the step limit, terminate if epoch >= stop_epoch: print( "Terminating from step limit reached at epoch ", epoch, flush=True ) self.epoch = epoch if self.save_lattice: print("Saving final lattice", epoch, flush=True) np.save( f"lattice_{epoch}_{self.xdim}{self.ydim}_{self.alpha}_{self.train}.npy", lattice, ) break # get one random input vector xk_m = xk[epoch - self.epoch, :] # calculate the relative distance between input vector and nodes, take the closest node as the BMU # momentum = diff * momentum_decay_rate # momentum-based gradient descent diff = lattice - xk_m squ = diff**2 s = np.sum(squ, axis=1) c = np.argmin(s) # self.loss_history[epoch,:] = np.sqrt(s[c]) # update step gamma_m = np.outer( self.Gamma(c, m2Ds, alpha, nsize), np.ones(number_features) ) # lattice -= (diff + momentum) * gamma_m lattice -= diff * gamma_m # shrink the neighborhood size every [nsize_freq] epochs if ( (epoch - self.epoch) > 2 * nsize_step and (epoch - self.epoch) % nsize_freq == 0 and nsize > nsize_min ): nsize = ( nsize_max - ((epoch - self.epoch) - 2 * nsize_step) // nsize_freq ) print( f"Shrinking neighborhood size to {nsize} at epoch {epoch}", flush=True, ) # decay the learning rate every [alpha_freq] epochs if epoch % alpha_freq == 0 and self.alpha_type == 1 and epoch != 0: alpha *= 0.75 print(f"Decaying learning rate to {alpha} at epoch {epoch}", flush=True) # save lattice sparingly if epoch % self.save_frequency == 0: self.lattice = lattice.copy() # print("Saving lattice at epoch ", epoch, flush=True) self.lattice_history.append(self.lattice) # compute the umatrix and save it umat = self.compute_umat() # np.save(f"umat_{epoch}_{self.xdim}{self.ydim}_{self.alpha}_{self.train}.npy", umat) self.umat_history.append(umat) epoch += 1 print("Training complete", flush=True) # update the learning rate, lattice, and Umatrix after training self.alpha = alpha self.umat = self.compute_umat() self.lattice = lattice # lattice takes the shape of [X*Y, F]
@staticmethod @njit() def Gamma( index_bmu: int, m2Ds: np.ndarray, alpha: float, nsize: int, gaussian: bool = True, ): """Calculate the neighborhood function for a given BMU on a lattice. Args: index_bmu (int): The index of the BMU node on the lattice. m2Ds (np.ndarray): Lattice coordinate of each node. alpha (float): The amplitude parameter for the Gaussian function, AKA the learning rate. nsize (int): The size of the neighborhood. gaussian (bool, optional): Whether to use Gaussian function or not. Defaults to True. Returns: np.ndarray: The neighborhood function values for each node on the grid. """ dist_2d = np.abs( m2Ds[index_bmu, :] - m2Ds ) # 2d distance between the BMU and the rest of the lattice chebyshev_dist = np.zeros( dist_2d.shape[0] ) # initialize the Chebyshev distance array for i in prange( dist_2d.shape[0] ): # numba max does not have axis argument, otherwise this would be chebyshev_dist = np.max(dist_2d, axis=1) chebyshev_dist[i] = np.max(dist_2d[i, :]) # Define the Gaussian function to calculate the neighborhood function def gauss(dist, A, sigma): """gauss -- Gaussian function""" return A * np.exp(-((dist) ** 2) / (2 * sigma**2)) # if a node on the lattice is in within nsize neighborhood, then h = Gaussian(alpha), else h = 0.0 if gaussian: # use Gaussian function h = gauss(chebyshev_dist, alpha, nsize / 3) else: # otherwise everything within nsize is multiplied by alpha, and everything outside is unchanged h = np.where(chebyshev_dist <= nsize, alpha, 0.0) h[chebyshev_dist > nsize] = ( 0.0 # manually set the values outside the neighborhood to 0 ) return h
[docs] def map_data_to_lattice(self): """ After training, map each data point to the nearest node in the lattice. Returns: np.ndarray[int]: A 2D array with the x and y coordinates of the best matching nodes for each data point. """ print("Begin matching points with nodes", flush=True) data_to_lattice_1d = self.best_match(self.lattice, self.data_array) self.projection_1d = data_to_lattice_1d projection_1d_to_2d = np.reshape( self.projection_1d, (len(self.projection_1d), 1) ) # make it a 2D array projection_2d = self.coordinate(projection_1d_to_2d, self.xdim) self.projection_2d = projection_2d return projection_2d
[docs] def assign_cluster_to_lattice(self, smoothing=None, merge_cost=0.0): """ Assigns clusters to the lattice based on the computed centroids. Args: smoothing (float, optional): Smoothing parameter for computing Umatrix. Defaults to None. merge_cost (float, optional): Cost threshold for merging similar centroids. Defaults to 0.0. Returns: numpy.ndarray: Array representing the assigned clusters for each lattice point. """ if smoothing is not None: # smooth the Umatrix before computing the centroids self.umat = self.compute_umat(smoothing) naive_centroids = self.compute_centroids( False ) # all local minima are centroids centroids = self.merge_similar_centroids( naive_centroids, merge_cost ) # merge similar centroids x = self.xdim y = self.ydim centr_locs = [] # create list of centroid locations for ix in range(x): for iy in range(y): cx = centroids["centroid_x"][ix, iy] cy = centroids["centroid_y"][ix, iy] centr_locs.append((cx, cy)) unique_ids = list(set(centr_locs)) n_clusters = len(unique_ids) print(f"Number of clusters : {n_clusters}", flush=True) print("Centroids: ", unique_ids, flush=True) # mapping = {} clusters = 1000 * np.ones((x, y), dtype=np.int32) for i in range(n_clusters): # mapping[i] = unique_ids[i] for ix in range(x): for iy in range(y): if ( centroids["centroid_x"][ix, iy], centroids["centroid_y"][ix, iy], ) == unique_ids[i]: clusters[ix, iy] = i self.lattice_assigned_clusters = clusters return clusters
@staticmethod @njit(parallel=True) def assign_cluster_to_data( projection_2d: np.ndarray, clusters_on_lattice: np.ndarray ) -> np.ndarray: """ Given a lattice and cluster assignments on that lattice, return the cluster ids of the data (in a 1d array) Args: projection_2d (np.ndarray): 2d array with x-y coordinates of the node associated with each data point clusters_on_lattice (np.ndarray): X x Y matrix of cluster labels on lattice Returns: np.ndarray: cluster_id of each data point """ cluster_id = np.zeros(projection_2d.shape[0], dtype=np.int32) for i in prange(projection_2d.shape[0]): cluster_id[i] = clusters_on_lattice[ int(projection_2d[i, 0]), int(projection_2d[i, 1]) ] return cluster_id @staticmethod @njit(parallel=True) def best_match(lattice: np.ndarray, obs: np.ndarray, full=False) -> np.ndarray: """ Given input vector inp[n,f] (where n is number of different observations, f is number of features per observation), return the best matching node. Args: lattice (np.ndarray): weight values of the lattice obs (np.ndarray): observations (input vectors) full (bool, optional): indicate whether to return first and second best match. Defaults to False. Returns: np.ndarray: return the 1d index of the best-matched node (within the lattice) for each observation """ if full: best_match_node = np.zeros((obs.shape[0], 2)) else: best_match_node = np.zeros((obs.shape[0], 1)) for i in prange(obs.shape[0]): diff = lattice - obs[i] squ = diff**2 s = np.sum(squ, axis=1) if full: # numba does not support argsort, so we record the top two best matches this way o = np.argmin(s) best_match_node[i, 0] = o s[o] = np.max(s) o = np.argmin(s) best_match_node[i, 1] = o else: best_match_node[i] = np.argmin(s) if i % int(obs.shape[0] // 10) == 0: print("i = ", i) return best_match_node @staticmethod @njit(parallel=True) def coordinate(rowix: np.ndarray, xdim: int) -> np.ndarray: """ Convert from a list of row index to an array of xy-coordinates. Args: rowix (np.ndarray): 1d array with the 1d indices of the points of interest (n x 1 matrix) xdim (int): x dimension of the lattice Returns: np.ndarray: array with x and y coordinates of each point in rowix """ len_rowix = len(rowix) coords = np.zeros((len_rowix, 2), dtype=np.int32) for k in prange(len_rowix): coords[k, :] = np.array([rowix[k, 0] % xdim, rowix[k, 0] // xdim]) return coords
[docs] def rowix(self, x, y): """ Convert from a xy-coordinate to a row index. Args: x (int): The x-coordinate of the map. y (int): The y-coordinate of the map. Returns: int: The row index corresponding to the given xy-coordinate. """ rix = x + y * self.xdim return rix
[docs] def node_weight(self, x, y): """ Returns the weight values of a node at (x,y) on the lattice. Args: x (int): x-coordinate of the node. y (int): y-coordinate of the node. Returns: np.ndarray: 1d array of weight values of said node. """ ix = self.rowix(x, y) return self.lattice[ix]
[docs] def compute_centroids(self, explicit=False): """ Compute the centroid for each node in the lattice given a precomputed Umatrix. Args: explicit (bool): Controls the shape of the connected component. Returns: dict: A dictionary containing the matrices with the same x-y dimensions as the original map, containing the centroid x-y coordinates. """ xdim = self.xdim ydim = self.ydim heat = self.umat centroid_x = np.array([[-1] * ydim for _ in range(xdim)]) centroid_y = np.array([[-1] * ydim for _ in range(xdim)]) def find_this_centroid(ix, iy): # recursive function to find the centroid of a point on the map if (centroid_x[ix, iy] > -1) and (centroid_y[ix, iy] > -1): return {"bestx": centroid_x[ix, iy], "besty": centroid_y[ix, iy]} min_val = heat[ix, iy] min_x = ix min_y = iy # (ix, iy) is an inner map element if ix > 0 and ix < xdim - 1 and iy > 0 and iy < ydim - 1: if heat[ix - 1, iy - 1] < min_val: min_val = heat[ix - 1, iy - 1] min_x = ix - 1 min_y = iy - 1 if heat[ix, iy - 1] < min_val: min_val = heat[ix, iy - 1] min_x = ix min_y = iy - 1 if heat[ix + 1, iy - 1] < min_val: min_val = heat[ix + 1, iy - 1] min_x = ix + 1 min_y = iy - 1 if heat[ix + 1, iy] < min_val: min_val = heat[ix + 1, iy] min_x = ix + 1 min_y = iy if heat[ix + 1, iy + 1] < min_val: min_val = heat[ix + 1, iy + 1] min_x = ix + 1 min_y = iy + 1 if heat[ix, iy + 1] < min_val: min_val = heat[ix, iy + 1] min_x = ix min_y = iy + 1 if heat[ix - 1, iy + 1] < min_val: min_val = heat[ix - 1, iy + 1] min_x = ix - 1 min_y = iy + 1 if heat[ix - 1, iy] < min_val: min_val = heat[ix - 1, iy] min_x = ix - 1 min_y = iy # (ix, iy) is bottom left corner elif ix == 0 and iy == 0: if heat[ix + 1, iy] < min_val: min_val = heat[ix + 1, iy] min_x = ix + 1 min_y = iy if heat[ix + 1, iy + 1] < min_val: min_val = heat[ix + 1, iy + 1] min_x = ix + 1 min_y = iy + 1 if heat[ix, iy + 1] < min_val: min_val = heat[ix, iy + 1] min_x = ix min_y = iy + 1 # (ix, iy) is bottom right corner elif ix == xdim - 1 and iy == 0: if heat[ix, iy + 1] < min_val: min_val = heat[ix, iy + 1] min_x = ix min_y = iy + 1 if heat[ix - 1, iy + 1] < min_val: min_val = heat[ix - 1, iy + 1] min_x = ix - 1 min_y = iy + 1 if heat[ix - 1, iy] < min_val: min_val = heat[ix - 1, iy] min_x = ix - 1 min_y = iy # (ix, iy) is top right corner elif ix == xdim - 1 and iy == ydim - 1: if heat[ix - 1, iy - 1] < min_val: min_val = heat[ix - 1, iy - 1] min_x = ix - 1 min_y = iy - 1 if heat[ix, iy - 1] < min_val: min_val = heat[ix, iy - 1] min_x = ix min_y = iy - 1 if heat[ix - 1, iy] < min_val: min_val = heat[ix - 1, iy] min_x = ix - 1 min_y = iy # (ix, iy) is top left corner elif ix == 0 and iy == ydim - 1: if heat[ix, iy - 1] < min_val: min_val = heat[ix, iy - 1] min_x = ix min_y = iy - 1 if heat[ix + 1, iy - 1] < min_val: min_val = heat[ix + 1, iy - 1] min_x = ix + 1 min_y = iy - 1 if heat[ix + 1, iy] < min_val: min_val = heat[ix + 1, iy] min_x = ix + 1 min_y = iy # (ix, iy) is a left side element elif ix == 0 and iy > 0 and iy < ydim - 1: if heat[ix, iy - 1] < min_val: min_val = heat[ix, iy - 1] min_x = ix min_y = iy - 1 if heat[ix + 1, iy - 1] < min_val: min_val = heat[ix + 1, iy - 1] min_x = ix + 1 min_y = iy - 1 if heat[ix + 1, iy] < min_val: min_val = heat[ix + 1, iy] min_x = ix + 1 min_y = iy if heat[ix + 1, iy + 1] < min_val: min_val = heat[ix + 1, iy + 1] min_x = ix + 1 min_y = iy + 1 if heat[ix, iy + 1] < min_val: min_val = heat[ix, iy + 1] min_x = ix min_y = iy + 1 # (ix, iy) is a bottom side element elif ix > 0 and ix < xdim - 1 and iy == 0: if heat[ix + 1, iy] < min_val: min_val = heat[ix + 1, iy] min_x = ix + 1 min_y = iy if heat[ix + 1, iy + 1] < min_val: min_val = heat[ix + 1, iy + 1] min_x = ix + 1 min_y = iy + 1 if heat[ix, iy + 1] < min_val: min_val = heat[ix, iy + 1] min_x = ix min_y = iy + 1 if heat[ix - 1, iy + 1] < min_val: min_val = heat[ix - 1, iy + 1] min_x = ix - 1 min_y = iy + 1 if heat[ix - 1, iy] < min_val: min_val = heat[ix - 1, iy] min_x = ix - 1 min_y = iy # (ix, iy) is a right side element elif ix == xdim - 1 and iy > 0 and iy < ydim - 1: if heat[ix - 1, iy - 1] < min_val: min_val = heat[ix - 1, iy - 1] min_x = ix - 1 min_y = iy - 1 if heat[ix, iy - 1] < min_val: min_val = heat[ix, iy - 1] min_x = ix min_y = iy - 1 if heat[ix, iy + 1] < min_val: min_val = heat[ix, iy + 1] min_x = ix min_y = iy + 1 if heat[ix - 1, iy + 1] < min_val: min_val = heat[ix - 1, iy + 1] min_x = ix - 1 min_y = iy + 1 if heat[ix - 1, iy] < min_val: min_val = heat[ix - 1, iy] min_x = ix - 1 min_y = iy # (ix, iy) is a top side element elif ix > 0 and ix < xdim - 1 and iy == ydim - 1: if heat[ix - 1, iy - 1] < min_val: min_val = heat[ix - 1, iy - 1] min_x = ix - 1 min_y = iy - 1 if heat[ix, iy - 1] < min_val: min_val = heat[ix, iy - 1] min_x = ix min_y = iy - 1 if heat[ix + 1, iy - 1] < min_val: min_val = heat[ix + 1, iy - 1] min_x = ix + 1 min_y = iy - 1 if heat[ix + 1, iy] < min_val: min_val = heat[ix + 1, iy] min_x = ix + 1 min_y = iy if heat[ix - 1, iy] < min_val: min_val = heat[ix - 1, iy] min_x = ix - 1 min_y = iy # if successful # move to the square with the smaller value, i_e_, call # find_this_centroid on this new square # note the RETURNED x-y coords in the centroid_x and # centroid_y matrix at the current location # return the RETURNED x-y coordinates if min_x != ix or min_y != iy: r_val = find_this_centroid(min_x, min_y) # if explicit is set show the exact connected component # otherwise construct a connected componenent where all # nodes are connected to a centrol node if explicit: centroid_x[ix, iy] = min_x centroid_y[ix, iy] = min_y return {"bestx": min_x, "besty": min_y} else: centroid_x[ix, iy] = r_val["bestx"] centroid_y[ix, iy] = r_val["besty"] return r_val else: centroid_x[ix, iy] = ix centroid_y[ix, iy] = iy return {"bestx": ix, "besty": iy} for i in range(xdim): for j in range(ydim): find_this_centroid(i, j) return {"centroid_x": centroid_x, "centroid_y": centroid_y}
[docs] @staticmethod # @njit(parallel=True) # numba does not support dictionary; so cannot parallelize this function def replace_value( centroids: dict[str, np.ndarray], centroid_a: tuple, centroid_b: tuple ) -> dict[str, np.ndarray]: """ Replaces the values of centroid_a with the values of centroid_b in the given centroids dictionary. Args: centroids (dict[str, np.ndarray]): A dictionary containing the centroids. centroid_a (tuple): The coordinates of the centroid to be replaced. centroid_b (tuple): The coordinates of the centroid to replace with. Returns: dict[str, np.ndarray]: The updated centroids dictionary. """ (xdim, ydim) = centroids["centroid_x"].shape for ix in range(xdim): for iy in range(ydim): if ( centroids["centroid_x"][ix, iy] == centroid_a[0] and centroids["centroid_y"][ix, iy] == centroid_a[1] ): centroids["centroid_x"][ix, iy] = centroid_b[0] centroids["centroid_y"][ix, iy] = centroid_b[1] return centroids
[docs] def merge_similar_centroids(self, naive_centroids: np.ndarray, threshold=0.3): """ Merge centroids that are close enough together. Args: naive_centroids (np.ndarray): original centroids before merging threshold (float, optional): Any centroids with pairwise cost less than this threshold is merged. Defaults to 0.3. Returns: np.ndarray: new node map with combined centroids """ heat = self.umat centroids = naive_centroids.copy() unique_centroids = self.get_unique_centroids( centroids ) # the nodes_count dictionary is also created here, so don't remove this line # for each pair of centroids, compute the weighted distance between them via interpolating the umat # if the distance is less than the threshold, combine the centroids cost_between_centroids = [] for i in range(len(unique_centroids["position_x"]) - 1): for j in range(i + 1, len(unique_centroids["position_x"])): a = [ unique_centroids["position_x"][i], unique_centroids["position_y"][i], ] b = [ unique_centroids["position_x"][j], unique_centroids["position_y"][j], ] num_sample = 5 * int(np.sqrt((b[0] - a[0]) ** 2 + (b[1] - a[1]) ** 2)) x, y = np.linspace(a[0], b[0], num_sample), np.linspace( a[1], b[1], num_sample ) umat_dist = map_coordinates(heat**2, np.vstack((x, y))) total_cost = np.sum(umat_dist) # total_cost /= num_sample cost_between_centroids.append([a, b, total_cost]) # cost_between_centroids.sort(key=lambda x: x[2]) sorted_cost = sorted( cost_between_centroids, key=lambda x: x[2] ) # this ranks all pairwise cost in ascending order # normalize the cost such that the largest cost at each step is always one sorted_cost = [[a, b, c / sorted_cost[-1][2]] for a, b, c in sorted_cost] # combine the centroids recursively until the threshold is reached if sorted_cost[0][2] < threshold: centroid_a = tuple(sorted_cost[0][0]) centroid_b = tuple(sorted_cost[0][1]) # heat_a = heat[centroid_a[0], centroid_a[1]] # heat_b = heat[centroid_b[0], centroid_b[1]] nodes_a = self.nodes_count[centroid_a] nodes_b = self.nodes_count[centroid_b] print(f"Centroid A: {centroid_a}, count: {nodes_a}", flush=True) print(f"Centroid B: {centroid_b}, count: {nodes_b}", flush=True) # print(f"Centroid A: {centroid_a}, Umatrix value: {heat_a}", flush=True) # print(f"Centroid B: {centroid_b}, Umatrix value: {heat_b}", flush=True) print("Merging...", flush=True) replace_a_with_b = False # this method takes the centroid with the larger number of nodes if nodes_a < nodes_b: replace_a_with_b = True # this method takes the centroid with the smaller U-matrix value # if heat_a > heat_b: # replace_a_with_b = True if replace_a_with_b: centroids = self.replace_value(centroids, centroid_a, centroid_b) else: centroids = self.replace_value(centroids, centroid_b, centroid_a) # print("New centroids: \n", centroids, flush=True) centroids = self.merge_similar_centroids(centroids, threshold) else: unique_centroids = self.get_unique_centroids(centroids) # print("Centroids: \n", unique_centroids, flush=True) print( "Number of unique centroids: ", len(unique_centroids["position_x"]), flush=True, ) print("Minimum cost between centroids: ", sorted_cost[0][2], flush=True) return centroids return centroids
[docs] def get_unique_centroids(self, centroids): """ Print out a list of unique centroids given a matrix of centroid locations. Args: centroids: A matrix of the centroid locations in the map. Returns: A dictionary containing the unique x and y positions of the centroids. The dictionary has the following keys: position_x: A list of unique x positions. position_y: A list of unique y positions. """ # get the dimensions of the map xdim = self.xdim ydim = self.ydim xlist = [] ylist = [] centr_locs = [] # create a list of unique centroid positions for ix in range(xdim): for iy in range(ydim): cx = centroids["centroid_x"][ix, iy] cy = centroids["centroid_y"][ix, iy] centr_locs.append((cx, cy)) self.nodes_count = {i: centr_locs.count(i) for i in centr_locs} unique_ids = list(set(centr_locs)) xlist = [x for x, y in unique_ids] ylist = [y for x, y in unique_ids] return {"position_x": xlist, "position_y": ylist}
[docs] def compute_umat(self, smoothing=None): """ Compute the unified distance matrix. Args: smoothing (float, optional): A positive floating point value controlling the smoothing of the umat representation. Defaults to None. Returns: numpy.ndarray: A matrix with the same x-y dimensions as the original map containing the umat values. """ d = euclidean_distances(self.lattice, self.lattice) / (self.xdim * self.ydim) umat = self.compute_heat(d, smoothing) return umat
[docs] def compute_heat(self, d, smoothing=None): """ Compute a heat value map representation of the given distance matrix. Args: d (numpy.ndarray): A distance matrix computed via the 'dist' function. smoothing (float, optional): A positive floating point value controlling the smoothing of the umat representation. Defaults to None. Returns: numpy.ndarray: A matrix with the same x-y dimensions as the original map containing the heat. """ x = self.xdim y = self.ydim heat = np.array([[0.0] * y for _ in range(x)]) if x == 1 or y == 1: sys.exit( "compute_heat: heat map can not be computed for a map \ with a dimension of 1" ) # this function translates our 2-dim lattice coordinates # into the 1-dim coordinates of the lattice def xl(ix, iy): return ix + iy * x # check if the map is larger than 2 x 2 (otherwise it is only corners) if x > 2 and y > 2: # iterate over the inner nodes and compute their umat values for ix in range(1, x - 1): for iy in range(1, y - 1): sum = ( d[xl(ix, iy), xl(ix - 1, iy - 1)] + d[xl(ix, iy), xl(ix, iy - 1)] + d[xl(ix, iy), xl(ix + 1, iy - 1)] + d[xl(ix, iy), xl(ix + 1, iy)] + d[xl(ix, iy), xl(ix + 1, iy + 1)] + d[xl(ix, iy), xl(ix, iy + 1)] + d[xl(ix, iy), xl(ix - 1, iy + 1)] + d[xl(ix, iy), xl(ix - 1, iy)] ) heat[ix, iy] = sum / 8 # iterate over bottom x axis for ix in range(1, x - 1): iy = 0 sum = ( d[xl(ix, iy), xl(ix + 1, iy)] + d[xl(ix, iy), xl(ix + 1, iy + 1)] + d[xl(ix, iy), xl(ix, iy + 1)] + d[xl(ix, iy), xl(ix - 1, iy + 1)] + d[xl(ix, iy), xl(ix - 1, iy)] ) heat[ix, iy] = sum / 5 # iterate over top x axis for ix in range(1, x - 1): iy = y - 1 sum = ( d[xl(ix, iy), xl(ix - 1, iy - 1)] + d[xl(ix, iy), xl(ix, iy - 1)] + d[xl(ix, iy), xl(ix + 1, iy - 1)] + d[xl(ix, iy), xl(ix + 1, iy)] + d[xl(ix, iy), xl(ix - 1, iy)] ) heat[ix, iy] = sum / 5 # iterate over the left y-axis for iy in range(1, y - 1): ix = 0 sum = ( d[xl(ix, iy), xl(ix, iy - 1)] + d[xl(ix, iy), xl(ix + 1, iy - 1)] + d[xl(ix, iy), xl(ix + 1, iy)] + d[xl(ix, iy), xl(ix + 1, iy + 1)] + d[xl(ix, iy), xl(ix, iy + 1)] ) heat[ix, iy] = sum / 5 # iterate over the right y-axis for iy in range(1, y - 1): ix = x - 1 sum = ( d[xl(ix, iy), xl(ix - 1, iy - 1)] + d[xl(ix, iy), xl(ix, iy - 1)] + d[xl(ix, iy), xl(ix, iy + 1)] + d[xl(ix, iy), xl(ix - 1, iy + 1)] + d[xl(ix, iy), xl(ix - 1, iy)] ) heat[ix, iy] = sum / 5 # compute umat values for corners if x >= 2 and y >= 2: # bottom left corner ix = 0 iy = 0 sum = ( d[xl(ix, iy), xl(ix + 1, iy)] + d[xl(ix, iy), xl(ix + 1, iy + 1)] + d[xl(ix, iy), xl(ix, iy + 1)] ) heat[ix, iy] = sum / 3 # bottom right corner ix = x - 1 iy = 0 sum = ( d[xl(ix, iy), xl(ix, iy + 1)] + d[xl(ix, iy), xl(ix - 1, iy + 1)] + d[xl(ix, iy), xl(ix - 1, iy)] ) heat[ix, iy] = sum / 3 # top left corner ix = 0 iy = y - 1 sum = ( d[xl(ix, iy), xl(ix, iy - 1)] + d[xl(ix, iy), xl(ix + 1, iy - 1)] + d[xl(ix, iy), xl(ix + 1, iy)] ) heat[ix, iy] = sum / 3 # top right corner ix = x - 1 iy = y - 1 sum = ( d[xl(ix, iy), xl(ix - 1, iy - 1)] + d[xl(ix, iy), xl(ix, iy - 1)] + d[xl(ix, iy), xl(ix - 1, iy)] ) heat[ix, iy] = sum / 3 if smoothing is not None: if smoothing == 0: heat = self.smooth_2d(heat, nrow=x, ncol=y, surface=False) elif smoothing > 0: heat = self.smooth_2d( heat, nrow=x, ncol=y, surface=False, theta=smoothing ) else: sys.exit("compute_heat: bad value for smoothing parameter") return heat
[docs] def list_clusters(self, centroids, unique_centroids): """Get the clusters as a list of lists., not very useful Args: centroids (matrix): A matrix of the centroid locations in the map. unique_centroids (list): A list of unique centroid locations. Returns: list: A list of clusters associated with each unique centroid. """ centroids_x_positions = unique_centroids["position_x"] centroids_y_positions = unique_centroids["position_y"] cluster_list = [] for i in range(len(centroids_x_positions)): cx = centroids_x_positions[i] cy = centroids_y_positions[i] # get the clusters associated with a unique centroid and store it in a list cluster_list.append(self.list_from_centroid(cx, cy, centroids)) return cluster_list
[docs] def list_from_centroid(self, x, y, centroids): """Get all cluster elements associated with one centroid. Args: x (int): The x position of a centroid. y (int): The y position of a centroid. centroids (numpy.ndarray): A matrix of the centroid locations in the map. Returns: list: A list of cluster elements associated with the given centroid. """ centroid_x = x centroid_y = y xdim = self.xdim ydim = self.ydim cluster_list = [] for xi in range(xdim): for yi in range(ydim): cx = centroids["centroid_x"][xi, yi] cy = centroids["centroid_y"][xi, yi] if (cx == centroid_x) and (cy == centroid_y): cweight = self.umat[xi, yi] cluster_list.append(cweight) return cluster_list
[docs] def smooth_2d( self, Y, ind=None, weight_obj=None, grid=None, nrow=64, ncol=64, surface=True, theta=None, ): """ Smooths 2D data using a kernel smoother., internal function, no user-facing aspect Args: Y (array-like): The input data to be smoothed. ind (array-like, optional): The indices of the data to be smoothed. Defaults to None. weight_obj (dict, optional): The weight object used for smoothing. Defaults to None. grid (dict, optional): The grid object used for smoothing. Defaults to None. nrow (int, optional): The number of rows in the grid. Defaults to 64. ncol (int, optional): The number of columns in the grid. Defaults to 64. surface (bool, optional): Flag indicating whether the data represents a surface. Defaults to True. theta (float, optional): The theta value used in the exponential covariance function. Defaults to None. Returns: array-like: The smoothed data. Raises: None """ def exp_cov(x1, x2, theta=2, p=2, distMat=0): x1 = x1 * (1 / theta) x2 = x2 * (1 / theta) distMat = euclidean_distances(x1, x2) distMat = distMat**p return np.exp(-distMat) NN = [[1] * ncol] * nrow grid = {"x": [i for i in range(nrow)], "y": [i for i in range(ncol)]} if weight_obj is None: dx = grid["x"][1] - grid["x"][0] dy = grid["y"][1] - grid["y"][0] m = len(grid["x"]) n = len(grid["y"]) M = 2 * m N = 2 * n xg = [] for i in range(N): for j in range(M): xg.extend([[j, i]]) xg = np.array(xg) center = [] center.append([int(dx * M / 2 - 1), int((dy * N) / 2 - 1)]) out = exp_cov(xg, np.array(center), theta=theta) out = np.transpose(np.reshape(out, (N, M))) temp = np.zeros((M, N)) temp[int(M / 2 - 1)][int(N / 2 - 1)] = 1 wght = np.fft.fft2(out) / (np.fft.fft2(temp) * M * N) weight_obj = {"m": m, "n": n, "N": N, "M": M, "wght": wght} temp = np.zeros((weight_obj["M"], weight_obj["N"])) temp[0:m, 0:n] = Y temp2 = np.fft.ifft2(np.fft.fft2(temp) * weight_obj["wght"]).real[ 0 : weight_obj["m"], 0 : weight_obj["n"] ] temp = np.zeros((weight_obj["M"], weight_obj["N"])) temp[0:m, 0:n] = NN temp3 = np.fft.ifft2(np.fft.fft2(temp) * weight_obj["wght"]).real[ 0 : weight_obj["m"], 0 : weight_obj["n"] ] return temp2 / temp3
[docs] def plot_heat(self, heat, explicit=False, comp=True, merge=False, merge_cost=0.001): """ Plot the heat map of the given data. Args: heat (array-like): The data to be plotted. explicit (bool, optional): A flag indicating whether the connected components are explicit. Defaults to False. comp (bool, optional): A flag indicating whether to plot the connected components. Defaults to True. merge (bool, optional): A flag indicating whether to merge the connected components. Defaults to False. merge_cost (float, optional): The threshold for merging the connected components. Defaults to 0.001. """ x = self.xdim y = self.ydim nobs = self.data_array.shape[0] count = np.array([[0] * y] * x) # need to make sure the map doesn't have a dimension of 1 if x <= 1 or y <= 1: sys.exit("plot_heat: map dimensions too small") heat_tmp = np.squeeze(heat).flatten() # Convert 2D Array to 1D tmp = np.digitize( heat_tmp, np.linspace(heat_tmp.min(), heat_tmp.max(), num=100) ) tmp = np.reshape(tmp, (-1, y)) # Convert 1D Array to 2D tmp_1 = np.array(np.transpose(tmp)) fig, ax = plt.subplots(dpi=200) plt.rcParams["font.size"] = 8 ax.pcolor(tmp_1, cmap=plt.cm.YlOrRd) ax.set_xticks(np.arange(0, x, 5) + 0.5, minor=False) ax.set_yticks(np.arange(0, y, 5) + 0.5, minor=False) plt.xlabel("x") plt.ylabel("y") ax.set_xticklabels(np.arange(0, x, 5), minor=False) ax.set_yticklabels(np.arange(0, y, 5), minor=False) ax.xaxis.set_tick_params(labeltop="on") ax.yaxis.set_tick_params(labelright="on") ax.xaxis.label.set_fontsize(10) ax.yaxis.label.set_fontsize(10) ax.set_aspect("equal") ax.grid(True) # put the connected component lines on the map if comp: # find the centroid for each node on the map centroids = self.compute_centroids(explicit) if merge: # find the unique centroids for the nodes on the map centroids = self.merge_similar_centroids(centroids, merge_cost) unique_centroids = self.get_unique_centroids(centroids) print("Unique centroids : ", unique_centroids) unique_centroids["position_x"] = [ x + 0.5 for x in unique_centroids["position_x"] ] unique_centroids["position_y"] = [ y + 0.5 for y in unique_centroids["position_y"] ] plt.scatter( unique_centroids["position_x"], unique_centroids["position_y"], color="red", s=10, ) # connect each node to its centroid for ix in range(x): for iy in range(y): cx = centroids["centroid_x"][ix, iy] cy = centroids["centroid_y"][ix, iy] plt.plot( [ix + 0.5, cx + 0.5], [iy + 0.5, cy + 0.5], color="grey", linestyle="-", linewidth=1.0, ) # put the labels on the map if available if not (self.labels is None) and (len(self.labels) != 0): self.map_data_to_lattice() # obtain the projection_1d array # count the labels in each map cell for i in range(nobs): nix = self.projection_1d[i] c = self.coordinate( np.reshape(nix, (1, 1)), self.xdim ) # NOTE: slow code # print(c) ix = int(c[0, 0]) iy = int(c[0, 1]) count[ix - 1, iy - 1] = count[ix - 1, iy - 1] + 1 for i in range(nobs): c = self.coordinate( np.reshape(self.projection_1d[i], (1, 1)), self.xdim ) # NOTE: slow code ix = int(c[0, 0]) iy = int(c[0, 1]) # we only print one label per cell if count[ix - 1, iy - 1] > 0: count[ix - 1, iy - 1] = 0 ix = ix - 0.5 iy = iy - 0.5 l = self.labels[i] plt.text(ix + 1, iy + 1, l) plt.show()