I denna modul utforskar vi konceptet med automatiserade ML-pipelines – ryggraden i ett moget MLOps-system. Vi kommer att lära oss hur man designar, implementerar och underhåller skalbar datainsamling, databearbetning, modellträning och evalueringspipelines. Modulen fokuserar på att skapa reproducerbara komponenter som kan orkestreras för att möjliggöra både kontinuerlig träning och robusta driftsättningar.
Efter denna modul kommer du att:
En ML-pipeline är en sekvens av automatiserade steg som transformerar rådata till en tränad och utvärderad modell, klar för driftsättning. Pipelines möjliggör konsekvent, reproducerbar modellträning och underhåll.
# Exempel på en TFX-pipeline för databearbetning import tensorflow as tf import tensorflow_transform as tft from tensorflow_transform.tf_metadata import schema_utils # Definiera schema schema = schema_utils.schema_from_feature_spec({ 'numeric_feature': tf.io.FixedLenFeature([], tf.float32), 'categorical_feature': tf.io.FixedLenFeature([], tf.string), 'label': tf.io.FixedLenFeature([], tf.int64) }) # Definiera preprocessing_fn def preprocessing_fn(inputs): outputs = {} # Normalisera numeriska features outputs['numeric_feature'] = tft.scale_to_z_score(inputs['numeric_feature']) # Konvertera kategoriska features till one-hot encoding outputs['categorical_feature'] = tft.compute_and_apply_vocabulary( inputs['categorical_feature'], vocab_filename='categorical_vocab') # Kopiera label outputs['label'] = inputs['label'] return outputs
from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline from sklearn.impute import SimpleImputer from sklearn.preprocessing import StandardScaler, OneHotEncoder # Definiera preprocessor numeric_features = ['age', 'income', 'years_experience'] categorical_features = ['education', 'occupation', 'marital_status'] numeric_transformer = Pipeline(steps=[ ('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler()) ]) categorical_transformer = Pipeline(steps=[ ('imputer', SimpleImputer(strategy='constant', fill_value='missing')), ('onehot', OneHotEncoder(handle_unknown='ignore')) ]) preprocessor = ColumnTransformer( transformers=[ ('num', numeric_transformer, numeric_features), ('cat', categorical_transformer, categorical_features) ]) # Skapa fullständig pipeline from sklearn.ensemble import RandomForestClassifier full_pipeline = Pipeline(steps=[ ('preprocessor', preprocessor), ('classifier', RandomForestClassifier()) ]) # Träna pipeline full_pipeline.fit(X_train, y_train)
# Exempel med Great Expectations import great_expectations as ge # Ladda data som DataFrame df = ge.read_csv("training_data.csv") # Definiera förväntningar df.expect_column_values_to_not_be_null("customer_id") df.expect_column_values_to_be_between("age", min_value=18, max_value=120) df.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"]) # Validera data validation_result = df.validate()
import mlflow import mlflow.sklearn from sklearn.model_selection import GridSearchCV from sklearn.ensemble import RandomForestClassifier # Konfigurera MLflow mlflow.set_experiment("customer_churn_prediction") # Definiera parameter grid param_grid = { 'n_estimators': [100, 200, 300], 'max_depth': [None, 10, 20, 30], 'min_samples_split': [2, 5, 10] } # Skapa grid search grid_search = GridSearchCV( RandomForestClassifier(), param_grid, cv=5, scoring='f1', return_train_score=True ) # Träna med MLflow tracking with mlflow.start_run(): # Logga parametrar mlflow.log_param("model_type", "RandomForestClassifier") mlflow.log_param("cv_folds", 5) # Träna modell grid_search.fit(X_train, y_train) # Logga bästa parametrar for param, value in grid_search.best_params_.items(): mlflow.log_param(f"best_{param}", value) # Logga metrics mlflow.log_metric("best_f1", grid_search.best_score_) mlflow.log_metric("test_f1", f1_score(y_test, grid_search.predict(X_test))) # Logga bästa modell mlflow.sklearn.log_model(grid_search.best_estimator_, "model")
# Exempel med TensorFlow distribuerad träning import tensorflow as tf # Skapa distribution strategi strategy = tf.distribute.MirroredStrategy() # Bygg modell inom strategins scope with strategy.scope(): model = tf.keras.Sequential([ tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dense(64, activation='relu'), tf.keras.layers.Dense(1, activation='sigmoid') ]) model.compile( optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'] ) # Träna distribuerad modell model.fit( train_dataset, epochs=10, validation_data=val_dataset, callbacks=[ tf.keras.callbacks.ModelCheckpoint(filepath='checkpoints/model-{epoch:02d}'), tf.keras.callbacks.TensorBoard(log_dir='logs') ] )
# Exempel på modellutvärdering med scikit-learn och MLflow import numpy as np from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score import mlflow def evaluate_model(model_uri, X_test, y_test, thresholds): """Utvärdera en modell mot definierade tröskelvärden""" # Ladda modell från MLflow model = mlflow.sklearn.load_model(model_uri) # Gör prediktioner y_pred = model.predict(X_test) y_pred_proba = model.predict_proba(X_test)[:, 1] # Beräkna mätvärden metrics = { 'accuracy': accuracy_score(y_test, y_pred), 'precision': precision_score(y_test, y_pred), 'recall': recall_score(y_test, y_pred), 'f1': f1_score(y_test, y_pred), 'roc_auc': roc_auc_score(y_test, y_pred_proba) } # Kontrollera mot tröskelvärden passed_validation = True for metric_name, threshold in thresholds.items(): if metrics[metric_name] < threshold: passed_validation = False print(f"Validation failed: {metric_name} = {metrics[metric_name]} < {threshold}") return passed_validation, metrics
# Exempel på Airflow DAG för en ML-pipeline from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator default_args = { 'owner': 'mlops', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'email_on_failure': True, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'ml_training_pipeline', default_args=default_args, description='End-to-end ML training pipeline', schedule_interval=timedelta(days=1), ) # Definiera task-funktioner def fetch_data(**kwargs): # Kod för att hämta data return "data_path" def validate_data(**kwargs): data_path = kwargs['ti'].xcom_pull(task_ids='fetch_data') # Kod för att validera data return "validated_data_path" def preprocess_data(**kwargs): data_path = kwargs['ti'].xcom_pull(task_ids='validate_data') # Kod för dataförbehandling return "preprocessed_data_path" def train_model(**kwargs): data_path = kwargs['ti'].xcom_pull(task_ids='preprocess_data') # Kod för modellträning return "model_path" def evaluate_model(**kwargs): model_path = kwargs['ti'].xcom_pull(task_ids='train_model') # Kod för modellutvärdering return "evaluation_results" def register_model(**kwargs): model_path = kwargs['ti'].xcom_pull(task_ids='train_model') eval_results = kwargs['ti'].xcom_pull(task_ids='evaluate_model') # Kod för modellregistrering return "registered_model_uri" # Skapa tasks task_fetch_data = PythonOperator( task_id='fetch_data', python_callable=fetch_data, dag=dag, ) task_validate_data = PythonOperator( task_id='validate_data', python_callable=validate_data, dag=dag, ) task_preprocess_data = PythonOperator( task_id='preprocess_data', python_callable=preprocess_data, dag=dag, ) task_train_model = PythonOperator( task_id='train_model', python_callable=train_model, dag=dag, ) task_evaluate_model = PythonOperator( task_id='evaluate_model', python_callable=evaluate_model, dag=dag, ) task_register_model = PythonOperator( task_id='register_model', python_callable=register_model, dag=dag, ) # Definiera beroenden task_fetch_data >> task_validate_data >> task_preprocess_data >> task_train_model >> task_evaluate_model >> task_register_model
import kfp from kfp import dsl from kfp.components import func_to_container_op # Definiera komponenter @func_to_container_op def fetch_data() -> str: # Kod för att hämta data return "data_path" @func_to_container_op def validate_data(data_path: str) -> str: # Kod för att validera data return "validated_data_path" @func_to_container_op def preprocess_data(data_path: str) -> str: # Kod för dataförbehandling return "preprocessed_data_path" @func_to_container_op def train_model(data_path: str) -> str: # Kod för modellträning return "model_path" @func_to_container_op def evaluate_model(model_path: str) -> str: # Kod för modellutvärdering return "evaluation_results" @func_to_container_op def register_model(model_path: str, eval_results: str) -> str: # Kod för modellregistrering return "registered_model_uri" # Definiera pipeline @dsl.pipeline( name='ML Training Pipeline', description='End-to-end ML training pipeline' ) def ml_pipeline(): fetch_task = fetch_data() validate_task = validate_data(fetch_task.output) preprocess_task = preprocess_data(validate_task.output) train_task = train_model(preprocess_task.output) evaluate_task = evaluate_model(train_task.output) register_task = register_model(train_task.output, evaluate_task.output) # Kompilera och kör pipeline kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
import unittest from unittest.mock import patch import numpy as np import pandas as pd # Exempel på enhetstest för en preprocessor-komponent class TestPreprocessor(unittest.TestCase): def setUp(self): # Skapa test data self.test_data = pd.DataFrame({ 'numeric_feature': [1.0, 2.0, None, 4.0], 'categorical_feature': ['A', 'B', 'A', None] }) def test_imputation(self): from my_pipeline.preprocessors import impute_missing_values # Anropa funktionen processed_data = impute_missing_values(self.test_data) # Kontrollera resultat self.assertFalse(processed_data.isnull().any().any()) self.assertEqual(processed_data.loc[2, 'numeric_feature'], 2.0) # Median self.assertEqual(processed_data.loc[3, 'categorical_feature'], 'unknown') def test_scaling(self): from my_pipeline.preprocessors import scale_numeric_features # Skapa test data utan saknade värden test_data = pd.DataFrame({ 'numeric_feature': [1.0, 2.0, 3.0, 4.0] }) # Anropa funktionen processed_data = scale_numeric_features(test_data) # Kontrollera resultat self.assertAlmostEqual(processed_data['numeric_feature'].mean(), 0) self.assertAlmostEqual(processed_data['numeric_feature'].std(), 1)
1. Implementera databearbetningspipeline: Skapa en databearbetningspipeline med scikit-learn och DVC 2. Skapa träningspipeline: Implementera en modellträningspipeline med MLflow 3. Orkestrering med Airflow: Konfigurera en end-to-end ML-pipeline med Airflow 4. Pipeline-testning: Skriv enhetstester för pipeline-komponenter
I nästa modul kommer vi att utforska kontinuerlig integrering och driftsättning (CI/CD) för ML, där vi kommer att lära oss hur man automatiserar processen från kodändring till produktion.