MLOps

[MLOps] 19. Pyfunc 사용하기 (feat.커스텀 모델 만들기)

mlslly 2024. 4. 30. 18:08

* 프로그래머스의 마키나락스 MLOPS 강의를 참고하여 작성함

 

이전 포스팅에서 MLflow에 모델을 저장하고 다시 불러와 사용하는 법에 대해서 알아보았다.

모델을 로드시 Sklearn, 혹은 Pyfunc 모듈을 활용할 수 있었는데, 

Sklearn 모듈은 주로 Sklearn 전용 모델을 저장 및 활용할 수 있었고, 그 외의 작업들, 모든 다른 유형의 모델 저장에 대해서는 Pyfunc를 활용할 수 있다는 점을 배웠다. 이전 포스팅은 아래 참고.

https://ysryuu.tistory.com/38

 

[MLOps] 18. MLflow에서 모델 불러오기 실습

* 프로그래머스의 마키나락스 MLOPS 강의를 참고하여 작성함 이제 저장한 모델을 불러와서 불러온 데이터로 predict 해보도..

ysryuu.tistory.com

 

Native Framework vs Custom Model

Sklearn은 머신러닝 프레임워크 중 하나이며,

이 경우와 같이 MLflow에서 지원하는 네이티브 프레임워크들이 있다. (네이티브 지원) 

MLflow에서 지원하는 frameworks 들은 이 링크에서 확인 가능하다. (아래 사진은 전체 리스트 아님) 

https://mlflow.org/docs/latest/models.html#built-in-model-flavors

 

MLflow Models — MLflow 2.12.1 documentation

The pmdarima model flavor enables logging of pmdarima models in MLflow format via the mlflow.pmdarima.save_model() and mlflow.pmdarima.log_model() methods. These methods also add the python_function flavor to the MLflow Models that they produce, allowing t

mlflow.org

 

그러나 네이티브가 지원되는 프레임워크가 아닌 다른 프레임워크를 사용하고 싶을 때,

혹은 모델만을 저장하는 것이 아니라 여러 전후처리 단계를 포함하여 모델과 함께 저장하고 싶은 경우 등, 모델을 커스터마이징하여 저장하고자 하는 경우가 있을 것이다.

ex.  preprocess.dill, model.dill, postprocess.dill 를 하나의 모델로 저장

 


커스텀 모델 코드 작성 

예시 코드 )

아래와 같은 모델이 있다고 하자. 이 경우에 predict를 정의한 부분에서, 

최종 pred 값을 0,1,2라는 예측값이 아닌 문자열 항목들로 대치하는 후처리가 추가되었다. 

이런 경우에 기존 sklearn등의 native 프레임워크로는 해당 모델이 저장되지 않는다.

class MyModel: 
    def __init__(self, model) -> None:
        self.model = model

    def predict(self, X):   
        X_pred = self.model.predict(X)
        x_pred_df = pd.Series(X_pred).map(
            {0:'virginica',1:'setosa', 2:'versicolor'}
        )
        return x_pred_df

 

1. 모델 클래스 코드 

그렇다면 이제 본격적으로 모델 코드를 작성해보자

import dill 
import textwrap
import mlflow

class MyModel: 
    def __init__(self, clf) -> None:
        self.clf = clf

    def predict(self, X):   
        X_pred = self.clf.predict(X)
        x_pred_df = pd.Series(X_pred).map(
            {0:'virginica',1:'setosa', 2:'versicolor'}
        )
        return x_pred_df

# my custom model    
my_model = MyModel(clf)

 

2. 모델 저장 및 불러오기 코드

# save model
with open('model.dill','wb') as f : 
    dill.dump(my_model,f)

# load model
with open('loader.py','w') as f : 
    f.write(
        textwrap.dedent(
            '''
                import dill
                import os

                def _load_pyfunc(path):
                    if os.path.isdir(path):
                        path= os.path.join(path, 'model.dill')
                    
                    with open(path,'rb') as f : 
                        return dill.load(f)
            '''
        )
    )

 

3. 모델 정보 

# 모델 정보 
mlflow.pyfunc.log_model(
    artifact_path = 'my_model', 
    data_path = 'model.dill',
    loader_module = 'loader',
    code_path = ['loader.py']
)

 

 


 

최종 실행 파일 

<custom_train.py>

import os
import uuid
import textwrap

import optuna
import mlflow
import dill
import pandas as pd
from minio import Minio
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

UNIQUE_PREFIX = str(uuid.uuid4())[:8]
BUCKET_NAME = "raw-data"
OBJECT_NAME = "iris"

os.environ["MLFLOW_S3_ENDPOINT_URL"] = "http://0.0.0.0:9000"
os.environ["MLFLOW_TRACKING_URI"] = "http://0.0.0.0:5001"
os.environ["AWS_ACCESS_KEY_ID"] = "minio"
os.environ["AWS_SECRET_ACCESS_KEY"] = "miniostorage"


class MyModel:
    def __init__(self, clf):
        self.clf = clf

    def predict(self, X):
        X_pred = self.clf.predict(X)
        X_pred_df = pd.Series(X_pred).map({0: "virginica", 1: "setosa", 2: "versicolor"})
        return X_pred_df


def download_data():
    #
    # minio client
    #
    url = "0.0.0.0:9000"
    access_key = "minio"
    secret_key = "miniostorage"
    client = Minio(url, access_key=access_key, secret_key=secret_key, secure=False)

    #
    # data download
    #
    object_stat = client.stat_object(BUCKET_NAME, OBJECT_NAME)
    data_version_id = object_stat.version_id
    client.fget_object(BUCKET_NAME, OBJECT_NAME, file_path="download_data.csv")
    return data_version_id


def load_data():
    data_version_id = download_data()
    df = pd.read_csv("download_data.csv")
    X, y = df.drop(columns=["target"]), df["target"]
    data_dict = {"data": X, "target": y, "version_id": data_version_id}
    return data_dict


def objective(trial):
    #
    # suggest new parameter
    #
    trial.suggest_int("n_estimators", 100, 1000, step=100)
    trial.suggest_int("max_depth", 3, 10)

    run_name = f"{UNIQUE_PREFIX}-{trial.number}"
    with mlflow.start_run(run_name=run_name):
        #
        # log parameter
        #
        mlflow.log_params(trial.params)

        #
        # load data
        #
        data_dict = load_data()
        mlflow.log_param("bucket_name", BUCKET_NAME)
        mlflow.log_param("object_name", OBJECT_NAME)
        mlflow.log_param("version_id", data_dict["version_id"])
        X, y = data_dict["data"], data_dict["target"]
        X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.3, random_state=2024)

        #
        # train model
        #
        clf = RandomForestClassifier(
            n_estimators=trial.params["n_estimators"], max_depth=trial.params["max_depth"], random_state=2024
        )
        clf.fit(X_train, y_train)

        #
        # evaluate train model
        #
        y_pred = clf.predict(X_valid)
        acc_score = accuracy_score(y_valid, y_pred)

        #
        # log metrics
        #
        mlflow.log_metric("accuracy", acc_score)
    return acc_score


def train_best_model(params):
    run_name = f"{UNIQUE_PREFIX}-best-model"
    with mlflow.start_run(run_name=run_name):
        #
        # log parameter
        #
        mlflow.log_params(params)

        #
        # load data
        #
        data_dict = load_data()
        mlflow.log_param("bucket_name", BUCKET_NAME)
        mlflow.log_param("object_name", OBJECT_NAME)
        mlflow.log_param("version_id", data_dict["version_id"])
        X, y = data_dict["data"], data_dict["target"]
        #
        # train model
        #
        clf = RandomForestClassifier(
            n_estimators=params["n_estimators"], max_depth=params["max_depth"], random_state=2024
        )
        clf.fit(X, y)
        #
        # my custom model
        #
        my_model = MyModel(clf)
        #
        # save model
        #
        with open("model.dill", "wb") as f:
            dill.dump(my_model, f)

        with open("loader.py", "w") as f:
            f.write(
                textwrap.dedent(
                    """
import os
import dill

def _load_pyfunc(path):
    if os.path.isdir(path):
        path = os.path.join(path, "model.dill")
    
    with open(path, "rb") as f:
        return dill.load(f)
"""
                )
            )

        mlflow.pyfunc.log_model(
            artifact_path="my_model",
            data_path="model.dill",
            loader_module="loader",
            code_path=["loader.py"],
        )
        return clf


if __name__ == "__main__":
    #
    # set mlflow
    #
    study_name = "hpo-tutorial"
    mlflow.set_experiment(study_name)

    # study
    sampler = optuna.samplers.RandomSampler(seed=2024)
    study = optuna.create_study(sampler=sampler, study_name=study_name, direction="maximize")

    # optimize
    study.optimize(objective, n_trials=5)

    best_params = study.best_params
    best_clf = train_best_model(best_params)

 


pyfunc 모델 저장하기 

1. 필요 패키지 설치

$ pip3 install dill

 

2. 파일 실행

$ python3 custom_train.py

 

3. 모델 저장 확인 

 

MLflow 사이트 (0.0.0.0:5001)에 가보면, hpo-tutorial 실험에 5개의 모델과,

best model이 pyfunc 로 저장되어있는 것을 확인할 수 있음. 

 

pyfunc 모델 불러와 추론하기 

이전 포스팅과 마찬가지로, pyfunc모델의 run_id를 확인 후 model_load.py 파일을 실행한다. 

(이때 sklearn과 관련한 코드들은 삭제 혹은 주석처리) 

$ python3 load_model.py --run-id 85abf0aca7454e4e84b0c9f48e23827e

 

 

결과를 아래와 같이 확인 가능하며, custom model로 추론한 값들이 문자열로 잘 변환된 것을 확인할 수 있다.!