The essence of machine learning (ML) involves fitting a mathematical function to define the relationship between a target variable and a set of predictor features. In applied data science, the end purpose of such a function is to predict the outcome of new, never-before-seen cases. The applications of ML are wide-ranging and well-documented. The goal of this chapter is not to explain the inner workings of ML algorithms. Myriad resources exist to accomplish that task. Rather, this chapter will show how to effectively implement and tune popular ML algorithms.
The two most popular types of ML algorithms are classification and regression. Regression algorithms predict a continuous outcome, such as the price of a stock or the closing price of a house. A classification model predicts a discrete label, such as whether someone will opt out of an email campaign or whether someone will buy a product. When building a classification model, we oftentimes prefer to predict the probability of an observation being in a given class. For example, we might predict the probability of a person opting out of an email campaign. More on predicting strong probabilities in chapter 11.
Our customer churn model is a classification task. We will develop a model to predict the probability of a person canceling their monthly subscription. This model will be the core of what we do. Without it, we have nothing that is actionable!
When building a machine learning model, we want to train it on a certain set of data (training set) and validate it on a distinct, separate set of data (testing data). Completely holding out a portion of data is vital. Checking the predictive power on our test set gives us an idea of how our model will generalize to brand new observations in production. Analyzing the predictive performance on our test set is vital to assessing the strengths and weaknesses of our models and for selecting the final model(s) we want to put into production. We will discuss this topic more in chapter 12.
For our problem, we can randomly assign observations to our test set. Generally, we assign somewhere between 10-30% of our observations to the test set. Setting the percentage is a mix of art and science. Oftentimes, we'll rely on a rule of thumb that we have employed for similar projects in the past (e.g. 25% goes to the test set). The size of our test set should be influenced by the size of our overall dataset. We need a test set that is large enough to be reliable. If we only have, say, 10 observation in our test set, most of our evaluation statistics won't be reliable (to note, the situation is a bit different for time-series problems). If we have a huge dataset, we can be fine assigning a smaller percentage of observations to our test set (e.g. 10%). However, if our dataset is smaller, we likely we need a larger percentage. That said, we still need ample training data. A balance exists - and finding it can oftentimes be an art. However, we can use learning curves to help inform our decision, which we will cover in an upcoming section.
To create a train-test split, we can use the following function, which will be placed in helpers/model_helpers.py
from sklearn.model_selection import train_test_split | |
def create_train_test_split(x, y, test_size=0.25): | |
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size, random_state=19) | |
return x_train, x_test, y_train, y_test |
Cross validation is a core concept in machine learning. Simply, cross validation splits our training data into new training and testing sets in a rotating fashion. To begin this process, we select the number of folds (referred to as k) we want in our cross validation scheme. To keep it simple, we'll select a small number of 3 folds in this example. Selecting k=3, we randomly split our data into three folds. We then fit three separate models, with each fold being used as the testing set one time.
Why would we want to use cross validation? The main reason is that it can give us a better idea of how our model will generalize to new data. Rather than scoring our model on only one holdout set of data, we do so on multiple sets. It's possible that our model may score well on a certain holdout set simply by chance. Lucky splits happen. By performing the holdout evaluation multiple times and taking the average, we get a better view of the actual error we might expect "in the wild". Cross validation is crucial. It gives us a fuller picture and more confidence that model evaluation results are not function of a lucky or unlucky random split.
We primarily use cross validation when determining the parameters for our model. As a simple example, let's say we want to see if a max_depth of 10 or 15 performs better for our Random Forest algorithm. If we used k=3, we would build three models, as described above, with the max_depth set to 10 and take the average score on the testing sets (remember, each fold is used for testing one time). We would then build three models with the max_depth set to 15 and take the average score on the testing sets. The max_depth setting with the best average score on the testing sets would be declared the winner. Again, by engineering multiple test sets, we have more confidence in the evaluation scores we get.
The above process is elegant. It preserves the integrity of our full testing set, which we discussed in the last section. The testing set should never be used to try hyperparameter combinations. Rather, we should always leverage cross validation to repurpose our training set to tune hyperparameters. Cross validation helps us make better use of our data.
What's a good number for k? Well, it depends. A value is 3 is the floor - don't go any lower than that. You might see values as high as 12 or 13 but rarely anything higher. The size of your training set plays a huge role. If you have a small training set and you set k to a high value, your testing sets will be small and the results less reliable. Likewise, setting k to a high value will increase the time it takes to tune your models. If you have a huge dataset, you'd probably be good setting k to something like 5, knowing you'll have good-sized test sets. No "right" answer exists. Through experience, you'll find what works well for your modeling tasks.
A learning curve shows us the diminishing returns of adding more training data. Logically, our algorithm will cease improving at some point as we add more data. This approximate point will be highly contingent on the problem at hand. We can use such a point to help inform roughly how large our training set should be. Fortunately, we can leverage scikit-learn to plot our learning curve. Let's create utilities/learning_curves.py and add the following code to it. Recall the utilities directory houses ancillary, one-off processes.
import pandas as pd | |
import numpy as np | |
import os | |
import matplotlib.pyplot as plt | |
from sklearn.model_selection import learning_curve | |
from sklearn.ensemble import RandomForestClassifier | |
from ds_helpers.aws import get_secrets_manager_secret | |
from data.data import make_mysql_connection | |
from helpers.model_helpers import make_directories_if_not_exists | |
IMAGES_PATH = 'utilities/output_images' | |
def generate_learning_curve(model, df, target, train_sizes_list, cv_times, scoring, file_path): | |
y = df[target] | |
x = df.drop(target, 1) | |
train_sizes, train_scores, validation_scores = learning_curve(model, x, y, train_sizes=train_sizes_list, | |
cv=cv_times, scoring=scoring, n_jobs=-1) | |
train_scores = train_scores.mean(axis=1) | |
validation_scores = validation_scores.mean(axis=1) | |
plt.plot(train_sizes, train_scores, label='Training Error') | |
plt.plot(train_sizes, validation_scores, label='Validation error') | |
plt.ylabel('Score') | |
plt.xlabel('Training Set Size') | |
plt.title('Learning Curve') | |
plt.legend() | |
plt.savefig(file_path) | |
plt.clf() | |
def main(): | |
churn_df = pd.read_sql('''select * from churn_model.churn_data;''', make_mysql_connection('churn-model-mysql')) | |
churn_df['churn'] = np.where(churn_df['churn'].str.startswith('y'), 1, 0) | |
churn_df.drop(['id', 'meta__inserted_at', 'client_id', 'acquired_date'], 1, inplace=True) | |
churn_df.fillna(value=0, inplace=True) | |
churn_df = pd.get_dummies(churn_df, dummy_na=True) | |
model = RandomForestClassifier(max_depth=10, min_samples_leaf=5) | |
train_sizes_list = [0.2, 0.4, 0.6, 0.8, 1.0] | |
make_directories_if_not_exists([IMAGES_PATH]) | |
generate_learning_curve(model, churn_df, 'churn', train_sizes_list, 3, 'roc_auc', | |
os.path.join(IMAGES_PATH, 'learning_curve.png')) | |
if __name__ == "__main__": | |
main() |
We see that our training error goes down and then stabilizes. The higher scores on less data are likely due to statistical noise. With small samples sizes, we should have less confidence in our scores. However, we care more about the validation error. We see that it generally goes up as we increase the training data size. (We're using ROC AUC as our scoring metric, and we want this metric to be as high as possible). This finding means we should have as large of a training set as we can - but we still need to balance having a large enough testing set to be stable. To note, for our learning curve, we used an un-tuned model and bare-bones preprocessing. We'll get better performance with improved tuning and cleaning. Also, we are clearly overfitting the training set as our training error and validation error differ quite noticeably.
In previous chapters, we have mentioned our modeling pipeline multiple times. This is where much of the magic happens: our preprocessing and modeling steps get defined and ordered. Please review the pipeline below. We will then discuss each step in turn.
from sklearn.pipeline import Pipeline | |
from sklearn.compose import ColumnTransformer, make_column_selector as selector | |
from sklearn.impute import SimpleImputer | |
from sklearn.preprocessing import StandardScaler, FunctionTransformer | |
from sklearn.feature_selection import SelectPercentile, f_classif, chi2, VarianceThreshold | |
from sklearn.feature_extraction import DictVectorizer | |
from helpers.model_helpers import clip_feature_bounds, drop_features, convert_column_to_datetime, fill_missing_values, \ | |
extract_month_from_date, convert_month_to_quarter, extract_year_from_date, create_ratio_column, \ | |
ensure_features_are_standardized, TakeLog, CombineCategoryLevels, FeaturesToDict | |
from helpers.constants import MONTH_TO_QUARTER_DICT | |
from modeling.config import FEATURES_TO_DROP, CATEGORICAL_FILL_VALUE, FEATURE_DTYPE_MAPPING | |
def get_pipeline(model): | |
""" | |
Generates a scikit-learn modeling pipeline with model as the final step. | |
:param model: instantiated model | |
:returns: scikit-learn pipeline | |
""" | |
numeric_transformer = Pipeline(steps=[ | |
('mouse_movement_clipper', FunctionTransformer(clip_feature_bounds, validate=False, | |
kw_args={'feature': 'mouse_movement', 'cutoff': 0, | |
'new_amount': 0, 'clip_type': 'lower'})), | |
('propensity_score_clipper', FunctionTransformer(clip_feature_bounds, validate=False, | |
kw_args={'feature': 'propensity_score', 'cutoff': 0, | |
'new_amount': 0, 'clip_type': 'lower'})), | |
('completeness_score_clipper', FunctionTransformer(clip_feature_bounds, validate=False, | |
kw_args={'feature': 'completeness_score', 'cutoff': 0, | |
'new_amount': 0, 'clip_type': 'lower'})), | |
('profile_score_clipper', FunctionTransformer(clip_feature_bounds, validate=False, | |
kw_args={'feature': 'profile_score', 'cutoff': 0, | |
'new_amount': 0, 'clip_type': 'lower'})), | |
('average_stars_clipper', FunctionTransformer(clip_feature_bounds, validate=False, | |
kw_args={'feature': 'average_stars', 'cutoff': 0, | |
'new_amount': 0, 'clip_type': 'lower'})), | |
('ratio_creator', FunctionTransformer(create_ratio_column, validate=False, | |
kw_args={'col1': 'profile_score', 'col2': 'activity_score'})), | |
('log_creator', TakeLog()), | |
('dict_creator', FeaturesToDict()), | |
('dict_vectorizer', DictVectorizer(sparse=False)), | |
('imputer', SimpleImputer(strategy='mean')), | |
('scaler', StandardScaler()), | |
('feature_selector', SelectPercentile(f_classif)), | |
]) | |
categorical_transformer = Pipeline(steps=[ | |
('date_transformer', FunctionTransformer(convert_column_to_datetime, validate=False, | |
kw_args={'feature': 'acquired_date'})), | |
('month_extractor', FunctionTransformer(extract_month_from_date, validate=False, | |
kw_args={'date_col': 'acquired_date'})), | |
('quarter_extractor', FunctionTransformer(convert_month_to_quarter, validate=False, | |
kw_args={'month_col': 'month', | |
'mapping_dict': MONTH_TO_QUARTER_DICT})), | |
('year_extractor', FunctionTransformer(extract_year_from_date, validate=False, | |
kw_args={'date_col': 'acquired_date'})), | |
('date_dropper', FunctionTransformer(drop_features, validate=False, | |
kw_args={'feature_list': FEATURES_TO_DROP})), | |
('imputer', FunctionTransformer(fill_missing_values, validate=False, | |
kw_args={'fill_value': CATEGORICAL_FILL_VALUE})), | |
('category_combiner', CombineCategoryLevels()), | |
('dict_creator', FeaturesToDict()), | |
('dict_vectorizer', DictVectorizer(sparse=False)), | |
('feature_selector', SelectPercentile(chi2)), | |
]) | |
preprocessor = ColumnTransformer( | |
transformers=[ | |
('numeric_transformer', numeric_transformer, selector(dtype_include='number')), | |
('categorical_transformer', categorical_transformer, selector(dtype_exclude='number')) | |
], | |
remainder='passthrough', | |
) | |
pipeline = Pipeline(steps=[ | |
('data_mapper', FunctionTransformer(ensure_features_are_standardized, validate=False, | |
kw_args={'feature_mapping': FEATURE_DTYPE_MAPPING})), | |
('preprocessor', preprocessor), | |
('variance_thresholder', VarianceThreshold()), | |
('model', model) | |
]) | |
return pipeline |
At a high level, we have four main components to our pipeline.
Let's first review our numeric_transformer.
Let's now review our categorical_transformer.
Whew! That's a lot. Our next object is the preprocessor. This part is simple: we apply the numeric_transformer to our numeric columns, and we apply our categorical_transformer to our categorical columns.
Lastly, we create our main pipeline.
One item you might notice is that we are applying FeaturesToDict and DictVectorizer twice. Why is this? In the numeric_transformer, the imputer step turns our dataframe into a numpy array, and we lose our column names. The DictVectorizer has a feature_names_ attribute, which is the way we can take a snapshot of our feature names before we lose them in the pipeline. Likewise, in the categorical_transformer, we use the DictVectorizer to one-hot encode our features. The DictVectorizer will tell us the names of all of our dummied features.
Our target (churn or not churn) is imbalanced. Far fewer people churn than do not churn. In general, in some select cases, we might benefit from artificially balancing our classes. We can only know through experimentation. Some important cautions exist when we want to re-balance our target class. First, we are forced to either remove data or artificially create data. There are smart ways we can accomplish either task, but that doesn't mean those are risk-free actions. Second, since we often care about predicting calibrated probabilities, messing with our class balance can hinder our calibration. Caveat emptor.
As alluded, we can either remove observations (undersampling) or create synthetic observations (oversampling). Though altering our class balance can be risky for the reasons described, there could be cases where we must undersample. This might occur when we have tons of data but only limited computational resources and the class balance is severe. That said, we need to be careful where we perform the undersampling or oversampling. We must only perform this type of operation on the training fold during cross validation. If we undersample or oversample any data that is used for testing or validation, we are not going to get an accurate view of how our model will generalize in a production setting. You don't undersample or oversample in production! In the same vein, you don't undersample or oversample test data.
Unfortunately, we cannot perform undersamping or oversampling in a scikit-learn pipeline. However, we can do so in an imbalanced-learn pipeline. To implement undersampling of the negative (majority) class, we can make the following changes.
In modeling/pipeline.py, remove the following import: from sklearn.pipeline import Pipeline. In its stead, include the following import statements.
from imblearn.pipeline import Pipeline
from imblearn.under_sampling import RandomUnderSampler
Further, we'll need to add the following tuple as the first item in our pipeline.
('undersampling', RandomUnderSampler(sampling_strategy=create_undersampling_dict))
create_undersampling_dict is our undersampling strategy, which we must be passed as a callable and must return a dictionary. The function is defined in helpers/model_helpers, so we'll also need to import it in modeling/pipeline.py. It simply randomly eliminates 50% of the negative observations.
from collections import Counter
def create_undersampling_dict(y):
"""
Creates a dictionary for undersampling the negative class
param y: the y target series
:returns: dictionary
"""
counter = Counter(y)
neg_count = counter.get(0)
neg_adjusted_count = int(neg_count * 0.5)
pos_count = counter.get(1)
return {0: neg_adjusted_count, 1: pos_count}
After implementing these changes, if we run modeling/train.py, we will be training our models using undersampling. Again, this can be a risky choice as it can mess with proper calibration, so we need to be careful and closely inspect the effects on our model.
Right now, we are imputing missing values in numeric columns with the mean. As discussed in the last chapter, we have a few alternatives. One of them is to use scikit-learn's iterative imputer, which employs regression to fill in missing values. For example, a column with missing values becomes the y target series, and all other columns are the predictors. A model is fit using this data and then employed to predict the missing values in y. This approach can be implemented with only a few lines of code. In modeling/pipeline.py, remove the following import: from sklearn.impute import SimpleImputer. Instead, include the following import statements.
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
Then, all we have to do is change the imputer step in our numeric_transformer.
('imputer', IterativeImputer())
If we run modeling/train.py, we'll train our models with our new, fancy imputation strategy.
Our current pipeline is using one-hot encoding for the categorical columns. One alternative, discussed in the previous chapter, is mean target encoding. To implement this method, we shall use the Leave One Out Encoder from the category_encoders library. We make the following changes to our pipeline. We still use the DictVectorizer to capture our column names before our data turns into an array.
from category_encoders.leave_one_out import LeaveOneOutEncoder | |
from sklearn.pipeline import Pipeline | |
from sklearn.compose import ColumnTransformer, make_column_selector as selector | |
from sklearn.impute import SimpleImputer | |
from sklearn.preprocessing import StandardScaler, FunctionTransformer | |
from sklearn.feature_selection import SelectPercentile, f_classif, chi2, VarianceThreshold | |
from sklearn.feature_extraction import DictVectorizer | |
from helpers.model_helpers import clip_feature_bounds, drop_features, convert_column_to_datetime, fill_missing_values, \ | |
extract_month_from_date, convert_month_to_quarter, extract_year_from_date, create_ratio_column, \ | |
ensure_features_are_standardized, TakeLog, CombineCategoryLevels, FeaturesToDict | |
from helpers.constants import MONTH_TO_QUARTER_DICT | |
from modeling.config import FEATURES_TO_DROP, CATEGORICAL_FILL_VALUE, FEATURE_DTYPE_MAPPING | |
from sklearn.pipeline import Pipeline | |
from sklearn.compose import ColumnTransformer, make_column_selector as selector | |
from sklearn.impute import SimpleImputer | |
from sklearn.preprocessing import StandardScaler, FunctionTransformer | |
from sklearn.feature_selection import SelectPercentile, f_classif, chi2, VarianceThreshold | |
from sklearn.feature_extraction import DictVectorizer | |
from helpers.model_helpers import clip_feature_bounds, drop_features, convert_column_to_datetime, fill_missing_values, \ | |
extract_month_from_date, convert_month_to_quarter, extract_year_from_date, create_ratio_column, \ | |
ensure_features_are_standardized, TakeLog, CombineCategoryLevels, FeaturesToDict | |
from helpers.constants import MONTH_TO_QUARTER_DICT | |
from modeling.config import FEATURES_TO_DROP, CATEGORICAL_FILL_VALUE, FEATURE_DTYPE_MAPPING | |
def get_pipeline(model): | |
""" | |
Generates a scikit-learn modeling pipeline with model as the final step. | |
:param model: instantiated model | |
:returns: scikit-learn pipeline | |
""" | |
numeric_transformer = Pipeline(steps=[ | |
('mouse_movement_clipper', FunctionTransformer(clip_feature_bounds, validate=False, | |
kw_args={'feature': 'mouse_movement', 'cutoff': 0, | |
'new_amount': 0, 'clip_type': 'lower'})), | |
('propensity_score_clipper', FunctionTransformer(clip_feature_bounds, validate=False, | |
kw_args={'feature': 'propensity_score', 'cutoff': 0, | |
'new_amount': 0, 'clip_type': 'lower'})), | |
('completeness_score_clipper', FunctionTransformer(clip_feature_bounds, validate=False, | |
kw_args={'feature': 'completeness_score', 'cutoff': 0, | |
'new_amount': 0, 'clip_type': 'lower'})), | |
('profile_score_clipper', FunctionTransformer(clip_feature_bounds, validate=False, | |
kw_args={'feature': 'profile_score', 'cutoff': 0, | |
'new_amount': 0, 'clip_type': 'lower'})), | |
('average_stars_clipper', FunctionTransformer(clip_feature_bounds, validate=False, | |
kw_args={'feature': 'average_stars', 'cutoff': 0, | |
'new_amount': 0, 'clip_type': 'lower'})), | |
('ratio_creator', FunctionTransformer(create_ratio_column, validate=False, | |
kw_args={'col1': 'profile_score', 'col2': 'activity_score'})), | |
('log_creator', TakeLog()), | |
('dict_creator', FeaturesToDict()), | |
('dict_vectorizer', DictVectorizer(sparse=False)), | |
('imputer', SimpleImputer(strategy='mean')), | |
('scaler', StandardScaler()), | |
('feature_selector', SelectPercentile(f_classif)), | |
]) | |
categorical_transformer = Pipeline(steps=[ | |
('date_transformer', FunctionTransformer(convert_column_to_datetime, validate=False, | |
kw_args={'feature': 'acquired_date'})), | |
('month_extractor', FunctionTransformer(extract_month_from_date, validate=False, | |
kw_args={'date_col': 'acquired_date'})), | |
('quarter_extractor', FunctionTransformer(convert_month_to_quarter, validate=False, | |
kw_args={'month_col': 'month', | |
'mapping_dict': MONTH_TO_QUARTER_DICT})), | |
('year_extractor', FunctionTransformer(extract_year_from_date, validate=False, | |
kw_args={'date_col': 'acquired_date'})), | |
('date_dropper', FunctionTransformer(drop_features, validate=False, | |
kw_args={'feature_list': FEATURES_TO_DROP})), | |
('imputer', FunctionTransformer(fill_missing_values, validate=False, | |
kw_args={'fill_value': CATEGORICAL_FILL_VALUE})), | |
('category_combiner', CombineCategoryLevels()), | |
('leave_one_out_encoder', LeaveOneOutEncoder()), | |
('dict_creator', FeaturesToDict()), | |
('dict_vectorizer', DictVectorizer(sparse=False)), | |
('feature_selector', SelectPercentile(chi2)), | |
]) | |
preprocessor = ColumnTransformer( | |
transformers=[ | |
('numeric_transformer', numeric_transformer, selector(dtype_include='number')), | |
('categorical_transformer', categorical_transformer, selector(dtype_exclude='number')) | |
], | |
remainder='passthrough', | |
) | |
pipeline = Pipeline(steps=[ | |
('data_mapper', FunctionTransformer(ensure_features_are_standardized, validate=False, | |
kw_args={'feature_mapping': FEATURE_DTYPE_MAPPING})), | |
('preprocessor', preprocessor), | |
('variance_thresholder', VarianceThreshold()), | |
('model', model) | |
]) | |
return pipeline |
In the next few sections, we will provide brief conceptual overviews of popular machine learning models. We will subsequently see how to implement them.
scikit-learn has a number of useful models. We will discuss three of them: Logistic Regression, Random Forest, and Extra Trees. The goal of this book is not to explain the mechanics of machine learning models, so we will only briefly review each model.
XGBoost, CatBoost, and LightGBM are popular gradient boosting algorithms. All are decision-tree based by default. In the last section, we discussed Random Forest and Extra Trees algorithms. Those are called bagging algorithms, which create ensembles of trees. Each tree in a forest can be constructed in parallel. Boosting algorithms, like those mentioned in this section, are run sequentially. Subsequent trees focus on observations that previous trees struggled to correctly predict.
This triad of boosting models are state of the art. Though differences exist, they are generally pretty similar in their performance, though exceptions certainly exist. There are some differences in how they work under the hood. You can explore those with a quick Google. To note, there are also scikit-learn implementations of gradient boosting, and they also work quite well.
We can create our own model ensemble by having individual models vote, which can be implemented via scikit-learn's VotingClassifier. For example, we could average the predicted probabilities of, say, a Logistic Regression and a Random Forest. This average would then be our final prediction.
One note: there is a bug that prevents VotingClassifier from working with the CalibratedClassifierCV (we discuss the latter and its importance later in this chapter and then extensively in chapter 11). To get this combination to work, we actually need to change the scikit-learn source code. First, this is a risky thing to do because we may screw up something unintentionally. Second, while changing source code is fairly easy on your local machine, doing so in a production Docker image created in a CI/CD pipeline run on a remote server would be a pain. Frankly, waiting until scikit-learn fixes the issue is probably the best course of action. However, if we really want to get our VotingClassifier to work with the CalibratedClassifierCV, we need to change line 510 of scikit learn's calibration module (in version 0.24.2) from "elif method_name == 'predict_proba':" to "elif method_name == 'predict_proba' or '_predict_proba':".Additionally, we could stack our models, which is related to voting. Instead of averaging our predicted probabilities from a series of models, we feed them into another machine learning model, called a meta estimator or final estimator. The meta estimator produces a final prediction based on the inputs of the other models.
Let's now see how we might go about implementing the models described above. We'll need to introduce new code.
In data/db.py, we include a function to retrieve our training data from our MySQL tables. We'll cover the rest of the functions in future chapters.
import pandas as pd | |
import pathlib | |
import os | |
import sqlalchemy | |
from ds_helpers import db, aws | |
from cachetools import cached, TTLCache | |
from helpers.app_helpers import get_current_timestamp | |
SSL_PATH = os.path.join(pathlib.Path(__file__).parent.absolute(), 'rds-ca-2019-root.pem') | |
def make_mysql_connection(db_secret_name): | |
""" | |
Establishes a connection with MySQL | |
:param db_secret_name: Secrets Manager secret name with DB creds | |
:returns: sqlalchemy connection | |
""" | |
return db.connect_to_mysql(aws.get_secrets_manager_secret(db_secret_name), ssl_path=SSL_PATH) | |
def get_training_data(db_conn): | |
""" | |
Retrieves the training data. | |
""" | |
return pd.read_sql('''select * from churn_model.churn_data;''', db_conn) | |
def log_model_metadata(model_uid, schema, db_conn): | |
""" | |
Logs model metadata to a database table. | |
:param model_uid: model uid | |
:param schema: schema name | |
:param db_conn: database connection | |
""" | |
df = pd.DataFrame({ | |
'training_timestamp': [get_current_timestamp()], | |
'model_uid': [model_uid] | |
}) | |
df.to_sql(name='model_metadata', schema=schema, con=db_conn, if_exists='append', index=False) | |
def retrieve_app_config(schema, db_conn, environment): | |
""" | |
Retrieves the application configuration from a database table. | |
:param schema: name of the MySQL schema | |
:param db_conn: database connection | |
:param environment: ENVIRONMENT env variabele | |
:return: dictionary with config keys and values | |
""" | |
if environment in ['STAGE', 'LOCAL']: | |
table_name = 'stage_config' | |
else: | |
table_name = 'prod_config' | |
query = f''' | |
select config_key, config_value | |
from {schema}.{table_name} | |
where meta__inserted_at = (select max(meta__inserted_at) from {schema}.{table_name}) | |
;''' | |
df = pd.read_sql(query, db_conn) | |
df = df.set_index('config_key') | |
df_dict = df.to_dict().get('config_value') | |
return df_dict | |
@cached(cache=TTLCache(maxsize=1, ttl=86_400)) | |
def get_client_ltv_table(db_conn): | |
""" | |
Gets the LTV for every client_id. | |
:param db_conn: database connection | |
:returns: pandas dataframe | |
""" | |
query = ''' | |
select client_id, ltv | |
from churn_model.client_ltv; | |
''' | |
df = pd.read_sql(query, db_conn) | |
return df | |
def get_hashed_password_for_username(username, db_conn): | |
""" | |
Gets the hashed_password for username. | |
:param username: a username | |
:param db_conn: database connection | |
:returns: hashed password as string | |
""" | |
query = f''' | |
select password | |
from churn_model.app_credentials | |
where username = '{username}'; | |
''' | |
df = pd.read_sql(query, db_conn) | |
return df['password'][0] | |
def log_payloads_to_mysql(input_payload, output_payload, table_name, schema_name, db_secret_name): | |
""" | |
Logs input and output payloads payloads to MySQL. | |
:param input_payload: input payload | |
:param output_payload: output_payload | |
:param table_name: name of MySQL table | |
:param schema_name: name of MySQL schema | |
:param db_secret_name: Secrets Manager secret with DB creds | |
""" | |
uid = input_payload.get('uid') | |
logging_timestamp = output_payload.get('logging_timestamp') | |
new_input_payload = dict() | |
new_input_payload['input'] = input_payload | |
new_output_payload = dict() | |
new_output_payload['output'] = output_payload | |
payload_dict = {**new_input_payload, **new_output_payload} | |
df = pd.DataFrame({ | |
'uid': [uid], | |
'logging_timestamp': [logging_timestamp], | |
'input_output_payloads': [payload_dict] | |
}) | |
df.to_sql(name=table_name, schema=schema_name, con=make_mysql_connection(db_secret_name), | |
dtype={'input_output_payloads': sqlalchemy.types.JSON}, if_exists='append', index=False) | |
def log_feature_importance_to_mysql(df, schema, db_conn): | |
"""" | |
Logs feature importance scores to the feature_score table. This table is created by the stored procedure | |
GenerateModelPerformanceTables. | |
:param df: pandas dataframe containing all the columns expected by the feature_score table | |
:param schema: name of the schema | |
:param db_conn: database connection | |
""" | |
db.write_dataframe_to_database(df, schema, 'feature_score', db_conn) |
The second file is modeling/config.py. This is where we include global variables and configurations for training models. Of note, this location is where we define the models we will train along with the associated parameter spaces we will optimize (more on this point in later sections). Likewise, we define a series of metrics we will calculate on our test set. Additionally, you might notice we use named tuples throughout this script. Why? Because they are more readable; they allow us to assign keyword arguments rather than just reference index locations.
from sklearn.ensemble import ExtraTreesClassifier, RandomForestClassifier | |
from xgboost import XGBClassifier | |
from sklearn.calibration import CalibratedClassifierCV | |
from sklearn.metrics import log_loss, f1_score, roc_auc_score, brier_score_loss, balanced_accuracy_score | |
from collections import namedtuple | |
from hyperopt import hp | |
TARGET = 'churn' | |
FEATURES_TO_DROP = ['acquired_date'] | |
TEST_SET_PERCENTAGE = 0.20 | |
CLASS_CUTOFF = 0.50 | |
CV_SCORER = 'neg_log_loss' | |
CV_FOLDS = 5 | |
CATEGORICAL_FILL_VALUE = 'missing' | |
CALIBRATION_PLOT_BINS = [10, 25, 50, 100] | |
LOGGING_S3_BUCKET = 'churn-model-data-science-modeling' | |
DB_SECRET_NAME = 'churn-model-mysql' | |
DB_SCHEMA_NAME = 'churn_model' | |
LOG_TO_DB = True | |
DROP_COL_SCORER = log_loss | |
DROP_COL_SCORER_STRING = 'neg_log_loss' | |
DROP_COL_SCORING_TYPE = 'probability' | |
DROP_COL_HIGHER_IS_BETTER = True | |
EXPLANATION_SAMPLE_N = 10_000 | |
USE_SHAP_KERNEL = False | |
FEATURE_DTYPE_MAPPING = { | |
'activity_score': float, | |
'propensity_score': float, | |
'profile_score_new': float, | |
'completeness_score': float, | |
'xp_points': float, | |
'profile_score': float, | |
'portfolio_score': float, | |
'mouse_movement': float, | |
'average_stars': float, | |
'ad_target_group': str, | |
'marketing_message': str, | |
'device_type': str, | |
'all_star_group': str, | |
'mouse_x': str, | |
'coupon_code': str, | |
'ad_engagement_group': str, | |
'user_group': str, | |
'browser_type': str, | |
'email_code': str, | |
'marketing_creative': str, | |
'secondary_user_group': str, | |
'promotion_category': str, | |
'marketing_campaign': str, | |
'mouse_y': str, | |
'marketing_channel': str, | |
'marketing_creative_sub': str, | |
'site_level': str, | |
'acquired_date': str | |
} | |
ENGINEERING_PARAM_GRID = { | |
'preprocessor__numeric_transformer__log_creator__take_log': hp.choice( | |
'preprocessor__numeric_transformer__log_creator__take_log', ['yes', 'no']), | |
'preprocessor__categorical_transformer__category_combiner__combine_categories': hp.choice( | |
'preprocessor__categorical_transformer__category_combiner__combine_categories', ['yes', 'no']), | |
'preprocessor__categorical_transformer__feature_selector__percentile': hp.uniformint( | |
'preprocessor__categorical_transformer__feature_selector__percentile', 1, 100), | |
'preprocessor__numeric_transformer__feature_selector__percentile': hp.uniformint( | |
'preprocessor__numeric_transformer__feature_selector__percentile', 1, 100), | |
} | |
FOREST_PARAM_GRID = { | |
'model__base_estimator__max_depth': hp.uniformint('model__base_estimator__max_depth', 3, 16), | |
'model__base_estimator__min_samples_leaf': hp.uniform('model__base_estimator__min_samples_leaf', 0.001, 0.01), | |
'model__base_estimator__max_features': hp.choice('model__base_estimator__max_features', ['log2', 'sqrt']), | |
} | |
XGBOOST_PARAM_GRID = { | |
'model__base_estimator__learning_rate': hp.uniform('model__base_estimator__learning_ratee', 0.01, 0.5), | |
'model__base_estimator__n_estimators': hp.randint('model__base_estimator__n_estimators', 75, 150), | |
'model__base_estimator__max_depth': hp.randint('model__base_estimator__max_depth', 3, 16), | |
'model__base_estimator__min_child_weight': hp.uniformint('model__base_estimator__min_child_weight', 2, 16), | |
} | |
model_named_tuple = namedtuple('model_config', {'model_name', 'model', 'param_space', 'iterations'}) | |
MODEL_TRAINING_LIST = [ | |
model_named_tuple(model_name='random_forest', model=CalibratedClassifierCV(base_estimator=RandomForestClassifier()), | |
param_space=FOREST_PARAM_GRID, iterations=15), | |
model_named_tuple(model_name='extra_trees', model=CalibratedClassifierCV(base_estimator=ExtraTreesClassifier()), | |
param_space=FOREST_PARAM_GRID, iterations=15), | |
model_named_tuple(model_name='xgboost', model=CalibratedClassifierCV(base_estimator=XGBClassifier()), | |
param_space=XGBOOST_PARAM_GRID, iterations=15), | |
] | |
evaluation_named_tuple = namedtuple('model_evaluation', {'evaluation_column', 'scorer_callable', 'metric_name'}) | |
MODEL_EVALUATION_LIST = [ | |
evaluation_named_tuple(evaluation_column='1_prob', scorer_callable=log_loss, metric_name='log_loss'), | |
evaluation_named_tuple(evaluation_column='1_prob', scorer_callable=brier_score_loss, | |
metric_name='brier_score_loss'), | |
evaluation_named_tuple(evaluation_column='1_prob', scorer_callable=roc_auc_score, metric_name='roc_auc'), | |
evaluation_named_tuple(evaluation_column='predicted_class', scorer_callable=f1_score, metric_name='f1'), | |
evaluation_named_tuple(evaluation_column='predicted_class', scorer_callable=balanced_accuracy_score, | |
metric_name='balanced_accuracy_score'), | |
] |
You might have noticed that our models are wrapped in the CalibratedClassifierCV. Doing so is how we improve the calibration of our models. We, therefore, are actually tuning the base_estimator in the CalibratedClassifierCV, hence the parameters we have specified in our parameter search grids. You'll also see examples throughout this chapter that do not include the CalibratedClassifierCV. We shall discuss the CalibratedClassifierCV more in the next chapter, and you can decide if you want to use this methodology during your hyperparameter search or not. We illustrate how to do so in this chapter, though, as it is often an overlooked topic that can be confusing without examples. Should you opt to not use the CalibratedClassifierCV, you simply need to remove that call from the MODEL_TRAINING_LIST and remove the "__base_estimator__" from your parameter search dictionaries. Toggling back and forth isn't too difficult, fortunately.
We previously discussed voting and stacking models. If we wanted to write model configs for such models, here is what we would do. To note, the parameter grids are set for something called Randomized Search for simplicity (which will be explained shortly).
VOTING_PARAM_GRID = { | |
'model__base_estimator__log_reg__C': uniform(scale=10), | |
'model__base_estimator__random_forest__max_depth': randint(3, 31), | |
'model__base_estimator__random_forest__min_samples_leaf': uniform(0.0001, 0.1), | |
'model__base_estimator__random_forest__max_features': ['log2', 'sqrt'], | |
'model__base_estimator__xgboost__learning_rate': uniform(0.01, 0.5), | |
'model__base_estimator__xgboost__n_estimators': randint(50, 200), | |
'model__base_estimator__xgboost__max_depth': randint(3, 31), | |
'model__base_estimator__xgboost__min_child_weight': randint(1, 16), | |
} | |
STACKING_PARAM_GRID = { | |
'model__base_estimator__random_forest__max_depth': randint(3, 31), | |
'model__base_estimator__random_forest__min_samples_leaf': uniform(0.0001, 0.1), | |
'model__base_estimator__random_forest__max_features': ['log2', 'sqrt'], | |
'model__base_estimator__light_gbm__learning_rate': uniform(0.01, 0.5), | |
'model__base_estimator__light_gbm__n_estimators': randint(50, 200), | |
'model__base_estimator__light_gbm__max_depth': randint(3, 31), | |
'model__base_estimator__light_gbm__min_data_in_leaf': randint(1, 101), | |
'model__base_estimator__light_gbm__num_leaves': randint(10, 101), | |
} | |
model_named_tuple = namedtuple('model_config', {'model_name', 'model', 'param_space', 'iterations'}) | |
MODEL_TRAINING_LIST = [ | |
model_named_tuple(model_name='voting_classifier', model=CalibratedClassifierCV(base_estimator=VotingClassifier( | |
estimators=[ | |
('log_reg', LogisticRegression(solver='sag')), | |
('random_forest', RandomForestClassifier()), | |
('xgboost', XGBClassifier()) | |
], voting='soft')), param_space=VOTING_PARAM_GRID, iterations=15), | |
model_named_tuple(model_name='stacking_classifier', model=CalibratedClassifierCV(base_estimator=StackingClassifier( | |
estimators=[ | |
('random_forest', RandomForestClassifier()), | |
('light_gbm', LGBMClassifier(verbose=False)) | |
])), param_space=VOTING_PARAM_GRID, iterations=15), | |
] |
Another file is modeling/model.py. This is where we place our function for training a machine learning model. Again, more on this point in upcoming sections.
import pandas as pd | |
from sklearn.calibration import CalibratedClassifierCV | |
from sklearn.model_selection import cross_val_score | |
from hyperopt import fmin, tpe, Trials, space_eval | |
from helpers.model_helpers import save_pipeline, save_cv_scores | |
from data.db import log_model_metadata | |
def train_model(x_train, y_train, get_pipeline_function, model_uid, model, param_space, iterations, cv_strategy, | |
cv_scoring, static_param_space, db_schema_name=None, db_conn=None, log_to_db=False): | |
""" | |
Trains a machine learning model, optimizes the hyperparameters, and saves the serialized model into the | |
MODELS_DIRECTORY. | |
:param x_train: x_train dataframe | |
:param y_train: y_train series | |
:param get_pipeline_function: callable that takes model to produce a scikit-learn pipeline | |
:param model_uid: model uid | |
:param model: instantiated model | |
:param param_space: the distribution of hyperparameters to search over | |
:param iterations: number of trial to search for optimal hyperparameters | |
:param cv_strategy: cross validation strategy | |
:param cv_scoring: scoring method used for cross validation | |
:param static_param_space: parameter search space valid for all models (e.g. feature engineering) | |
:param db_schema_name: database schema to log metrics to | |
:param db_conn: database connection | |
:param log_to_db: Boolean of whether or not to log model info to a database | |
:returns: scikit-learn pipeline | |
""" | |
print(f'training {model_uid}...') | |
pipeline = get_pipeline_function(model) | |
if static_param_space: | |
param_space.update(static_param_space) | |
cv_scores_df = pd.DataFrame() | |
def _model_objective(params): | |
pipeline.set_params(**params) | |
score = cross_val_score(pipeline, x_train, y_train, cv=cv_strategy, scoring=cv_scoring, n_jobs=-1) | |
temp_cv_scores_df = pd.DataFrame(score) | |
temp_cv_scores_df = temp_cv_scores_df.reset_index() | |
temp_cv_scores_df['index'] = 'fold_' + temp_cv_scores_df['index'].astype(str) | |
temp_cv_scores_df = temp_cv_scores_df.T | |
temp_cv_scores_df = temp_cv_scores_df.add_prefix('fold_') | |
temp_cv_scores_df = temp_cv_scores_df.iloc[1:] | |
temp_cv_scores_df['mean'] = temp_cv_scores_df.mean(axis=1) | |
temp_cv_scores_df['std'] = temp_cv_scores_df.std(axis=1) | |
temp_params_df = pd.DataFrame(params, index=list(range(0, len(params) + 1))) | |
temp_cv_scores_df = pd.concat([temp_params_df, temp_cv_scores_df], axis=1) | |
temp_cv_scores_df = temp_cv_scores_df.dropna() | |
nonlocal cv_scores_df | |
cv_scores_df = cv_scores_df.append(temp_cv_scores_df) | |
return 1 - score.mean() | |
trials = Trials() | |
best = fmin(_model_objective, param_space, algo=tpe.suggest, max_evals=iterations, trials=trials) | |
best_params = space_eval(param_space, best) | |
cv_scores_df = cv_scores_df.sort_values(by=['mean'], ascending=False) | |
cv_scores_df = cv_scores_df.reset_index(drop=True) | |
cv_scores_df = cv_scores_df.reset_index() | |
cv_scores_df = cv_scores_df.rename(columns={'index': 'ranking'}) | |
save_cv_scores(cv_scores_df, model_uid, 'cv_scores') | |
pipeline.set_params(**best_params) | |
pipeline.fit(x_train, y_train) | |
save_pipeline(pipeline, model_uid, 'model') | |
if log_to_db: | |
log_model_metadata(model_uid, db_schema_name, db_conn) | |
return pipeline | |
def calibrate_fitted_model(pipeline, model_uid, x_validation, y_validation, calibration_method='sigmoid'): | |
""" | |
Trains a CalibratedClassiferCV using the fitted model from the pipeline as the base_estimator. | |
:param pipeline: Fitted pipeline | |
:param model_uid: model uid | |
:param x_validation: x validation set | |
:param y_validation: y validation set | |
:param calibration_method: the calibration method to use; either sigmoid or isotonic; default is sigmoid | |
:returns: sklearn pipeline with the model step as a fitted CalibratedClassifierCV | |
""" | |
model = pipeline.named_steps['model'] | |
pipeline.steps.pop(len(pipeline) - 1) | |
x_validation = pipeline.transform(x_validation) | |
calibrated_model = CalibratedClassifierCV(model, cv='prefit', method=calibration_method) | |
calibrated_model.fit(x_validation, y_validation) | |
pipeline.steps.append(['model', calibrated_model]) | |
save_pipeline(pipeline, model_uid, 'model') | |
return pipeline |
Lastly, we also import code from helpers/model_helpers.py. Many of these functions were reviewed in Chapter 9. Additional functions, such as how we save our pipeline, can be found in the book's GitHub repo.
The final file we'll introduce at this time is modeling/train.py. This is where the action happens - we train our models. Basically, we do the following: 1) grab our training data, 2) create our train-test split, 3) and loop over every model in MODEL_TRAINING_LIST, which will train, evaluate, and explain a series of machine learning models. We'll cover the evaluation functions you see in chapter 12. Likewise, we shall cover model explanation in chapter 13.
import numpy as np | |
import time | |
import os | |
from ds_helpers import aws | |
from data.db import get_training_data, make_mysql_connection | |
from helpers.model_helpers import create_x_y_split, create_train_test_split, create_uid, save_data_in_model_directory | |
from modeling.config import MODEL_EVALUATION_LIST, CV_SCORER, TARGET, CLASS_CUTOFF, CALIBRATION_PLOT_BINS, CV_FOLDS, \ | |
TEST_SET_PERCENTAGE, MODEL_TRAINING_LIST, ENGINEERING_PARAM_GRID, DB_SECRET_NAME, DB_SCHEMA_NAME, \ | |
LOGGING_S3_BUCKET, LOG_TO_DB, DROP_COL_SCORER, DROP_COL_SCORER_STRING, DROP_COL_SCORING_TYPE, \ | |
DROP_COL_HIGHER_IS_BETTER, EXPLANATION_SAMPLE_N, USE_SHAP_KERNEL | |
from modeling.model import train_model | |
from modeling.pipeline import get_pipeline | |
from modeling.evaluate import run_omnibus_model_evaluation | |
from modeling.explain import run_omnibus_model_explanation | |
def create_training_and_testing_data(target, test_set_percentage, db_conn): | |
""" | |
Creates training and testing data for modeling. | |
:param target: name of the target | |
:param test_set_percentage: percentage of observations for the test set | |
:param db_conn: database connection | |
:returns: x_train, x_test, y_train, y_test | |
""" | |
print('creating training and testing data...') | |
df = get_training_data(db_conn) | |
df = df.drop(['client_id', 'id', 'meta__inserted_at'], 1) | |
df[TARGET] = np.where(df[target] == 'yes', 1, 0) | |
x, y = create_x_y_split(df, target) | |
x_train, x_test, y_train, y_test = create_train_test_split(x, y, test_set_percentage) | |
return x_train, x_test, y_train, y_test | |
def train_and_evaluate_model(x_train, x_test, y_train, y_test, model_training_list, cv_strategy, cv_scoring, | |
static_param_space, class_cutoff, target, evaluation_list, calibration_bins, | |
drop_col_scorer, drop_col_scorer_string, drop_col_scoring_type, drop_col_higher_is_better, | |
explanation_sample_n, use_shap_kernel, s3_logging_bucket, db_schema_name=None, | |
db_conn=None, log_to_db=None): | |
""" | |
Trains, evaluates, and explains a series of machine learning models. | |
:param x_train: x train | |
:param x_test: x test | |
:param y_train: y train | |
:param y_test: y test | |
:param model_training_list: list of named tuples containing model configurations; the following tuple elements are | |
required: model_name, model, param_space, iterations | |
:param cv_strategy: cross validation strategy | |
:param cv_scoring: scoring strategy for cross validation | |
:param static_param_space: param space valid for every model | |
:param class_cutoff: probability percentage to be classified in the position class | |
:param target: name of the target | |
:param evaluation_list: list of named tuples containing model evaluation configurations: the following tuple | |
elements are required: evaluation_column, scorer_callable, metric_name | |
:param calibration_bins: list of calibration bins to show | |
:param drop_col_scorer: scikit-learn scoring function for drop col model | |
:param drop_col_scorer_string: scoring metric in the form of a string (e.g. 'neg_log-loss') for drop col model | |
:param drop_col_scoring_type: either class or probability for drop col model | |
:param drop_col_higher_is_better: Boolean of whether or not a higher score is better (e.g. roc auc vs. log loss) for | |
drop col model | |
:param explanation_sample_n: number of observations to include when performing feature explanation | |
:param use_shap_kernel: Boolean of whether or not to use the SHAP kernel explainer | |
:param s3_logging_bucket: S3 bucket in which to store the model output | |
:param db_schema_name: name of the schema for logging model results | |
:param db_conn: database connection | |
:param log_to_db: Boolean of whether or not to log results to the database | |
""" | |
for model in model_training_list: | |
loop_start_time = time.time() | |
model_uid = create_uid(base_string=model.model_name) | |
save_data_in_model_directory(model_uid=model_uid, x_train=x_train, x_test=x_test, y_train=y_train, | |
y_test=y_test) | |
best_pipeline = train_model(x_train=x_train, y_train=y_train, get_pipeline_function=get_pipeline, | |
model_uid=model_uid, model=model.model, param_space=model.param_space, | |
iterations=model.iterations, cv_strategy=cv_strategy, cv_scoring=cv_scoring, | |
static_param_space=static_param_space, db_schema_name=db_schema_name, | |
db_conn=db_conn, log_to_db=log_to_db) | |
run_omnibus_model_evaluation(estimator=best_pipeline, model_uid=model_uid, x_test=x_test, y_test=y_test, | |
class_cutoff=class_cutoff, target=target, evaluation_list=evaluation_list, | |
calibration_bins=calibration_bins, db_schema_name=db_schema_name, db_conn=db_conn, | |
log_to_db=log_to_db) | |
run_omnibus_model_explanation(estimator=best_pipeline, model_uid=model_uid, x_test=x_test, y_test=y_test, | |
x_train=x_train, y_train=y_train, drop_col_scorer=drop_col_scorer, | |
drop_col_scorer_string=drop_col_scorer_string, | |
drop_col_scoring_type=drop_col_scoring_type, | |
drop_col_higher_is_better=drop_col_higher_is_better, | |
sample_n=explanation_sample_n, use_shap_kernel=use_shap_kernel, | |
db_schema_name=db_schema_name, db_conn=db_conn, log_to_db=log_to_db) | |
print(f'uploading {model_uid} directory to S3...') | |
aws.upload_directory_to_s3(local_directory=os.path.join('modeling', model_uid), bucket=s3_logging_bucket) | |
print(f'--- {time.time() - loop_start_time} seconds for to train{model_uid} ---') | |
def main(target, test_set_percentage, model_training_list, cv_strategy, cv_scoring, static_param_space, class_cutoff, | |
evaluation_list, calibration_bins, drop_col_scorer, drop_col_scorer_string, drop_col_scoring_type, | |
drop_col_higher_is_better, explanation_sample_n, use_shap_kernel, s3_logging_bucket, db_schema_name, log_to_db, | |
db_secret_name): | |
""" | |
Main execution function. | |
:param target: name of the target | |
:param test_set_percentage: percentage of observations for the test set | |
:param model_training_list: list of named tuples containing model configurations; the following tuple elements are | |
required: model_name, model, param_space, iterations | |
:param cv_strategy: cross validation strategy | |
:param cv_scoring: scoring strategy for cross validation | |
:param static_param_space: param space valid for every model | |
:param class_cutoff: probability percentage to be classified in the position class | |
:param target: name of the target | |
:param evaluation_list: list of named tuples containing model evaluation configurations: the following tuple | |
elements are required: evaluation_column, scorer_callable, metric_name | |
:param calibration_bins: list of calibration bins to show | |
:param drop_col_scorer: scikit-learn scoring function for drop col model | |
:param drop_col_scorer_string: scoring metric in the form of a string (e.g. 'neg_log-loss') for drop col model | |
:param drop_col_scoring_type: either class or probability for drop col model | |
:param drop_col_higher_is_better: Boolean of whether or not a higher score is better (e.g. roc auc vs. log loss) for | |
drop col model | |
:param explanation_sample_n: number of observations to include when performing feature explanation | |
:param use_shap_kernel: Boolean of whether or not to use the SHAP kernel explainer | |
:param s3_logging_bucket: S3 bucket in which to store the model output | |
:param db_schema_name: name of the schema for logging model results | |
:param log_to_db: Boolean of whether or not to log results to the database | |
:param db_secret_name: Secrets Manager secret with database credentials | |
""" | |
db_conn = make_mysql_connection(db_secret_name) | |
x_train, x_test, y_train, y_test = create_training_and_testing_data(target, test_set_percentage, db_conn) | |
train_and_evaluate_model(x_train, x_test, y_train, y_test, model_training_list, cv_strategy, cv_scoring, | |
static_param_space, class_cutoff, target, evaluation_list, calibration_bins, | |
drop_col_scorer, drop_col_scorer_string, drop_col_scoring_type, drop_col_higher_is_better, | |
explanation_sample_n, use_shap_kernel, s3_logging_bucket, db_schema_name, db_conn, | |
log_to_db) | |
if __name__ == "__main__": | |
script_start_time = time.time() | |
main( | |
target=TARGET, | |
test_set_percentage=TEST_SET_PERCENTAGE, | |
model_training_list=MODEL_TRAINING_LIST, | |
cv_strategy=CV_FOLDS, | |
cv_scoring=CV_SCORER, | |
static_param_space=ENGINEERING_PARAM_GRID, | |
class_cutoff=CLASS_CUTOFF, | |
evaluation_list=MODEL_EVALUATION_LIST, | |
calibration_bins=CALIBRATION_PLOT_BINS, | |
drop_col_scorer= DROP_COL_SCORER, | |
drop_col_scorer_string=DROP_COL_SCORER_STRING, | |
drop_col_scoring_type=DROP_COL_SCORING_TYPE, | |
drop_col_higher_is_better=DROP_COL_HIGHER_IS_BETTER, | |
explanation_sample_n=EXPLANATION_SAMPLE_N, | |
use_shap_kernel=USE_SHAP_KERNEL, | |
s3_logging_bucket=LOGGING_S3_BUCKET, | |
db_schema_name=DB_SCHEMA_NAME, | |
log_to_db=LOG_TO_DB, | |
db_secret_name=DB_SECRET_NAME | |
) | |
print("--- {} seconds for script to run ---".format(time.time() - script_start_time)) |
Given the size of our training set, our models will take a while to train. Your number of CPUs and their power also plays a huge role in the training time. If your machine isn't up to snuff, you can potentially take a random sample of the data. Running the code on EC2 is also an option. Another option is to not train using the CalibratedClassifierCV. This is the main culprit for slowing down the training in our script. Instead, you could perform post-hoc calibration on a validation set. Don't sweat: we'll cover more on calibration options in chapter 11.
Of note, to ensure your computer doesn't sleep while you're training models that might take some time, you can issue the following command from terminal.
$ caffeinate
Grid search is the most basic form of hyperparameter tuning. This is a brute-force methodology. Basically, we try every combination of parameters from a specified grid. This can be thorough but, of course, potentially time-consuming. We also risk biasing our hyperparameter tuning by the parameter grid we construct. We could easily provide a grid with too few options or based on biased heuristics. For example, the below is a sample parameter grid for optimizing the C regularization and fit_intercept parameters in a Logistic Regression. The provided grid will construct 8 different models, pairing each C value with each fit_intercept option.
'fit_intercept': [True, False]}
But what if the best value of C is 0.5? We wouldn't discover it. We could develop a more extensive grid to combat this issue, but we have better options. One option is Randomized Search. Rather than search over all the options in a grid, we randomly search over a distribution. For this to be effective, we have 1) to search a relevant distribution and 2) to run the optimization for enough iterations to find a suitable combination of parameters. Here is how our grid might look for Randomized Search.
'fit_intercept': [True, False]}
In the above grid, we randomly search over a uniform distribution with a range of 0 to 10. Given enough iterations, we will increase our odds of finding the best C value paired with the best fit_intercept value. However, the drawback is that this process is random. We might waste time trying combinations of parameters that simply aren't promising. Don't worry: we'll cover more alternative options in subsequent sections.
Generally, I don't recommend using Grid Search unless we have a specific reason to try out certain and exact parameter combinations. I also generally do not recommend leveraging Randomized Search since better options exist.
Same of the code samples in the remainder of this section do not exact match the final structure of modeling/model.py. They are close, but not exact. However, they could easily be edited to match exactly or more closely.
In the previous section, we discussed the drawbacks of Randomized Search to tune model parameters. The big issue is that it will waste time trying parameter combinations that, based on evidence from previous trials, have little to no promise. An alternative is Successive Halving, which is like Random Search though better in some important respects. After a set of random trials, a certain percentage of poor performing configurations are thrown out. The early models are also only fit on a small number of resources (i.e. training data). The remaining configurations are then the basis for the next set of random trials, and more "resources" are allowed for training and evaluation. This process repeats until the best set of parameters is discovered. Implementing Successive Halving in scikit-learn is pretty easy. We need to update modeling/model.py to the following, which really amounts to changing only a couple of lines.
import joblib | |
import os | |
import pandas as pd | |
from sklearn.experimental import enable_halving_search_cv | |
from sklearn.model_selection import HalvingRandomSearchCV | |
from modeling.config import MODELS_DIRECTORY, DIAGNOSTICS_DIRECTORY, ENGINEERING_PARAM_GRID | |
def train_model(x_train, y_train, get_pipeline_function, model_name, model, param_space, cv_times, scoring): | |
""" | |
Trains a machine learning model, optimizes the hyperparameters, saves the serialized model into the | |
MODELS_DIRECTORY, and saves the cross validation results as a csv into the DIAGNOSTICS_DIRECTORY. | |
:param x_train: x_train dataframe | |
:param y_train: y_train series | |
:param get_pipeline_function: callable that takes model to produce a scikit-learn pipeline | |
:param model_name: name of the model | |
:param model: instantiated model | |
:param param_space: the distribution of hyperparameters to search over | |
:param cv_times: number of times to cross validation | |
:param scoring: scoring method used for cross validation | |
:returns: scikit-learn pipeline | |
""" | |
print(f'training {model_name}...') | |
pipeline = get_pipeline_function(model) | |
param_space.update(ENGINEERING_PARAM_GRID) | |
search = HalvingRandomSearchCV(pipeline, param_distributions=param_space, n_candidates=1_000, scoring=scoring, | |
cv=cv_times, n_jobs=-1, verbose=10, random_state=19) | |
search.fit(x_train, y_train) | |
best_pipeline = search.best_estimator_ | |
cv_results = pd.DataFrame(search.cv_results_).sort_values(by=['rank_test_score'], ascending=False) | |
joblib.dump(best_pipeline, os.path.join(model_name, MODELS_DIRECTORY, f'{model_name}.pkl'), compress=3) | |
cv_results.to_csv(os.path.join(model_name, DIAGNOSTICS_DIRECTORY, f'{model_name}_cv_results.csv'), index=False) | |
return best_pipeline |
Successive Halving is an effective algorithm, though it does have drawbacks. If we have a large number of potential model configurations, we may inadvertently kill off some promising configurations that need more time and data to converge (i.e. they get lost in the shuffle). Likewise, if we have a small number of potential model configurations, some poor configurations may be given too many resources and too much time (i.e. the algorithm has to look somewhere). An alternative to Successive Halving is Hyperband. This algorithm extends Successive Halving by considering several possible numbers of model configurations, given a set budget. Think of Hyperband as a smarter version of Successive Halving - rather than applying a brute-force rule for the number of models to try and resources to allocate, Hyperband adaptively determines these amounts. Another extension is BOHB, short for Bayesian Optimization and Hyperband. In this algorithm, Hyperband is used to determine how many configurations to try during the optimization routine. However, rather than using random sampling to select the exact model configurations, Bayesian Optimization is leveraged, which selects configurations to explore based on previous results.
We can, fortunately, implement both Hyperband and BOHB pretty easily. We first need to install hpbandster-sklearn. This library provides a drop-in replacement for Randomized Search, called HpBandSterSearchCV. The default optimizer is BOHB, though we could pass in "hyperband" is we wanted to use plain hyperband. Let's see what modeling/model.py now looks like.
import joblib | |
import os | |
import pandas as pd | |
from hpbandster_sklearn import HpBandSterSearchCV | |
from modeling.config import MODELS_DIRECTORY, DIAGNOSTICS_DIRECTORY | |
def train_model(x_train, y_train, get_pipeline_function, model_name, model, param_space, n_trials, cv_times, scoring): | |
""" | |
Trains a machine learning model, optimizes the hyperparameters, saves the serialized model into the | |
MODELS_DIRECTORY, and saves the cross validation results as a csv into the DIAGNOSTICS_DIRECTORY. | |
:param x_train: x_train dataframe | |
:param y_train: y_train series | |
:param get_pipeline_function: callable that takes model to produce a scikit-learn pipeline | |
:param model_name: name of the model | |
:param model: instantiated model | |
:param param_space: the distribution of hyperparameters to search over | |
:param n_trials: number of trial to search for optimal hyperparameters | |
:param cv_times: number of times to cross validation | |
:param scoring: scoring method used for cross validation | |
:returns: scikit-learn pipeline | |
""" | |
print(f'training {model_name}...') | |
pipeline = get_pipeline_function(model) | |
search = HpBandSterSearchCV(pipeline, param_distributions=param_space, n_iter=n_trials, scoring=scoring, | |
cv=cv_times, n_jobs=-1, verbose=10, random_state=19) | |
search.fit(x_train, y_train) | |
best_pipeline = search.best_estimator_ | |
cv_results = pd.DataFrame(search.cv_results_).sort_values(by=['rank_test_score'], ascending=False) | |
joblib.dump(best_pipeline, os.path.join(model_name, MODELS_DIRECTORY, f'{model_name}.pkl'), compress=3) | |
cv_results.to_csv(os.path.join(model_name, DIAGNOSTICS_DIRECTORY, f'{model_name}_cv_results.csv'), index=False) | |
return best_pipeline |
Likewise, we need to change modeling/config.py. For specifying our parameter search space, we're best served by using ConfigSpace when leveraging hpbandster-sklearn. Here is what that might look like if we want to train a Random Forest.
import ConfigSpace as cs | |
import ConfigSpace.hyperparameters as csh | |
from copy import deepcopy | |
BASE_PARAM_GRID = cs.ConfigurationSpace(seed=42) | |
BASE_PARAM_GRID.add_hyperparameter(csh.CategoricalHyperparameter( | |
'preprocessor__numeric_transformer__log_creator__take_log', ['yes', 'no'])) | |
BASE_PARAM_GRID.add_hyperparameter(csh.CategoricalHyperparameter( | |
'preprocessor__categorical_transformer__category_combiner__combine_categories', ['yes', 'no'])) | |
BASE_PARAM_GRID.add_hyperparameter(csh.UniformIntegerHyperparameter( | |
'feature_selector__percentile', 1, 100)) | |
FOREST_PARAM_GRID = deepcopy(BASE_PARAM_GRID) | |
FOREST_PARAM_GRID.add_hyperparameter(csh.UniformIntegerHyperparameter('model__base_estimator__max_depth', 3, 30)) | |
FOREST_PARAM_GRID.add_hyperparameter(csh.UniformFloatHyperparameter('model__base_estimator__min_samples_leaf', | |
0.0001, 0.01)) | |
FOREST_PARAM_GRID.add_hyperparameter(csh.CategoricalHyperparameter('model__base_estimator__max_features', | |
['log2', 'sqrt'])) | |
MODEL_TRAINING_DICT = { | |
'random_forest': [RandomForestClassifier(n_estimators=500), FOREST_PARAM_GRID, 10] | |
} |
We have yet another option: Bayesian hyperparameter tuning. We briefly introduced this concept in the previous section. This methodology updates its search space based on the results of previous trials. It will spend more time searching promising parts of the distribution. We can implement Bayesian Hyperparameter tuning with the scikit-optimize library.
Let's first see what a parameter grid for, say, a Random Forest would look like.
FOREST_PARAM_GRID = { | |
'model__base_estimator__max_depth': (3, 30), | |
'model__base_estimator__min_samples_leaf': (0.0001, 0.1, 'uniform'), | |
'model__base_estimator__max_features': ['log2', 'sqrt'], | |
} |
Let's also review how our training function would now look.
import joblib | |
import os | |
import pandas as pd | |
from skopt import BayesSearchCV | |
from modeling.config import MODELS_DIRECTORY, DIAGNOSTICS_DIRECTORY, ENGINEERING_PARAM_GRID | |
def train_model(x_train, y_train, get_pipeline_function, model_name, model, param_space, n_trials, cv_times, scoring): | |
""" | |
Trains a machine learning model, optimizes the hyperparameters, saves the serialized model into the | |
MODELS_DIRECTORY, and saves the cross validation results as a csv into the DIAGNOSTICS_DIRECTORY. | |
:param x_train: x_train dataframe | |
:param y_train: y_train series | |
:param get_pipeline_function: callable that takes model to produce a scikit-learn pipeline | |
:param model_name: name of the model | |
:param model: instantiated model | |
:param param_space: the distribution of hyperparameters to search over | |
:param n_trials: number of trial to search for optimal hyperparameters | |
:param cv_times: number of times to cross validation | |
:param scoring: scoring method used for cross validation | |
:returns: scikit-learn pipeline | |
""" | |
print(f'training {model_name}...') | |
pipeline = get_pipeline_function(model) | |
param_space.update(ENGINEERING_PARAM_GRID) | |
search = BayesSearchCV(pipeline, search_spaces=param_space, n_iter=n_trials, scoring=scoring, cv=cv_times, | |
n_jobs=-1, verbose=10) | |
search.fit(x_train, y_train) | |
best_pipeline = search.best_estimator_ | |
cv_results = pd.DataFrame(search.cv_results_).sort_values(by=['rank_test_score'], ascending=False) | |
joblib.dump(best_pipeline, os.path.join(model_name, MODELS_DIRECTORY, f'{model_name}.pkl'), compress=3) | |
cv_results.to_csv(os.path.join(model_name, DIAGNOSTICS_DIRECTORY, f'{model_name}_cv_results.csv'), index=False) | |
return best_pipeline |
The wonderful tour of hyperparameter optimization techniques continues with Tree-Structured Parzen Estimators. One of the most common implementations is Hyperopt. The idea is similar to Bayesian Optimization. Bayesian Optimization attempts to figure out the validation loss given the hyperparameters. A Parzen Estimator attempts to figure out the hyperparameters given the validation loss. Here is the deal with Hyperopt: if we want to use the hyperopt library's implementation, we don't get a drop-in replacement for RandomizedSearchCV. Fortunately, we can still pretty easily use the hyperopt library.
When using hyperopt, we need to use distributions provided by the library. Likewise, the first element in each tuple is a string name for the parameter.
from hyperopt import hp | |
ENGINEERING_PARAM_GRID = { | |
'preprocessor__numeric_transformer__log_creator__take_log': hp.choice( | |
'preprocessor__numeric_transformer__log_creator__take_log', ['yes', 'no']), | |
'preprocessor__categorical_transformer__category_combiner__combine_categories': hp.choice( | |
'preprocessor__categorical_transformer__category_combiner__combine_categories', ['yes', 'no']), | |
'preprocessor__categorical_transformer__feature_selector__percentile': hp.uniformint( | |
'preprocessor__categorical_transformer__feature_selector__percentile', 1, 100), | |
'preprocessor__numeric_transformer__feature_selector__percentile': hp.uniformint( | |
'preprocessor__numeric_transformer__feature_selector__percentile', 1, 100), | |
} | |
FOREST_PARAM_GRID = { | |
'model__base_estimator__max_depth': hp.uniformint('model__base_estimator__max_depth', 3, 16), | |
'model__base_estimator__min_samples_leaf': hp.uniform('model__base_estimator__min_samples_leaf', 0.001, 0.01), | |
'model__base_estimator__max_features': hp.choice('model__base_estimator__max_features', ['log2', 'sqrt']), | |
} |
You'll notice that our training function changes a bit. We define a function we want to minimize as hyperopt can only minimize functions. Since we actually want to maximize our scoring metric, we can simply minimize 1 - our scoring metric. After we have found the best parameters, we then refit our pipeline on the entire training set.
import pandas as pd | |
import joblib | |
import os | |
from sklearn.model_selection import cross_val_score | |
from hyperopt import fmin, tpe, Trials, space_eval | |
def train_model(x_train, y_train, get_pipeline_function, model_uid, model, param_space, iterations, cv_strategy, | |
cv_scoring, static_param_space): | |
""" | |
Trains a machine learning model, optimizes the hyperparameters, and saves the serialized model. | |
:param x_train: x_train dataframe | |
:param y_train: y_train series | |
:param get_pipeline_function: callable that takes model to produce a scikit-learn pipeline | |
:param model_uid: model uid | |
:param model: instantiated model | |
:param param_space: the distribution of hyperparameters to search over | |
:param iterations: number of trial to search for optimal hyperparameters | |
:param cv_strategy: cross validation strategy | |
:param cv_scoring: scoring method used for cross validation | |
:param static_param_space: parameter search space valid for all models (e.g. feature engineering) | |
:returns: scikit-learn pipeline | |
""" | |
print(f'training {model_uid}...') | |
pipeline = get_pipeline_function(model) | |
if static_param_space: | |
param_space.update(static_param_space) | |
cv_scores_df = pd.DataFrame() | |
def _model_objective(params): | |
pipeline.set_params(**params) | |
score = cross_val_score(pipeline, x_train, y_train, cv=cv_strategy, scoring=cv_scoring, n_jobs=-1) | |
temp_cv_scores_df = pd.DataFrame(score) | |
temp_cv_scores_df = temp_cv_scores_df.reset_index() | |
temp_cv_scores_df['index'] = 'fold_' + temp_cv_scores_df['index'].astype(str) | |
temp_cv_scores_df = temp_cv_scores_df.T | |
temp_cv_scores_df = temp_cv_scores_df.add_prefix('fold_') | |
temp_cv_scores_df = temp_cv_scores_df.iloc[1:] | |
temp_cv_scores_df['mean'] = temp_cv_scores_df.mean(axis=1) | |
temp_cv_scores_df['std'] = temp_cv_scores_df.std(axis=1) | |
temp_params_df = pd.DataFrame(params, index=list(range(0, len(params) + 1))) | |
temp_cv_scores_df = pd.concat([temp_params_df, temp_cv_scores_df], axis=1) | |
temp_cv_scores_df = temp_cv_scores_df.dropna() | |
nonlocal cv_scores_df | |
cv_scores_df = cv_scores_df.append(temp_cv_scores_df) | |
return 1 - score.mean() | |
trials = Trials() | |
best = fmin(_model_objective, param_space, algo=tpe.suggest, max_evals=iterations, trials=trials) | |
best_params = space_eval(param_space, best) | |
cv_scores_df = cv_scores_df.sort_values(by=['mean'], ascending=False) | |
cv_scores_df = cv_scores_df.reset_index(drop=True) | |
cv_scores_df = cv_scores_df.reset_index() | |
cv_scores_df = cv_scores_df.rename(columns={'index': 'ranking'}) | |
cv_scores_df.to_csv(os.path.join(model_uid, 'diagnostics', 'cv_scores', 'cv_scores.csv'), index=False) | |
pipeline.set_params(**best_params) | |
pipeline.fit(x_train, y_train) | |
joblib.dump(os.path.join(model_uid, 'models', 'model.pkl')) | |
return pipeline |
Optuna is another library we can use for hyperparameter optimization. Below is an example of how to use the optuna library to implement a Tree-Structured Parzen Estimator for hyperparameter optimization. To note, given how optuna is set up, there really isn't a great way to use our current modeling structure. We have to instead author a tightly-coupled function as opposed to a more modular one.
import joblib | |
import os | |
import optuna | |
from sklearn.model_selection import cross_val_score | |
from optuna.samplers import TPESampler | |
from modeling.config import MODELS_DIRECTORY | |
def train_random_forest(x_train, y_train, get_pipeline_function, model_name, model, n_trials, cv_times, | |
scoring): | |
""" | |
Trains a machine learning model, optimizes the hyperparameters, and saves the serialized model into the | |
MODELS_DIRECTORY. | |
:param x_train: x_train dataframe | |
:param y_train: y_train series | |
:param get_pipeline_function: callable that takes model to produce a scikit-learn pipeline | |
:param model_name: name of the model | |
:param model: instantiated model | |
:param n_trials: number of trial to search for optimal hyperparameters | |
:param cv_times: number of times to cross validation | |
:param scoring: scoring method used for cross validation | |
:returns: scikit-learn pipeline | |
""" | |
print(f'training {model_name}...') | |
pipeline = get_pipeline_function(model) | |
def objective(trial): | |
from copy import deepcopy | |
trial_pipe = deepcopy(pipeline) | |
preprocessor__numeric_transformer__log_creator__take_log = trial.suggest_categorical( | |
'preprocessor__numeric_transformer__log_creator__take_log', ['yes', 'no']) | |
preprocessor__categorical_transformer__category_combiner__combine_categories = trial.suggest_categorical( | |
'preprocessor__categorical_transformer__category_combiner__combine_categories', ['yes', 'no']) | |
feature_selector__percentile = int(trial.suggest_discrete_uniform('feature_selector__percentile', 1, 100, 1)) | |
model__max_depth = int(trial.suggest_discrete_uniform('model__max_depth', 3, 30, 1)) | |
model__min_samples_leaf = trial.suggest_uniform('model__min_samples_leaf', 0.0001, 0.01) | |
model__max_features = trial.suggest_categorical('model__max_features', ['log2', 'sqrt']) | |
params = { | |
'preprocessor__numeric_transformer__log_creator__take_log': | |
preprocessor__numeric_transformer__log_creator__take_log, | |
'preprocessor__categorical_transformer__category_combiner__combine_categories': | |
preprocessor__categorical_transformer__category_combiner__combine_categories, | |
'feature_selector__percentile': feature_selector__percentile, | |
'model__max_depth': model__max_depth, | |
'model__min_samples_leaf': model__min_samples_leaf, | |
'model__max_features': model__max_features | |
} | |
print(params) | |
trial_pipe.set_params(**params) | |
cv_score = cross_val_score(trial_pipe, x_train, y_train, n_jobs=-1, cv=cv_times, scoring=scoring).mean() | |
print(cv_score) | |
return cv_score | |
study = optuna.create_study(direction='maximize', sampler=TPESampler()) | |
study.optimize(objective, n_trials=n_trials) | |
trial = study.best_trial | |
pipeline.set_params(**trial.params) | |
pipeline.fit(x_train, y_train) | |
joblib.dump(pipeline, os.path.join(model_name, MODELS_DIRECTORY, f'{model_name}.pkl'), compress=3) | |
return pipeline |
import os | |
import joblib | |
import numpy as np | |
import time | |
from sklearn.ensemble import RandomForestClassifier | |
from data.db import get_training_data | |
from helpers.model_helpers import create_x_y_split, create_train_test_split, create_model_uid, \ | |
make_directories_if_not_exists, upload_model_directory_to_s3 | |
from modeling.config import MODEL_TRAINING_DICT, MODEL_EVALUATION_LIST, CV_SCORER, TARGET, MODELS_DIRECTORY, \ | |
DIAGNOSTICS_DIRECTORY, DATA_DIRECTORY, CLASS_CUTOFF, CALIBRATION_BINS, CV_TIMES | |
from modeling.model import train_random_forest | |
from modeling.pipelines import get_pipeline | |
from modeling.evaluate import run_omnibus_model_evaluation | |
def train(): | |
""" | |
Trains and evaluates machine learning models. | |
""" | |
script_start_time = time.time() | |
df = get_training_data() | |
df = df.drop(labels=['client_id', 'id', 'meta__inserted_at'], axis=1) | |
df[TARGET] = np.where(df[TARGET] == 'yes', 1, 0) | |
x, y = create_x_y_split(df, TARGET) | |
x_train, x_test, y_train, y_test = create_train_test_split(x, y) | |
model_name = 'random_forest' | |
model_name = create_model_uid(model_name) | |
model_directories = [os.path.join(model_name, MODELS_DIRECTORY), | |
os.path.join(model_name, DIAGNOSTICS_DIRECTORY), | |
os.path.join(model_name, DATA_DIRECTORY)] | |
make_directories_if_not_exists(model_directories) | |
joblib.dump(x_train, os.path.join(model_name, DATA_DIRECTORY, 'x_train.pkl'), compress=3) | |
joblib.dump(x_test, os.path.join(model_name, DATA_DIRECTORY, 'x_test.pkl'), compress=3) | |
joblib.dump(y_train, os.path.join(model_name, DATA_DIRECTORY, 'y_train.pkl'), compress=3) | |
joblib.dump(y_test, os.path.join(model_name, DATA_DIRECTORY, 'y_test.pkl'), compress=3) | |
pipeline = train_random_forest(x_train, y_train, get_pipeline, model_name, RandomForestClassifier(), | |
10, CV_TIMES, CV_SCORER) | |
run_omnibus_model_evaluation(pipeline, model_name, x_test, y_test, CLASS_CUTOFF, TARGET, MODEL_EVALUATION_LIST, | |
CALIBRATION_BINS, calibrated=False) | |
upload_model_directory_to_s3(model_name) | |
print("--- {} seconds for script to run ---" .format(time.time() - script_start_time)) | |
if __name__ == "__main__": | |
train() |
ultaopt is another Tree Parzen Estimator, specifically an Embedding-Tree-Parzen-Estimator. It can also be combined with Hyperband.
When using ultraopt, we need to configure our parameter search space as such.
TREE_HDL = { | |
"preprocessor__numeric_transformer__log_creator__take_log": {"_type": "choice", "_value": ["yes", "no"], | |
"_default": "yes"}, | |
"preprocessor__categorical_transformer__category_combiner__combine_categories": {"_type": "choice", | |
"_value": ["yes", "no"], | |
"_default": "yes"}, | |
"feature_selector__percentile": {"_type": "int_uniform", "_value": [1, 100], "_default": 50}, | |
"model__max_features": {"_type": "choice", "_value": ["sqrt", "log2"], "_default": "sqrt"}, | |
"model__min_samples_leaf": {"_type": "uniform", "_value": [0.0001, 0.01], "_default": 0.001}, | |
"model__max_depth": {"_type": "uniform", "_value": [3, 31], "_default": 3} | |
} |
Likewise, our train_model function becomes the following.
import joblib | |
import os | |
from sklearn.calibration import CalibratedClassifierCV | |
from sklearn.model_selection import cross_val_score | |
from ultraopt.hdl import layering_config | |
from ultraopt import fmin | |
from ultraopt.multi_fidelity import HyperBandIterGenerator | |
from modeling.config import MODELS_DIRECTORY | |
def train_model(x_train, y_train, get_pipeline_function, model_name, model, param_space, n_trials, cv_times, scoring): | |
""" | |
Trains a machine learning model, optimizes the hyperparameters, saves the serialized model into the | |
MODELS_DIRECTORY, and saves the cross validation results as a csv into the DIAGNOSTICS_DIRECTORY. | |
:param x_train: x_train dataframe | |
:param y_train: y_train series | |
:param get_pipeline_function: callable that takes model to produce a scikit-learn pipeline | |
:param model_name: name of the model | |
:param model: instantiated model | |
:param param_space: the distribution of hyperparameters to search over | |
:param n_trials: number of trial to search for optimal hyperparameters | |
:param cv_times: number of times to cross validation | |
:param scoring: scoring method used for cross validation | |
:returns: scikit-learn pipeline | |
""" | |
print(f'training {model_name}...') | |
pipeline = get_pipeline_function(model) | |
def _evaluate(config): | |
local_pipe = pipeline.set_params(**layering_config(config)) | |
return 1 - float(cross_val_score(local_pipe, x_train, y_train, scoring=scoring, cv=cv_times, n_jobs=-1).mean()) | |
hb = HyperBandIterGenerator(min_budget=1/4, max_budget=1, eta=2) | |
result = fmin(eval_func=_evaluate, config_space=param_space, optimizer="ETPE", n_iterations=n_trials, | |
multi_fidelity_iter_generator=hb) | |
best_config = result.best_config | |
pipeline.set_params(**best_config) | |
pipeline.fit(x_train, y_train) | |
joblib.dump(pipeline, os.path.join(model_name, MODELS_DIRECTORY, f'{model_name}.pkl'), compress=3) | |
return pipeline | |
SMAC3 is a Bayesian Optimization algorithm in conjunction with an aggressive racing mechanism to decide which of two parameter configurations perform better.
To use SMAC3, we need to define a ConfigSpace.
from smac.configspace import ConfigurationSpace | |
from ConfigSpace.hyperparameters import CategoricalHyperparameter, UniformFloatHyperparameter, \ | |
UniformIntegerHyperparameter | |
tree_cs = ConfigurationSpace() | |
take_log = CategoricalHyperparameter("preprocessor__numeric_transformer__log_creator__take_log", ["yes", "no"]) | |
combine_categories = \ | |
CategoricalHyperparameter("preprocessor__categorical_transformer__category_combiner__combine_categories" , | |
["yes", "no"]) | |
feature_selector = UniformIntegerHyperparameter("feature_selector__percentile", 1, 100, default_value=50) | |
max_features = CategoricalHyperparameter("model__max_features", ["sqrt", "log2"]) | |
min_samples_in_leaf = UniformFloatHyperparameter("model__min_samples_leaf", 0.0001, 0.01, default_value=0.001) | |
max_depth = UniformIntegerHyperparameter("model__max_depth", 3, 31, default_value=10) | |
tree_cs.add_hyperparameters([take_log, combine_categories, feature_selector, max_features, min_samples_in_leaf, | |
max_depth]) |
Our training function then becomes the below. Getting the best parameters is a bit of a pain, but it can be done with some work.
import joblib | |
import os | |
import numpy as np | |
import json | |
from sklearn.model_selection import cross_val_score | |
from smac.facade.smac_hpo_facade import SMAC4HPO | |
from smac.scenario.scenario import Scenario | |
from modeling.config import MODELS_DIRECTORY | |
def train_model(x_train, y_train, get_pipeline_function, model_name, model, param_space, n_trials, cv_times, scoring): | |
""" | |
Trains a machine learning model, optimizes the hyperparameters, saves the serialized model into the | |
MODELS_DIRECTORY, and saves the cross validation results as a csv into the DIAGNOSTICS_DIRECTORY. | |
:param x_train: x_train dataframe | |
:param y_train: y_train series | |
:param get_pipeline_function: callable that takes model to produce a scikit-learn pipeline | |
:param model_name: name of the model | |
:param model: instantiated model | |
:param param_space: the distribution of hyperparameters to search over | |
:param n_trials: number of trial to search for optimal hyperparameters | |
:param cv_times: number of times to cross validation | |
:param scoring: scoring method used for cross validation | |
:returns: scikit-learn pipeline | |
""" | |
print(f'training {model_name}...') | |
pipeline = get_pipeline_function(model) | |
def _evaluate(cfg): | |
local_pipe = pipeline.set_params(**cfg) | |
return 1 - float(cross_val_score(local_pipe, x_train, y_train, scoring=scoring, cv=cv_times, n_jobs=1).mean()) | |
scenario = Scenario({ | |
"run_obj": "quality", | |
"runcount-limit": n_trials, | |
"cs": param_space, | |
"deterministic": "true", | |
}) | |
smac = SMAC4HPO(scenario=scenario, rng=np.random.RandomState(42), tae_runner=_evaluate) | |
value = smac.get_tae_runner().run(param_space.get_default_configuration(), 1)[1] | |
try: | |
incumbent = smac.optimize() | |
finally: | |
incumbent = smac.solver.incumbent | |
output_dir = smac.output_dir | |
with open(os.path.join(output_dir, 'traj_aclib2.json'), 'r') as file: | |
json_strs = file.readlines() | |
lowest_cost = 100_000_000 | |
best_parameters = [] | |
for json_str in json_strs: | |
json_str = json_str.replace("'", '') | |
local_json = json.loads(json_str) | |
cost = local_json.get('cost') | |
if cost < lowest_cost: | |
lowest_cost = cost | |
best_parameters = local_json.get('incumbent') | |
parameter_dict = dict() | |
for parameter in best_parameters: | |
param_split = parameter.split('=') | |
parameter_dict[param_split[0]] = param_split[1] | |
for k, v in parameter_dict.items(): | |
try: | |
parameter_dict[k] = float(v) | |
except: | |
pass | |
pipeline.set_params(**parameter_dict) | |
pipeline.fit(x_train, y_train) | |
joblib.dump(pipeline, os.path.join(model_name, MODELS_DIRECTORY, f'{model_name}.pkl'), compress=3) | |
return pipeline |
In the previous sections, we reviewed some powerful hyperparameter tuning methodologies. A neat library called tune-sklearn provides an abstraction for some of the methodologies we've discussed. This capability is exposed via the TuneSearchCV class. This can accept multiple values in the search_optimization argument, such as "bayesian", "hyperopt", "bohb", and "optuna". For our parameter grid, a tuple represents a distribution, and a list represents categorical choices. Let's take a look at the adjustments we would need to make to train a Random Forest.
from sklear.ensemble import RandomForestClassifier | |
ENGINEERING_PARAM_GRID = { | |
'preprocessor__numeric_transformer__log_creator__take_log': ['yes', 'no'], | |
'preprocessor__categorical_transformer__category_combiner__combine_categories': ['yes', 'no'], | |
'feature_selector__percentile': (1, 100) | |
} | |
FOREST_PARAM_GRID = { | |
'model__max_depth': (3, 31), | |
'model__min_samples_leaf': (0.0001, 0.01), | |
'model__max_features': ['log2', 'sqrt'] | |
} | |
MODEL_TRAINING_DICT = { | |
'random_forest': [RandomForestClassifier(n_estimators=500), FOREST_PARAM_GRID, 50] | |
} |
import joblib | |
import os | |
import pandas as pd | |
from tune_sklearn import TuneSearchCV | |
from modeling.config import MODELS_DIRECTORY, DIAGNOSTICS_DIRECTORY, ENGINEERING_PARAM_GRID | |
def train_model(x_train, y_train, get_pipeline_function, model_name, model, param_space, n_trials, cv_times, scoring): | |
""" | |
Trains a machine learning model, optimizes the hyperparameters, saves the serialized model into the | |
MODELS_DIRECTORY, and saves the cross validation results as a csv into the DIAGNOSTICS_DIRECTORY. | |
:param x_train: x_train dataframe | |
:param y_train: y_train series | |
:param get_pipeline_function: callable that takes model to produce a scikit-learn pipeline | |
:param model_name: name of the model | |
:param model: instantiated model | |
:param param_space: the distribution of hyperparameters to search over | |
:param n_trials: number of trial to search for optimal hyperparameters | |
:param cv_times: number of times to cross validation | |
:param scoring: scoring method used for cross validation | |
:returns: scikit-learn pipeline | |
""" | |
print(f'training {model_name}...') | |
pipeline = get_pipeline_function(model) | |
param_space.update(ENGINEERING_PARAM_GRID) | |
search = TuneSearchCV(pipeline, param_distributions=param_space, n_trials=n_trials, scoring=scoring, cv=cv_times, | |
verbose=2, n_jobs=-1, search_optimization='bohb') | |
search.fit(x_train, y_train) | |
best_pipeline = search.best_estimator_ | |
cv_results = pd.DataFrame(search.cv_results_).sort_values(by=['rank_test_score'], ascending=False) | |
joblib.dump(best_pipeline, os.path.join(model_name, MODELS_DIRECTORY, f'{model_name}.pkl'), compress=3) | |
cv_results.to_csv(os.path.join(model_name, DIAGNOSTICS_DIRECTORY, f'{model_name}_cv_results.csv'), index=False) | |
return best_pipeline |
We can also perform Random Search with an early stopping mechanism. If we're not experiencing better performance, the process will stop. We do have to change our parameter grids.
from sklearn.ensemble import RandomForestClassifier | |
from scipy.stats import uniform, randint | |
ENGINEERING_PARAM_GRID = { | |
'preprocessor__numeric_transformer__log_creator__take_log': ['yes', 'no'], | |
'preprocessor__categorical_transformer__category_combiner__combine_categories': ['yes', 'no'], | |
'feature_selector__percentile': randint(1, 100) | |
} | |
FOREST_PARAM_GRID = { | |
'model__max_depth': randint(3, 31), | |
'model__min_samples_leaf': uniform(0.0001, 0.01), | |
'model__max_features': ['log2', 'sqrt'] | |
} | |
MODEL_TRAINING_DICT = { | |
'random_forest': [RandomForestClassifier(n_estimators=500), FOREST_PARAM_GRID, 10] | |
} |
import joblib | |
import os | |
import pandas as pd | |
from tune_sklearn import TuneSearchCV | |
from modeling.config import MODELS_DIRECTORY, DIAGNOSTICS_DIRECTORY, ENGINEERING_PARAM_GRID | |
def train_model(x_train, y_train, get_pipeline_function, model_name, model, param_space, n_trials, cv_times, scoring): | |
""" | |
Trains a machine learning model, optimizes the hyperparameters, saves the serialized model into the | |
MODELS_DIRECTORY, and saves the cross validation results as a csv into the DIAGNOSTICS_DIRECTORY. | |
:param x_train: x_train dataframe | |
:param y_train: y_train series | |
:param get_pipeline_function: callable that takes model to produce a scikit-learn pipeline | |
:param model_name: name of the model | |
:param model: instantiated model | |
:param param_space: the distribution of hyperparameters to search over | |
:param n_trials: number of trial to search for optimal hyperparameters | |
:param cv_times: number of times to cross validation | |
:param scoring: scoring method used for cross validation | |
:returns: scikit-learn pipeline | |
""" | |
print(f'training {model_name}...') | |
pipeline = get_pipeline_function(model) | |
param_space.update(ENGINEERING_PARAM_GRID) | |
search = TuneSearchCV(pipeline, param_distributions=param_space, n_trials=n_trials, scoring=scoring, cv=cv_times, | |
verbose=2, n_jobs=-1, search_optimization='random', early_stopping='MedianStoppingRule') | |
search.fit(x_train, y_train) | |
best_pipeline = search.best_estimator_ | |
cv_results = pd.DataFrame(search.cv_results_).sort_values(by=['rank_test_score'], ascending=False) | |
joblib.dump(best_pipeline, os.path.join(model_name, MODELS_DIRECTORY, f'{model_name}.pkl'), compress=3) | |
cv_results.to_csv(os.path.join(model_name, DIAGNOSTICS_DIRECTORY, f'{model_name}_cv_results.csv'), index=False) | |
return best_pipeline |
To note, depending on the selected search_optimization, the CalibratedClassifierCV may not be supported.
Another framework for training and optimizing machine learn models is Ray Tune. The implementation below will probably remind you of the code required to use Optuna or Hyperopt. A benefit of using something like Ray Tune is access to additional hyperparameter tuning techniques. In addition to Hyperband and BOHB, Ray Tune provides ASHA, an asynchronous version of Hyperband. This provides better parallelism and stronger elimination of poor, straggling fits. Likewise, Ray Tune also provides the MedianStoppingRule, which we used in the last section. This stops a trial if performance is below the median of other trials at a similar point in time. Additionally, Ray Tune gives us the opportunity to use Population-Based Training (PBT). This methodology starts by training a slew of models in parallel. Occasionally, low-performing models clone the state of top-performers, and a random permutation is added to the cloned configuration.
Let's see how we could use Ray Tune with ASHA.
from ray import tune | |
from sklearn.ensemble import RandomForestClassifier | |
ENGINEERING_PARAM_GRID = { | |
'preprocessor__numeric_transformer__log_creator__take_log': tune.choice(['yes', 'no']), | |
'preprocessor__categorical_transformer__category_combiner__combine_categories': tune.choice(['yes', 'no']), | |
'feature_selector__percentile': tune.randint(1, 100) | |
} | |
FOREST_PARAM_GRID = { | |
'model__max_depth': tune.randint(3, 31), | |
'model__min_samples_leaf': tune.uniform(0.0001, 0.01), | |
'model__max_features': tune.choice(['log2', 'sqrt']) | |
} | |
MODEL_TRAINING_DICT = { | |
'random_forest': [RandomForestClassifier(n_estimators=500), FOREST_PARAM_GRID, 10] | |
} |
import joblib | |
import os | |
import pandas as pd | |
from ray import tune | |
from ray.tune.schedulers import ASHAScheduler | |
from sklearn.model_selection import cross_val_score | |
from modeling.config import MODELS_DIRECTORY, ENGINEERING_PARAM_GRID | |
def train_model(x_train, y_train, get_pipeline_function, model_name, model, param_space, n_trials, cv_times, scoring): | |
""" | |
Trains a machine learning model, optimizes the hyperparameters, and saves the serialized model into the | |
MODELS_DIRECTORY. | |
:param x_train: x_train dataframe | |
:param y_train: y_train series | |
:param get_pipeline_function: callable that takes model to produce a scikit-learn pipeline | |
:param model_name: name of the model | |
:param model: instantiated model | |
:param param_space: the distribution of hyperparameters to search over | |
:param n_trials: number of trial to search for optimal hyperparameters | |
:param cv_times: number of times to cross validation | |
:param scoring: scoring method used for cross validation | |
:returns: scikit-learn pipeline | |
""" | |
print(f'training {model_name}...') | |
pipeline = get_pipeline_function(model) | |
param_space.update(ENGINEERING_PARAM_GRID) | |
def _objective(config): | |
pipeline.set_params(**config) | |
cv_score = cross_val_score(pipeline, x_train, y_train, cv=cv_times, scoring=scoring).mean() | |
tune.report(neg_log_loss=cv_score, done=True) | |
pd.set_option('display.max_rows', 500) | |
pd.set_option('display.max_columns', 500) | |
pd.set_option('display.width', 1000) | |
analysis = tune.run( | |
_objective, | |
mode='max', | |
metric='neg_log_loss', | |
config=param_space, | |
num_samples=n_trials, | |
scheduler=ASHAScheduler()) | |
pipeline.set_params(**analysis.best_config) | |
pipeline.fit(x_train, y_train) | |
joblib.dump(pipeline, os.path.join(model_name, MODELS_DIRECTORY, f'{model_name}.pkl'), compress=3) | |
return pipeline |
Automated ML allow us to throw a training set into a framework and get a predictive model. We don't have to define the models we want to try nor the parameter grids. This can be a useful starting point. We'll likely want more control over our final model, but an Auto ML framework can crunch through a ton of different options, provide us with ideas, and give us a good starting point for more modeling. TPOT is an easy-to-use Auto ML framework that uses genetic algorithms to find the "best" model. We can control the optimization through the generation and population_size parameters. The former determines the number of iterations to run the optimization. Within each generation, population_size number of models are fit. The default values are 100 generations with a population_size of 100, for a total of 10,000 different models. Simply for experimentation, I set lower values for generation and population_size along with a max run time in the example below. In reality, you'll want to let the optimization run longer. Auto ML can oftentimes run for hours or even days, depending on the size of the data and complexity of the problem. When TPOT is finished, it will output a Python script with the best model parameters from the optimization process. This is quite handy as it can provide inspiration and a basis for more experimentation. Of note, TPOT expects the training data to already be cleaned and ready for modeling. Most of our cleaning and engineering is in our scikit-learn pipeline, so we remove some of the latter steps in the pipeline and apply our main preprocessing to the data. Unfortunately, TPOT will not optimize our preprocessing parameters.
import numpy as np | |
from tpot import TPOTClassifier | |
from data.db import get_training_data | |
from helpers.model_helpers import create_x_y_split, create_train_test_split | |
from modeling.config import TARGET, CV_SCORER, CV_TIMES | |
from modeling.pipelines import get_pipeline | |
def main(): | |
df = get_training_data() | |
df = df.drop(labels=['client_id', 'id', 'meta__inserted_at'], axis=1) | |
df[TARGET] = np.where(df[TARGET] == 'yes', 1, 0) | |
x, y = create_x_y_split(df, TARGET) | |
x_train, x_test, y_train, y_test = create_train_test_split(x, y) | |
pipeline = get_pipeline(model=None) | |
pipeline.steps.pop(len(pipeline) - 1) | |
pipeline.steps.pop(len(pipeline) - 1) | |
pipeline.steps.pop(len(pipeline) - 1) | |
x_train = pipeline.fit_transform(x_train) | |
x_test = pipeline.transform(x_test) | |
tpot = TPOTClassifier(generations=5, population_size=50, scoring=CV_SCORER, cv=CV_TIMES, early_stop=3, | |
max_time_mins=10, verbosity=3, random_state=42, n_jobs=-1) | |
tpot.fit(x_train, y_train) | |
print(tpot.score(x_test, y_test)) | |
tpot.export('tpot_pipeline.py') | |
if __name__ == "__main__": | |
main() |
Another Auto ML option we have is auto-sklearn. Like with TPOT, we need to apply any necessary data cleaning functions. We can then throw our data into the AutoSklearnClassifier. Simply for experimentation, I only allow the system to try different options for five minutes in the example below. What is beneficial about auto-sklearn is that it also tries some different preprocessing techniques, such as the select percentile class we also use in our scikit-learn pipeline. In fact, we would be better served by removing some, if not most, of our preprocessing steps entirely, including the imputer or even how we encode categorical features as those pieces are also covered in auto-sklearn.
import numpy as np | |
from autosklearn.classification import AutoSklearnClassifier | |
from autosklearn.metrics import log_loss | |
from data.db import get_training_data | |
from helpers.model_helpers import create_x_y_split, create_train_test_split | |
from modeling.config import TARGET | |
from modeling.pipelines import get_pipeline | |
def main(): | |
df = get_training_data() | |
df = df.drop(labels=['client_id', 'id', 'meta__inserted_at'], axis=1) | |
df[TARGET] = np.where(df[TARGET] == 'yes', 1, 0) | |
x, y = create_x_y_split(df, TARGET) | |
x_train, x_test, y_train, y_test = create_train_test_split(x, y) | |
pipeline = get_pipeline(model=None) | |
pipeline.steps.pop(len(pipeline) - 1) | |
pipeline.steps.pop(len(pipeline) - 1) | |
pipeline.steps.pop(len(pipeline) - 1) | |
x_train = pipeline.fit_transform(x_train) | |
x_test = pipeline.transform(x_test) | |
automl = AutoSklearnClassifier(time_left_for_this_task=300, n_jobs=-1, metric=log_loss) | |
automl.fit(x_train, y_train) | |
print(automl.show_models()) | |
print() | |
predictions = automl.predict_proba(x_test) | |
print(log_loss(y_test, predictions[:, 1])) | |
if __name__ == "__main__": | |
main() | |
MLBox is yet another option for Auto ML. Here is how we can use it.
from mlbox.preprocessing import * | |
from mlbox.optimisation import * | |
from mlbox.prediction import * | |
from data.db import get_training_data | |
from modeling.config import TARGET | |
def main(): | |
df = get_training_data() | |
df = df.drop(labels=['client_id', 'id', 'meta__inserted_at'], axis=1) | |
df[TARGET] = np.where(df[TARGET] == 'yes', 1, 0) | |
train_df = df.sample(frac=0.7, random_state=200) | |
test_df = df.drop(train_df.index) | |
train_df.to_csv('train_df.csv', index=False) | |
test_df.to_csv('test_df.csv', index=False) | |
rd = Reader(sep=',') | |
df = rd.train_test_split(['train_df.csv', 'test_df.csv'], TARGET) | |
opt = Optimiser(scoring='neg_log_loss', n_folds=5) | |
opt.evaluate(None, df) | |
if __name__ == "__main__": | |
main() |
Luigi is framework that allows us to build and manage a complex series of tasks. If we think about it, training a machine learning model is certainly a complex series of tasks! Therefore, we might consider leveraging Luigi. Below is a simple implementation of training ML models using Luigi. We could clearly expand and improve it, though the implementation should suffice to exhibit Luigi. Basically, we have four Luigi tasks, coded as classes: GetData, TrainRandomForest, TrainExtraTrees, and TrainModels. By looking at the code, we can get a pretty good idea of what's going on.
Let's bring this all together. GetData is run first, followed by TrainRandomForest and TrainExtraTrees. The training is over once we have a specific text file is in our working directory.
However, what happens when one of our steps fail? This is where the beauty of Luigi kicks in. For example, if we run the script and get an error on TrainRandomForest, when we reboot the script we will not have to re-execute GetData and TrainExtraTrees. Luigi picks up where it left off on a task-by-task basis. This is super useful for long-running tasks. For instance, if GetData is quite time intensive, we don't want to have to re-run it unless necessary. With Luigi, we don't have to worry about coding logic around GetData in cases of downstream failure. Rather, Luigi looks to see if the LocalTarget is met and makes a decision from there.
import luigi | |
import joblib | |
import numpy as np | |
from datetime import date | |
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier | |
from tune_sklearn import TuneSearchCV | |
from sklearn.metrics import log_loss | |
from data.db import get_training_data | |
from helpers.model_helpers import create_x_y_split, create_train_test_split | |
from modeling.config import TARGET, FOREST_PARAM_GRID, ENGINEERING_PARAM_GRID | |
from modeling.pipelines import get_pipeline | |
class GetData(luigi.ExternalTask): | |
def output(self): | |
return luigi.LocalTarget( | |
path='y_test.pkl' | |
) | |
def run(self): | |
print('getting data...') | |
df = get_training_data() | |
df = df.drop(labels=['client_id', 'id', 'meta__inserted_at'], axis=1) | |
df[TARGET] = np.where(df[TARGET] == 'yes', 1, 0) | |
x, y = create_x_y_split(df, TARGET) | |
x_train, x_test, y_train, y_test = create_train_test_split(x, y) | |
joblib.dump(x_train, 'x_train.pkl', compress=3) | |
joblib.dump(x_test, 'x_test.pkl', compress=3) | |
joblib.dump(y_train, 'y_train.pkl', compress=3) | |
joblib.dump(y_test, 'y_test.pkl', compress=3) | |
class TrainRandomForest(luigi.Task): | |
def requires(self): | |
return GetData() | |
def output(self): | |
return luigi.LocalTarget( | |
path=(str(date.today()) + '_random_forest.pkl') | |
) | |
def run(self): | |
print('training random forest...') | |
x_train = joblib.load('x_train.pkl') | |
y_train = joblib.load('y_train.pkl') | |
x_test = joblib.load('x_test.pkl') | |
y_test = joblib.load('y_test.pkl') | |
pipeline = get_pipeline(RandomForestClassifier()) | |
FOREST_PARAM_GRID.update(ENGINEERING_PARAM_GRID) | |
search = TuneSearchCV(pipeline, param_distributions=FOREST_PARAM_GRID, n_trials=50, scoring='neg_log_loss', | |
cv=5, verbose=2, n_jobs=-1, search_optimization='random', | |
early_stopping='MedianStoppingRule') | |
search.fit(x_train, y_train) | |
best_pipeline = search.best_estimator_ | |
predictions = best_pipeline.predict_proba(x_test) | |
ll_score = log_loss(y_test, predictions[:, 1]) | |
print(ll_score) | |
joblib.dump(best_pipeline, str(date.today()) + '_random_forest.pkl') | |
class TrainExtraTrees(luigi.Task): | |
def requires(self): | |
return GetData() | |
def output(self): | |
return luigi.LocalTarget( | |
path=(str(date.today()) + '_extra_trees.pkl') | |
) | |
def run(self): | |
print('training extra trees...') | |
x_train = joblib.load('x_train.pkl') | |
y_train = joblib.load('y_train.pkl') | |
x_test = joblib.load('x_test.pkl') | |
y_test = joblib.load('y_test.pkl') | |
pipeline = get_pipeline(ExtraTreesClassifier()) | |
FOREST_PARAM_GRID.update(ENGINEERING_PARAM_GRID) | |
search = TuneSearchCV(pipeline, param_distributions=FOREST_PARAM_GRID, n_trials=50, scoring='neg_log_loss', | |
cv=5, verbose=2, n_jobs=-1, search_optimization='random', | |
early_stopping='MedianStoppingRule') | |
search.fit(x_train, y_train) | |
best_pipeline = search.best_estimator_ | |
predictions = best_pipeline.predict_proba(x_test) | |
ll_score = log_loss(y_test, predictions[:, 1]) | |
print(ll_score) | |
joblib.dump(best_pipeline, str(date.today()) + '_random_forest.pkl') | |
class TrainModels(luigi.Task): | |
def requires(self): | |
return TrainExtraTrees(), TrainRandomForest() | |
def output(self): | |
return luigi.LocalTarget( | |
path=(str(date.today()) + '.txt') | |
) | |
def run(self): | |
with open(str(date.today()) + '.txt', 'w') as file: | |
file.write('luigi tasks complete') | |
if __name__ == '__main__': | |
luigi.run() |
To run our Luigi tasks, we issue the following command from our directory root.
$ python3 modeling/luigi_ml.py --local-scheduler TrainModels
An Abstract Base Class (ABC) allows us to define a common Application Programming Interface (API). If we want to enforce certain standards for building machine learning models, we could use an ABC. Here is a beginning framework that could easily be expanded with real code.
from abc import ABC, abstractmethod | |
class MLModelBase(ABC): | |
""" An abstract base class for ML modeling """ | |
def __init__(self, model, model_name): | |
self.model = model | |
self.model_name = model_name | |
@property | |
@abstractmethod | |
def expected_output(self): | |
raise NotImplementedError() | |
@abstractmethod | |
def train(self): | |
pass | |
@abstractmethod | |
def make_prediction(self): | |
pass | |
@abstractmethod | |
def explain_predictions(self): | |
pass | |
class MyModel(MLModelBase): | |
""" Sample Implementation """ | |
expected_output = range(0, 1) | |
def train(self): | |
print('training...') | |
def make_prediction(self): | |
print('predicting...') | |
def explain_predictions(self): | |
print('explaining...') | |
if __name__ == "__main__": | |
model = MyModel('model', 'model_name') | |
print(model.expected_output) | |
print(model.model) | |
model.train() | |
model.make_prediction() | |
model.explain_predictions() |
MLModelBase is our ABC, which defines our ML training blueprint. Any class that inherits from MLModelBase must include methods called train, make_prediction, and explain_predictions. It must also have a property titled expected_output. The MyModel class does, in fact, inherit from MLModelBase and follows the prescribed structure. If MyModel did not implement a train method, our code would fail.