Innehållsförteckning

Modul 3: Automatiserade ML-pipelines

Översikt

I denna modul utforskar vi konceptet med automatiserade ML-pipelines – ryggraden i ett moget MLOps-system. Vi kommer att lära oss hur man designar, implementerar och underhåller skalbar datainsamling, databearbetning, modellträning och evalueringspipelines. Modulen fokuserar på att skapa reproducerbara komponenter som kan orkestreras för att möjliggöra både kontinuerlig träning och robusta driftsättningar.

Lärandemål

Efter denna modul kommer du att:

3.1 Grundläggande ML-pipeline-arkitektur

Definition av en ML-pipeline

En ML-pipeline är en sekvens av automatiserade steg som transformerar rådata till en tränad och utvärderad modell, klar för driftsättning. Pipelines möjliggör konsekvent, reproducerbar modellträning och underhåll.

Typiska komponenter i en ML-pipeline

Designprinciper för ML-pipelines

3.2 Designa och implementera databearbetningspipelines

Komponenter i databearbetningspipelines

Implementering med TensorFlow Extended (TFX)

# Exempel på en TFX-pipeline för databearbetning
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils
 
# Definiera schema
schema = schema_utils.schema_from_feature_spec({
    'numeric_feature': tf.io.FixedLenFeature([], tf.float32),
    'categorical_feature': tf.io.FixedLenFeature([], tf.string),
    'label': tf.io.FixedLenFeature([], tf.int64)
})
 
# Definiera preprocessing_fn
def preprocessing_fn(inputs):
    outputs = {}
 
    # Normalisera numeriska features
    outputs['numeric_feature'] = tft.scale_to_z_score(inputs['numeric_feature'])
 
    # Konvertera kategoriska features till one-hot encoding
    outputs['categorical_feature'] = tft.compute_and_apply_vocabulary(
        inputs['categorical_feature'], vocab_filename='categorical_vocab')
 
    # Kopiera label
    outputs['label'] = inputs['label']
 
    return outputs

Implementering med Scikit-learn pipelines

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
 
# Definiera preprocessor
numeric_features = ['age', 'income', 'years_experience']
categorical_features = ['education', 'occupation', 'marital_status']
 
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])
 
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])
 
# Skapa fullständig pipeline
from sklearn.ensemble import RandomForestClassifier
 
full_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier())
])
 
# Träna pipeline
full_pipeline.fit(X_train, y_train)

Datavalidering med Great Expectations eller TensorFlow Data Validation

# Exempel med Great Expectations
import great_expectations as ge
 
# Ladda data som DataFrame
df = ge.read_csv("training_data.csv")
 
# Definiera förväntningar
df.expect_column_values_to_not_be_null("customer_id")
df.expect_column_values_to_be_between("age", min_value=18, max_value=120)
df.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"])
 
# Validera data
validation_result = df.validate()

3.3 Designa och implementera träningspipelines

Komponenter i träningspipelines

Implementering med MLflow

import mlflow
import mlflow.sklearn
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
 
# Konfigurera MLflow
mlflow.set_experiment("customer_churn_prediction")
 
# Definiera parameter grid
param_grid = {
    'n_estimators': [100, 200, 300],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10]
}
 
# Skapa grid search
grid_search = GridSearchCV(
    RandomForestClassifier(),
    param_grid,
    cv=5,
    scoring='f1',
    return_train_score=True
)
 
# Träna med MLflow tracking
with mlflow.start_run():
    # Logga parametrar
    mlflow.log_param("model_type", "RandomForestClassifier")
    mlflow.log_param("cv_folds", 5)
 
    # Träna modell
    grid_search.fit(X_train, y_train)
 
    # Logga bästa parametrar
    for param, value in grid_search.best_params_.items():
        mlflow.log_param(f"best_{param}", value)
 
    # Logga metrics
    mlflow.log_metric("best_f1", grid_search.best_score_)
    mlflow.log_metric("test_f1", f1_score(y_test, grid_search.predict(X_test)))
 
    # Logga bästa modell
    mlflow.sklearn.log_model(grid_search.best_estimator_, "model")

Distribuerad träning

# Exempel med TensorFlow distribuerad träning
import tensorflow as tf
 
# Skapa distribution strategi
strategy = tf.distribute.MirroredStrategy()
 
# Bygg modell inom strategins scope
with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
 
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
 
# Träna distribuerad modell
model.fit(
    train_dataset,
    epochs=10,
    validation_data=val_dataset,
    callbacks=[
        tf.keras.callbacks.ModelCheckpoint(filepath='checkpoints/model-{epoch:02d}'),
        tf.keras.callbacks.TensorBoard(log_dir='logs')
    ]
)

3.4 Designa och implementera evalueringspipelines

Komponenter i evalueringspipelines

Implementering av en modellutvärderingspipeline

# Exempel på modellutvärdering med scikit-learn och MLflow
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import mlflow
 
def evaluate_model(model_uri, X_test, y_test, thresholds):
    """Utvärdera en modell mot definierade tröskelvärden"""
    # Ladda modell från MLflow
    model = mlflow.sklearn.load_model(model_uri)
 
    # Gör prediktioner
    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]
 
    # Beräkna mätvärden
    metrics = {
        'accuracy': accuracy_score(y_test, y_pred),
        'precision': precision_score(y_test, y_pred),
        'recall': recall_score(y_test, y_pred),
        'f1': f1_score(y_test, y_pred),
        'roc_auc': roc_auc_score(y_test, y_pred_proba)
    }
 
    # Kontrollera mot tröskelvärden
    passed_validation = True
    for metric_name, threshold in thresholds.items():
        if metrics[metric_name] < threshold:
            passed_validation = False
            print(f"Validation failed: {metric_name} = {metrics[metric_name]} < {threshold}")
 
    return passed_validation, metrics

Avancerad modellutvärdering

3.5 Orkestrering av ML-pipelines

Orkestreringsverktyg

Implementering med Apache Airflow

# Exempel på Airflow DAG för en ML-pipeline
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
 
default_args = {
    'owner': 'mlops',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
 
dag = DAG(
    'ml_training_pipeline',
    default_args=default_args,
    description='End-to-end ML training pipeline',
    schedule_interval=timedelta(days=1),
)
 
# Definiera task-funktioner
def fetch_data(**kwargs):
    # Kod för att hämta data
    return "data_path"
 
def validate_data(**kwargs):
    data_path = kwargs['ti'].xcom_pull(task_ids='fetch_data')
    # Kod för att validera data
    return "validated_data_path"
 
def preprocess_data(**kwargs):
    data_path = kwargs['ti'].xcom_pull(task_ids='validate_data')
    # Kod för dataförbehandling
    return "preprocessed_data_path"
 
def train_model(**kwargs):
    data_path = kwargs['ti'].xcom_pull(task_ids='preprocess_data')
    # Kod för modellträning
    return "model_path"
 
def evaluate_model(**kwargs):
    model_path = kwargs['ti'].xcom_pull(task_ids='train_model')
    # Kod för modellutvärdering
    return "evaluation_results"
 
def register_model(**kwargs):
    model_path = kwargs['ti'].xcom_pull(task_ids='train_model')
    eval_results = kwargs['ti'].xcom_pull(task_ids='evaluate_model')
    # Kod för modellregistrering
    return "registered_model_uri"
 
# Skapa tasks
task_fetch_data = PythonOperator(
    task_id='fetch_data',
    python_callable=fetch_data,
    dag=dag,
)
 
task_validate_data = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    dag=dag,
)
 
task_preprocess_data = PythonOperator(
    task_id='preprocess_data',
    python_callable=preprocess_data,
    dag=dag,
)
 
task_train_model = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag,
)
 
task_evaluate_model = PythonOperator(
    task_id='evaluate_model',
    python_callable=evaluate_model,
    dag=dag,
)
 
task_register_model = PythonOperator(
    task_id='register_model',
    python_callable=register_model,
    dag=dag,
)
 
# Definiera beroenden
task_fetch_data >> task_validate_data >> task_preprocess_data >> task_train_model >> task_evaluate_model >> task_register_model

Implementering med Kubeflow Pipelines

import kfp
from kfp import dsl
from kfp.components import func_to_container_op
 
# Definiera komponenter
@func_to_container_op
def fetch_data() -> str:
    # Kod för att hämta data
    return "data_path"
 
@func_to_container_op
def validate_data(data_path: str) -> str:
    # Kod för att validera data
    return "validated_data_path"
 
@func_to_container_op
def preprocess_data(data_path: str) -> str:
    # Kod för dataförbehandling
    return "preprocessed_data_path"
 
@func_to_container_op
def train_model(data_path: str) -> str:
    # Kod för modellträning
    return "model_path"
 
@func_to_container_op
def evaluate_model(model_path: str) -> str:
    # Kod för modellutvärdering
    return "evaluation_results"
 
@func_to_container_op
def register_model(model_path: str, eval_results: str) -> str:
    # Kod för modellregistrering
    return "registered_model_uri"
 
# Definiera pipeline
@dsl.pipeline(
    name='ML Training Pipeline',
    description='End-to-end ML training pipeline'
)
def ml_pipeline():
    fetch_task = fetch_data()
    validate_task = validate_data(fetch_task.output)
    preprocess_task = preprocess_data(validate_task.output)
    train_task = train_model(preprocess_task.output)
    evaluate_task = evaluate_model(train_task.output)
    register_task = register_model(train_task.output, evaluate_task.output)
 
# Kompilera och kör pipeline
kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')

3.6 Testning och kvalitetssäkring av pipelines

Testning av pipeline-komponenter

Implementering av pipeline-tester

import unittest
from unittest.mock import patch
import numpy as np
import pandas as pd
 
# Exempel på enhetstest för en preprocessor-komponent
class TestPreprocessor(unittest.TestCase):
    def setUp(self):
        # Skapa test data
        self.test_data = pd.DataFrame({
            'numeric_feature': [1.0, 2.0, None, 4.0],
            'categorical_feature': ['A', 'B', 'A', None]
        })
 
    def test_imputation(self):
        from my_pipeline.preprocessors import impute_missing_values
 
        # Anropa funktionen
        processed_data = impute_missing_values(self.test_data)
 
        # Kontrollera resultat
        self.assertFalse(processed_data.isnull().any().any())
        self.assertEqual(processed_data.loc[2, 'numeric_feature'], 2.0)  # Median
        self.assertEqual(processed_data.loc[3, 'categorical_feature'], 'unknown')
 
    def test_scaling(self):
        from my_pipeline.preprocessors import scale_numeric_features
 
        # Skapa test data utan saknade värden
        test_data = pd.DataFrame({
            'numeric_feature': [1.0, 2.0, 3.0, 4.0]
        })
 
        # Anropa funktionen
        processed_data = scale_numeric_features(test_data)
 
        # Kontrollera resultat
        self.assertAlmostEqual(processed_data['numeric_feature'].mean(), 0)
        self.assertAlmostEqual(processed_data['numeric_feature'].std(), 1)

CI/CD för ML-pipelines

Praktiska övningar

1. Implementera databearbetningspipeline: Skapa en databearbetningspipeline med scikit-learn och DVC 2. Skapa träningspipeline: Implementera en modellträningspipeline med MLflow 3. Orkestrering med Airflow: Konfigurera en end-to-end ML-pipeline med Airflow 4. Pipeline-testning: Skriv enhetstester för pipeline-komponenter

Verktygsintroduktion

Läsresurser

Nyckelinsikter

Nästa steg

I nästa modul kommer vi att utforska kontinuerlig integrering och driftsättning (CI/CD) för ML, där vi kommer att lära oss hur man automatiserar processen från kodändring till produktion.