====== Modul 3: Automatiserade ML-pipelines ======
===== Översikt =====
I denna modul utforskar vi konceptet med automatiserade ML-pipelines – ryggraden i ett moget MLOps-system. Vi kommer att lära oss hur man designar, implementerar och underhåller skalbar datainsamling, databearbetning, modellträning och evalueringspipelines. Modulen fokuserar på att skapa reproducerbara komponenter som kan orkestreras för att möjliggöra både kontinuerlig träning och robusta driftsättningar.
===== Lärandemål =====
Efter denna modul kommer du att:
* Förstå komponenterna i en end-to-end ML-pipeline
* Kunna designa modulära och återanvändbara pipeline-komponenter
* Implementera databearbetning, träning och evalueringspipelines
* Välja lämpliga orkestreringsverktyg för olika typer av ML-pipelines
* Implementera pipeline-testning och kvalitetskontroll
* Förstå hur man hanterar misslyckade pipeline-körningar
===== 3.1 Grundläggande ML-pipeline-arkitektur =====
==== Definition av en ML-pipeline ====
En ML-pipeline är en sekvens av automatiserade steg som transformerar rådata till en tränad och utvärderad modell, klar för driftsättning. Pipelines möjliggör konsekvent, reproducerbar modellträning och underhåll.
==== Typiska komponenter i en ML-pipeline ====
* **Datainsamling**: Hämtning av data från olika källor
* **Datavalidering**: Kontroll av datakvalitet och schema
* **Dataförbehandling**: Rengöring, normalisering och transformation
* **Feature engineering**: Skapande av meningsfulla features från rådata
* **Datauppdelning**: Uppdelning av data i tränings-, validerings- och testuppsättningar
* **Modellträning**: Träning av modeller med olika algoritmer och hyperparametrar
* **Modellutvärdering**: Utvärdering av modellprestanda mot definierade mätvärden
* **Modellvalidering**: Säkerställande att modellen uppfyller alla krav före driftsättning
* **Modellregistrering**: Registrering av godkända modeller i modellregistret
* **Modellpaketisering**: Förberedelse av modellen för driftsättning
==== Designprinciper för ML-pipelines ====
* **Idempotens**: Samma input ger alltid samma output
* **Modularitet**: Separata, självständiga komponenter
* **Parameterisering**: Konfigurerbara komponenter utan kodändringar
* **Statelessness**: Komponenter förlitar sig inte på bevarad intern status
* **Observerbarhet**: Möjlighet att övervaka och logga pipeline-prestanda
* **Felhantering**: Robust hantering av fel och undantag
* **Skalbarhet**: Förmåga att hantera växande datamängder och komplexitet
===== 3.2 Designa och implementera databearbetningspipelines =====
==== Komponenter i databearbetningspipelines ====
* **Datainsamling**: Batch vs. streaming, datakällor
* **Datakvalitetsvalidering**: Schemavalidering, integritetskontroller
* **Dataförbehandling**: Hantering av saknade värden, uteliggare (outliers)
* **Feature engineering**: Transformation, kodning, normalisering
* **Feature-lagring**: Feature stores, caching-strategier
==== Implementering med TensorFlow Extended (TFX) ====
# Exempel på en TFX-pipeline för databearbetning
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils
# Definiera schema
schema = schema_utils.schema_from_feature_spec({
'numeric_feature': tf.io.FixedLenFeature([], tf.float32),
'categorical_feature': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.int64)
})
# Definiera preprocessing_fn
def preprocessing_fn(inputs):
outputs = {}
# Normalisera numeriska features
outputs['numeric_feature'] = tft.scale_to_z_score(inputs['numeric_feature'])
# Konvertera kategoriska features till one-hot encoding
outputs['categorical_feature'] = tft.compute_and_apply_vocabulary(
inputs['categorical_feature'], vocab_filename='categorical_vocab')
# Kopiera label
outputs['label'] = inputs['label']
return outputs
==== Implementering med Scikit-learn pipelines ====
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
# Definiera preprocessor
numeric_features = ['age', 'income', 'years_experience']
categorical_features = ['education', 'occupation', 'marital_status']
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
# Skapa fullständig pipeline
from sklearn.ensemble import RandomForestClassifier
full_pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', RandomForestClassifier())
])
# Träna pipeline
full_pipeline.fit(X_train, y_train)
==== Datavalidering med Great Expectations eller TensorFlow Data Validation ====
# Exempel med Great Expectations
import great_expectations as ge
# Ladda data som DataFrame
df = ge.read_csv("training_data.csv")
# Definiera förväntningar
df.expect_column_values_to_not_be_null("customer_id")
df.expect_column_values_to_be_between("age", min_value=18, max_value=120)
df.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"])
# Validera data
validation_result = df.validate()
===== 3.3 Designa och implementera träningspipelines =====
==== Komponenter i träningspipelines ====
* **Hyperparameter-tuning**: Grid search, random search, bayesiansk optimering
* **Modellträning**: Träning av olika algoritmer och arkitekturer
* **Cross-validation**: K-fold validering för robustare resultat
* **Checkpointing**: Regelbunden sparning av modellstatus under träning
* **Experiment tracking**: Spårning av träningsparametrar och resultat
==== Implementering med MLflow ====
import mlflow
import mlflow.sklearn
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
# Konfigurera MLflow
mlflow.set_experiment("customer_churn_prediction")
# Definiera parameter grid
param_grid = {
'n_estimators': [100, 200, 300],
'max_depth': [None, 10, 20, 30],
'min_samples_split': [2, 5, 10]
}
# Skapa grid search
grid_search = GridSearchCV(
RandomForestClassifier(),
param_grid,
cv=5,
scoring='f1',
return_train_score=True
)
# Träna med MLflow tracking
with mlflow.start_run():
# Logga parametrar
mlflow.log_param("model_type", "RandomForestClassifier")
mlflow.log_param("cv_folds", 5)
# Träna modell
grid_search.fit(X_train, y_train)
# Logga bästa parametrar
for param, value in grid_search.best_params_.items():
mlflow.log_param(f"best_{param}", value)
# Logga metrics
mlflow.log_metric("best_f1", grid_search.best_score_)
mlflow.log_metric("test_f1", f1_score(y_test, grid_search.predict(X_test)))
# Logga bästa modell
mlflow.sklearn.log_model(grid_search.best_estimator_, "model")
==== Distribuerad träning ====
# Exempel med TensorFlow distribuerad träning
import tensorflow as tf
# Skapa distribution strategi
strategy = tf.distribute.MirroredStrategy()
# Bygg modell inom strategins scope
with strategy.scope():
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
# Träna distribuerad modell
model.fit(
train_dataset,
epochs=10,
validation_data=val_dataset,
callbacks=[
tf.keras.callbacks.ModelCheckpoint(filepath='checkpoints/model-{epoch:02d}'),
tf.keras.callbacks.TensorBoard(log_dir='logs')
]
)
===== 3.4 Designa och implementera evalueringspipelines =====
==== Komponenter i evalueringspipelines ====
* **Modellvalidering**: Kontroll av modellprestanda
* **Prestandamätvärden**: Beräkning och utvärdering av olika mätvärden
* **Tröskelvärden**: Kontroll mot fördefinierade tröskelvärden för godkännande
* **Jämförelse med baseline**: Jämförelse med en referensmodell
* **Bias och rättvisa**: Utvärdering av modellrättvisa mellan grupper
==== Implementering av en modellutvärderingspipeline ====
# Exempel på modellutvärdering med scikit-learn och MLflow
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import mlflow
def evaluate_model(model_uri, X_test, y_test, thresholds):
"""Utvärdera en modell mot definierade tröskelvärden"""
# Ladda modell från MLflow
model = mlflow.sklearn.load_model(model_uri)
# Gör prediktioner
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
# Beräkna mätvärden
metrics = {
'accuracy': accuracy_score(y_test, y_pred),
'precision': precision_score(y_test, y_pred),
'recall': recall_score(y_test, y_pred),
'f1': f1_score(y_test, y_pred),
'roc_auc': roc_auc_score(y_test, y_pred_proba)
}
# Kontrollera mot tröskelvärden
passed_validation = True
for metric_name, threshold in thresholds.items():
if metrics[metric_name] < threshold:
passed_validation = False
print(f"Validation failed: {metric_name} = {metrics[metric_name]} < {threshold}")
return passed_validation, metrics
==== Avancerad modellutvärdering ====
* **A/B-testning**: Jämförelse av modeller i produktion
* **Feature importance**: Analys av feature-betydelse
* **Partial Dependence Plots**: Visualisering av modellbeteende
* **SHAP-värden**: Modellförklaring på instansnivå
===== 3.5 Orkestrering av ML-pipelines =====
==== Orkestreringsverktyg ====
* **Apache Airflow**: Arbetsflödesorkestrering baserad på DAGs
* **Kubeflow Pipelines**: Kubernetes-baserad orkestrering för ML
* **Argo Workflows**: Kubernetes-native arbetsflödeshantering
* **Prefect**: Modern arbetsflödesorkestrering
* **Dagster**: Dataorienterad orkestrering
==== Implementering med Apache Airflow ====
# Exempel på Airflow DAG för en ML-pipeline
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'mlops',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'ml_training_pipeline',
default_args=default_args,
description='End-to-end ML training pipeline',
schedule_interval=timedelta(days=1),
)
# Definiera task-funktioner
def fetch_data(**kwargs):
# Kod för att hämta data
return "data_path"
def validate_data(**kwargs):
data_path = kwargs['ti'].xcom_pull(task_ids='fetch_data')
# Kod för att validera data
return "validated_data_path"
def preprocess_data(**kwargs):
data_path = kwargs['ti'].xcom_pull(task_ids='validate_data')
# Kod för dataförbehandling
return "preprocessed_data_path"
def train_model(**kwargs):
data_path = kwargs['ti'].xcom_pull(task_ids='preprocess_data')
# Kod för modellträning
return "model_path"
def evaluate_model(**kwargs):
model_path = kwargs['ti'].xcom_pull(task_ids='train_model')
# Kod för modellutvärdering
return "evaluation_results"
def register_model(**kwargs):
model_path = kwargs['ti'].xcom_pull(task_ids='train_model')
eval_results = kwargs['ti'].xcom_pull(task_ids='evaluate_model')
# Kod för modellregistrering
return "registered_model_uri"
# Skapa tasks
task_fetch_data = PythonOperator(
task_id='fetch_data',
python_callable=fetch_data,
dag=dag,
)
task_validate_data = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
dag=dag,
)
task_preprocess_data = PythonOperator(
task_id='preprocess_data',
python_callable=preprocess_data,
dag=dag,
)
task_train_model = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag,
)
task_evaluate_model = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_model,
dag=dag,
)
task_register_model = PythonOperator(
task_id='register_model',
python_callable=register_model,
dag=dag,
)
# Definiera beroenden
task_fetch_data >> task_validate_data >> task_preprocess_data >> task_train_model >> task_evaluate_model >> task_register_model
==== Implementering med Kubeflow Pipelines ====
import kfp
from kfp import dsl
from kfp.components import func_to_container_op
# Definiera komponenter
@func_to_container_op
def fetch_data() -> str:
# Kod för att hämta data
return "data_path"
@func_to_container_op
def validate_data(data_path: str) -> str:
# Kod för att validera data
return "validated_data_path"
@func_to_container_op
def preprocess_data(data_path: str) -> str:
# Kod för dataförbehandling
return "preprocessed_data_path"
@func_to_container_op
def train_model(data_path: str) -> str:
# Kod för modellträning
return "model_path"
@func_to_container_op
def evaluate_model(model_path: str) -> str:
# Kod för modellutvärdering
return "evaluation_results"
@func_to_container_op
def register_model(model_path: str, eval_results: str) -> str:
# Kod för modellregistrering
return "registered_model_uri"
# Definiera pipeline
@dsl.pipeline(
name='ML Training Pipeline',
description='End-to-end ML training pipeline'
)
def ml_pipeline():
fetch_task = fetch_data()
validate_task = validate_data(fetch_task.output)
preprocess_task = preprocess_data(validate_task.output)
train_task = train_model(preprocess_task.output)
evaluate_task = evaluate_model(train_task.output)
register_task = register_model(train_task.output, evaluate_task.output)
# Kompilera och kör pipeline
kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
===== 3.6 Testning och kvalitetssäkring av pipelines =====
==== Testning av pipeline-komponenter ====
* **Enhetstester**: Testa individuella funktioner
* **Integrationstester**: Testa interaktion mellan komponenter
* **End-to-end-tester**: Testa hela pipelinen
==== Implementering av pipeline-tester ====
import unittest
from unittest.mock import patch
import numpy as np
import pandas as pd
# Exempel på enhetstest för en preprocessor-komponent
class TestPreprocessor(unittest.TestCase):
def setUp(self):
# Skapa test data
self.test_data = pd.DataFrame({
'numeric_feature': [1.0, 2.0, None, 4.0],
'categorical_feature': ['A', 'B', 'A', None]
})
def test_imputation(self):
from my_pipeline.preprocessors import impute_missing_values
# Anropa funktionen
processed_data = impute_missing_values(self.test_data)
# Kontrollera resultat
self.assertFalse(processed_data.isnull().any().any())
self.assertEqual(processed_data.loc[2, 'numeric_feature'], 2.0) # Median
self.assertEqual(processed_data.loc[3, 'categorical_feature'], 'unknown')
def test_scaling(self):
from my_pipeline.preprocessors import scale_numeric_features
# Skapa test data utan saknade värden
test_data = pd.DataFrame({
'numeric_feature': [1.0, 2.0, 3.0, 4.0]
})
# Anropa funktionen
processed_data = scale_numeric_features(test_data)
# Kontrollera resultat
self.assertAlmostEqual(processed_data['numeric_feature'].mean(), 0)
self.assertAlmostEqual(processed_data['numeric_feature'].std(), 1)
==== CI/CD för ML-pipelines ====
* Implementering av Continuous Integration för pipeline-kod
* Automatisk testning vid varje kodändring
* Nightly runs av full pipeline på samplade data
===== Praktiska övningar =====
1. **Implementera databearbetningspipeline**: Skapa en databearbetningspipeline med scikit-learn och DVC
2. **Skapa träningspipeline**: Implementera en modellträningspipeline med MLflow
3. **Orkestrering med Airflow**: Konfigurera en end-to-end ML-pipeline med Airflow
4. **Pipeline-testning**: Skriv enhetstester för pipeline-komponenter
===== Verktygsintroduktion =====
* **Apache Airflow**: Installation och grundläggande användning
* **Kubeflow Pipelines**: Konfigurera Kubeflow på Kubernetes
* **MLflow**: Experimentering och modellspårning
* **TensorFlow Extended**: Komponenter för skalbar ML
===== Läsresurser =====
* "Building Machine Learning Pipelines" - H. Hapke & C. Nelson
* "Data Science on AWS" - C. Fregly & A. Barth
* "TensorFlow Extended (TFX) for Machine Learning" - TensorFlow dokumentation
* "Building Reproducible ML Pipelines" - Google Cloud dokumentation
===== Nyckelinsikter =====
* Automatiserade ML-pipelines är grunden för reproducerbar modellproduktion
* Modularitet och parameterisering möjliggör flexibla och underhållbara pipelines
* Orkestrering är avgörande för att hantera komplexiteten i ML-arbetsflöden
* Testning av pipeline-komponenter är lika viktig som testning av applikationskod
* Val av orkestreringsverktyg beror på organisationens infrastruktur och behov
===== Nästa steg =====
I nästa modul kommer vi att utforska kontinuerlig integrering och driftsättning (CI/CD) för ML, där vi kommer att lära oss hur man automatiserar processen från kodändring till produktion.