Our model is now in production. Hooray! One distinction of machine learning systems is they generally decay over time. This concept is sometimes difficult to grasp for the non-initiated into data science. A machine learning system is not like a website; we can't just prop it up and let it be. We need to monitor it closely, and we also need to retrain our model.
Data science projects are never done, barring getting canceled. Like a car, our model needs maintenance. Retraining your model on new data might be like your regular oil change. Occasionally you might need a bigger repair, like replacing a gasket. In machine learning, this might be akin to completely changing models (e.g. from Random Forest to XGBoost). Fortunately, some of this can be automated via AWS Batch and our CI/CD pipeline. We can schedule our model to retrain and be released into production. We can leverage our testing mechanisms to ensure a "bad" model does not go into production.
You might ask: How often should we retrain our model? My answer: it depends. The decision is contingent on how stationary the environment is. For a stock trading models, frequent retrains are likely warranted (e.g. daily or multiple times per day). For our customer churn model, the environment is very likely not as dynamic. That said, we probably don't want to go many, many months without retraining the model. Over the course of months, multiple items with the business or broader environment might change. Likewise, as months go by, we might see new types of customers the model did not experience during training, or churn rates for certain types of customers might systemically change due to broader marketing efforts. Using only a rule of thumb, my professional opinion would be to retrain the churn model every few weeks or even every few months. We likely would not experience enough day-to-day change to warrant more frequent retrains.
We could trigger a model retrain based on underling conditions. For one, we could track predicted vs. actual outcomes and have the model retrain once performance drops below a certain threshold. In environments with quick feedback mechanisms, this can work well. What's more, we could track the business impact of our model over time and trigger a retrain when this impact starts to decay. This can be a bit reactionary as it might take time for business impact to shift or be measures.
We have other related options. We can assume we would be getting new churns on a somewhat regular basis. These would represent updates to our training data. As these updates come in, we could see if they notably shift the relationship between the target and the features. For example, clients with a certain creative_id start churning at a much higher rate. This is known as concept drift. Likewise, we might also experience dataset shift, which is when the distribution of a feature changes over time. For instance, the profile_score's mean and standard deviation might shift much higher. We can monitor both concept and dataset shift and take action accordingly.
Given all these options, what option should we choose? Again, it depends on the environment and overall modeling problem. That said, I posit the following relationship exists generally.
Data shift leads to concept shift. Concept shift will produce declines in predictive performance, which will hamper business impact. Our retraining methodology should generally follow this path as well. If we retrain based on data shift, we will prevent concept shift and upstream effects on predictive and business performance. Detecting data shift is our first line of defense. The higher we move on the taxonomy, the closer we are to losing business impact. We typically don't want to wait until we see degradations in business impact to retrain our model. An additional and important concept is proactive retrains. That is, we retrain our model on a schedule in the hopes of catching data shift at the earliest stages. This can often be a wise strategy. That said, we don't have to go crazy with retrains! We'll actually cover that issue more later in this chapter.
Thus far, we have mostly focused on the retraining piece of the equation. However, being able to retrain and knowing when you must retrain relates back to the issue of tracking. Fundamentally, we must know how well our model is performing in production. In other words, is what is actually happening consistent with what the model is predicting?
Currently, we log input and output payloads to two locations: S3 and MySQL. This might seem duplicative, but good reason exists. S3 is highly, highly available. It will basically always be available, so it's a wonderful failsafe. However, MySQL is our ultimate destination, so it also make sense to just directly log to this location since we don't really care about response times in this case. (Responding in a few seconds is fine for our project, though this would be slow by industry standards). The issue is that while MySQL is highly available, I have experienced database outages on multiple occasions due to a variety of reasons. Therefore, we want a backup plan: S3. Of note, our MySQL table uses json datatypes for the column that stores input and output payloads. This means we can simply add and subtract fields from our payloads without breaking the logging.
Let's say we could only log to S3 from our application (which we might do to speed up our response times!), but we still wanted our logs to end up in MySQL. We could run the following as an ETL (extract, transform, and load) job on AWS Batch to accomplish this goal.
import pandas as pd | |
import boto3 | |
import os | |
import json | |
import glob | |
import sqlalchemy | |
import shutil | |
from ds_helpers import db, aws | |
def download_folder_from_s3(bucket_name, directory): | |
s3_resource = boto3.resource('s3') | |
bucket = s3_resource.Bucket(bucket_name) | |
if directory == '/': | |
directory_name = f's3_{bucket_name}' | |
if not os.path.exists(directory_name): | |
os.makedirs(directory_name) | |
for s3_object in bucket.objects.all(): | |
bucket.download_file(s3_object.key, os.path.join(directory_name, s3_object.key)) | |
else: | |
for s3_object in bucket.objects.filter(Prefix=directory): | |
if not os.path.exists(os.path.dirname(s3_object.key)): | |
os.makedirs(os.path.dirname(s3_object.key)) | |
bucket.download_file(s3_object.key, s3_object.key) | |
def insert_records(directory, table_name, schema_name, db_secret_name): | |
main_df = pd.DataFrame() | |
for path in glob.glob(f'{directory}/*'): | |
with open(path, 'r') as file: | |
payload_dict = json.loads(file.read().replace("\'", "\"")) | |
uid = payload_dict.get('input').get('uid') | |
logging_timestamp = payload_dict.get('output').get('logging_timestamp') | |
temp_df = pd.DataFrame({'uid': [uid], | |
'logging_timestamp': [logging_timestamp], | |
'input_output_payloads': [payload_dict]}) | |
main_df = main_df.append(temp_df) | |
main_df.to_sql(name=table_name, schema=schema_name, con=db.connect_to_mysql( | |
aws.get_secrets_manager_secret(db_secret_name), | |
ssl_path='data/rds-ca-2019-root.pem'), dtype={'input_output_payloads': sqlalchemy.types.JSON}, | |
if_exists='append', index=False) | |
def main(bucket_name, directory, table_name, schema_name, db_secret_name): | |
local_directory = f's3_{bucket_name}' | |
try: | |
download_folder_from_s3(bucket_name, directory) | |
insert_records(local_directory, table_name, schema_name, db_secret_name) | |
finally: | |
if os.path.exists(local_directory): | |
shutil.rmtree(local_directory) | |
if __name__ == "__main__": | |
main('churn-model-data-science-logs', '/', 'model_logs', 'churn_model', 'churn-model-mysql') |
import pandas as pd | |
import boto3 | |
import os | |
import json | |
import glob | |
import sqlalchemy | |
import shutil | |
from time import sleep | |
from ds_helpers import db, aws | |
def get_most_recent_db_insert(db_secret_name: str) -> str: | |
""" | |
Gets the most recent logging_timestamp inserted into the churn_model.model_logs table | |
:param db_secret_name: name of the Secrets Manager secret for DB credentials | |
:returns: most recent logging_timestamp | |
""" | |
query = ''' | |
select max(logging_timestamp) as max_insert | |
from churn_model.model_logs; | |
''' | |
df = pd.read_sql(query, db.connect_to_mysql(aws.get_secrets_manager_secret(db_secret_name), | |
ssl_path='data/rds-ca-2019-root.pem')) | |
max_insert = df['max_insert'][0] | |
return max_insert | |
def run_athena_query(client, start_timestamp): | |
response = client.start_query_execution( | |
QueryString=f'''select input.uid from ds_churn_logs where output.logging_timestamp >= TIMESTAMP '{start_timestamp}';''', | |
QueryExecutionContext={ | |
'Database': 'churn_model' | |
}, | |
ResultConfiguration={ | |
'OutputLocation': 's3://athena-churn-results/' | |
} | |
) | |
return response | |
def get_athena_file_name(client, execution_response): | |
execution_id = execution_response['QueryExecutionId'] | |
state = 'RUNNING' | |
while state != 'SUCCEEDED': | |
response = client.get_query_execution(QueryExecutionId=execution_id) | |
state = response['QueryExecution']['Status']['State'] | |
if state == 'SUCCEEDED': | |
s3_file_name = response['QueryExecution']['ResultConfiguration']['OutputLocation'] | |
s3_file_name = s3_file_name.split("/", 3)[3] | |
return s3_file_name | |
sleep(1) | |
def get_uids_to_download(file_name): | |
aws.download_file_from_s3(file_name, 'athena-churn-results') | |
df = pd.read_csv(file_name) | |
uids = df['uid'].tolist() | |
os.remove(file_name) | |
return uids | |
def download_new_payloads(uids, local_directory): | |
if not os.path.exists(local_directory): | |
os.makedirs(local_directory) | |
for uid in uids: | |
aws.download_file_from_s3(f'{uid}.json', 'churn-model-data-science-logs') | |
os.replace(f'{uid}.json', os.path.join(local_directory, f'{uid}.json')) | |
def insert_records(directory, table_name, schema_name, db_secret_name): | |
main_df = pd.DataFrame() | |
for path in glob.glob(f'{directory}/*'): | |
with open(path, 'r') as file: | |
payload_dict = json.loads(file.read().replace("\'", "\"")) | |
uid = payload_dict.get('input').get('uid') | |
logging_timestamp = payload_dict.get('output').get('logging_timestamp') | |
temp_df = pd.DataFrame({'uid': [uid], | |
'logging_timestamp': [logging_timestamp], | |
'input_output_payloads': [payload_dict]}) | |
main_df = main_df.append(temp_df) | |
main_df.to_sql(name=table_name, schema=schema_name, con=db.connect_to_mysql( | |
aws.get_secrets_manager_secret(db_secret_name), | |
ssl_path='data/rds-ca-2019-root.pem'), dtype={'input_output_payloads': sqlalchemy.types.JSON}, | |
if_exists='append', index=False) | |
def main(local_payloads_directory, db_secret_name, table_name, schema_name): | |
try: | |
client = boto3.client('athena') | |
max_logging_timestamp = get_most_recent_db_insert(db_secret_name) | |
athena_response = run_athena_query(client, max_logging_timestamp) | |
athena_results_file_name = get_athena_file_name(client, athena_response) | |
uids_to_download = get_uids_to_download(athena_results_file_name) | |
download_new_payloads(uids_to_download, local_payloads_directory) | |
insert_records(local_payloads_directory, table_name, schema_name, db_secret_name) | |
finally: | |
pass | |
if os.path.exists(local_payloads_directory): | |
shutil.rmtree(local_payloads_directory) | |
if __name__ == "__main__": | |
main('temp_payloads', 'churn-model-mysql', 'model_logs', 'churn_model') |
Let's say we only wanted to log payloads to S3 and wanted to move them over in batch to DyanmoDB. We could leverage the following script. However, a notable limitation exists if we want to run many batches over time (which is likely). We ideally want to only insert the records created after our last table insert. Unfortunately, DynamoDB doesn't give us an easy way to efficiently discover the most recent record based on the logging_timestamp. We could pull down every record and find the value in pandas, but as our table grows, this isn't a super great option.
import boto3 | |
import os | |
import glob | |
import json | |
from time import sleep | |
from decimal import Decimal | |
def download_folder_from_s3(bucket_name, directory): | |
s3_resource = boto3.resource('s3') | |
bucket = s3_resource.Bucket(bucket_name) | |
if directory == '/': | |
directory_name = f's3_{bucket_name}' | |
if not os.path.exists(directory_name): | |
os.makedirs(directory_name) | |
for s3_object in bucket.objects.all(): | |
bucket.download_file(s3_object.key, os.path.join(directory_name, s3_object.key)) | |
else: | |
for s3_object in bucket.objects.filter(Prefix=directory): | |
if not os.path.exists(os.path.dirname(s3_object.key)): | |
os.makedirs(os.path.dirname(s3_object.key)) | |
bucket.download_file(s3_object.key, s3_object.key) | |
def create_dynamo_table(table_name, key_field): | |
dynamodb = boto3.resource('dynamodb') | |
dynamodb.create_table( | |
TableName=table_name, | |
KeySchema=[ | |
{ | |
'AttributeName': key_field, | |
'KeyType': 'HASH' | |
}, | |
], | |
AttributeDefinitions=[ | |
{ | |
'AttributeName': key_field, | |
'AttributeType': 'S' | |
} | |
], | |
ProvisionedThroughput={ | |
'ReadCapacityUnits': 10, | |
'WriteCapacityUnits': 10 | |
} | |
) | |
sleep(5) | |
def insert_records(directory, table_name): | |
dynamodb = boto3.resource('dynamodb') | |
dynamo_table = dynamodb.Table(table_name) | |
for path in glob.glob(f'{directory}/*'): | |
with open(path, 'r') as file: | |
payload_dict = json.loads(json.dumps(json.loads(file.read().replace("\'", "\""))), parse_float=Decimal) | |
uid = payload_dict.get('input').get('uid') | |
uid_dict = dict() | |
uid_dict['uid'] = uid | |
uid_dict.update(payload_dict) | |
dynamo_table.put_item(Item=uid_dict) | |
def main(s3_bucket, directory, dynamo_table, key_field, create_table=True): | |
if create_table: | |
download_folder_from_s3(s3_bucket, directory) | |
create_dynamo_table(dynamo_table, key_field) | |
insert_records(f's3_{s3_bucket}', dynamo_table) | |
if __name__ == "__main__": | |
main('churn-model-data-science-logs', '/', 'churn_logs', 'uid') |
Sentry will alert us of errors in our application. However, it will not alert us to anomalies or other behavior of which we might want to be aware. For this purpose, we might set up an AWS Batch job that queries our logs table and triggers an email when a certain condition occurs. This might run every, say, 10 minutes. Below is a script that will simply count the number of requests and calculate the average predicted probability in a time period. If the count is above a certain threshold, an email will be triggered. Likewise, we could simply change the criteria a bit and have this be a daily report or something of the like.
import pandas as pd | |
import pytz | |
import datetime | |
import yagmail | |
import ast | |
from ds_helpers import aws, db | |
def get_start_timestamp(minute_lookback): | |
now_cst = datetime.datetime.strptime(datetime.datetime.now(pytz.timezone('US/Central')).strftime( | |
'%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S') | |
start_timestamp = now_cst - datetime.timedelta(minutes=minute_lookback) | |
return start_timestamp | |
def query_logs_table(db_secret_name, start_timestamp): | |
query = f''' | |
select JSON_EXTRACT(input_output_payloads, "$.output.prediction") as prediction | |
FROM churn_model.model_logs | |
where logging_timestamp >= '{start_timestamp}'; | |
''' | |
df = pd.read_sql(query, db.connect_to_mysql(aws.get_secrets_manager_secret(db_secret_name), | |
ssl_path='data/rds-ca-2019-root.pem')) | |
return df | |
def count_observation(df): | |
return len(df) | |
def determine_should_email_be_sent(count, count_threshold): | |
if count >= count_threshold: | |
return True | |
else: | |
return False | |
def get_mean_prediction(df): | |
return df['prediction'].astype(float).mean() | |
def send_email(email_secret, report_timestamp, mean_prediction, prediction_count): | |
email_dict = aws.get_secrets_manager_secret(email_secret) | |
username = email_dict.get('username') | |
password = email_dict.get('password') | |
yag = yagmail.SMTP(username, password) | |
recipients = email_dict.get('recipients') | |
recipients = ast.literal_eval(recipients) | |
subject = f'Prediction Report since {report_timestamp}' | |
contents = f'Number of predictions during window: {prediction_count}. \n Average prediction: {mean_prediction}.' | |
yag.send(to=recipients, subject=subject, contents=contents) | |
def main(db_secret_name, minute_lookback, count_trigger_threshold, email_secret): | |
start_timestamp = get_start_timestamp(minute_lookback) | |
logs_df = query_logs_table(db_secret_name, start_timestamp) | |
prediction_count = count_observation(logs_df) | |
should_send_email = determine_should_email_be_sent(prediction_count, count_trigger_threshold) | |
if should_send_email: | |
mean_prediction = get_mean_prediction(logs_df) | |
send_email(email_secret, start_timestamp, mean_prediction, prediction_count) | |
if __name__ == "__main__": | |
main(db_secret_name='churn-model-mysql', minute_lookback=30, count_trigger_threshold=5, email_secret='churn-email') |
Rather than an email, we might want to shoot off a text message. Twilio makes it quite easy to send text messages. Follow this tutorial to get started. We can then use the below function to fire off text messages using the following function.
import os | |
from twilio.rest import Client | |
account_sid = os.environ['TWILIO_ACCOUNT_SID'] | |
auth_token = os.environ['TWILIO_AUTH_TOKEN'] | |
client = Client(account_sid, auth_token) | |
def main(body, recipient_number): | |
message = client.messages.create( | |
to=recipient_number, | |
from_=os.environ['TWILIO_FROM_NUMBER'], | |
body=body | |
) | |
Likewise, we can easily adjust our email notification script be more general and send an SMS notification instead.
Data shift is a crucial topic in machine learning. If the distributions of our features are different compared to when we trained our model, a retrain might be warranted. Without it, our model might not know how to best handle certain features. We can employ a univariate approach to track data shift. If we detect data shift, we can automatically trigger a model retrain.
import pandas as pd | |
import numpy as np | |
import os | |
import joblib | |
from scipy.stats import ks_2samp, chisquare | |
from ds_helpers import aws, db | |
from app_settings import MODEL_1_PATH, MODEL_FEATURES | |
def extract_model_uid_from_path(model_path): | |
return model_path.split('/')[1] | |
def get_query_start_timestamp(model_uid, db_conn): | |
query = f''' | |
select training_timestamp | |
from churn_model.model_meta_data | |
where model_uid = '{model_uid}'; | |
''' | |
df = pd.read_sql(query, db_conn) | |
start_timestamp = df['training_timestamp'][0] | |
return start_timestamp | |
def extract_production_data(start_timestamp, model_uid, db_conn): | |
query = f''' | |
select JSON_EXTRACT(input_output_payloads, "$.input.*") as "values", | |
JSON_KEYS(input_output_payloads, "$.input") as "keys" | |
from ( | |
select * from churn_model.model_logs | |
where JSON_EXTRACT(input_output_payloads, "$.output.model_used") = 'model_1' | |
and JSON_EXTRACT(input_output_payloads, "$.output.model_1_path") = '{model_uid}' | |
union | |
select * from churn_model.model_logs | |
where JSON_EXTRACT(input_output_payloads, "$.output.model_used") = 'model_2' | |
and JSON_EXTRACT(input_output_payloads, "$.output.model_2_path") = '{model_uid}' | |
) model_output | |
where logging_timestamp >= '{start_timestamp}';''' | |
df = pd.read_sql(query, db_conn) | |
columns = df['keys'][0] | |
columns = columns.strip('][').split(', ') | |
columns = [c.replace('"', '') for c in columns] | |
df.drop('keys', 1, inplace=True) | |
df['values'] = df['values'].str.replace('[', '').str.replace(']', '') | |
df = df['values'].str.split(',', expand=True) | |
df.columns = columns | |
df.drop(['uid', 'url', 'endpoint'], 1, inplace=True) | |
for col in list(df): | |
df[col] = df[col].str.replace('"', '') | |
df[col] = df[col].str.strip() | |
try: | |
df[col] = df[col].astype(float) | |
except ValueError: | |
pass | |
return df | |
def recreate_data_used_for_training(model_uid): | |
path = os.path.join(model_uid, 'data') | |
aws.download_folder_from_s3('churn-model-data-science-modeling', path) | |
x_train = joblib.load(os.path.join(path, 'x_train.pkl')) | |
x_train.reset_index(inplace=True, drop=True) | |
x_test = joblib.load(os.path.join(path, 'x_test.pkl')) | |
x_test.reset_index(inplace=True, drop=True) | |
x_df = pd.concat([x_train, x_test], axis=0) | |
x_df = x_df[MODEL_FEATURES] | |
return x_df | |
def calculate_ks_statistic(training_df, production_df, feature): | |
try: | |
ks_result = ks_2samp(training_df[feature], production_df[feature]) | |
return ks_result[1] | |
except KeyError: | |
return 0.00 | |
def prep_category_for_chi_squared(training_df, production_df, feature): | |
try: | |
training_feature_grouped = pd.DataFrame(training_df.groupby(feature)[feature].count()) | |
training_feature_grouped.columns = ['train_count'] | |
training_feature_grouped['train_count'] = (training_feature_grouped['train_count'] / | |
training_feature_grouped['train_count'].sum()) * 1_000 | |
training_feature_grouped['train_count'] = training_feature_grouped['train_count'] + 1 | |
training_feature_grouped.reset_index(inplace=True) | |
except KeyError: | |
training_feature_grouped = pd.DataFrame({'train_count': [], feature: []}) | |
try: | |
production_feature_grouped = pd.DataFrame(production_df.groupby(feature)[feature].count()) | |
production_feature_grouped.columns = ['prod_count'] | |
production_feature_grouped['prod_count'] = (production_feature_grouped['prod_count'] / | |
production_feature_grouped['prod_count'].sum()) * 1_000 | |
production_feature_grouped['prod_count'] = production_feature_grouped['prod_count'] + 1 | |
production_feature_grouped.reset_index(inplace=True) | |
except KeyError: | |
production_feature_grouped = pd.DataFrame({'prod_count': [], feature: []}) | |
merged_df = pd.merge(training_feature_grouped, production_feature_grouped, how='outer', on=feature) | |
merged_df.fillna(0, inplace=True) | |
merged_df['train_count'] = merged_df['train_count'].astype(int) | |
merged_df['prod_count'] = merged_df['prod_count'].astype(int) | |
return merged_df | |
def calculate_chi_squared_statistic(training_series, production_series): | |
chi_result = chisquare(f_obs=production_series, f_exp=training_series) | |
return chi_result[1] | |
def main(model_path, db_secret_name, p_value_cutoff): | |
db_conn = db.connect_to_mysql(aws.get_secrets_manager_secret(db_secret_name), | |
ssl_path=os.path.join('data', 'rds-ca-2019-root.pem')) | |
model_uid = extract_model_uid_from_path(model_path) | |
query_start_time = get_query_start_timestamp(model_uid, db_conn) | |
production_df = extract_production_data(query_start_time, model_uid, db_conn) | |
original_training_df = recreate_data_used_for_training(model_uid) | |
cat_production_df = production_df.select_dtypes(include='object') | |
num_production_df = production_df.select_dtypes(exclude='object') | |
cat_training_df = original_training_df.select_dtypes(include='object') | |
num_training_df = original_training_df.select_dtypes(exclude='object') | |
cat_columns = set(list(cat_production_df) + list(cat_training_df)) | |
num_columns = set(list(num_production_df) + list(num_training_df)) | |
main_drift_df = pd.DataFrame() | |
for cat_col in cat_columns: | |
temp_chi_squared_df = prep_category_for_chi_squared(cat_training_df, cat_production_df, cat_col) | |
p_value = calculate_chi_squared_statistic(temp_chi_squared_df['train_count'], | |
temp_chi_squared_df['prod_count']) | |
temp_drift_df = pd.DataFrame({'feature': [cat_col], 'p_value': [p_value]}) | |
main_drift_df = main_drift_df.append(temp_drift_df) | |
for num_col in num_columns: | |
p_value = calculate_ks_statistic(num_training_df, num_production_df, num_col) | |
temp_drift_df = pd.DataFrame({'feature': [num_col], 'p_value': [p_value]}) | |
main_drift_df = main_drift_df.append(temp_drift_df) | |
main_drift_df['shift_occurred'] = np.where(main_drift_df['p_value'] <= p_value_cutoff, True, False) | |
main_drift_df['p_value_cutoff'] = p_value_cutoff | |
db.write_dataframe_to_database(main_drift_df, 'churn_model', 'data_shift', db_conn) | |
if __name__ == "__main__": | |
main(model_path=MODEL_1_PATH, db_secret_name='churn-model-mysql', p_value_cutoff=0.05) |
Concept shift is a related yet distinct concept compared to data shift. Concept shift refers to a change in the actual underlying relationship between our target and predictors. For example, activity_score was predictive, but an environmental or technological change has made it uninformative.
How does one track concept shift? With data shift, we can look at each feature and run some handy statistical tests. However, concept shift is a multivariate problem rather than a univariate one. For such a problem, we can turn to our old friend machine learning. We will pull the copy of our training data from S3, extract the input payloads from our production application, concatenate the two, and see if we can predict which rows are from production. If our test set score is above a given threshold, we ascertain that concept shift has occurred. If it doesn't, then we assert concept shift has not occurred.
import pandas as pd | |
import numpy as np | |
import os | |
import joblib | |
from sklearn.ensemble import RandomForestClassifier | |
from sklearn.pipeline import Pipeline | |
from sklearn.compose import ColumnTransformer, make_column_selector as selector | |
from sklearn.impute import SimpleImputer | |
from sklearn.metrics import roc_auc_score | |
from sklearn.model_selection import GridSearchCV | |
from sklearn.feature_extraction import DictVectorizer | |
from sklearn.preprocessing import FunctionTransformer | |
from ds_helpers import aws, db | |
from helpers.model_helpers import create_train_test_split, create_x_y_split, FeaturesToDict, fill_missing_values | |
from app_settings import MODEL_1_PATH, MODEL_FEATURES | |
def extract_model_uid_from_path(model_path): | |
return model_path.split('/')[1] | |
def get_query_start_timestamp(model_uid, db_conn): | |
query = f''' | |
select training_timestamp | |
from churn_model.model_meta_data | |
where model_uid = '{model_uid}'; | |
''' | |
df = pd.read_sql(query, db_conn) | |
start_timestamp = df['training_timestamp'][0] | |
return start_timestamp | |
def extract_production_data(start_timestamp, model_uid, db_conn): | |
query = f''' | |
select JSON_EXTRACT(input_output_payloads, "$.input.*") as "values", | |
JSON_KEYS(input_output_payloads, "$.input") as "keys" | |
from ( | |
select * from churn_model.model_logs | |
where JSON_EXTRACT(input_output_payloads, "$.output.model_used") = 'model_1' | |
and JSON_EXTRACT(input_output_payloads, "$.output.model_1_path") = '{model_uid}' | |
union | |
select * from churn_model.model_logs | |
where JSON_EXTRACT(input_output_payloads, "$.output.model_used") = 'model_2' | |
and JSON_EXTRACT(input_output_payloads, "$.output.model_2_path") = '{model_uid}' | |
) model_output | |
where logging_timestamp >= '{start_timestamp}';''' | |
df = pd.read_sql(query, db_conn) | |
columns = df['keys'][0] | |
columns = columns.strip('][').split(', ') | |
columns = [c.replace('"', '') for c in columns] | |
df.drop('keys', 1, inplace=True) | |
df['values'] = df['values'].str.replace('[', '').str.replace(']', '') | |
df = df['values'].str.split(',', expand=True) | |
df.columns = columns | |
df.drop(['uid', 'url', 'endpoint'], 1, inplace=True) | |
for col in list(df): | |
df[col] = df[col].str.replace('"', '') | |
try: | |
df[col] = df[col].astype(float) | |
df[col] = df[col].str.strip() | |
except ValueError: | |
pass | |
return df | |
def recreate_data_used_for_training(model_uid): | |
path = os.path.join(model_uid, 'data') | |
aws.download_folder_from_s3('churn-model-data-science-modeling', path) | |
x_train = joblib.load(os.path.join(path, 'x_train.pkl')) | |
x_train.reset_index(inplace=True, drop=True) | |
x_test = joblib.load(os.path.join(path, 'x_test.pkl')) | |
x_test.reset_index(inplace=True, drop=True) | |
x_df = pd.concat([x_train, x_test], axis=0) | |
x_df = x_df[MODEL_FEATURES] | |
return x_df | |
def create_training_data(original_training_df, production_df): | |
original_training_df['target'] = 0 | |
production_df['target'] = 1 | |
training_df = pd.concat([original_training_df, production_df], axis=0) | |
training_df.reset_index(inplace=True, drop=True) | |
training_df.drop('acquired_date', 1, inplace=True) | |
training_df = training_df.fillna(value=np.nan) | |
x, y = create_x_y_split(training_df, 'target') | |
x_train, x_test, y_train, y_test = create_train_test_split(x, y) | |
return x_train, x_test, y_train, y_test | |
def train_model(x_train, y_train): | |
param_grid = { | |
'model__max_depth': [5, 10, 15], | |
'model__min_samples_leaf': [None, 3], | |
'model__max_features': ['log2', 'sqrt'] | |
} | |
numeric_transformer = Pipeline(steps=[ | |
('imputer', SimpleImputer(strategy='mean')) | |
]) | |
categorical_transformer = Pipeline(steps=[ | |
('imputer', FunctionTransformer(fill_missing_values, validate=False, | |
kw_args={'fill_value': 'unknown'})), | |
('dict_creator', FeaturesToDict()), | |
('dict_vectorizer', DictVectorizer(sparse=False)), | |
]) | |
preprocessor = ColumnTransformer( | |
transformers=[ | |
('numeric_transformer', numeric_transformer, selector(dtype_include='number')), | |
('categorical_transformer', categorical_transformer, selector(dtype_exclude='number')) | |
] | |
) | |
pipeline = Pipeline(steps=[ | |
('preprocessor', preprocessor), | |
('model', RandomForestClassifier()) | |
]) | |
search = GridSearchCV(pipeline, param_grid=param_grid, scoring='roc_auc', n_jobs=-1, cv=3) | |
search.fit(x_train, y_train) | |
best_pipeline = search.best_estimator_ | |
return best_pipeline | |
def evaluate_model(estimator, x_test, y_test): | |
predictions = estimator.predict_proba(x_test) | |
roc_auc = roc_auc_score(y_test, predictions[:, 1]) | |
return roc_auc | |
def determine_if_concept_shift_has_occurred(score, threshold): | |
if score >= threshold: | |
return True | |
else: | |
return False | |
def main(model_path, db_secret_name, scoring_threshold): | |
db_conn = db.connect_to_mysql(aws.get_secrets_manager_secret(db_secret_name), | |
ssl_path=os.path.join('data', 'rds-ca-2019-root.pem')) | |
model_uid = extract_model_uid_from_path(model_path) | |
query_start_time = get_query_start_timestamp(model_uid, db_conn) | |
production_df = extract_production_data(query_start_time, MODEL_1_PATH, db_conn) | |
original_training_df = recreate_data_used_for_training(model_uid) | |
x_train, x_test, y_train, y_test = create_training_data(original_training_df, production_df) | |
pipeline = train_model(x_train, y_train) | |
model_score = evaluate_model(pipeline, x_test, y_test) | |
shift_occurred = determine_if_concept_shift_has_occurred(roc_auc, scoring_threshold) | |
insert_statement = f''' | |
INSERT INTO churn_model.concept_shift (shift_occurred, metric_used, scoring_threshold, model_score, model_uid) | |
VALUES ({shift_occurred}, 'roc_auc', {scoring_threshold}, {model_score}, '{model_uid}'); | |
''' | |
db_conn.execute(insert_statement) | |
print(f'Has concept shift occurred: {shift_occurred}') | |
if __name__ == "__main__": | |
main(model_path=MODEL_1_PATH, db_secret_name='churn-model-mysql', scoring_threshold=0.55) | |
We've built some nifty tools to help us track our model. However, it might be nice to visualize such items and have access to them on demand. For that task, we can use our old buddy Streamlit. When we're building a model tracking dashboard, we should cover the following items at minimum.
Most of the data you need about your models should be in your MySQL logs table. Depending on your situation, you might need data from a downstream system to understand how the models' predictions were used. Of course, you'll need to tie in data from your systems of record to understand the ultimate outcome (e.g. did the customer end up churning?). Below is a sample of what a model tracking dashboard might look like.
Each time we want to release a new model, we don't always want to do it manually. We want to automate this process if possible. When we retrain our model, we have two options: we can refit our existing model on new data or perform a completely new search for a "best" model. The former option is comparatively straightforward. The latter perhaps should never be automated. However, when we're refitting our model, we can do so relatively quickly and easily.
First, we need a simple retraining script. This script doesn't check for anything like data shift, but such a mechanism would be easy to build. At the moment, this is a "proactive" retraining process.
import joblib | |
import os | |
from zipfile import ZipFile | |
from ds_helpers import aws | |
from helpers.model_helpers import create_x_y_split | |
from data.db import get_training_data | |
from modeling.config import TARGET | |
def main(s3_bucket): | |
""" | |
Takes a model from S3, retrains it on new data, and uploads it to S3. | |
:param s3_bucket: S3 bucket where the model lives | |
""" | |
df = get_training_data() | |
df = df.drop(['client_id', 'id', 'meta__inserted_at'], 1) | |
df[TARGET] = np.where(df[target] == 'yes', 1, 0) | |
x, y = create_x_y_split(df, target) | |
aws.download_file_from_s3('model.zip', s3_bucket) | |
os.makedirs('original_model') | |
with ZipFile('model.zip', 'r') as zip_file: | |
zip_file.extractall('original_model') | |
original_model = joblib.load(os.path.join('original_model', 'model.pkl')) | |
retrained_model = original_model.fit(x, y) | |
joblib.dump(retrained_model, 'model.pkl') | |
with ZipFile('model.zip', 'w') as zip_file: | |
zip_file.write('model.pkl') | |
aws.upload_file_to_s3('model.zip', s3_bucket) | |
if __name__ == "__main__": | |
main() | |
We can set the script to run on AWS Batch on whatever scheduled we like. However, how do we release this new model into production? We need to amend our CodePipeline built with Pulumi. We will add a second source step: an S3 bucket. Whenever an upload hits our s3_object_key, the CI/CD build will kick off. The CI/CD build will pull 1) code from the most recent git commit to the source CodeCommit repo and 2) the most recent file in the source S3 bucket. The file in the S3 bucket will, of course, be our pickled model that will be uploaded by retrain.py.
import pulumi_aws as aws | |
def main(codecommit_repository_name, codecommit_branch_name, codecommit_repo_arn, ecr_arn, build_role_name, | |
build_project_name, network_interface_region, network_interface_owner_id, private_subnet_arn, | |
build_s3_bucket_arn, build_vpc_id, private_subnet_id, build_security_group_id, pipeline_role_name, | |
pipeline_name, pipeline_s3_bucket_name, pipeline_policy_name, pipeline_s3_bucket_arn, stage_ecs_cluster_name, | |
stage_ecs_service_name, prod_ecs_cluster_name, prod_ecs_service_name, source_s3_bucket_arn, | |
source_s3_bucket_name, s3_object_key): | |
""" | |
Creates a CodePipeline to deploy an ECS application. The following structure is employed: | |
- GitHub source | |
- S3 source | |
- CodeBuild build | |
- Stage Deploy | |
- Manual Approval | |
- Prod Deploy | |
Appropriate roles and policies are created and used. | |
:param pipeline_role_name: name for the execution role to run the pipeline | |
:type pipeline_role_name: str | |
:param pipeline_name: name of the pipeline | |
:type pipeline_name: str | |
:param pipeline_s3_bucket_name: name of the s3 bucket | |
:type pipeline_s3_bucket_name: str | |
:param repository_name: name of the git repo | |
:type repository_name: str | |
:param branch_name: name of the git branch | |
:type branch_name: str | |
:param build_project_name: name of the build project | |
:type build_project_name: str | |
:param pipeline_policy_name: name for the pipeline policy | |
:type pipeline_policy_name: str | |
:param pipeline_s3_bucket_arn: arn of the s3 bucket | |
:type pipeline_s3_bucket_arn: str | |
:param stage_ecs_cluster_name: name of the staging ECS cluster | |
:type stage_ecs_cluster_name: str | |
:param stage_ecs_service_name: name of the staging ECS service | |
:type stage_ecs_service_name: str | |
:param prod_ecs_cluster_name: name of the production ECS cluster | |
:type prod_ecs_cluster_name: str | |
:param prod_ecs_service_name: name of the production ECS service | |
:type prod_ecs_service_name: str | |
:param source_s3_bucket_arn: arn of the S3 bucket used in the source step | |
:type source_s3_bucket_arn: str | |
:param source_s3_bucket_name: name of the S3 bucket used in the source step | |
:type source_s3_bucket_name: str | |
:param s3_object_key: name of the S3 file to trigger the pipeline | |
:type s3_object_key: str | |
""" | |
codebuild_role = aws.iam.Role(build_role_name, assume_role_policy="""{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Principal": { | |
"Service": "codebuild.amazonaws.com" | |
}, | |
"Action": "sts:AssumeRole" | |
} | |
] | |
} | |
""") | |
role_policy = aws.iam.RolePolicy(f"{build_role_name}_role_policy", | |
role=codebuild_role.name, | |
policy=f"""{{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{{ | |
"Effect": "Allow", | |
"Resource": [ | |
"*" | |
], | |
"Action": [ | |
"logs:CreateLogGroup", | |
"logs:CreateLogStream", | |
"logs:PutLogEvents" | |
] | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ec2:CreateNetworkInterface", | |
"ec2:DescribeDhcpOptions", | |
"ec2:DescribeNetworkInterfaces", | |
"ec2:DeleteNetworkInterface", | |
"ec2:DescribeSubnets", | |
"ec2:DescribeSecurityGroups", | |
"ec2:DescribeVpcs" | |
], | |
"Resource": "*" | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ec2:CreateNetworkInterfacePermission" | |
], | |
"Resource": [ | |
"arn:aws:ec2:{network_interface_region}:{network_interface_owner_id}:network-interface/*" | |
], | |
"Condition": {{ | |
"StringEquals": {{ | |
"ec2:Subnet": [ | |
"{private_subnet_arn}" | |
], | |
"ec2:AuthorizedService": "codebuild.amazonaws.com" | |
}} | |
}} | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"*" | |
], | |
"Resource": [ | |
"{build_s3_bucket_arn}", | |
"{build_s3_bucket_arn}/*", | |
"{pipeline_s3_bucket_arn}", | |
"{pipeline_s3_bucket_arn}/*", | |
"{source_s3_bucket_arn}", | |
"{source_s3_bucket_arn}/*" | |
] | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ecr:GetRegistryPolicy", | |
"ecr:DescribeRegistry", | |
"ecr:GetAuthorizationToken", | |
"ecr:DeleteRegistryPolicy", | |
"ecr:PutRegistryPolicy", | |
"ecr:PutReplicationConfiguration" | |
], | |
"Resource": [ | |
"*" | |
] | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ecr:*" | |
], | |
"Resource": [ | |
"{ecr_arn}" | |
] | |
}} | |
] | |
}} | |
""") | |
codebuild_project = aws.codebuild.Project(build_project_name, | |
name=build_project_name, | |
description=f"{build_project_name}_codebuild_project", | |
build_timeout=15, | |
queued_timeout=15, | |
service_role=codebuild_role.arn, | |
artifacts=aws.codebuild.ProjectArtifactsArgs( | |
type="CODEPIPELINE", | |
), | |
environment=aws.codebuild.ProjectEnvironmentArgs( | |
compute_type="BUILD_GENERAL1_SMALL", | |
image="aws/codebuild/standard:3.0", | |
type="LINUX_CONTAINER", | |
image_pull_credentials_type="CODEBUILD", | |
privileged_mode=True | |
), | |
logs_config=aws.codebuild.ProjectLogsConfigArgs( | |
cloudwatch_logs=aws.codebuild.ProjectLogsConfigCloudwatchLogsArgs( | |
group_name="log-group", | |
stream_name="log-stream", | |
), | |
s3_logs=aws.codebuild.ProjectLogsConfigS3LogsArgs( | |
status="ENABLED", | |
location=build_s3_bucket_arn, | |
), | |
), | |
source=aws.codebuild.ProjectSourceArgs( | |
type="CODEPIPELINE", | |
), | |
vpc_config=aws.codebuild.ProjectVpcConfigArgs( | |
vpc_id=build_vpc_id, | |
subnets=[ | |
private_subnet_id, | |
], | |
security_group_ids=[ | |
build_security_group_id, | |
], | |
), | |
) | |
codepipeline_role = aws.iam.Role(pipeline_role_name, assume_role_policy="""{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Principal": { | |
"Service": "codepipeline.amazonaws.com" | |
}, | |
"Action": "sts:AssumeRole" | |
} | |
] | |
} | |
""") | |
codepipeline = aws.codepipeline.Pipeline(pipeline_name, | |
name=pipeline_name, | |
role_arn=codepipeline_role.arn, | |
artifact_store=aws.codepipeline.PipelineArtifactStoreArgs( | |
location=pipeline_s3_bucket_name, | |
type="S3", | |
), | |
stages=[ | |
aws.codepipeline.PipelineStageArgs( | |
name="Source", | |
actions=[ | |
aws.codepipeline.PipelineStageActionArgs( | |
name="CodeSource", | |
category="Source", | |
owner="AWS", | |
provider="CodeCommit", | |
version="1", | |
output_artifacts=["SourceArtifact"], | |
namespace='SourceVariables', | |
run_order=1, | |
configuration={ | |
"RepositoryName": codecommit_repository_name, | |
"BranchName": codecommit_branch_name, | |
}, | |
), | |
aws.codepipeline.PipelineStageActionArgs( | |
name="FileSource", | |
category="Source", | |
owner="AWS", | |
provider="S3", | |
version="1", | |
output_artifacts=["file"], | |
run_order=1, | |
configuration={ | |
"S3Bucket": source_s3_bucket_name, | |
"S3ObjectKey": s3_object_key, | |
"PollForSourceChanges": "false", | |
}, | |
), | |
], | |
), | |
aws.codepipeline.PipelineStageArgs( | |
name="Build", | |
actions=[aws.codepipeline.PipelineStageActionArgs( | |
name="Build", | |
category="Build", | |
owner="AWS", | |
provider="CodeBuild", | |
input_artifacts=["SourceArtifact"], | |
output_artifacts=["BuildArtifact"], | |
namespace='BuildVariables', | |
version="1", | |
run_order=1, | |
configuration={ | |
"ProjectName": build_project_name, | |
}, | |
)], | |
), | |
aws.codepipeline.PipelineStageArgs( | |
name="StageDeploy", | |
actions=[aws.codepipeline.PipelineStageActionArgs( | |
name="StageDeploy", | |
category="Deploy", | |
owner="AWS", | |
provider="ECS", | |
input_artifacts=["BuildArtifact"], | |
version="1", | |
configuration={ | |
"ClusterName": stage_ecs_cluster_name, | |
"ServiceName": stage_ecs_service_name | |
}, | |
)], | |
), | |
aws.codepipeline.PipelineStageArgs( | |
name="ManualApproval", | |
actions=[aws.codepipeline.PipelineStageActionArgs( | |
name="ManualApproval", | |
category="Approval", | |
owner="AWS", | |
provider="Manual", | |
version="1", | |
)], | |
), | |
aws.codepipeline.PipelineStageArgs( | |
name="ProdDeploy", | |
actions=[aws.codepipeline.PipelineStageActionArgs( | |
name="ProdDeploy", | |
category="Deploy", | |
owner="AWS", | |
provider="ECS", | |
input_artifacts=["BuildArtifact"], | |
version="1", | |
configuration={ | |
"ClusterName": prod_ecs_cluster_name, | |
"ServiceName": prod_ecs_service_name, | |
}, | |
)], | |
), | |
]) | |
codepipeline_policy = aws.iam.RolePolicy(pipeline_policy_name, | |
role=codepipeline_role.id, | |
policy=f"""{{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{{ | |
"Effect": "Allow", | |
"Action": "*", | |
"Resource": [ | |
"{pipeline_s3_bucket_arn}", | |
"{pipeline_s3_bucket_arn}/*", | |
"{source_s3_bucket_arn}", | |
"{source_s3_bucket_arn}/*" | |
] | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": "*", | |
"Resource": "{codecommit_repo_arn}" | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"codebuild:BatchGetBuilds", | |
"codebuild:StartBuild" | |
], | |
"Resource": "*" | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ecs:*" | |
], | |
"Resource": "*" | |
}}, | |
{{ | |
"Action": [ | |
"iam:PassRole" | |
], | |
"Effect": "Allow", | |
"Resource": "*", | |
"Condition": {{ | |
"StringLike": {{ | |
"iam:PassedToService": [ | |
"ecs-tasks.amazonaws.com" | |
] | |
}} | |
}} | |
}} | |
] | |
}}""" | |
) | |
Likewise, we need to update our buildspec.yml file to pull our model into our Docker image. Our build process will now download the model, called model.pkl, from the S3 bucket. The Docker build step will pick up this new model.
version: 0.2 | |
phases: | |
install: | |
runtime-versions: | |
python: 3.8 | |
docker: 18 | |
commands: | |
- nohup /usr/local/bin/dockerd --host=unix:///var/run/docker.sock --host=tcp://0.0.0.0:2375 --storage-driver=overlay& | |
- timeout 15 sh -c "until docker info; do echo .; sleep 1; done" | |
pre_build: | |
commands: | |
- echo installing libraries and running unit tests... | |
- pip install -r requirements.txt | |
- echo pulling files from s3... | |
- rm model.pkl | |
- aws s3 cp s3://insert-s3-bucket-name/model.zip . | |
- unzip model.zip | |
- python -m pytest tests/tests.py | |
- echo logging into ecr... | |
- $(aws ecr get-login --no-include-email --region us-west-2) | |
- REPOSITORY_URI=INSERT_ECR_URI | |
- COMMIT_HASH=$(echo $CODEBUILD_RESOLVED_SOURCE_VERSION) | |
- IMAGE_TAG=${COMMIT_HASH:=latest} | |
build: | |
commands: | |
- echo building docker image... | |
- docker build -t $REPOSITORY_URI:latest . | |
- docker tag $REPOSITORY_URI:latest $REPOSITORY_URI:$IMAGE_TAG | |
post_build: | |
commands: | |
- echo pushing docker image... | |
- docker push $REPOSITORY_URI:latest | |
- docker push $REPOSITORY_URI:$IMAGE_TAG | |
- printf '[{"name":"taskdef-name","imageUri":"%s"}]' $REPOSITORY_URI:latest > taskdefinition.json | |
artifacts: | |
files: | |
- '**/*' | |
base_directory: base_location | |
name: build-artifacts | |
discard-paths: no |
Finally, we need to update app_settings.py and app.py. In app_settings.py, we simply need to remove MODEL_PATH. In app.py, we can also remove the import of MODEL_PATH. In our initalize_app method, we simply need to load model.pkl. For local testing, we obviously will need a local copy of a model.pkl file. We can manually pull the most recent upload from S3. Don't worry: Our buildspec.yml will remove any model.pkl file pushed into CodeCommit, making sure we use the desired S3 model. Likewise, if we are using S3 to house our models, we might list .pkl files in our .gitignore.
def initialize_app(): | |
""" | |
Initializes our Flask application. | |
- creates a Flask app object | |
- sets AWS keys for uploading payloads to S3 | |
- retrieves and sets the application config | |
- integrates with Sentry for error reporting | |
- sets up a background scheduler to refresh teh config every 3,600 seconds | |
- loads the trained model and sets it as a global object | |
""" | |
app = Flask(__name__) | |
if ENVIRONMENT != 'local': | |
sentry_sdk.init( | |
dsn=SENTRY_DSN, | |
integrations=[FlaskIntegration()], | |
traces_sample_rate=1.0 | |
) | |
config_dict = retrieve_app_config(DB_SCHEMA, make_mysql_connection(DATABASE_SECRET), ENVIRONMENT) | |
for key, value in config_dict.items(): | |
app.config[key] = value | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(func=hit_config_refresh_endpoint, trigger="interval", seconds=3_600) | |
scheduler.start() | |
global model | |
model = joblib.load('model.pkl') | |
return app | |
I've seen some people who are retrain crazy. Let's retrain constantly! This may sound cool, but it's often overkill. In the script below, I load in a churn model and a small test set (4,000 observations). I record baseline evaluation scores on this test set. Subsequently, I change the distribution of 1-2 numeric features at varying levels and record the effect. (Of course, drift can happen in categorical features, too). This simulates drift. The results are captured in the screenshot below.
In our first scenario, we change all profile_scores by 1%. This is actually a fairly big change since we are impacting 100% of observations in some way. We can see that the impact on our predictive power is fairly small. We start getting more noticeable declines at a 5% movement of all profile_scores. When we move to a multivariate approach, our first scenario is changing the profile_score and average_stars by 1% in 10 observations. We see basically no decline in predictive performance in this scenario. We have to decently shift the data to start seeing notable declines. Of note, we see a bit of noise in the log loss for the multivariate scenarios as we are effecting random rows, some of which might not be impacted much by the selected features.
import joblib | |
import pandas as pd | |
import numpy as np | |
from sklearn.metrics import log_loss | |
from random import shuffle | |
from copy import deepcopy | |
from modeling.evaluate import calculate_probability_lift | |
np.random.seed(10) | |
pd.set_option('display.max_rows', None) | |
pd.set_option('display.max_columns', None) | |
pd.set_option('display.width', None) | |
def run_base_case(model, y_test, x_test): | |
preds_df = pd.DataFrame(model.predict_proba(x_test), columns=['0_prob', '1_prob']) | |
ll = log_loss(y_test, preds_df['1_prob']) | |
proba_lift = calculate_probability_lift(y_test, preds_df['1_prob'], '') | |
df = pd.DataFrame({ | |
'scenario': ['base_case'], | |
'log_loss': [ll], | |
'proba_lift': [proba_lift] | |
}) | |
return df | |
def run_univariate_drift_scenario(model, y_test, x_test, column, factor): | |
local_x_test = deepcopy(x_test) | |
local_x_test[column] = local_x_test[column] * factor | |
preds_df = pd.DataFrame(model.predict_proba(local_x_test), columns=['0_prob', '1_prob']) | |
preds_df = preds_df.reset_index(drop=True) | |
ll = log_loss(y_test, preds_df['1_prob']) | |
proba_lift = calculate_probability_lift(y_test, preds_df['1_prob'], '') | |
df = pd.DataFrame({ | |
'scenario': [f'{column}_{factor}_shift'], | |
'log_loss': [ll], | |
'proba_lift': [proba_lift] | |
}) | |
return df | |
def run_multivariate_drift_scenario(model, y_test, x_test, column_list, change_obs, factor, scenario_name): | |
local_x_test = deepcopy(x_test) | |
indices = list(local_x_test.index) | |
shuffle(indices) | |
indices = indices[0:change_obs] | |
for column in column_list: | |
for index in indices: | |
local_x_test[column][index] = local_x_test[column][index] * factor | |
preds_df = pd.DataFrame(model.predict_proba(local_x_test), columns=['0_prob', '1_prob']) | |
preds_df = preds_df.reset_index(drop=True) | |
ll = log_loss(y_test, preds_df['1_prob']) | |
proba_lift = calculate_probability_lift(y_test, preds_df['1_prob'], '') | |
df = pd.DataFrame({ | |
'scenario': [scenario_name], | |
'log_loss': [ll], | |
'proba_lift': [proba_lift] | |
}) | |
return df | |
def main(): | |
x_test = joblib.load('modeling/random_forest_202107252202562870520500/data/x_test.pkl') | |
y_test = joblib.load('modeling/random_forest_202107252202562870520500/data/y_test.pkl') | |
model = joblib.load('modeling/random_forest_202107252202562870520500/model/model.pkl') | |
x_test = x_test.reset_index(drop=True) | |
y_test = y_test.reset_index(drop=True) | |
main_df = pd.DataFrame() | |
main_df = pd.concat([main_df, run_base_case(model, y_test, x_test)], axis=0) | |
main_df = pd.concat([main_df, run_univariate_drift_scenario(model, y_test, x_test, 'profile_score', 1.01)], axis=0) | |
main_df = pd.concat([main_df, run_univariate_drift_scenario(model, y_test, x_test, 'profile_score', 1.03)], axis=0) | |
main_df = pd.concat([main_df, run_univariate_drift_scenario(model, y_test, x_test, 'profile_score', 1.05)], axis=0) | |
main_df = pd.concat([main_df, run_univariate_drift_scenario(model, y_test, x_test, 'profile_score', 1.10)], axis=0) | |
main_df = pd.concat([main_df, run_multivariate_drift_scenario(model, y_test, x_test, | |
['profile_score', 'average_stars'], 10, | |
1.01, 'small_multivariate')], axis=0) | |
main_df = pd.concat([main_df, run_multivariate_drift_scenario(model, y_test, x_test, | |
['profile_score', 'average_stars'], 500, | |
1.05, 'moderate_multivariate')], axis=0) | |
main_df = pd.concat([main_df, run_multivariate_drift_scenario(model, y_test, x_test, | |
['profile_score', 'average_stars'], | |
1_500, 1.10, 'strong_multivariate')], axis=0) | |
main_df = pd.concat([main_df, run_multivariate_drift_scenario(model, y_test, x_test, | |
['profile_score', 'average_stars'], | |
4_000, 2.0, 'very_strong_multivariate')], axis=0) | |
return main_df | |
if __name__ == "__main__": | |
comparison_df = main() | |
print(comparison_df) |
What does all this mean? The message is mixed. We clearly don't need to retrain every time we get a new observation. However, we can experience some degradation in our model even with small drift. Even though this won't tank our model, we would like to avoid such a case. Therefore, we should try to retrain proactively or at early signs of drift if possible. That said, even with some drift, our model can still work.
We have to understand our environment. How likely is drift to occur? How much drift is likely to occur? If our environment is static, we need to worry less about retraining frequently. If our data is not changing, both in terms of data and concept drift, our model will work well. Retraining becomes a proactive step. This point is important because retraining frequently may be costly in certain cases, such as when assembling the requisite data is difficult.