Modul 3: Automatiserade ML-pipelines
Översikt
I denna modul utforskar vi konceptet med automatiserade ML-pipelines – ryggraden i ett moget MLOps-system. Vi kommer att lära oss hur man designar, implementerar och underhåller skalbar datainsamling, databearbetning, modellträning och evalueringspipelines. Modulen fokuserar på att skapa reproducerbara komponenter som kan orkestreras för att möjliggöra både kontinuerlig träning och robusta driftsättningar.
Lärandemål
Efter denna modul kommer du att:
- Förstå komponenterna i en end-to-end ML-pipeline
- Kunna designa modulära och återanvändbara pipeline-komponenter
- Implementera databearbetning, träning och evalueringspipelines
- Välja lämpliga orkestreringsverktyg för olika typer av ML-pipelines
- Implementera pipeline-testning och kvalitetskontroll
- Förstå hur man hanterar misslyckade pipeline-körningar
3.1 Grundläggande ML-pipeline-arkitektur
Definition av en ML-pipeline
En ML-pipeline är en sekvens av automatiserade steg som transformerar rådata till en tränad och utvärderad modell, klar för driftsättning. Pipelines möjliggör konsekvent, reproducerbar modellträning och underhåll.
Typiska komponenter i en ML-pipeline
- Datainsamling: Hämtning av data från olika källor
- Datavalidering: Kontroll av datakvalitet och schema
- Dataförbehandling: Rengöring, normalisering och transformation
- Feature engineering: Skapande av meningsfulla features från rådata
- Datauppdelning: Uppdelning av data i tränings-, validerings- och testuppsättningar
- Modellträning: Träning av modeller med olika algoritmer och hyperparametrar
- Modellutvärdering: Utvärdering av modellprestanda mot definierade mätvärden
- Modellvalidering: Säkerställande att modellen uppfyller alla krav före driftsättning
- Modellregistrering: Registrering av godkända modeller i modellregistret
- Modellpaketisering: Förberedelse av modellen för driftsättning
Designprinciper för ML-pipelines
- Idempotens: Samma input ger alltid samma output
- Modularitet: Separata, självständiga komponenter
- Parameterisering: Konfigurerbara komponenter utan kodändringar
- Statelessness: Komponenter förlitar sig inte på bevarad intern status
- Observerbarhet: Möjlighet att övervaka och logga pipeline-prestanda
- Felhantering: Robust hantering av fel och undantag
- Skalbarhet: Förmåga att hantera växande datamängder och komplexitet
3.2 Designa och implementera databearbetningspipelines
Komponenter i databearbetningspipelines
- Datainsamling: Batch vs. streaming, datakällor
- Datakvalitetsvalidering: Schemavalidering, integritetskontroller
- Dataförbehandling: Hantering av saknade värden, uteliggare (outliers)
- Feature engineering: Transformation, kodning, normalisering
- Feature-lagring: Feature stores, caching-strategier
Implementering med TensorFlow Extended (TFX)
# Exempel på en TFX-pipeline för databearbetning import tensorflow as tf import tensorflow_transform as tft from tensorflow_transform.tf_metadata import schema_utils # Definiera schema schema = schema_utils.schema_from_feature_spec({ 'numeric_feature': tf.io.FixedLenFeature([], tf.float32), 'categorical_feature': tf.io.FixedLenFeature([], tf.string), 'label': tf.io.FixedLenFeature([], tf.int64) }) # Definiera preprocessing_fn def preprocessing_fn(inputs): outputs = {} # Normalisera numeriska features outputs['numeric_feature'] = tft.scale_to_z_score(inputs['numeric_feature']) # Konvertera kategoriska features till one-hot encoding outputs['categorical_feature'] = tft.compute_and_apply_vocabulary( inputs['categorical_feature'], vocab_filename='categorical_vocab') # Kopiera label outputs['label'] = inputs['label'] return outputs
Implementering med Scikit-learn pipelines
from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline from sklearn.impute import SimpleImputer from sklearn.preprocessing import StandardScaler, OneHotEncoder # Definiera preprocessor numeric_features = ['age', 'income', 'years_experience'] categorical_features = ['education', 'occupation', 'marital_status'] numeric_transformer = Pipeline(steps=[ ('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler()) ]) categorical_transformer = Pipeline(steps=[ ('imputer', SimpleImputer(strategy='constant', fill_value='missing')), ('onehot', OneHotEncoder(handle_unknown='ignore')) ]) preprocessor = ColumnTransformer( transformers=[ ('num', numeric_transformer, numeric_features), ('cat', categorical_transformer, categorical_features) ]) # Skapa fullständig pipeline from sklearn.ensemble import RandomForestClassifier full_pipeline = Pipeline(steps=[ ('preprocessor', preprocessor), ('classifier', RandomForestClassifier()) ]) # Träna pipeline full_pipeline.fit(X_train, y_train)
Datavalidering med Great Expectations eller TensorFlow Data Validation
# Exempel med Great Expectations import great_expectations as ge # Ladda data som DataFrame df = ge.read_csv("training_data.csv") # Definiera förväntningar df.expect_column_values_to_not_be_null("customer_id") df.expect_column_values_to_be_between("age", min_value=18, max_value=120) df.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"]) # Validera data validation_result = df.validate()
3.3 Designa och implementera träningspipelines
Komponenter i träningspipelines
- Hyperparameter-tuning: Grid search, random search, bayesiansk optimering
- Modellträning: Träning av olika algoritmer och arkitekturer
- Cross-validation: K-fold validering för robustare resultat
- Checkpointing: Regelbunden sparning av modellstatus under träning
- Experiment tracking: Spårning av träningsparametrar och resultat
Implementering med MLflow
import mlflow import mlflow.sklearn from sklearn.model_selection import GridSearchCV from sklearn.ensemble import RandomForestClassifier # Konfigurera MLflow mlflow.set_experiment("customer_churn_prediction") # Definiera parameter grid param_grid = { 'n_estimators': [100, 200, 300], 'max_depth': [None, 10, 20, 30], 'min_samples_split': [2, 5, 10] } # Skapa grid search grid_search = GridSearchCV( RandomForestClassifier(), param_grid, cv=5, scoring='f1', return_train_score=True ) # Träna med MLflow tracking with mlflow.start_run(): # Logga parametrar mlflow.log_param("model_type", "RandomForestClassifier") mlflow.log_param("cv_folds", 5) # Träna modell grid_search.fit(X_train, y_train) # Logga bästa parametrar for param, value in grid_search.best_params_.items(): mlflow.log_param(f"best_{param}", value) # Logga metrics mlflow.log_metric("best_f1", grid_search.best_score_) mlflow.log_metric("test_f1", f1_score(y_test, grid_search.predict(X_test))) # Logga bästa modell mlflow.sklearn.log_model(grid_search.best_estimator_, "model")
Distribuerad träning
# Exempel med TensorFlow distribuerad träning import tensorflow as tf # Skapa distribution strategi strategy = tf.distribute.MirroredStrategy() # Bygg modell inom strategins scope with strategy.scope(): model = tf.keras.Sequential([ tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dense(64, activation='relu'), tf.keras.layers.Dense(1, activation='sigmoid') ]) model.compile( optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'] ) # Träna distribuerad modell model.fit( train_dataset, epochs=10, validation_data=val_dataset, callbacks=[ tf.keras.callbacks.ModelCheckpoint(filepath='checkpoints/model-{epoch:02d}'), tf.keras.callbacks.TensorBoard(log_dir='logs') ] )
3.4 Designa och implementera evalueringspipelines
Komponenter i evalueringspipelines
- Modellvalidering: Kontroll av modellprestanda
- Prestandamätvärden: Beräkning och utvärdering av olika mätvärden
- Tröskelvärden: Kontroll mot fördefinierade tröskelvärden för godkännande
- Jämförelse med baseline: Jämförelse med en referensmodell
- Bias och rättvisa: Utvärdering av modellrättvisa mellan grupper
Implementering av en modellutvärderingspipeline
# Exempel på modellutvärdering med scikit-learn och MLflow import numpy as np from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score import mlflow def evaluate_model(model_uri, X_test, y_test, thresholds): """Utvärdera en modell mot definierade tröskelvärden""" # Ladda modell från MLflow model = mlflow.sklearn.load_model(model_uri) # Gör prediktioner y_pred = model.predict(X_test) y_pred_proba = model.predict_proba(X_test)[:, 1] # Beräkna mätvärden metrics = { 'accuracy': accuracy_score(y_test, y_pred), 'precision': precision_score(y_test, y_pred), 'recall': recall_score(y_test, y_pred), 'f1': f1_score(y_test, y_pred), 'roc_auc': roc_auc_score(y_test, y_pred_proba) } # Kontrollera mot tröskelvärden passed_validation = True for metric_name, threshold in thresholds.items(): if metrics[metric_name] < threshold: passed_validation = False print(f"Validation failed: {metric_name} = {metrics[metric_name]} < {threshold}") return passed_validation, metrics
Avancerad modellutvärdering
- A/B-testning: Jämförelse av modeller i produktion
- Feature importance: Analys av feature-betydelse
- Partial Dependence Plots: Visualisering av modellbeteende
- SHAP-värden: Modellförklaring på instansnivå
3.5 Orkestrering av ML-pipelines
Orkestreringsverktyg
- Apache Airflow: Arbetsflödesorkestrering baserad på DAGs
- Kubeflow Pipelines: Kubernetes-baserad orkestrering för ML
- Argo Workflows: Kubernetes-native arbetsflödeshantering
- Prefect: Modern arbetsflödesorkestrering
- Dagster: Dataorienterad orkestrering
Implementering med Apache Airflow
# Exempel på Airflow DAG för en ML-pipeline from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator default_args = { 'owner': 'mlops', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'email_on_failure': True, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'ml_training_pipeline', default_args=default_args, description='End-to-end ML training pipeline', schedule_interval=timedelta(days=1), ) # Definiera task-funktioner def fetch_data(**kwargs): # Kod för att hämta data return "data_path" def validate_data(**kwargs): data_path = kwargs['ti'].xcom_pull(task_ids='fetch_data') # Kod för att validera data return "validated_data_path" def preprocess_data(**kwargs): data_path = kwargs['ti'].xcom_pull(task_ids='validate_data') # Kod för dataförbehandling return "preprocessed_data_path" def train_model(**kwargs): data_path = kwargs['ti'].xcom_pull(task_ids='preprocess_data') # Kod för modellträning return "model_path" def evaluate_model(**kwargs): model_path = kwargs['ti'].xcom_pull(task_ids='train_model') # Kod för modellutvärdering return "evaluation_results" def register_model(**kwargs): model_path = kwargs['ti'].xcom_pull(task_ids='train_model') eval_results = kwargs['ti'].xcom_pull(task_ids='evaluate_model') # Kod för modellregistrering return "registered_model_uri" # Skapa tasks task_fetch_data = PythonOperator( task_id='fetch_data', python_callable=fetch_data, dag=dag, ) task_validate_data = PythonOperator( task_id='validate_data', python_callable=validate_data, dag=dag, ) task_preprocess_data = PythonOperator( task_id='preprocess_data', python_callable=preprocess_data, dag=dag, ) task_train_model = PythonOperator( task_id='train_model', python_callable=train_model, dag=dag, ) task_evaluate_model = PythonOperator( task_id='evaluate_model', python_callable=evaluate_model, dag=dag, ) task_register_model = PythonOperator( task_id='register_model', python_callable=register_model, dag=dag, ) # Definiera beroenden task_fetch_data >> task_validate_data >> task_preprocess_data >> task_train_model >> task_evaluate_model >> task_register_model
Implementering med Kubeflow Pipelines
import kfp from kfp import dsl from kfp.components import func_to_container_op # Definiera komponenter @func_to_container_op def fetch_data() -> str: # Kod för att hämta data return "data_path" @func_to_container_op def validate_data(data_path: str) -> str: # Kod för att validera data return "validated_data_path" @func_to_container_op def preprocess_data(data_path: str) -> str: # Kod för dataförbehandling return "preprocessed_data_path" @func_to_container_op def train_model(data_path: str) -> str: # Kod för modellträning return "model_path" @func_to_container_op def evaluate_model(model_path: str) -> str: # Kod för modellutvärdering return "evaluation_results" @func_to_container_op def register_model(model_path: str, eval_results: str) -> str: # Kod för modellregistrering return "registered_model_uri" # Definiera pipeline @dsl.pipeline( name='ML Training Pipeline', description='End-to-end ML training pipeline' ) def ml_pipeline(): fetch_task = fetch_data() validate_task = validate_data(fetch_task.output) preprocess_task = preprocess_data(validate_task.output) train_task = train_model(preprocess_task.output) evaluate_task = evaluate_model(train_task.output) register_task = register_model(train_task.output, evaluate_task.output) # Kompilera och kör pipeline kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
3.6 Testning och kvalitetssäkring av pipelines
Testning av pipeline-komponenter
- Enhetstester: Testa individuella funktioner
- Integrationstester: Testa interaktion mellan komponenter
- End-to-end-tester: Testa hela pipelinen
Implementering av pipeline-tester
import unittest from unittest.mock import patch import numpy as np import pandas as pd # Exempel på enhetstest för en preprocessor-komponent class TestPreprocessor(unittest.TestCase): def setUp(self): # Skapa test data self.test_data = pd.DataFrame({ 'numeric_feature': [1.0, 2.0, None, 4.0], 'categorical_feature': ['A', 'B', 'A', None] }) def test_imputation(self): from my_pipeline.preprocessors import impute_missing_values # Anropa funktionen processed_data = impute_missing_values(self.test_data) # Kontrollera resultat self.assertFalse(processed_data.isnull().any().any()) self.assertEqual(processed_data.loc[2, 'numeric_feature'], 2.0) # Median self.assertEqual(processed_data.loc[3, 'categorical_feature'], 'unknown') def test_scaling(self): from my_pipeline.preprocessors import scale_numeric_features # Skapa test data utan saknade värden test_data = pd.DataFrame({ 'numeric_feature': [1.0, 2.0, 3.0, 4.0] }) # Anropa funktionen processed_data = scale_numeric_features(test_data) # Kontrollera resultat self.assertAlmostEqual(processed_data['numeric_feature'].mean(), 0) self.assertAlmostEqual(processed_data['numeric_feature'].std(), 1)
CI/CD för ML-pipelines
- Implementering av Continuous Integration för pipeline-kod
- Automatisk testning vid varje kodändring
- Nightly runs av full pipeline på samplade data
Praktiska övningar
1. Implementera databearbetningspipeline: Skapa en databearbetningspipeline med scikit-learn och DVC 2. Skapa träningspipeline: Implementera en modellträningspipeline med MLflow 3. Orkestrering med Airflow: Konfigurera en end-to-end ML-pipeline med Airflow 4. Pipeline-testning: Skriv enhetstester för pipeline-komponenter
Verktygsintroduktion
- Apache Airflow: Installation och grundläggande användning
- Kubeflow Pipelines: Konfigurera Kubeflow på Kubernetes
- MLflow: Experimentering och modellspårning
- TensorFlow Extended: Komponenter för skalbar ML
Läsresurser
- ”Building Machine Learning Pipelines” - H. Hapke & C. Nelson
- ”Data Science on AWS” - C. Fregly & A. Barth
- ”TensorFlow Extended (TFX) for Machine Learning” - TensorFlow dokumentation
- ”Building Reproducible ML Pipelines” - Google Cloud dokumentation
Nyckelinsikter
- Automatiserade ML-pipelines är grunden för reproducerbar modellproduktion
- Modularitet och parameterisering möjliggör flexibla och underhållbara pipelines
- Orkestrering är avgörande för att hantera komplexiteten i ML-arbetsflöden
- Testning av pipeline-komponenter är lika viktig som testning av applikationskod
- Val av orkestreringsverktyg beror på organisationens infrastruktur och behov
Nästa steg
I nästa modul kommer vi att utforska kontinuerlig integrering och driftsättning (CI/CD) för ML, där vi kommer att lära oss hur man automatiserar processen från kodändring till produktion.