Source code for quickclus.QuickClus

#Libraries-----------------------------------------------------------------------------------
import logging
import warnings

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import matplotlib.cm as cm
import seaborn as sns
import plotly.express as px

from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, RobustScaler, PowerTransformer, StandardScaler, QuantileTransformer

import umap

import hdbscan
import optuna

from sklearn.base import BaseEstimator, ClassifierMixin

from quickclus.utils import *

#Logs
logger = logging.getLogger("quickclus")
logger.setLevel(logging.ERROR)
sh = logging.StreamHandler()
sh.setFormatter(
    logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"),
)
logger.addHandler(sh)


[docs]class QuickClus(BaseEstimator, ClassifierMixin): """QuickClus Creates UMAP embeddings and HDSCAN clusters from a pandas DataFrame with mixed data Parameters ---------- random_state : int, default = None Random State for both UMAP and numpy.random. If set to None UMAP will run in Numba in multicore mode but results may vary between runs. Setting a seed may help to offset the stochastic nature of UMAP by setting it with fixed random seed. n_neighbors: int, default = 15 Level of neighbors for UMAP. Setting this higher will generate higher densities at the expense of requiring more computational complexity. min_cluster_size: int, default = 15 Minimum Cluster size for HDBSCAN. The minimum number of points from which a cluster needs to be formed. min_samples : int, default = None Samples used for HDBSCAN. The larger this is set the more noise points get declared and the more restricted clusters become to only dense areas. If None, min_samples = min_cluster_size threshold_combine_rare_levels: float, default = 0.02 To avoid an excessive increase in dimensionality when transforming categorical variables into-one hot encoding, rare levels can be combined. This value indicates the minimum proportion of a category that should not be combined into "other". n_components: int, default = None Number of components for UMAP. These are dimensions to reduce the data down to. Ideally, this needs to be a value that preserves all the information to form meaningful clusters. Default is the logarithm of total number of features. imputer_strategy_numerical: str, default = "mean" Imputation strategy for numerical variables. The values can be: "mean", "median", "most_frequent" scaler_type_numerical: str, default = "standard" Scaler strategy for numerical variables. The values can be: "robust" (RobustScaler), "standard" (StandardScaler) transformation_type_numerical: str, default = "power" Scaler strategy for numerical variables. The values can be: "power" (PowerTransformer), "quantile" (QuantileTransformer) umap_combine_method: str, default = "intersection" Method by which to combine embeddings spaces. Options include: intersection, union, contrast, intersection_union_mapper The latter combines both the intersection and union of the embeddings. See: https://umap-learn.readthedocs.io/en/latest/composing_models.html n_neighbors_intersection_union: int, default = None Level of neighbors for UMAP to use to combine umaps embeddings if umap_combine_method = "intersection_union_mapper" If None, n_neighbors_intersection_union = n_neighbors verbose: bool, defualt = False Level of verbosity to print when fitting and predicting. Setting to False will only show Warnings that appear. """ def __init__(self, random_state: int = None, n_neighbors: int = 15, min_cluster_size: int = 15, min_samples: int = None, threshold_combine_rare_levels: float = 0.00, n_components: int = None, scaler_type_numerical: str = "standard", imputer_strategy_numerical: str = "mean", transformation_type_numerical: str = "power", umap_combine_method: str = "intersection", n_neighbors_intersection_union: int = None, verbose: bool = False, ): self.random_state = random_state self.n_neighbors = n_neighbors self.min_cluster_size = min_cluster_size self.min_samples = min_samples self.threshold_combine_rare_levels = threshold_combine_rare_levels self.n_components = n_components self.scaler_type_numerical = scaler_type_numerical self.imputer_strategy_numerical = imputer_strategy_numerical self.transformation_type_numerical = transformation_type_numerical self.umap_combine_method = umap_combine_method self.n_neighbors_intersection_union = n_neighbors_intersection_union if verbose: logger.setLevel(logging.DEBUG) self.verbose = True else: logger.setLevel(logging.ERROR) self.verbose = False # supress deprecation warnings # see: https://stackoverflow.com/questions/54379418 def noop(*args, **kargs): pass warnings.warn = noop if isinstance(random_state, int): np.random.seed(seed = random_state) else: logger.info("No random seed passed, running UMAP in Numba") if min_samples is None: self.min_samples = min_cluster_size logger.info("No min_samples passed, using min_samples = min_cluster_size") if (n_neighbors_intersection_union is None) & (umap_combine_method == "intersection_union_mapper"): self.n_neighbors_intersection_union = n_neighbors logger.info("No n_neighbors_intersection_union passed, using n_neighbors_intersection_union = n_neighbors")
[docs] def fit(self, df: pd.DataFrame) -> None: """ Fit function for call UMAP and HDBSCAN Parameters ---------- df : pandas DataFrame DataFrame object with named columns of categorical and numerics Returns ------- Fitted: None Fitted UMAPs and HDBSCAN """ check_is_df(df) if not isinstance(self.n_components, int): self.n_components = int(round(np.log(df.shape[1]))) logger.info("Extracting categorical features") self.categorical_ = self._extract_categorical_data(df) #If the dataset has categorical columns: if self.categorical_.shape[1] > 0: logger.info("Preprocessing categorical features") self._preprocess_categorical_data() logger.info("Transforming categorical features into UMAP") self._transform_categorical_umap() else: logger.info("No categorical features in the dataset") logger.info("Extracting numerical features") self.numerical_ = self._extract_numerical_data(df) #If the dataset has categorical columns: if self.numerical_.shape[1] > 0: logger.info("Preprocessing categorical features") self._preprocess_numerical_data() logger.info("Transforming numerical features into UMAP") self._transform_numerical_umap() else: logger.info("No numerical features in the dataset") #Combine the data logger.info("Mapping/Combining Embeddings") if (self.numerical_.shape[1] > 0) & (self.categorical_.shape[1] > 0): self._combine_umap_data() elif (self.numerical_.shape[1] > 0): self.umap_combined = self.umap_numerical_ elif self.categorical_.shape[1] > 0: self.umap_combined = self.umap_categorical_ else: raise TypeError("No numerical or categorical data were found") logger.info("Fitting HDBSCAN...") self._fit_hdbscan()
def _extract_categorical_data(self, data): """ Extracts the categorical data from the dataframe Parameters ---------- data : pandas DataFrame DataFrame object with named columns of categorical and numerics Returns ------- categorical_data: pandas DataFrame pandas DataFrame with the categorical variables """ #Select only the categorical columns categorical_data = data.select_dtypes(exclude = ["float", "int", "datetime"]) return categorical_data def _preprocess_categorical_data(self): """ Preprocess the categorical data: Rare level combination, na imputation with the mode and one hot encoding Parameters ---------- self.categorical_ : pandas DataFrame pandas DataFrame with categorical features self.threshold_combine_rare_levels : float Minimum proportion of a category to not be combined Returns ------- self.preprocessed_categorical_: numpy.array numpy array with the preprocessed categorical data """ #Combine rare levels into "other" if self.threshold_combine_rare_levels > 0: for category in self.categorical_.columns: self.categorical_[category] = self.categorical_[category].\ mask(self.categorical_[category].map(self.categorical_[category].\ value_counts(normalize = True)) <= self.threshold_combine_rare_levels, 'Other') #Use a simple imputer with the mode and one hot encoding imputer_cat = SimpleImputer(strategy = "most_frequent") one_hot = OneHotEncoder(categories = "auto", handle_unknown = "ignore") #Create the pipeline and transform the data categorical_pipeline = Pipeline([("imputer", imputer_cat), ("one_hot", one_hot)]) preprocessed_cat = categorical_pipeline.fit_transform(self.categorical_) self.preprocessed_categorical_ = preprocessed_cat return self def _transform_categorical_umap(self): """ Transforms the preprocessed categorical data into a umap embedding Parameters ---------- self.preprocessed_categorical_ : scipy.sparse.csr.csr_matrix matrix with preprocessed categorical data self.n_neighbors : int number of neighbors UMAP self.n_components: int number of components UMAP self.random_state: int seed Returns ------- self.umap_categorical_: umap.umap_.UMAP categorical umap embedding """ #TODO: In some cases dice doesn't work. Check why. logger.info(f"Preprocessed categorical data shape: {self.preprocessed_categorical_.shape}") try: categorical_umap = umap.UMAP( metric = "dice", n_neighbors = self.n_neighbors, n_components = self.n_components, min_dist = 0.0, random_state = self.random_state, ).fit(self.preprocessed_categorical_) logger.info("Metric used for categorical data: dice") except: categorical_umap = umap.UMAP( metric = "jaccard", n_neighbors = self.n_neighbors, n_components = self.n_components, min_dist = 0.0, random_state = self.random_state, ).fit(self.preprocessed_categorical_) logger.info("Metric used for categorical data: jaccard") self.umap_categorical_ = categorical_umap return self def _extract_numerical_data(self, data): """ Extracts the numerical data from the dataframe Parameters ---------- data : pandas DataFrame DataFrame object with named columns of categorical and numerics Returns ------- numerical_data: pandas DataFrame pandas DataFrame with the numerical variables """ numerical_data = data.select_dtypes(include = ["float", "int"]) return numerical_data def _preprocess_numerical_data(self): """ Preprocess of numerical data: na imputation, scaler, and transformation Parameters ---------- self.numerical_ : pandas DataFrame pandas DataFrame with numerical features self.imputer_strategy_numerical: str imputation strategy, 'mean', 'median', 'most_frequent' self.scaler_type_numerical: str scaler type, 'standard' or 'robust' self.transformation_type_numerical: str transformation type, 'power' or 'quantile' Returns ------- self.preprocessed_numerical_: numpy.array numpy array with the preprocessed numerical data """ #Imputer imputer_numeric = SimpleImputer(strategy = self.imputer_strategy_numerical) #Scaler if self.scaler_type_numerical == "robust": scaler_numeric = RobustScaler() elif self.scaler_type_numerical == "standard": scaler_numeric = StandardScaler() else: raise Exception("Select a valid scaler") #Transformation if self.transformation_type_numerical == "power": transform_numeric = PowerTransformer() elif self.transformation_type_numerical == "quantile": transform_numeric = QuantileTransformer() else: raise Exception("Select a valid transformation type") #Pipeline numerical_pipeline = Pipeline([("imputer", imputer_numeric), ("scaler", scaler_numeric), ("transform", transform_numeric)]) self.preprocessed_numerical_ = numerical_pipeline.fit_transform(self.numerical_) return self def _transform_numerical_umap(self): """ Transforms the preprocessed numerical data into a umap embedding Parameters ---------- self.preprocessed_numerical_: scipy.sparse.csr.csr_matrix matrix with preprocessed numerical data self.n_neighbors: int number of neighbors UMA self.n_components: int number of components UMAP self.random_state: int seed Returns ------- self.umap_numerical_: umap.umap_.UMAP umap embedding """ numerical_umap = umap.UMAP( metric = "l2", n_neighbors = self.n_neighbors, n_components = self.n_components, min_dist = 0.0, random_state = self.random_state, ).fit(self.preprocessed_numerical_) self.umap_numerical_ = numerical_umap return self def _combine_umap_data(self): """ Combines the numerical and categorical data embeddings Parameters ---------- self.umap_numerical_ : umap.umap_.UMAP numerical data embedding self.umap_categorical_: umap.umap_.UMAP categorical data embedding self.umap_combine_method: str method to combine the embeddings (intersection/union/contrast/intersection_union_mapper) self.n_neighbors_intersection_union: int if umap_combine_method = intersection_union_mapper, number of components UMAP self.n_components: int if umap_combine_method = intersection_union_mapper, number of components UMAP self.random_state: int seed self.preprocessed_num: scipy.sparse.csr.csr_matrix if umap_combine_method = intersection_union_mapper, matrix with preprocessed numerical data Returns ------- self.umap_combined : umap.umap_.UMAP combined umap """ logger.info(f"Numerical data embedding shape: {self.umap_numerical_.embedding_.shape}") logger.info(f"Categorical data embedding shape: {self.umap_categorical_.embedding_.shape}") if self.umap_combine_method == "intersection": umap_combined = self.umap_numerical_ * self.umap_categorical_ elif self.umap_combine_method == "union": umap_combined = self.umap_numerical_ + self.umap_categorical_ elif self.umap_combine_method == "contrast": umap_combined = self.umap_numerical_ - self.umap_categorical_ elif self.umap_combine_method == "intersection_union_mapper": intersection_mapper = umap.UMAP( random_state = self.random_state, n_neighbors = self.n_neighbors_intersection_union, n_components = self.n_components, min_dist = 0.0, ).fit(self.preprocessed_numerical_) umap_combined = intersection_mapper * ( self.umap_numerical_ + self.umap_categorical_ ) else: raise KeyError("Select valid UMAP combine method") self.umap_combined = umap_combined return self def _fit_hdbscan(self): """ Fits a hdbscan model to the embedding Parameters ---------- self.min_cluster_size : int min_cluster_size of the hdbscan model self.min_samples: int min_samples of the hdbscan model self.umap_combined: umap.umap_.UMAP combined umap embedding (numerical + categorical) Returns ------- self.hdbscan_: hdbscan.hdbscan_.HDBSCAN hdbscan model """ hdb_model = hdbscan.HDBSCAN(min_cluster_size = self.min_cluster_size, min_samples = self.min_samples, gen_min_span_tree = True).fit(self.umap_combined.embedding_) self.hdbscan_ = hdb_model return self #Visualization
[docs] def plot_condensed_tree(self): """ Plots the condensed tree of the model Parameters ---------- self.hdb_model: hdbscan model Returns ------- None """ plt.figure(figsize = (10, 8), dpi = 80) _ = self.hdbscan_.condensed_tree_.plot( select_clusters = True, selection_palette = sns.color_palette("deep", np.unique(self.hdbscan_.labels_).shape[0]), )
[docs] def plot_embedding_labels(self): """ Plots a jointplot with the model's labels Parameters ---------- self.hdb_model: hdbscan model self.umap_embedding: data's umap embedding Returns ------- None """ if self.umap_combined.embedding_.shape[1] > 1: _ = sns.jointplot( x = self.umap_combined.embedding_[:, 0], y = self.umap_combined.embedding_[:, 1], hue = self.hdbscan_.labels_, kind = "kde", palette = "viridis" ) else: print("The embedding has only 1 dimension, increase it to plot the results")
[docs] def plot_2d_labels(self, plot_lib = "matploblib", data = None): """ Plot the first two dimensions of the final embedding with the final clusters. Parameters ---------- plot_lib: str plot library to use (plotly, matplotlib) data: pd.Dataframe pandas dataframe with the original data to show in the plot. Only used if plot_lib = "plotly" Returns ------- fig: figure plotly fig or matplotlib """ if self.umap_combined.embedding_.shape[1] > 2: print("Plotting only the first two dimensions.") if self.umap_combined.embedding_.shape[1] >= 2: if plot_lib == "plotly": aux_df = data.copy() aux_df["UMAP_1"] = self.umap_combined.embedding_[:, 0] aux_df["UMAP_2"] = self.umap_combined.embedding_[:, 1] if "Cluster" not in aux_df.columns: aux_df["Cluster"] = self.hdbscan_.labels_ aux_df["Cluster"] = aux_df["Cluster"].astype("str") custom_columns = [c for c in aux_df.columns if c not in ["UMAP_1", "UMAP_2"]] custom_hover = [f"{c}: %{{customdata[{i}]}}" for i, c in enumerate(custom_columns)] fig = px.scatter(aux_df, x = "UMAP_1", y = "UMAP_2", color = "Cluster", custom_data = custom_columns, height=800, width=800) fig.update_traces( hovertemplate="<br>".join(custom_hover) ) else: fig, ax = plt.subplots(figsize=(10, 8)) unique_labels = np.unique(self.hdbscan_.labels_) colors = cm.rainbow(np.linspace(0, 1, len(unique_labels))) for i, label in enumerate(unique_labels): mask_label = self.hdbscan_.labels_ == label ax.scatter(self.umap_combined.embedding_[:,0][mask_label], self.umap_combined.embedding_[:,1][mask_label], color=colors[i], alpha=0.5, s=6, label=label) if unique_labels.shape[0] <= 20: ax.legend(fontsize=8, markerscale=1) ax.set_xticks([]) ax.set_yticks([]) return fig else: print("This function needs at least 2 dimensions.")
[docs] def plot_3d_labels(self, data): """ Plot the first three dimensions of the final embedding with the final clusters. Parameters ---------- data: pd.Dataframe pandas dataframe with the original data to show in the plot. Returns ------- fig: plotly.graph_objs._figure.Figure plotly fig """ if self.umap_combined.embedding_.shape[1] > 3: print("Plotting only the first three dimensions.") if self.umap_combined.embedding_.shape[1] >= 3: aux_df = data.copy() aux_df["UMAP_1"] = self.umap_combined.embedding_[:, 0] aux_df["UMAP_2"] = self.umap_combined.embedding_[:, 1] aux_df["UMAP_3"] = self.umap_combined.embedding_[:, 2] if "Cluster" not in aux_df.columns: aux_df["Cluster"] = self.hdbscan_.labels_ aux_df["Cluster"] = aux_df["Cluster"].astype("str") custom_columns = [c for c in aux_df.columns if c not in ["UMAP_1", "UMAP_2", "UMAP_3"]] custom_hover = [f"{c}: %{{customdata[{i}]}}" for i, c in enumerate(custom_columns)] fig = px.scatter_3d(aux_df, x = "UMAP_1", y = "UMAP_2", z = "UMAP_3", color = "Cluster", custom_data = custom_columns, height=800, width=800) fig.update_traces( hovertemplate="<br>".join(custom_hover) ) return fig else: print("This function needs at least 3 dimensions.")
#data visualization
[docs] def assing_results(self, data): """ Assings hdb_model's labels to the original data Parameters ---------- data : pandas.DataFrame Original pandas DataFrame Returns ------- results: pandas.DataFrame new pandas dataframe with the calculated clusters """ results = data.copy() results["Cluster"] = self.hdbscan_.labels_ return results
[docs] def cluster_summary(self, results_df, metric = "mean", include_cat = False): """ Creates a cluster's summary of the numerical and/or categorical features Parameters ---------- results_df : pandas.DataFrame pandas dataframe with a cluster column metric: str, default = "mean" metric to use in the summary (mean/median/max/min) include_cat: bool, default = False include the mode of the categorical variables Returns ------- df_summary: pandas.DataFrame New dataframe with the summary """ #Profile by cluster numerics = results_df.select_dtypes(include = [int, float]).drop(["Cluster"], 1).columns.tolist() prop_cluster = results_df["Cluster"].value_counts(dropna = False, normalize = True).to_frame().rename(columns = {"Cluster": "data_prop"}) count_cluster = results_df["Cluster"].value_counts(dropna = False, normalize = False).to_frame().rename(columns = {"Cluster": "data_count"}) if metric == "mean": summary_data = results_df[numerics + ["Cluster"]].groupby(["Cluster"]).mean() elif metric == "median": summary_data = results_df[numerics + ["Cluster"]].groupby(["Cluster"]).median() elif metric == "max": summary_data = results_df[numerics + ["Cluster"]].groupby(["Cluster"]).max() elif metric == "min": summary_data = results_df[numerics + ["Cluster"]].groupby(["Cluster"]).min() else: print("Select a valid metric: mean, median, max, min") return None #merge all the results df_summary = prop_cluster.merge(count_cluster, left_index = True, right_index = True).merge(summary_data, left_index = True, right_index = True) if include_cat: #Calculates the mode of the categorical variables categoricals_col = results_df.select_dtypes(exclude = ["float", "int", "datetime"]).columns.tolist() df_summary_cat = results_df[categoricals_col + ["Cluster"]].groupby(["Cluster"]).agg(pd.Series.mode) df_summary = df_summary.merge(df_summary_cat, left_index = True, right_index = True) return df_summary
[docs] def describe_cluster(self, results_df, clusters = [0], columns_analyze_numerical = [], columns_analyze_categorical = [], metric = "mean"): """ Describes the selected clusters Parameters ---------- results_df : pandas.DataFrame pandas dataframe with a cluster column clusters: list list with clusters to describe (int) columns_analyze_numerical: list list of numerical columns to describe columns_analyze_categorical: list list of categorical columns to describe metric: str, default = "mean" metric to use in the summary of numerical columns (mean/median) Returns ------- None: None """ total_rows = results_df.shape[0] analysis_clusters = {} #Iterate over the selected_clusters for selected_cluster in clusters: text_cluster_analysis = "" cluster_results = results_df[results_df["Cluster"] == selected_cluster] #print(f"Analysis of cluster {selected_cluster}:") text_cluster_analysis += f"Analysis of cluster {selected_cluster}:\n" rows_cluster = cluster_results.shape[0] percentage_cluster = (rows_cluster/total_rows) * 100 #print(f"The cluster {selected_cluster} has {rows_cluster} rows ({percentage_cluster:.2f}% of total).") text_cluster_analysis += f"Cluster {selected_cluster} has {rows_cluster} rows ({percentage_cluster:.2f}% of the total).\n" #Analyze the numerical columns if exists if len(columns_analyze_numerical) > 0: if metric == "mean": for n_col in columns_analyze_numerical: col_mean = results_df[n_col].mean() col_cluster = cluster_results[n_col].mean() variation = (col_cluster - col_mean)/col_mean #print(f"The average {n_col} in the dataset is {col_mean:.2f} and in the cluster {selected_cluster} is {col_cluster:.2f} ({variation * 100:+.1f}%).") text_cluster_analysis += f"The average {n_col} in the dataset is {col_mean:.2f}, and in cluster {selected_cluster} is {col_cluster:.2f} ({variation * 100:+.1f}%).\n" elif metric == "median": for n_col in columns_analyze_numerical: col_median = results_df[n_col].median() col_cluster = cluster_results[n_col].median() variation = (col_cluster - col_median)/col_median #print(f"The median {n_col} in the dataset is {col_median:.2f} and in the cluster {selected_cluster} is {col_cluster:.2f} ({variation * 100:+.1f}%).") text_cluster_analysis += f"The median {n_col} in the dataset is {col_median:.2f} and in the cluster {selected_cluster} is {col_cluster:.2f} ({variation * 100:+.1f}%).\n" else: print("Select a valid metric (mean/median)") #Analyze the categorical columns if exists if len(columns_analyze_categorical) > 0: for n_col in columns_analyze_categorical: col_mode = results_df[n_col].value_counts(normalize = True, dropna = False).sort_values(ascending = False) col_mode_cat, col_mode_value = col_mode.index[0], col_mode[0] * 100 col_cluster = cluster_results[n_col].value_counts(normalize = True, dropna = False).sort_values(ascending = False) clus_mode_cat, clus_mode_value = col_cluster.index[0], col_cluster[0] * 100 #print(f"The most common value of the column {n_col} in the dataset is {col_mode_cat} ({col_mode_value:.1f}%) and in the cluster {selected_cluster} is {clus_mode_cat} ({clus_mode_value:.1f}%).") text_cluster_analysis += f"The most common value of the column {n_col} in the dataset is {col_mode_cat} ({col_mode_value:.1f}%) and in the cluster {selected_cluster} is {clus_mode_cat} ({clus_mode_value:.1f}%).\n" #print("\n") analysis_clusters[selected_cluster] = text_cluster_analysis return analysis_clusters
#Optimize model
[docs] def tune_model(self, n_trials = 100, min_cluster_start = 0.01, min_cluster_end = 0.15, min_samples_start = 0.01, min_samples_end = 0.15, max_epsilon = None): """ Tunes a hdbscan model maximizing the DBCV score (https://www.dbs.ifi.lmu.de/~zimek/publications/SDM2014/DBCV.pdf) Parameters ---------- n_trials : int, default = 100 number of iterations min_cluster_start: float, default = 0.01 lowest value of min_cluster of the search space (proportion of data) min_cluster_end: float, default = 0.15 highest value of min_cluster of the search space (proportion of data) min_samples_start: float, default = 0.01 lowest value of min_samples of the search space (proportion of data) min_samples_end: float, default = 0.15 highest value of min_samples of the search space (proportion of data) max_epsilon: float, default = None If a value is provided, an optimal epsilon is searched between 0 and max_epsilon Returns ------- None: optimized hdbscan """ n_rows = self.umap_combined.embedding_.shape[0] #Check values: assert 0 <= min_cluster_start <= 1, "min_cluster_start must be between 0 and 1" assert 0 <= min_cluster_end <= 1, "min_cluster_end must be between 0 and 1" assert 0 <= min_samples_start <= 1, "min_samples_start must be between 0 and 1" assert 0 <= min_samples_end <= 1, "min_samples_end must be between 0 and 1" min_cluster_start_value = n_rows * min_cluster_start min_cluster_end_value = n_rows * min_cluster_end min_samples_start_value = n_rows * min_samples_start min_samples_end_value = n_rows * min_samples_end # 1. Define an objective function to be maximized. def objective(trial): # 2. Suggest values for the hyperparameters using a trial object. min_cluster_number = trial.suggest_int("min_cluster", min_cluster_start_value, min_cluster_end_value, log = False) min_samples_number = trial.suggest_int("min_samples", min_samples_start_value, min_samples_end_value, log = False) if max_epsilon is not None: cluster_selection_epsilon_number = trial.suggest_float("cluster_selection_epsilon", 0, max_epsilon) hdb_model = hdbscan.HDBSCAN(min_cluster_size = min_cluster_number, min_samples = min_samples_number, cluster_selection_epsilon = cluster_selection_epsilon_number, gen_min_span_tree = True).fit(self.umap_combined.embedding_) else: hdb_model = hdbscan.HDBSCAN(min_cluster_size = min_cluster_number, min_samples = min_samples_number, gen_min_span_tree = True).fit(self.umap_combined.embedding_) score = hdb_model.relative_validity_ return score # 3. Create a study object and optimize the objective function. study = optuna.create_study(direction = 'maximize') study.optimize(objective, n_trials = n_trials) #Train the model with the optimized parameters best_model_params = study.best_params print("Best parameters: ", best_model_params) if max_epsilon is not None: hdb_model = hdbscan.HDBSCAN(min_cluster_size = best_model_params['min_cluster'], min_samples = best_model_params['min_samples'], cluster_selection_epsilon = best_model_params['cluster_selection_epsilon'], gen_min_span_tree = True).fit(self.umap_combined.embedding_) else: hdb_model = hdbscan.HDBSCAN(min_cluster_size = best_model_params['min_cluster'], min_samples = best_model_params['min_samples'], gen_min_span_tree = True).fit(self.umap_combined.embedding_) self.hdbscan_ = hdb_model return self