Home About the Author

Chapter Introduction: Model Tracking and Retraining

Our model is now in production. Hooray! One distinction of machine learning systems is they generally decay over time. This concept is sometimes difficult to grasp for the non-initiated into data science. A machine learning system is not like a website; we can't just prop it up and let it be. We need to monitor it closely, and we also need to retrain our model.

Data science projects are never done, barring getting canceled. Like a car, our model needs maintenance. Retraining your model on new data might be like your regular oil change. Occasionally you might need a bigger repair, like replacing a gasket. In machine learning, this might be akin to completely changing models (e.g. from Random Forest to XGBoost). Fortunately, some of this can be automated via AWS Batch and our CI/CD pipeline. We can schedule our model to retrain and be released into production. We can leverage our testing mechanisms to ensure a "bad" model does not go into production.

You might ask: How often should we retrain our model? My answer: it depends. The decision is contingent on how stationary the environment is. For a stock trading models, frequent retrains are likely warranted (e.g. daily or multiple times per day). For our customer churn model, the environment is very likely not as dynamic. That said, we probably don't want to go many, many months without retraining the model. Over the course of months, multiple items with the business or broader environment might change. Likewise, as months go by, we might see new types of customers the model did not experience during training, or churn rates for certain types of customers might systemically change due to broader marketing efforts. Using only a rule of thumb, my professional opinion would be to retrain the churn model every few weeks or even every few months. We likely would not experience enough day-to-day change to warrant more frequent retrains.

We could trigger a model retrain based on underling conditions. For one, we could track predicted vs. actual outcomes and have the model retrain once performance drops below a certain threshold. In environments with quick feedback mechanisms, this can work well. What's more, we could track the business impact of our model over time and trigger a retrain when this impact starts to decay. This can be a bit reactionary as it might take time for business impact to shift or be measures.

We have other related options. We can assume we would be getting new churns on a somewhat regular basis. These would represent updates to our training data. As these updates come in, we could see if they notably shift the relationship between the target and the features. For example, clients with a certain creative_id start churning at a much higher rate. This is known as concept drift. Likewise, we might also experience dataset shift, which is when the distribution of a feature changes over time. For instance, the profile_score's mean and standard deviation might shift much higher. We can monitor both concept and dataset shift and take action accordingly.

Given all these options, what option should we choose? Again, it depends on the environment and overall modeling problem. That said, I posit the following relationship exists generally.

Model Retraining Taxonomy

Data shift leads to concept shift. Concept shift will produce declines in predictive performance, which will hamper business impact. Our retraining methodology should generally follow this path as well. If we retrain based on data shift, we will prevent concept shift and upstream effects on predictive and business performance. Detecting data shift is our first line of defense. The higher we move on the taxonomy, the closer we are to losing business impact. We typically don't want to wait until we see degradations in business impact to retrain our model. An additional and important concept is proactive retrains. That is, we retrain our model on a schedule in the hopes of catching data shift at the earliest stages. This can often be a wise strategy. That said, we don't have to go crazy with retrains! We'll actually cover that issue more later in this chapter.

Thus far, we have mostly focused on the retraining piece of the equation. However, being able to retrain and knowing when you must retrain relates back to the issue of tracking. Fundamentally, we must know how well our model is performing in production. In other words, is what is actually happening consistent with what the model is predicting?

Current Payload Logging Process

Currently, we log input and output payloads to two locations: S3 and MySQL. This might seem duplicative, but good reason exists. S3 is highly, highly available. It will basically always be available, so it's a wonderful failsafe. However, MySQL is our ultimate destination, so it also make sense to just directly log to this location since we don't really care about response times in this case. (Responding in a few seconds is fine for our project, though this would be slow by industry standards). The issue is that while MySQL is highly available, I have experienced database outages on multiple occasions due to a variety of reasons. Therefore, we want a backup plan: S3. Of note, our MySQL table uses json datatypes for the column that stores input and output payloads. This means we can simply add and subtract fields from our payloads without breaking the logging.

ETL Logs into MySQL

Let's say we could only log to S3 from our application (which we might do to speed up our response times!), but we still wanted our logs to end up in MySQL. We could run the following as an ETL (extract, transform, and load) job on AWS Batch to accomplish this goal.

import pandas as pd
import boto3
import os
import json
import glob
import sqlalchemy
import shutil
from ds_helpers import db, aws
def download_folder_from_s3(bucket_name, directory):
s3_resource = boto3.resource('s3')
bucket = s3_resource.Bucket(bucket_name)
if directory == '/':
directory_name = f's3_{bucket_name}'
if not os.path.exists(directory_name):
os.makedirs(directory_name)
for s3_object in bucket.objects.all():
bucket.download_file(s3_object.key, os.path.join(directory_name, s3_object.key))
else:
for s3_object in bucket.objects.filter(Prefix=directory):
if not os.path.exists(os.path.dirname(s3_object.key)):
os.makedirs(os.path.dirname(s3_object.key))
bucket.download_file(s3_object.key, s3_object.key)
def insert_records(directory, table_name, schema_name, db_secret_name):
main_df = pd.DataFrame()
for path in glob.glob(f'{directory}/*'):
with open(path, 'r') as file:
payload_dict = json.loads(file.read().replace("\'", "\""))
uid = payload_dict.get('input').get('uid')
logging_timestamp = payload_dict.get('output').get('logging_timestamp')
temp_df = pd.DataFrame({'uid': [uid],
'logging_timestamp': [logging_timestamp],
'input_output_payloads': [payload_dict]})
main_df = main_df.append(temp_df)
main_df.to_sql(name=table_name, schema=schema_name, con=db.connect_to_mysql(
aws.get_secrets_manager_secret(db_secret_name),
ssl_path='data/rds-ca-2019-root.pem'), dtype={'input_output_payloads': sqlalchemy.types.JSON},
if_exists='append', index=False)
def main(bucket_name, directory, table_name, schema_name, db_secret_name):
local_directory = f's3_{bucket_name}'
try:
download_folder_from_s3(bucket_name, directory)
insert_records(local_directory, table_name, schema_name, db_secret_name)
finally:
if os.path.exists(local_directory):
shutil.rmtree(local_directory)
if __name__ == "__main__":
main('churn-model-data-science-logs', '/', 'model_logs', 'churn_model', 'churn-model-mysql')
import pandas as pd
import boto3
import os
import json
import glob
import sqlalchemy
import shutil
from time import sleep
from ds_helpers import db, aws
def get_most_recent_db_insert(db_secret_name: str) -> str:
"""
Gets the most recent logging_timestamp inserted into the churn_model.model_logs table
:param db_secret_name: name of the Secrets Manager secret for DB credentials
:returns: most recent logging_timestamp
"""
query = '''
select max(logging_timestamp) as max_insert
from churn_model.model_logs;
'''
df = pd.read_sql(query, db.connect_to_mysql(aws.get_secrets_manager_secret(db_secret_name),
ssl_path='data/rds-ca-2019-root.pem'))
max_insert = df['max_insert'][0]
return max_insert
def run_athena_query(client, start_timestamp):
response = client.start_query_execution(
QueryString=f'''select input.uid from ds_churn_logs where output.logging_timestamp >= TIMESTAMP '{start_timestamp}';''',
QueryExecutionContext={
'Database': 'churn_model'
},
ResultConfiguration={
'OutputLocation': 's3://athena-churn-results/'
}
)
return response
def get_athena_file_name(client, execution_response):
execution_id = execution_response['QueryExecutionId']
state = 'RUNNING'
while state != 'SUCCEEDED':
response = client.get_query_execution(QueryExecutionId=execution_id)
state = response['QueryExecution']['Status']['State']
if state == 'SUCCEEDED':
s3_file_name = response['QueryExecution']['ResultConfiguration']['OutputLocation']
s3_file_name = s3_file_name.split("/", 3)[3]
return s3_file_name
sleep(1)
def get_uids_to_download(file_name):
aws.download_file_from_s3(file_name, 'athena-churn-results')
df = pd.read_csv(file_name)
uids = df['uid'].tolist()
os.remove(file_name)
return uids
def download_new_payloads(uids, local_directory):
if not os.path.exists(local_directory):
os.makedirs(local_directory)
for uid in uids:
aws.download_file_from_s3(f'{uid}.json', 'churn-model-data-science-logs')
os.replace(f'{uid}.json', os.path.join(local_directory, f'{uid}.json'))
def insert_records(directory, table_name, schema_name, db_secret_name):
main_df = pd.DataFrame()
for path in glob.glob(f'{directory}/*'):
with open(path, 'r') as file:
payload_dict = json.loads(file.read().replace("\'", "\""))
uid = payload_dict.get('input').get('uid')
logging_timestamp = payload_dict.get('output').get('logging_timestamp')
temp_df = pd.DataFrame({'uid': [uid],
'logging_timestamp': [logging_timestamp],
'input_output_payloads': [payload_dict]})
main_df = main_df.append(temp_df)
main_df.to_sql(name=table_name, schema=schema_name, con=db.connect_to_mysql(
aws.get_secrets_manager_secret(db_secret_name),
ssl_path='data/rds-ca-2019-root.pem'), dtype={'input_output_payloads': sqlalchemy.types.JSON},
if_exists='append', index=False)
def main(local_payloads_directory, db_secret_name, table_name, schema_name):
try:
client = boto3.client('athena')
max_logging_timestamp = get_most_recent_db_insert(db_secret_name)
athena_response = run_athena_query(client, max_logging_timestamp)
athena_results_file_name = get_athena_file_name(client, athena_response)
uids_to_download = get_uids_to_download(athena_results_file_name)
download_new_payloads(uids_to_download, local_payloads_directory)
insert_records(local_payloads_directory, table_name, schema_name, db_secret_name)
finally:
pass
if os.path.exists(local_payloads_directory):
shutil.rmtree(local_payloads_directory)
if __name__ == "__main__":
main('temp_payloads', 'churn-model-mysql', 'model_logs', 'churn_model')

ETL Logs from S3 into DynamoDB

Let's say we only wanted to log payloads to S3 and wanted to move them over in batch to DyanmoDB. We could leverage the following script. However, a notable limitation exists if we want to run many batches over time (which is likely). We ideally want to only insert the records created after our last table insert. Unfortunately, DynamoDB doesn't give us an easy way to efficiently discover the most recent record based on the logging_timestamp. We could pull down every record and find the value in pandas, but as our table grows, this isn't a super great option.

import boto3
import os
import glob
import json
from time import sleep
from decimal import Decimal
def download_folder_from_s3(bucket_name, directory):
s3_resource = boto3.resource('s3')
bucket = s3_resource.Bucket(bucket_name)
if directory == '/':
directory_name = f's3_{bucket_name}'
if not os.path.exists(directory_name):
os.makedirs(directory_name)
for s3_object in bucket.objects.all():
bucket.download_file(s3_object.key, os.path.join(directory_name, s3_object.key))
else:
for s3_object in bucket.objects.filter(Prefix=directory):
if not os.path.exists(os.path.dirname(s3_object.key)):
os.makedirs(os.path.dirname(s3_object.key))
bucket.download_file(s3_object.key, s3_object.key)
def create_dynamo_table(table_name, key_field):
dynamodb = boto3.resource('dynamodb')
dynamodb.create_table(
TableName=table_name,
KeySchema=[
{
'AttributeName': key_field,
'KeyType': 'HASH'
},
],
AttributeDefinitions=[
{
'AttributeName': key_field,
'AttributeType': 'S'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 10,
'WriteCapacityUnits': 10
}
)
sleep(5)
def insert_records(directory, table_name):
dynamodb = boto3.resource('dynamodb')
dynamo_table = dynamodb.Table(table_name)
for path in glob.glob(f'{directory}/*'):
with open(path, 'r') as file:
payload_dict = json.loads(json.dumps(json.loads(file.read().replace("\'", "\""))), parse_float=Decimal)
uid = payload_dict.get('input').get('uid')
uid_dict = dict()
uid_dict['uid'] = uid
uid_dict.update(payload_dict)
dynamo_table.put_item(Item=uid_dict)
def main(s3_bucket, directory, dynamo_table, key_field, create_table=True):
if create_table:
download_folder_from_s3(s3_bucket, directory)
create_dynamo_table(dynamo_table, key_field)
insert_records(f's3_{s3_bucket}', dynamo_table)
if __name__ == "__main__":
main('churn-model-data-science-logs', '/', 'churn_logs', 'uid')

Email Push Notifications

Sentry will alert us of errors in our application. However, it will not alert us to anomalies or other behavior of which we might want to be aware. For this purpose, we might set up an AWS Batch job that queries our logs table and triggers an email when a certain condition occurs. This might run every, say, 10 minutes. Below is a script that will simply count the number of requests and calculate the average predicted probability in a time period. If the count is above a certain threshold, an email will be triggered. Likewise, we could simply change the criteria a bit and have this be a daily report or something of the like.

import pandas as pd
import pytz
import datetime
import yagmail
import ast
from ds_helpers import aws, db
def get_start_timestamp(minute_lookback):
now_cst = datetime.datetime.strptime(datetime.datetime.now(pytz.timezone('US/Central')).strftime(
'%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S')
start_timestamp = now_cst - datetime.timedelta(minutes=minute_lookback)
return start_timestamp
def query_logs_table(db_secret_name, start_timestamp):
query = f'''
select JSON_EXTRACT(input_output_payloads, "$.output.prediction") as prediction
FROM churn_model.model_logs
where logging_timestamp >= '{start_timestamp}';
'''
df = pd.read_sql(query, db.connect_to_mysql(aws.get_secrets_manager_secret(db_secret_name),
ssl_path='data/rds-ca-2019-root.pem'))
return df
def count_observation(df):
return len(df)
def determine_should_email_be_sent(count, count_threshold):
if count >= count_threshold:
return True
else:
return False
def get_mean_prediction(df):
return df['prediction'].astype(float).mean()
def send_email(email_secret, report_timestamp, mean_prediction, prediction_count):
email_dict = aws.get_secrets_manager_secret(email_secret)
username = email_dict.get('username')
password = email_dict.get('password')
yag = yagmail.SMTP(username, password)
recipients = email_dict.get('recipients')
recipients = ast.literal_eval(recipients)
subject = f'Prediction Report since {report_timestamp}'
contents = f'Number of predictions during window: {prediction_count}. \n Average prediction: {mean_prediction}.'
yag.send(to=recipients, subject=subject, contents=contents)
def main(db_secret_name, minute_lookback, count_trigger_threshold, email_secret):
start_timestamp = get_start_timestamp(minute_lookback)
logs_df = query_logs_table(db_secret_name, start_timestamp)
prediction_count = count_observation(logs_df)
should_send_email = determine_should_email_be_sent(prediction_count, count_trigger_threshold)
if should_send_email:
mean_prediction = get_mean_prediction(logs_df)
send_email(email_secret, start_timestamp, mean_prediction, prediction_count)
if __name__ == "__main__":
main(db_secret_name='churn-model-mysql', minute_lookback=30, count_trigger_threshold=5, email_secret='churn-email')

SMS Push Notifications

Rather than an email, we might want to shoot off a text message. Twilio makes it quite easy to send text messages. Follow this tutorial to get started. We can then use the below function to fire off text messages using the following function.

import os
from twilio.rest import Client
account_sid = os.environ['TWILIO_ACCOUNT_SID']
auth_token = os.environ['TWILIO_AUTH_TOKEN']
client = Client(account_sid, auth_token)
def main(body, recipient_number):
message = client.messages.create(
to=recipient_number,
from_=os.environ['TWILIO_FROM_NUMBER'],
body=body
)

Likewise, we can easily adjust our email notification script be more general and send an SMS notification instead.

Tracking Data Shift

Data shift is a crucial topic in machine learning. If the distributions of our features are different compared to when we trained our model, a retrain might be warranted. Without it, our model might not know how to best handle certain features. We can employ a univariate approach to track data shift. If we detect data shift, we can automatically trigger a model retrain.

import pandas as pd
import numpy as np
import os
import joblib
from scipy.stats import ks_2samp, chisquare
from ds_helpers import aws, db
from app_settings import MODEL_1_PATH, MODEL_FEATURES
def extract_model_uid_from_path(model_path):
return model_path.split('/')[1]
def get_query_start_timestamp(model_uid, db_conn):
query = f'''
select training_timestamp
from churn_model.model_meta_data
where model_uid = '{model_uid}';
'''
df = pd.read_sql(query, db_conn)
start_timestamp = df['training_timestamp'][0]
return start_timestamp
def extract_production_data(start_timestamp, model_uid, db_conn):
query = f'''
select JSON_EXTRACT(input_output_payloads, "$.input.*") as "values",
JSON_KEYS(input_output_payloads, "$.input") as "keys"
from (
select * from churn_model.model_logs
where JSON_EXTRACT(input_output_payloads, "$.output.model_used") = 'model_1'
and JSON_EXTRACT(input_output_payloads, "$.output.model_1_path") = '{model_uid}'
union
select * from churn_model.model_logs
where JSON_EXTRACT(input_output_payloads, "$.output.model_used") = 'model_2'
and JSON_EXTRACT(input_output_payloads, "$.output.model_2_path") = '{model_uid}'
) model_output
where logging_timestamp >= '{start_timestamp}';'''
df = pd.read_sql(query, db_conn)
columns = df['keys'][0]
columns = columns.strip('][').split(', ')
columns = [c.replace('"', '') for c in columns]
df.drop('keys', 1, inplace=True)
df['values'] = df['values'].str.replace('[', '').str.replace(']', '')
df = df['values'].str.split(',', expand=True)
df.columns = columns
df.drop(['uid', 'url', 'endpoint'], 1, inplace=True)
for col in list(df):
df[col] = df[col].str.replace('"', '')
df[col] = df[col].str.strip()
try:
df[col] = df[col].astype(float)
except ValueError:
pass
return df
def recreate_data_used_for_training(model_uid):
path = os.path.join(model_uid, 'data')
aws.download_folder_from_s3('churn-model-data-science-modeling', path)
x_train = joblib.load(os.path.join(path, 'x_train.pkl'))
x_train.reset_index(inplace=True, drop=True)
x_test = joblib.load(os.path.join(path, 'x_test.pkl'))
x_test.reset_index(inplace=True, drop=True)
x_df = pd.concat([x_train, x_test], axis=0)
x_df = x_df[MODEL_FEATURES]
return x_df
def calculate_ks_statistic(training_df, production_df, feature):
try:
ks_result = ks_2samp(training_df[feature], production_df[feature])
return ks_result[1]
except KeyError:
return 0.00
def prep_category_for_chi_squared(training_df, production_df, feature):
try:
training_feature_grouped = pd.DataFrame(training_df.groupby(feature)[feature].count())
training_feature_grouped.columns = ['train_count']
training_feature_grouped['train_count'] = (training_feature_grouped['train_count'] /
training_feature_grouped['train_count'].sum()) * 1_000
training_feature_grouped['train_count'] = training_feature_grouped['train_count'] + 1
training_feature_grouped.reset_index(inplace=True)
except KeyError:
training_feature_grouped = pd.DataFrame({'train_count': [], feature: []})
try:
production_feature_grouped = pd.DataFrame(production_df.groupby(feature)[feature].count())
production_feature_grouped.columns = ['prod_count']
production_feature_grouped['prod_count'] = (production_feature_grouped['prod_count'] /
production_feature_grouped['prod_count'].sum()) * 1_000
production_feature_grouped['prod_count'] = production_feature_grouped['prod_count'] + 1
production_feature_grouped.reset_index(inplace=True)
except KeyError:
production_feature_grouped = pd.DataFrame({'prod_count': [], feature: []})
merged_df = pd.merge(training_feature_grouped, production_feature_grouped, how='outer', on=feature)
merged_df.fillna(0, inplace=True)
merged_df['train_count'] = merged_df['train_count'].astype(int)
merged_df['prod_count'] = merged_df['prod_count'].astype(int)
return merged_df
def calculate_chi_squared_statistic(training_series, production_series):
chi_result = chisquare(f_obs=production_series, f_exp=training_series)
return chi_result[1]
def main(model_path, db_secret_name, p_value_cutoff):
db_conn = db.connect_to_mysql(aws.get_secrets_manager_secret(db_secret_name),
ssl_path=os.path.join('data', 'rds-ca-2019-root.pem'))
model_uid = extract_model_uid_from_path(model_path)
query_start_time = get_query_start_timestamp(model_uid, db_conn)
production_df = extract_production_data(query_start_time, model_uid, db_conn)
original_training_df = recreate_data_used_for_training(model_uid)
cat_production_df = production_df.select_dtypes(include='object')
num_production_df = production_df.select_dtypes(exclude='object')
cat_training_df = original_training_df.select_dtypes(include='object')
num_training_df = original_training_df.select_dtypes(exclude='object')
cat_columns = set(list(cat_production_df) + list(cat_training_df))
num_columns = set(list(num_production_df) + list(num_training_df))
main_drift_df = pd.DataFrame()
for cat_col in cat_columns:
temp_chi_squared_df = prep_category_for_chi_squared(cat_training_df, cat_production_df, cat_col)
p_value = calculate_chi_squared_statistic(temp_chi_squared_df['train_count'],
temp_chi_squared_df['prod_count'])
temp_drift_df = pd.DataFrame({'feature': [cat_col], 'p_value': [p_value]})
main_drift_df = main_drift_df.append(temp_drift_df)
for num_col in num_columns:
p_value = calculate_ks_statistic(num_training_df, num_production_df, num_col)
temp_drift_df = pd.DataFrame({'feature': [num_col], 'p_value': [p_value]})
main_drift_df = main_drift_df.append(temp_drift_df)
main_drift_df['shift_occurred'] = np.where(main_drift_df['p_value'] <= p_value_cutoff, True, False)
main_drift_df['p_value_cutoff'] = p_value_cutoff
db.write_dataframe_to_database(main_drift_df, 'churn_model', 'data_shift', db_conn)
if __name__ == "__main__":
main(model_path=MODEL_1_PATH, db_secret_name='churn-model-mysql', p_value_cutoff=0.05)

Tracking Concept Shift

Concept shift is a related yet distinct concept compared to data shift. Concept shift refers to a change in the actual underlying relationship between our target and predictors. For example, activity_score was predictive, but an environmental or technological change has made it uninformative.

How does one track concept shift? With data shift, we can look at each feature and run some handy statistical tests. However, concept shift is a multivariate problem rather than a univariate one. For such a problem, we can turn to our old friend machine learning. We will pull the copy of our training data from S3, extract the input payloads from our production application, concatenate the two, and see if we can predict which rows are from production. If our test set score is above a given threshold, we ascertain that concept shift has occurred. If it doesn't, then we assert concept shift has not occurred.

import pandas as pd
import numpy as np
import os
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer, make_column_selector as selector
from sklearn.impute import SimpleImputer
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import GridSearchCV
from sklearn.feature_extraction import DictVectorizer
from sklearn.preprocessing import FunctionTransformer
from ds_helpers import aws, db
from helpers.model_helpers import create_train_test_split, create_x_y_split, FeaturesToDict, fill_missing_values
from app_settings import MODEL_1_PATH, MODEL_FEATURES
def extract_model_uid_from_path(model_path):
return model_path.split('/')[1]
def get_query_start_timestamp(model_uid, db_conn):
query = f'''
select training_timestamp
from churn_model.model_meta_data
where model_uid = '{model_uid}';
'''
df = pd.read_sql(query, db_conn)
start_timestamp = df['training_timestamp'][0]
return start_timestamp
def extract_production_data(start_timestamp, model_uid, db_conn):
query = f'''
select JSON_EXTRACT(input_output_payloads, "$.input.*") as "values",
JSON_KEYS(input_output_payloads, "$.input") as "keys"
from (
select * from churn_model.model_logs
where JSON_EXTRACT(input_output_payloads, "$.output.model_used") = 'model_1'
and JSON_EXTRACT(input_output_payloads, "$.output.model_1_path") = '{model_uid}'
union
select * from churn_model.model_logs
where JSON_EXTRACT(input_output_payloads, "$.output.model_used") = 'model_2'
and JSON_EXTRACT(input_output_payloads, "$.output.model_2_path") = '{model_uid}'
) model_output
where logging_timestamp >= '{start_timestamp}';'''
df = pd.read_sql(query, db_conn)
columns = df['keys'][0]
columns = columns.strip('][').split(', ')
columns = [c.replace('"', '') for c in columns]
df.drop('keys', 1, inplace=True)
df['values'] = df['values'].str.replace('[', '').str.replace(']', '')
df = df['values'].str.split(',', expand=True)
df.columns = columns
df.drop(['uid', 'url', 'endpoint'], 1, inplace=True)
for col in list(df):
df[col] = df[col].str.replace('"', '')
try:
df[col] = df[col].astype(float)
df[col] = df[col].str.strip()
except ValueError:
pass
return df
def recreate_data_used_for_training(model_uid):
path = os.path.join(model_uid, 'data')
aws.download_folder_from_s3('churn-model-data-science-modeling', path)
x_train = joblib.load(os.path.join(path, 'x_train.pkl'))
x_train.reset_index(inplace=True, drop=True)
x_test = joblib.load(os.path.join(path, 'x_test.pkl'))
x_test.reset_index(inplace=True, drop=True)
x_df = pd.concat([x_train, x_test], axis=0)
x_df = x_df[MODEL_FEATURES]
return x_df
def create_training_data(original_training_df, production_df):
original_training_df['target'] = 0
production_df['target'] = 1
training_df = pd.concat([original_training_df, production_df], axis=0)
training_df.reset_index(inplace=True, drop=True)
training_df.drop('acquired_date', 1, inplace=True)
training_df = training_df.fillna(value=np.nan)
x, y = create_x_y_split(training_df, 'target')
x_train, x_test, y_train, y_test = create_train_test_split(x, y)
return x_train, x_test, y_train, y_test
def train_model(x_train, y_train):
param_grid = {
'model__max_depth': [5, 10, 15],
'model__min_samples_leaf': [None, 3],
'model__max_features': ['log2', 'sqrt']
}
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='mean'))
])
categorical_transformer = Pipeline(steps=[
('imputer', FunctionTransformer(fill_missing_values, validate=False,
kw_args={'fill_value': 'unknown'})),
('dict_creator', FeaturesToDict()),
('dict_vectorizer', DictVectorizer(sparse=False)),
])
preprocessor = ColumnTransformer(
transformers=[
('numeric_transformer', numeric_transformer, selector(dtype_include='number')),
('categorical_transformer', categorical_transformer, selector(dtype_exclude='number'))
]
)
pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('model', RandomForestClassifier())
])
search = GridSearchCV(pipeline, param_grid=param_grid, scoring='roc_auc', n_jobs=-1, cv=3)
search.fit(x_train, y_train)
best_pipeline = search.best_estimator_
return best_pipeline
def evaluate_model(estimator, x_test, y_test):
predictions = estimator.predict_proba(x_test)
roc_auc = roc_auc_score(y_test, predictions[:, 1])
return roc_auc
def determine_if_concept_shift_has_occurred(score, threshold):
if score >= threshold:
return True
else:
return False
def main(model_path, db_secret_name, scoring_threshold):
db_conn = db.connect_to_mysql(aws.get_secrets_manager_secret(db_secret_name),
ssl_path=os.path.join('data', 'rds-ca-2019-root.pem'))
model_uid = extract_model_uid_from_path(model_path)
query_start_time = get_query_start_timestamp(model_uid, db_conn)
production_df = extract_production_data(query_start_time, MODEL_1_PATH, db_conn)
original_training_df = recreate_data_used_for_training(model_uid)
x_train, x_test, y_train, y_test = create_training_data(original_training_df, production_df)
pipeline = train_model(x_train, y_train)
model_score = evaluate_model(pipeline, x_test, y_test)
shift_occurred = determine_if_concept_shift_has_occurred(roc_auc, scoring_threshold)
insert_statement = f'''
INSERT INTO churn_model.concept_shift (shift_occurred, metric_used, scoring_threshold, model_score, model_uid)
VALUES ({shift_occurred}, 'roc_auc', {scoring_threshold}, {model_score}, '{model_uid}');
'''
db_conn.execute(insert_statement)
print(f'Has concept shift occurred: {shift_occurred}')
if __name__ == "__main__":
main(model_path=MODEL_1_PATH, db_secret_name='churn-model-mysql', scoring_threshold=0.55)

Building a Model Tracking Dashboard with Streamlit

We've built some nifty tools to help us track our model. However, it might be nice to visualize such items and have access to them on demand. For that task, we can use our old buddy Streamlit. When we're building a model tracking dashboard, we should cover the following items at minimum.

  • Business Impact. This is why we use machine learning! At the end of the day, we must be able to quantify the impact our model is making. In our case, this might involve estimating how many clients our model has helped save. How would we do this? In our case, this might involve comparing churn rates for clients given the "real" model vs clients given the heuristic model. We can then multiply the number of clients "saved" by some lifetime value metric to get the bottomline financial impact. What's more, we need to track this impact over time to capture any decays. One note, if we expect clients to be seen our machine learning app multiple times, we must ensure an individual client always receives eitherthe "real" model or the heuristic model.
  • Predictive Performance. Do the model's predictions align with actual outcomes? I submit we care about tracking the model's calibration for this piece.
  • Data Shift. As we've talked about, data shift is our first line of defense against declines in model performance. We could easily add a section for concept shift as well.

Most of the data you need about your models should be in your MySQL logs table. Depending on your situation, you might need data from a downstream system to understand how the models' predictions were used. Of course, you'll need to tie in data from your systems of record to understand the ultimate outcome (e.g. did the customer end up churning?). Below is a sample of what a model tracking dashboard might look like.

Data Drift

Automatic Model Retrains and Deployments

Each time we want to release a new model, we don't always want to do it manually. We want to automate this process if possible. When we retrain our model, we have two options: we can refit our existing model on new data or perform a completely new search for a "best" model. The former option is comparatively straightforward. The latter perhaps should never be automated. However, when we're refitting our model, we can do so relatively quickly and easily.

First, we need a simple retraining script. This script doesn't check for anything like data shift, but such a mechanism would be easy to build. At the moment, this is a "proactive" retraining process.

import joblib
import os
from zipfile import ZipFile
from ds_helpers import aws
from helpers.model_helpers import create_x_y_split
from data.db import get_training_data
from modeling.config import TARGET
def main(s3_bucket):
"""
Takes a model from S3, retrains it on new data, and uploads it to S3.
:param s3_bucket: S3 bucket where the model lives
"""
df = get_training_data()
df = df.drop(['client_id', 'id', 'meta__inserted_at'], 1)
df[TARGET] = np.where(df[target] == 'yes', 1, 0)
x, y = create_x_y_split(df, target)
aws.download_file_from_s3('model.zip', s3_bucket)
os.makedirs('original_model')
with ZipFile('model.zip', 'r') as zip_file:
zip_file.extractall('original_model')
original_model = joblib.load(os.path.join('original_model', 'model.pkl'))
retrained_model = original_model.fit(x, y)
joblib.dump(retrained_model, 'model.pkl')
with ZipFile('model.zip', 'w') as zip_file:
zip_file.write('model.pkl')
aws.upload_file_to_s3('model.zip', s3_bucket)
if __name__ == "__main__":
main()
view raw retrain.py hosted with ❤ by GitHub

We can set the script to run on AWS Batch on whatever scheduled we like. However, how do we release this new model into production? We need to amend our CodePipeline built with Pulumi. We will add a second source step: an S3 bucket. Whenever an upload hits our s3_object_key, the CI/CD build will kick off. The CI/CD build will pull 1) code from the most recent git commit to the source CodeCommit repo and 2) the most recent file in the source S3 bucket. The file in the S3 bucket will, of course, be our pickled model that will be uploaded by retrain.py.

import pulumi_aws as aws
def main(codecommit_repository_name, codecommit_branch_name, codecommit_repo_arn, ecr_arn, build_role_name,
build_project_name, network_interface_region, network_interface_owner_id, private_subnet_arn,
build_s3_bucket_arn, build_vpc_id, private_subnet_id, build_security_group_id, pipeline_role_name,
pipeline_name, pipeline_s3_bucket_name, pipeline_policy_name, pipeline_s3_bucket_arn, stage_ecs_cluster_name,
stage_ecs_service_name, prod_ecs_cluster_name, prod_ecs_service_name, source_s3_bucket_arn,
source_s3_bucket_name, s3_object_key):
"""
Creates a CodePipeline to deploy an ECS application. The following structure is employed:
- GitHub source
- S3 source
- CodeBuild build
- Stage Deploy
- Manual Approval
- Prod Deploy
Appropriate roles and policies are created and used.
:param pipeline_role_name: name for the execution role to run the pipeline
:type pipeline_role_name: str
:param pipeline_name: name of the pipeline
:type pipeline_name: str
:param pipeline_s3_bucket_name: name of the s3 bucket
:type pipeline_s3_bucket_name: str
:param repository_name: name of the git repo
:type repository_name: str
:param branch_name: name of the git branch
:type branch_name: str
:param build_project_name: name of the build project
:type build_project_name: str
:param pipeline_policy_name: name for the pipeline policy
:type pipeline_policy_name: str
:param pipeline_s3_bucket_arn: arn of the s3 bucket
:type pipeline_s3_bucket_arn: str
:param stage_ecs_cluster_name: name of the staging ECS cluster
:type stage_ecs_cluster_name: str
:param stage_ecs_service_name: name of the staging ECS service
:type stage_ecs_service_name: str
:param prod_ecs_cluster_name: name of the production ECS cluster
:type prod_ecs_cluster_name: str
:param prod_ecs_service_name: name of the production ECS service
:type prod_ecs_service_name: str
:param source_s3_bucket_arn: arn of the S3 bucket used in the source step
:type source_s3_bucket_arn: str
:param source_s3_bucket_name: name of the S3 bucket used in the source step
:type source_s3_bucket_name: str
:param s3_object_key: name of the S3 file to trigger the pipeline
:type s3_object_key: str
"""
codebuild_role = aws.iam.Role(build_role_name, assume_role_policy="""{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "codebuild.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
""")
role_policy = aws.iam.RolePolicy(f"{build_role_name}_role_policy",
role=codebuild_role.name,
policy=f"""{{
"Version": "2012-10-17",
"Statement": [
{{
"Effect": "Allow",
"Resource": [
"*"
],
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
}},
{{
"Effect": "Allow",
"Action": [
"ec2:CreateNetworkInterface",
"ec2:DescribeDhcpOptions",
"ec2:DescribeNetworkInterfaces",
"ec2:DeleteNetworkInterface",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups",
"ec2:DescribeVpcs"
],
"Resource": "*"
}},
{{
"Effect": "Allow",
"Action": [
"ec2:CreateNetworkInterfacePermission"
],
"Resource": [
"arn:aws:ec2:{network_interface_region}:{network_interface_owner_id}:network-interface/*"
],
"Condition": {{
"StringEquals": {{
"ec2:Subnet": [
"{private_subnet_arn}"
],
"ec2:AuthorizedService": "codebuild.amazonaws.com"
}}
}}
}},
{{
"Effect": "Allow",
"Action": [
"*"
],
"Resource": [
"{build_s3_bucket_arn}",
"{build_s3_bucket_arn}/*",
"{pipeline_s3_bucket_arn}",
"{pipeline_s3_bucket_arn}/*",
"{source_s3_bucket_arn}",
"{source_s3_bucket_arn}/*"
]
}},
{{
"Effect": "Allow",
"Action": [
"ecr:GetRegistryPolicy",
"ecr:DescribeRegistry",
"ecr:GetAuthorizationToken",
"ecr:DeleteRegistryPolicy",
"ecr:PutRegistryPolicy",
"ecr:PutReplicationConfiguration"
],
"Resource": [
"*"
]
}},
{{
"Effect": "Allow",
"Action": [
"ecr:*"
],
"Resource": [
"{ecr_arn}"
]
}}
]
}}
""")
codebuild_project = aws.codebuild.Project(build_project_name,
name=build_project_name,
description=f"{build_project_name}_codebuild_project",
build_timeout=15,
queued_timeout=15,
service_role=codebuild_role.arn,
artifacts=aws.codebuild.ProjectArtifactsArgs(
type="CODEPIPELINE",
),
environment=aws.codebuild.ProjectEnvironmentArgs(
compute_type="BUILD_GENERAL1_SMALL",
image="aws/codebuild/standard:3.0",
type="LINUX_CONTAINER",
image_pull_credentials_type="CODEBUILD",
privileged_mode=True
),
logs_config=aws.codebuild.ProjectLogsConfigArgs(
cloudwatch_logs=aws.codebuild.ProjectLogsConfigCloudwatchLogsArgs(
group_name="log-group",
stream_name="log-stream",
),
s3_logs=aws.codebuild.ProjectLogsConfigS3LogsArgs(
status="ENABLED",
location=build_s3_bucket_arn,
),
),
source=aws.codebuild.ProjectSourceArgs(
type="CODEPIPELINE",
),
vpc_config=aws.codebuild.ProjectVpcConfigArgs(
vpc_id=build_vpc_id,
subnets=[
private_subnet_id,
],
security_group_ids=[
build_security_group_id,
],
),
)
codepipeline_role = aws.iam.Role(pipeline_role_name, assume_role_policy="""{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "codepipeline.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
""")
codepipeline = aws.codepipeline.Pipeline(pipeline_name,
name=pipeline_name,
role_arn=codepipeline_role.arn,
artifact_store=aws.codepipeline.PipelineArtifactStoreArgs(
location=pipeline_s3_bucket_name,
type="S3",
),
stages=[
aws.codepipeline.PipelineStageArgs(
name="Source",
actions=[
aws.codepipeline.PipelineStageActionArgs(
name="CodeSource",
category="Source",
owner="AWS",
provider="CodeCommit",
version="1",
output_artifacts=["SourceArtifact"],
namespace='SourceVariables',
run_order=1,
configuration={
"RepositoryName": codecommit_repository_name,
"BranchName": codecommit_branch_name,
},
),
aws.codepipeline.PipelineStageActionArgs(
name="FileSource",
category="Source",
owner="AWS",
provider="S3",
version="1",
output_artifacts=["file"],
run_order=1,
configuration={
"S3Bucket": source_s3_bucket_name,
"S3ObjectKey": s3_object_key,
"PollForSourceChanges": "false",
},
),
],
),
aws.codepipeline.PipelineStageArgs(
name="Build",
actions=[aws.codepipeline.PipelineStageActionArgs(
name="Build",
category="Build",
owner="AWS",
provider="CodeBuild",
input_artifacts=["SourceArtifact"],
output_artifacts=["BuildArtifact"],
namespace='BuildVariables',
version="1",
run_order=1,
configuration={
"ProjectName": build_project_name,
},
)],
),
aws.codepipeline.PipelineStageArgs(
name="StageDeploy",
actions=[aws.codepipeline.PipelineStageActionArgs(
name="StageDeploy",
category="Deploy",
owner="AWS",
provider="ECS",
input_artifacts=["BuildArtifact"],
version="1",
configuration={
"ClusterName": stage_ecs_cluster_name,
"ServiceName": stage_ecs_service_name
},
)],
),
aws.codepipeline.PipelineStageArgs(
name="ManualApproval",
actions=[aws.codepipeline.PipelineStageActionArgs(
name="ManualApproval",
category="Approval",
owner="AWS",
provider="Manual",
version="1",
)],
),
aws.codepipeline.PipelineStageArgs(
name="ProdDeploy",
actions=[aws.codepipeline.PipelineStageActionArgs(
name="ProdDeploy",
category="Deploy",
owner="AWS",
provider="ECS",
input_artifacts=["BuildArtifact"],
version="1",
configuration={
"ClusterName": prod_ecs_cluster_name,
"ServiceName": prod_ecs_service_name,
},
)],
),
])
codepipeline_policy = aws.iam.RolePolicy(pipeline_policy_name,
role=codepipeline_role.id,
policy=f"""{{
"Version": "2012-10-17",
"Statement": [
{{
"Effect": "Allow",
"Action": "*",
"Resource": [
"{pipeline_s3_bucket_arn}",
"{pipeline_s3_bucket_arn}/*",
"{source_s3_bucket_arn}",
"{source_s3_bucket_arn}/*"
]
}},
{{
"Effect": "Allow",
"Action": "*",
"Resource": "{codecommit_repo_arn}"
}},
{{
"Effect": "Allow",
"Action": [
"codebuild:BatchGetBuilds",
"codebuild:StartBuild"
],
"Resource": "*"
}},
{{
"Effect": "Allow",
"Action": [
"ecs:*"
],
"Resource": "*"
}},
{{
"Action": [
"iam:PassRole"
],
"Effect": "Allow",
"Resource": "*",
"Condition": {{
"StringLike": {{
"iam:PassedToService": [
"ecs-tasks.amazonaws.com"
]
}}
}}
}}
]
}}"""
)

Likewise, we need to update our buildspec.yml file to pull our model into our Docker image. Our build process will now download the model, called model.pkl, from the S3 bucket. The Docker build step will pick up this new model.

version: 0.2
phases:
install:
runtime-versions:
python: 3.8
docker: 18
commands:
- nohup /usr/local/bin/dockerd --host=unix:///var/run/docker.sock --host=tcp://0.0.0.0:2375 --storage-driver=overlay&
- timeout 15 sh -c "until docker info; do echo .; sleep 1; done"
pre_build:
commands:
- echo installing libraries and running unit tests...
- pip install -r requirements.txt
- echo pulling files from s3...
- rm model.pkl
- aws s3 cp s3://insert-s3-bucket-name/model.zip .
- unzip model.zip
- python -m pytest tests/tests.py
- echo logging into ecr...
- $(aws ecr get-login --no-include-email --region us-west-2)
- REPOSITORY_URI=INSERT_ECR_URI
- COMMIT_HASH=$(echo $CODEBUILD_RESOLVED_SOURCE_VERSION)
- IMAGE_TAG=${COMMIT_HASH:=latest}
build:
commands:
- echo building docker image...
- docker build -t $REPOSITORY_URI:latest .
- docker tag $REPOSITORY_URI:latest $REPOSITORY_URI:$IMAGE_TAG
post_build:
commands:
- echo pushing docker image...
- docker push $REPOSITORY_URI:latest
- docker push $REPOSITORY_URI:$IMAGE_TAG
- printf '[{"name":"taskdef-name","imageUri":"%s"}]' $REPOSITORY_URI:latest > taskdefinition.json
artifacts:
files:
- '**/*'
base_directory: base_location
name: build-artifacts
discard-paths: no
view raw buildspec.yml hosted with ❤ by GitHub

Finally, we need to update app_settings.py and app.py. In app_settings.py, we simply need to remove MODEL_PATH. In app.py, we can also remove the import of MODEL_PATH. In our initalize_app method, we simply need to load model.pkl. For local testing, we obviously will need a local copy of a model.pkl file. We can manually pull the most recent upload from S3. Don't worry: Our buildspec.yml will remove any model.pkl file pushed into CodeCommit, making sure we use the desired S3 model. Likewise, if we are using S3 to house our models, we might list .pkl files in our .gitignore.

def initialize_app():
"""
Initializes our Flask application.
- creates a Flask app object
- sets AWS keys for uploading payloads to S3
- retrieves and sets the application config
- integrates with Sentry for error reporting
- sets up a background scheduler to refresh teh config every 3,600 seconds
- loads the trained model and sets it as a global object
"""
app = Flask(__name__)
if ENVIRONMENT != 'local':
sentry_sdk.init(
dsn=SENTRY_DSN,
integrations=[FlaskIntegration()],
traces_sample_rate=1.0
)
config_dict = retrieve_app_config(DB_SCHEMA, make_mysql_connection(DATABASE_SECRET), ENVIRONMENT)
for key, value in config_dict.items():
app.config[key] = value
scheduler = BackgroundScheduler()
scheduler.add_job(func=hit_config_refresh_endpoint, trigger="interval", seconds=3_600)
scheduler.start()
global model
model = joblib.load('model.pkl')
return app

A Case Study on Model Retrains

I've seen some people who are retrain crazy. Let's retrain constantly! This may sound cool, but it's often overkill. In the script below, I load in a churn model and a small test set (4,000 observations). I record baseline evaluation scores on this test set. Subsequently, I change the distribution of 1-2 numeric features at varying levels and record the effect. (Of course, drift can happen in categorical features, too). This simulates drift. The results are captured in the screenshot below.

Data Drift

In our first scenario, we change all profile_scores by 1%. This is actually a fairly big change since we are impacting 100% of observations in some way. We can see that the impact on our predictive power is fairly small. We start getting more noticeable declines at a 5% movement of all profile_scores. When we move to a multivariate approach, our first scenario is changing the profile_score and average_stars by 1% in 10 observations. We see basically no decline in predictive performance in this scenario. We have to decently shift the data to start seeing notable declines. Of note, we see a bit of noise in the log loss for the multivariate scenarios as we are effecting random rows, some of which might not be impacted much by the selected features.

import joblib
import pandas as pd
import numpy as np
from sklearn.metrics import log_loss
from random import shuffle
from copy import deepcopy
from modeling.evaluate import calculate_probability_lift
np.random.seed(10)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
def run_base_case(model, y_test, x_test):
preds_df = pd.DataFrame(model.predict_proba(x_test), columns=['0_prob', '1_prob'])
ll = log_loss(y_test, preds_df['1_prob'])
proba_lift = calculate_probability_lift(y_test, preds_df['1_prob'], '')
df = pd.DataFrame({
'scenario': ['base_case'],
'log_loss': [ll],
'proba_lift': [proba_lift]
})
return df
def run_univariate_drift_scenario(model, y_test, x_test, column, factor):
local_x_test = deepcopy(x_test)
local_x_test[column] = local_x_test[column] * factor
preds_df = pd.DataFrame(model.predict_proba(local_x_test), columns=['0_prob', '1_prob'])
preds_df = preds_df.reset_index(drop=True)
ll = log_loss(y_test, preds_df['1_prob'])
proba_lift = calculate_probability_lift(y_test, preds_df['1_prob'], '')
df = pd.DataFrame({
'scenario': [f'{column}_{factor}_shift'],
'log_loss': [ll],
'proba_lift': [proba_lift]
})
return df
def run_multivariate_drift_scenario(model, y_test, x_test, column_list, change_obs, factor, scenario_name):
local_x_test = deepcopy(x_test)
indices = list(local_x_test.index)
shuffle(indices)
indices = indices[0:change_obs]
for column in column_list:
for index in indices:
local_x_test[column][index] = local_x_test[column][index] * factor
preds_df = pd.DataFrame(model.predict_proba(local_x_test), columns=['0_prob', '1_prob'])
preds_df = preds_df.reset_index(drop=True)
ll = log_loss(y_test, preds_df['1_prob'])
proba_lift = calculate_probability_lift(y_test, preds_df['1_prob'], '')
df = pd.DataFrame({
'scenario': [scenario_name],
'log_loss': [ll],
'proba_lift': [proba_lift]
})
return df
def main():
x_test = joblib.load('modeling/random_forest_202107252202562870520500/data/x_test.pkl')
y_test = joblib.load('modeling/random_forest_202107252202562870520500/data/y_test.pkl')
model = joblib.load('modeling/random_forest_202107252202562870520500/model/model.pkl')
x_test = x_test.reset_index(drop=True)
y_test = y_test.reset_index(drop=True)
main_df = pd.DataFrame()
main_df = pd.concat([main_df, run_base_case(model, y_test, x_test)], axis=0)
main_df = pd.concat([main_df, run_univariate_drift_scenario(model, y_test, x_test, 'profile_score', 1.01)], axis=0)
main_df = pd.concat([main_df, run_univariate_drift_scenario(model, y_test, x_test, 'profile_score', 1.03)], axis=0)
main_df = pd.concat([main_df, run_univariate_drift_scenario(model, y_test, x_test, 'profile_score', 1.05)], axis=0)
main_df = pd.concat([main_df, run_univariate_drift_scenario(model, y_test, x_test, 'profile_score', 1.10)], axis=0)
main_df = pd.concat([main_df, run_multivariate_drift_scenario(model, y_test, x_test,
['profile_score', 'average_stars'], 10,
1.01, 'small_multivariate')], axis=0)
main_df = pd.concat([main_df, run_multivariate_drift_scenario(model, y_test, x_test,
['profile_score', 'average_stars'], 500,
1.05, 'moderate_multivariate')], axis=0)
main_df = pd.concat([main_df, run_multivariate_drift_scenario(model, y_test, x_test,
['profile_score', 'average_stars'],
1_500, 1.10, 'strong_multivariate')], axis=0)
main_df = pd.concat([main_df, run_multivariate_drift_scenario(model, y_test, x_test,
['profile_score', 'average_stars'],
4_000, 2.0, 'very_strong_multivariate')], axis=0)
return main_df
if __name__ == "__main__":
comparison_df = main()
print(comparison_df)

What does all this mean? The message is mixed. We clearly don't need to retrain every time we get a new observation. However, we can experience some degradation in our model even with small drift. Even though this won't tank our model, we would like to avoid such a case. Therefore, we should try to retrain proactively or at early signs of drift if possible. That said, even with some drift, our model can still work.

We have to understand our environment. How likely is drift to occur? How much drift is likely to occur? If our environment is static, we need to worry less about retraining frequently. If our data is not changing, both in terms of data and concept drift, our model will work well. Retraining becomes a proactive step. This point is important because retraining frequently may be costly in certain cases, such as when assembling the requisite data is difficult.

Applied Full Stack Data Science