Modul 3: Automatiserade ML-pipelines

I denna modul utforskar vi konceptet med automatiserade ML-pipelines – ryggraden i ett moget MLOps-system. Vi kommer att lära oss hur man designar, implementerar och underhåller skalbar datainsamling, databearbetning, modellträning och evalueringspipelines. Modulen fokuserar på att skapa reproducerbara komponenter som kan orkestreras för att möjliggöra både kontinuerlig träning och robusta driftsättningar.

Efter denna modul kommer du att:

  • Förstå komponenterna i en end-to-end ML-pipeline
  • Kunna designa modulära och återanvändbara pipeline-komponenter
  • Implementera databearbetning, träning och evalueringspipelines
  • Välja lämpliga orkestreringsverktyg för olika typer av ML-pipelines
  • Implementera pipeline-testning och kvalitetskontroll
  • Förstå hur man hanterar misslyckade pipeline-körningar

En ML-pipeline är en sekvens av automatiserade steg som transformerar rådata till en tränad och utvärderad modell, klar för driftsättning. Pipelines möjliggör konsekvent, reproducerbar modellträning och underhåll.

  • Datainsamling: Hämtning av data från olika källor
  • Datavalidering: Kontroll av datakvalitet och schema
  • Dataförbehandling: Rengöring, normalisering och transformation
  • Feature engineering: Skapande av meningsfulla features från rådata
  • Datauppdelning: Uppdelning av data i tränings-, validerings- och testuppsättningar
  • Modellträning: Träning av modeller med olika algoritmer och hyperparametrar
  • Modellutvärdering: Utvärdering av modellprestanda mot definierade mätvärden
  • Modellvalidering: Säkerställande att modellen uppfyller alla krav före driftsättning
  • Modellregistrering: Registrering av godkända modeller i modellregistret
  • Modellpaketisering: Förberedelse av modellen för driftsättning
  • Idempotens: Samma input ger alltid samma output
  • Modularitet: Separata, självständiga komponenter
  • Parameterisering: Konfigurerbara komponenter utan kodändringar
  • Statelessness: Komponenter förlitar sig inte på bevarad intern status
  • Observerbarhet: Möjlighet att övervaka och logga pipeline-prestanda
  • Felhantering: Robust hantering av fel och undantag
  • Skalbarhet: Förmåga att hantera växande datamängder och komplexitet
  • Datainsamling: Batch vs. streaming, datakällor
  • Datakvalitetsvalidering: Schemavalidering, integritetskontroller
  • Dataförbehandling: Hantering av saknade värden, uteliggare (outliers)
  • Feature engineering: Transformation, kodning, normalisering
  • Feature-lagring: Feature stores, caching-strategier
# Exempel på en TFX-pipeline för databearbetning
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils
 
# Definiera schema
schema = schema_utils.schema_from_feature_spec({
    'numeric_feature': tf.io.FixedLenFeature([], tf.float32),
    'categorical_feature': tf.io.FixedLenFeature([], tf.string),
    'label': tf.io.FixedLenFeature([], tf.int64)
})
 
# Definiera preprocessing_fn
def preprocessing_fn(inputs):
    outputs = {}
 
    # Normalisera numeriska features
    outputs['numeric_feature'] = tft.scale_to_z_score(inputs['numeric_feature'])
 
    # Konvertera kategoriska features till one-hot encoding
    outputs['categorical_feature'] = tft.compute_and_apply_vocabulary(
        inputs['categorical_feature'], vocab_filename='categorical_vocab')
 
    # Kopiera label
    outputs['label'] = inputs['label']
 
    return outputs
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
 
# Definiera preprocessor
numeric_features = ['age', 'income', 'years_experience']
categorical_features = ['education', 'occupation', 'marital_status']
 
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])
 
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])
 
# Skapa fullständig pipeline
from sklearn.ensemble import RandomForestClassifier
 
full_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier())
])
 
# Träna pipeline
full_pipeline.fit(X_train, y_train)
# Exempel med Great Expectations
import great_expectations as ge
 
# Ladda data som DataFrame
df = ge.read_csv("training_data.csv")
 
# Definiera förväntningar
df.expect_column_values_to_not_be_null("customer_id")
df.expect_column_values_to_be_between("age", min_value=18, max_value=120)
df.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"])
 
# Validera data
validation_result = df.validate()
  • Hyperparameter-tuning: Grid search, random search, bayesiansk optimering
  • Modellträning: Träning av olika algoritmer och arkitekturer
  • Cross-validation: K-fold validering för robustare resultat
  • Checkpointing: Regelbunden sparning av modellstatus under träning
  • Experiment tracking: Spårning av träningsparametrar och resultat
import mlflow
import mlflow.sklearn
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
 
# Konfigurera MLflow
mlflow.set_experiment("customer_churn_prediction")
 
# Definiera parameter grid
param_grid = {
    'n_estimators': [100, 200, 300],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10]
}
 
# Skapa grid search
grid_search = GridSearchCV(
    RandomForestClassifier(),
    param_grid,
    cv=5,
    scoring='f1',
    return_train_score=True
)
 
# Träna med MLflow tracking
with mlflow.start_run():
    # Logga parametrar
    mlflow.log_param("model_type", "RandomForestClassifier")
    mlflow.log_param("cv_folds", 5)
 
    # Träna modell
    grid_search.fit(X_train, y_train)
 
    # Logga bästa parametrar
    for param, value in grid_search.best_params_.items():
        mlflow.log_param(f"best_{param}", value)
 
    # Logga metrics
    mlflow.log_metric("best_f1", grid_search.best_score_)
    mlflow.log_metric("test_f1", f1_score(y_test, grid_search.predict(X_test)))
 
    # Logga bästa modell
    mlflow.sklearn.log_model(grid_search.best_estimator_, "model")
# Exempel med TensorFlow distribuerad träning
import tensorflow as tf
 
# Skapa distribution strategi
strategy = tf.distribute.MirroredStrategy()
 
# Bygg modell inom strategins scope
with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
 
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
 
# Träna distribuerad modell
model.fit(
    train_dataset,
    epochs=10,
    validation_data=val_dataset,
    callbacks=[
        tf.keras.callbacks.ModelCheckpoint(filepath='checkpoints/model-{epoch:02d}'),
        tf.keras.callbacks.TensorBoard(log_dir='logs')
    ]
)
  • Modellvalidering: Kontroll av modellprestanda
  • Prestandamätvärden: Beräkning och utvärdering av olika mätvärden
  • Tröskelvärden: Kontroll mot fördefinierade tröskelvärden för godkännande
  • Jämförelse med baseline: Jämförelse med en referensmodell
  • Bias och rättvisa: Utvärdering av modellrättvisa mellan grupper
# Exempel på modellutvärdering med scikit-learn och MLflow
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import mlflow
 
def evaluate_model(model_uri, X_test, y_test, thresholds):
    """Utvärdera en modell mot definierade tröskelvärden"""
    # Ladda modell från MLflow
    model = mlflow.sklearn.load_model(model_uri)
 
    # Gör prediktioner
    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]
 
    # Beräkna mätvärden
    metrics = {
        'accuracy': accuracy_score(y_test, y_pred),
        'precision': precision_score(y_test, y_pred),
        'recall': recall_score(y_test, y_pred),
        'f1': f1_score(y_test, y_pred),
        'roc_auc': roc_auc_score(y_test, y_pred_proba)
    }
 
    # Kontrollera mot tröskelvärden
    passed_validation = True
    for metric_name, threshold in thresholds.items():
        if metrics[metric_name] < threshold:
            passed_validation = False
            print(f"Validation failed: {metric_name} = {metrics[metric_name]} < {threshold}")
 
    return passed_validation, metrics
  • A/B-testning: Jämförelse av modeller i produktion
  • Feature importance: Analys av feature-betydelse
  • Partial Dependence Plots: Visualisering av modellbeteende
  • SHAP-värden: Modellförklaring på instansnivå
  • Apache Airflow: Arbetsflödesorkestrering baserad på DAGs
  • Kubeflow Pipelines: Kubernetes-baserad orkestrering för ML
  • Argo Workflows: Kubernetes-native arbetsflödeshantering
  • Prefect: Modern arbetsflödesorkestrering
  • Dagster: Dataorienterad orkestrering
# Exempel på Airflow DAG för en ML-pipeline
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
 
default_args = {
    'owner': 'mlops',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
 
dag = DAG(
    'ml_training_pipeline',
    default_args=default_args,
    description='End-to-end ML training pipeline',
    schedule_interval=timedelta(days=1),
)
 
# Definiera task-funktioner
def fetch_data(**kwargs):
    # Kod för att hämta data
    return "data_path"
 
def validate_data(**kwargs):
    data_path = kwargs['ti'].xcom_pull(task_ids='fetch_data')
    # Kod för att validera data
    return "validated_data_path"
 
def preprocess_data(**kwargs):
    data_path = kwargs['ti'].xcom_pull(task_ids='validate_data')
    # Kod för dataförbehandling
    return "preprocessed_data_path"
 
def train_model(**kwargs):
    data_path = kwargs['ti'].xcom_pull(task_ids='preprocess_data')
    # Kod för modellträning
    return "model_path"
 
def evaluate_model(**kwargs):
    model_path = kwargs['ti'].xcom_pull(task_ids='train_model')
    # Kod för modellutvärdering
    return "evaluation_results"
 
def register_model(**kwargs):
    model_path = kwargs['ti'].xcom_pull(task_ids='train_model')
    eval_results = kwargs['ti'].xcom_pull(task_ids='evaluate_model')
    # Kod för modellregistrering
    return "registered_model_uri"
 
# Skapa tasks
task_fetch_data = PythonOperator(
    task_id='fetch_data',
    python_callable=fetch_data,
    dag=dag,
)
 
task_validate_data = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    dag=dag,
)
 
task_preprocess_data = PythonOperator(
    task_id='preprocess_data',
    python_callable=preprocess_data,
    dag=dag,
)
 
task_train_model = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag,
)
 
task_evaluate_model = PythonOperator(
    task_id='evaluate_model',
    python_callable=evaluate_model,
    dag=dag,
)
 
task_register_model = PythonOperator(
    task_id='register_model',
    python_callable=register_model,
    dag=dag,
)
 
# Definiera beroenden
task_fetch_data >> task_validate_data >> task_preprocess_data >> task_train_model >> task_evaluate_model >> task_register_model
import kfp
from kfp import dsl
from kfp.components import func_to_container_op
 
# Definiera komponenter
@func_to_container_op
def fetch_data() -> str:
    # Kod för att hämta data
    return "data_path"
 
@func_to_container_op
def validate_data(data_path: str) -> str:
    # Kod för att validera data
    return "validated_data_path"
 
@func_to_container_op
def preprocess_data(data_path: str) -> str:
    # Kod för dataförbehandling
    return "preprocessed_data_path"
 
@func_to_container_op
def train_model(data_path: str) -> str:
    # Kod för modellträning
    return "model_path"
 
@func_to_container_op
def evaluate_model(model_path: str) -> str:
    # Kod för modellutvärdering
    return "evaluation_results"
 
@func_to_container_op
def register_model(model_path: str, eval_results: str) -> str:
    # Kod för modellregistrering
    return "registered_model_uri"
 
# Definiera pipeline
@dsl.pipeline(
    name='ML Training Pipeline',
    description='End-to-end ML training pipeline'
)
def ml_pipeline():
    fetch_task = fetch_data()
    validate_task = validate_data(fetch_task.output)
    preprocess_task = preprocess_data(validate_task.output)
    train_task = train_model(preprocess_task.output)
    evaluate_task = evaluate_model(train_task.output)
    register_task = register_model(train_task.output, evaluate_task.output)
 
# Kompilera och kör pipeline
kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
  • Enhetstester: Testa individuella funktioner
  • Integrationstester: Testa interaktion mellan komponenter
  • End-to-end-tester: Testa hela pipelinen
import unittest
from unittest.mock import patch
import numpy as np
import pandas as pd
 
# Exempel på enhetstest för en preprocessor-komponent
class TestPreprocessor(unittest.TestCase):
    def setUp(self):
        # Skapa test data
        self.test_data = pd.DataFrame({
            'numeric_feature': [1.0, 2.0, None, 4.0],
            'categorical_feature': ['A', 'B', 'A', None]
        })
 
    def test_imputation(self):
        from my_pipeline.preprocessors import impute_missing_values
 
        # Anropa funktionen
        processed_data = impute_missing_values(self.test_data)
 
        # Kontrollera resultat
        self.assertFalse(processed_data.isnull().any().any())
        self.assertEqual(processed_data.loc[2, 'numeric_feature'], 2.0)  # Median
        self.assertEqual(processed_data.loc[3, 'categorical_feature'], 'unknown')
 
    def test_scaling(self):
        from my_pipeline.preprocessors import scale_numeric_features
 
        # Skapa test data utan saknade värden
        test_data = pd.DataFrame({
            'numeric_feature': [1.0, 2.0, 3.0, 4.0]
        })
 
        # Anropa funktionen
        processed_data = scale_numeric_features(test_data)
 
        # Kontrollera resultat
        self.assertAlmostEqual(processed_data['numeric_feature'].mean(), 0)
        self.assertAlmostEqual(processed_data['numeric_feature'].std(), 1)
  • Implementering av Continuous Integration för pipeline-kod
  • Automatisk testning vid varje kodändring
  • Nightly runs av full pipeline på samplade data

1. Implementera databearbetningspipeline: Skapa en databearbetningspipeline med scikit-learn och DVC 2. Skapa träningspipeline: Implementera en modellträningspipeline med MLflow 3. Orkestrering med Airflow: Konfigurera en end-to-end ML-pipeline med Airflow 4. Pipeline-testning: Skriv enhetstester för pipeline-komponenter

  • Apache Airflow: Installation och grundläggande användning
  • Kubeflow Pipelines: Konfigurera Kubeflow på Kubernetes
  • MLflow: Experimentering och modellspårning
  • TensorFlow Extended: Komponenter för skalbar ML
  • ”Building Machine Learning Pipelines” - H. Hapke & C. Nelson
  • ”Data Science on AWS” - C. Fregly & A. Barth
  • ”TensorFlow Extended (TFX) for Machine Learning” - TensorFlow dokumentation
  • ”Building Reproducible ML Pipelines” - Google Cloud dokumentation
  • Automatiserade ML-pipelines är grunden för reproducerbar modellproduktion
  • Modularitet och parameterisering möjliggör flexibla och underhållbara pipelines
  • Orkestrering är avgörande för att hantera komplexiteten i ML-arbetsflöden
  • Testning av pipeline-komponenter är lika viktig som testning av applikationskod
  • Val av orkestreringsverktyg beror på organisationens infrastruktur och behov

I nästa modul kommer vi att utforska kontinuerlig integrering och driftsättning (CI/CD) för ML, där vi kommer att lära oss hur man automatiserar processen från kodändring till produktion.