データ・フロー統合

MLパイプラインのデータ・フロー・サポート機能を使用すると、ユーザーはデータ・フロー・アプリケーションをパイプライン内のステップとして統合できます。

この新機能により、ユーザーはMLパイプラインの他のステップとともにデータ・フロー・アプリケーション(Apache Spark as a Service)の実行をオーケストレーションし、大規模なデータ処理タスクを合理化できます。

データ・フロー・ステップを含むパイプラインが実行されると、そのステップに関連付けられたデータ・フロー・アプリケーションの新しい実行が自動的に作成および管理されます。データ・フロー実行は、パイプライン内の他のステップと同様に処理されます。正常に完了すると、パイプラインは実行を続行し、パイプラインのオーケストレーションの一部として後のステップを開始します。

MLパイプラインでのデータ・フロー・アプリケーションの使用は簡単です。

1. データ・フロー・ステップの追加
MLパイプラインで「データ・フロー」ステップ・タイプを選択します。
2. データ・フロー・アプリケーションの選択
ステップとして実行するデータ・フロー・アプリケーションを選択し、クラスタ・サイズや環境変数などのオプションを構成します。
3. パイプラインの実行
パイプラインの実行を開始します。データ・フロー・ステップに到達すると、関連付けられたアプリケーションが実行されます。完了すると、結果がステップ実行に反映され、パイプラインは次のステップにシームレスに進みます。
この統合により、大規模なデータセットを同じパイプライン内で効率的に処理できるようになり、MLパイプラインによる自動化を維持しながら、OCI Data Flowのスケーラブルなコンピュート能力を活用することで、データ・サイエンティストのワークフローが簡素化されます。

ポリシー

パイプラインとのデータ・フロー統合には、次のポリシーを含めます:
  • データ・フローとパイプラインの統合。
  • OCIサービスへのパイプライン実行アクセス。
  • (オプション)カスタム・ネットワーキング・ポリシー(カスタム・ネットワーキングを使用する場合のみ)。
データ・フローの使用に必要なすべての前提条件は、データ・フロー・ポリシーを参照してください。
ノート

パイプライン実行によってデータ・フロー実行がトリガーされると、リソース・プリンシパルdatasciencepipelinerunが継承されます。したがって、datasciencepipelinerunに権限を付与すると、パイプライン実行によって開始されたデータ・フロー内で実行されているコードにも権限が付与されます。

パイプラインを使用したデータ・フローの構成

適切なポリシーが適用されていることを確認します。

  1. データ・フローを使用するためのパイプライン・ステップを定義するときに、「パイプラインの作成」「データ・フロー・アプリケーションから」を選択します。
  2. 「データフロー・アプリケーションの選択」で、使用するデータ・フロー・アプリケーションを選択します。

    データ・フロー・アプリケーションが別のコンパートメントにある場合は、「コンパートメントの変更」を選択します。

  3. (オプション)「データ・フロー構成」セクションで、「構成」を選択します。

    「データ・フロー構成の構成」パネルで:

    1. ドライバのシェイプとエグゼキュータのシェイプを選択します。
    2. エグゼクタの数を入力します。
    3. (オプション)ログ・バケットを選択します。
    4. (オプション)Spark構成プロパティを追加します。
    5. (オプション)ウェアハウス・バケットURIを指定します。

クイック・スタート・ガイド

これは、データ・フロー・パイプラインを作成するためのステップバイステップ・ガイドです。

  1. データ・フロー・ポリシーのドキュメントに従います。データ・フローを使用する前に必要な初期設定の詳細を示します。
  2. 次のサンプルPythonアプリケーションhello-world.pyをバケットにアップロードします:
    print("======Start======")
    import os
    from pyspark.sql import SparkSession
     
    def in_dataflow():
        if os.environ.get("HOME") == "/home/dataflow":
            return True
        return False
     
    def get_spark():
        if in_dataflow():
            return SparkSession.builder.appName("hello").getOrCreate()
        else:
            return SparkSession.builder.appName("LocalSparkSession").master("local[*]").getOrCreate()
     
    print("======Opening Session======")
    spark = get_spark()
    print("======Application Created======")
    # Test the connection by creating a simple DataFrame
    df = spark.createDataFrame([("Hello",), ("World",)], ["word"])
    print("======Data Frame Created======")
    # Show the DataFrame's content
    df.show()
    print("======Done======")
  3. データ・フロー・ポリシーのステップに従って、ステップ2のPythonアプリケーションを使用してデータ・フロー・アプリケーションを作成します。
  4. データ・フロー・アプリケーションをテストします。
    1. アプリケーションの詳細ページで、「実行」をクリックします。
    2. 必要に応じて、「アプリケーションの実行」パネルで、引数およびパラメータの適用、リソース構成の更新、またはサポートされているSparkプロパティの追加を行います。
    3. 「Run」を選択してアプリケーションを実行します。
    4. (オプション)ログを確認します。実行詳細に移動し、ログを選択します。
  5. パイプラインを作成します。

    パイプラインを作成する前に、パイプライン実行リソースがデータ・フローを使用し、hello-worldアプリケーションでバケットにアクセスできるようにするポリシーがあることを確認してください。詳細は、「パイプライン・ポリシー」を参照してください。

    1. hello-worldデータ・フロー・アプリケーションを使用するステップを使用してパイプラインを作成します。
      1. Data Flow Step Demoなどの名前でパイプラインを作成します。
      2. 「パイプライン・ステップの追加」を選択します。
      3. ステップにStep 1などの名前を指定します。
      4. データ・フロー・アプリケーションを使用するには、「データ・フロー・アプリケーションから」を選択します。
      5. データ・フロー・アプリケーションを選択します。
      6. データ・フロー・アプリケーションが別のコンパートメントにある場合は、「コンパートメントの変更」を選択します。
      7. 「保存」を選択してステップを保存します。
      8. (オプション)ロギングを定義します。
      9. 「作成」を選択して、パイプラインを作成します。
    2. パイプライン・ログの有効化:
      1. パイプラインの詳細に移動します。
      2. 「ログ」リソースを選択します。
      3. ログの有効化
    3. パイプラインの実行:
      1. パイプラインの詳細に移動します。
      2. 「パイプライン実行」リソースを選択します。
      3. 「パイプライン実行の開始」を選択します。
      4. 「開始」を選択します。