Skip to content

Pipeline 介紹

Kubeflow Pipelines 是一個平台,用於構建和部署基於 Docker 容器的可移植、可擴展的機器學習 (ML) 工作流。

什麼是 Kubeflow 管道?

Kubeflow Pipelines 包括:

  • 用於管理和追踪 experiments、jobs 和 runs 的用戶界面 (UI)。
  • 用於安排多步驟 ML 工作流程的引擎。
  • 用於定義和操作管道和組件的 SDK。
  • 使用 SDK 與系統交互的筆記本。

以下是 Kubeflow 管道的目標:

  • 端到端編排:啟用和簡化機器學習管道的編排。
  • 輕鬆實驗:讓您輕鬆嘗試眾多想法和技術,並管理您的各種試驗/實驗。
  • 易於重用:使您能夠重用組件和管道以快速創建端到端解決方案,而無需每次都重新構建。

Kubeflow Pipelines 可作為 Kubeflow 的核心組件或作為獨立安裝使用。

什麼是管道?

pipeline 是對 ML 工作流的描述,包括工作流中的所有組件以及它們如何以圖形的形式組合。管道包括運行管道所需的輸入(參數)的定義以及每個組件的輸入和輸出。

開發管道後,您可以在 Kubeflow Pipelines UI 上上傳和共享它。

管道組件是一組獨立的用戶代碼,打包為 Docker 鏡像,在管道中執行一個步驟。例如,一個組件可以負責數據預處理、數據轉換、模型訓練等。

請參閱 pipelinecomponent 的概念指南。

管道範例

下面的屏幕截圖和代碼顯示了 xgboost-training-cm.py 管道,它使用 CSV 格式的結構化數據創建 XGBoost 模型。您可以在 GitHub 上查看有關管道的源代碼和其他信息。

管道的運行 execution graph

下面的屏幕截圖顯示了 Kubeflow Pipelines UI 中示例管道的運行時執行圖:

表示管道的 Python 代碼

以下是定義 xgboost-training-cm.py 管道的 Python 代碼的摘錄。你可以在 GitHub 上看到完整的代碼。

@dsl.pipeline(
    name='XGBoost Trainer',
    description='A trainer that does end-to-end distributed training for XGBoost models.'
)
def xgb_train_pipeline(
    output='gs://your-gcs-bucket',
    project='your-gcp-project',
    cluster_name='xgb-%s' % dsl.RUN_ID_PLACEHOLDER,
    region='us-central1',
    train_data='gs://ml-pipeline-playground/sfpd/train.csv',
    eval_data='gs://ml-pipeline-playground/sfpd/eval.csv',
    schema='gs://ml-pipeline-playground/sfpd/schema.json',
    target='resolution',
    rounds=200,
    workers=2,
    true_label='ACTION',
):
    output_template = str(output) + '/' + dsl.RUN_ID_PLACEHOLDER + '/data'

    # Current GCP pyspark/spark op do not provide outputs as return values, instead,
    # we need to use strings to pass the uri around.
    analyze_output = output_template
    transform_output_train = os.path.join(output_template, 'train', 'part-*')
    transform_output_eval = os.path.join(output_template, 'eval', 'part-*')
    train_output = os.path.join(output_template, 'train_output')
    predict_output = os.path.join(output_template, 'predict_output')

    with dsl.ExitHandler(exit_op=dataproc_delete_cluster_op(
        project_id=project,
        region=region,
        name=cluster_name
    )):
        _create_cluster_op = dataproc_create_cluster_op(
            project_id=project,
            region=region,
            name=cluster_name,
            initialization_actions=[
              os.path.join(_PYSRC_PREFIX,
                           'initialization_actions.sh'),
            ],
            image_version='1.2'
        )

        _analyze_op = dataproc_analyze_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            schema=schema,
            train_data=train_data,
            output=output_template
        ).after(_create_cluster_op).set_display_name('Analyzer')

        _transform_op = dataproc_transform_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            train_data=train_data,
            eval_data=eval_data,
            target=target,
            analysis=analyze_output,
            output=output_template
        ).after(_analyze_op).set_display_name('Transformer')

        _train_op = dataproc_train_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            train_data=transform_output_train,
            eval_data=transform_output_eval,
            target=target,
            analysis=analyze_output,
            workers=workers,
            rounds=rounds,
            output=train_output
        ).after(_transform_op).set_display_name('Trainer')

        _predict_op = dataproc_predict_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            data=transform_output_eval,
            model=train_output,
            target=target,
            analysis=analyze_output,
            output=predict_output
        ).after(_train_op).set_display_name('Predictor')

        _cm_op = confusion_matrix_op(
            predictions=os.path.join(predict_output, 'part-*.csv'),
            output_dir=output_template
        ).after(_predict_op)

        _roc_op = roc_op(
            predictions_dir=os.path.join(predict_output, 'part-*.csv'),
            true_class=true_label,
            true_score_column=true_label,
            output_dir=output_template
        ).after(_predict_op)

    dsl.get_pipeline_conf().add_op_transformer(
        gcp.use_gcp_secret('user-gcp-sa'))

下面的部分屏幕截圖顯示了用於啟動管道運行的 Kubeflow Pipelines UI。代碼中的管道定義決定了哪些參數出現在 UI 表單中。管道定義還可以為參數設置默認值:

管道的輸出

以下屏幕截圖顯示了 Kubeflow Pipelines UI 上可見的管道輸出示例。

預測結果:

混淆矩陣:

接受者操作特徵(ROC)曲線:

架構概覽

在高層次上,管道的執行過程如下:

  • Python SDK:您使用 Kubeflow Pipelines domain-specific language (DSL) 來創建組件或指定管道。

  • DSL compiler:DSL 編譯器將管道的 Python 代碼轉換為靜態配置 (YAML)。

  • Pipeline Service:您調用管道服務以創建從靜態配置運行的管道。

  • Kubernetes resources:管道服務調用 Kubernetes API 服務器來創建運行管道所需的 Kubernetes 資源 (CRD)。

  • Orchestration controllers: 一組編排控制器執行完成管道所需的容器。容器在 Kubernetes Pod 中執行。一個控制器範例是 Argo Workflow 控制器,它編排任務驅動的工作流。

  • Artifact storage: Pod 存儲兩種數據:

    • Metadata: 實驗、作業、管道運行和單一標量指標。出於排序和過濾的目的聚合指標數據。 Kubeflow Pipelines 將元數據存儲在 MySQL 數據庫中。
    • Artifacts: Pipeline 打包、視圖和大規模指標(時間序列)。使用大規模指標來調試管道運行或調查單個運行的性能。 Kubeflow Pipelines 將工件存儲在工件存儲中,例如 Minio 服務器或云存儲。

    Info

    MySQL 數據庫和 Minio 服務器都由 Kubernetes PersistentVolume 子系統支持。

  • Persistence agent 與 ML metadata: Pipeline Persistence Agent 監視 Pipeline Service 創建的 Kubernetes 資源,並將這些資源的狀態保存在 ML Metadata Service 中。 Pipeline Persistence Agent 記錄執行的容器集及其輸入和輸出。輸入/輸出由容器參數或數據工件 URI 組成。

  • Pipeline web server: Pipeline Web 服務器從各種服務收集數據以顯示相關視圖:當前運行的管道列表、管道執行的歷史記錄、數據工件列表、單個管道運行的調試信息、單個管道運行的執行狀態。