データ・フロー委任トークンを使用したOracle Cloud Infrastructureリソースへのアクセス

データ・フロー実行を作成すると、データ・フローによって委任トークンを使用したSparkジョブが自動的に構成されます。このトークンは、ユーザーのかわりにOracle Cloud Infrastructureリソースにアクセスするために使用されます。

これは、データ・フローで実行するSparkジョブが、資格証明を必要とせずにOracle Cloud Infrastructure Object Storageに対して読取りおよび書込みを行うことができる理由です。

トークンは、資格証明なしでOracle Cloud Infrastructure内の他のIAM対応システムにアクセスするためにも使用できます。例:
  • アプリケーションはボールト・サービスにアクセスして、パスワードまたはその他のシークレットを取得できます。
  • NoSQLサービスにアクセスして、Sparkアプリケーションの外部で状態を管理できます。Sparkの実行間で高水位標をトラッキングしたり、分散ロックを作成することが一般的です。NoSQLサービスはこれらの目的に使用できます。
  • 他のコンピューティング・タスクのためにOracle Functionsを起動できます。
  • 電子メール・サービスを使用して電子メールを送信できます。

Oracle Cloud Infrastructureサービスへのコールは、他のSparkコードに自由に組み込むことができます。「ポリシー・リファレンス」ページは、すべてのIAM対応サービスに関する情報を取得するのに適した場所です。

このチュートリアルでは、SparkコードでOracle Cloud Infrastructureクライアントを作成するためのテンプレートについて説明します。これらのクライアントは、データ・フローでの実行時に委任トークンを使用し、データ・フローの外部での実行時にAPIキーを使用します。次の方法を学習します
  1. Javaアプリケーションからの委任トークンの使用
  2. Pythonアプリケーションからの委任トークンの使用

開始する前に

データ・フローの起動を認可されている必要があります。権限は昇格されず、かわりにコールのみが実行されます。Oracle Cloud Infrastructureコンソールで実行できる場合、Sparkジョブはデータ・フローで実行できます。同様に、データ・フローで実行できる場合、SparkジョブはOracle Cloud Infrastructureコンソールで実行できます。

1. Javaアプリケーションからの委任トークンの使用

次のクラスOboTokenClientConfigurator.javaは、認証されたOracle Cloud Infrastructureクライアントを作成します。テンプレート・プロジェクトのベース・ディレクトリから、OboTokenClientConfigurationコードを貼り付ける空のファイルを含むサンプルのソース・ディレクトリ構造を作成します。
$ mkdir -p src/main/java/example/
$ touch src/main/java/example/OboTokenClientConfigurator.java

アプリケーションがローカルで実行されている場合、コードは構成ファイルおよびAPIキーに依存します。この設定の詳細は、「必要なキーとOCID」を参照してください。OboTokenClientConfiguration.javaの例:

package example;
 
import com.oracle.bmc.ConfigFileReader;
import com.oracle.bmc.Region;
import com.oracle.bmc.auth.BasicAuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.auth.InstancePrincipalsAuthenticationDetailsProvider;
import com.oracle.bmc.hdfs.BmcProperties;
import com.oracle.bmc.http.ClientConfigurator;
import com.oracle.bmc.http.DefaultConfigurator;
import com.oracle.bmc.http.signing.internal.Constants;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
 
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.NoSuchElementException;
import javax.annotation.Priority;
import javax.ws.rs.Priorities;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.ClientRequestContext;
import javax.ws.rs.client.ClientRequestFilter;
 
/**
 * Customize the SDK underlying REST client to use the on-behalf-of token when running on
 * Data Flow.
 */
public class OboTokenClientConfigurator implements ClientConfigurator {
 
  // TODO: Set these values for your sepcific OCI environment
  public static final String LOCAL_PROFILE = "DEV"; // TODO <your ~/.oci/config profile>
  private static final String CONFIG_FILE_PATH = ConfigFileReader.DEFAULT_FILE_PATH;
  private static final String CANONICAL_REGION_NAME = Region.US_PHOENIX_1.getRegionId();
 
  private final String delegationTokenPath;
 
  /**
   * Helper function for the Spark Driver to get the token path.
   */
  public static String getDelegationTokenPath() {
    SparkConf conf = new SparkConf();
    try {
      return conf.get("spark.hadoop.fs.oci.client.auth.delegationTokenPath");
    } catch (NoSuchElementException e) {
      return null;
    }
  }
 
  /**
   * Helper function to get the Hadoop configuration for the HDFS BmcFileSystem.
   */
  public static Configuration getConfiguration(Configuration config, String delegationTokenPath) {
    // https://objectstorage.us-phoenix-1.oraclecloud.com
    String domain = "oraclecloud.com";
    String overlayEndpoint = String
        .format("https://objectstorage.%s.%s", CANONICAL_REGION_NAME, domain);
    config.set(BmcProperties.HOST_NAME.getPropertyName(), overlayEndpoint);
    // Data Flow
    if (delegationTokenPath != null) {
      config.set("fs.oci.client.auth.delegationTokenPath", delegationTokenPath);
      config.set(BmcProperties.OBJECT_STORE_CLIENT_CLASS.getPropertyName(),
          "oracle.dfcs.hdfs.DelegationObjectStorageClient");
    } else { // local
      try {
        ConfigFileAuthenticationDetailsProvider provider =
            new ConfigFileAuthenticationDetailsProvider(CONFIG_FILE_PATH, LOCAL_PROFILE);
        config.set(BmcProperties.TENANT_ID.getPropertyName(), provider.getTenantId());
        config.set(BmcProperties.USER_ID.getPropertyName(), provider.getUserId());
        config.set(BmcProperties.FINGERPRINT.getPropertyName(), provider.getFingerprint());
        config.set(BmcProperties.PEM_FILE_PATH.getPropertyName(), provider.getPemFilePath());
      } catch (IOException ex) {
        throw new RuntimeException(ex);
      }
    }
    return config;
  }
 
  /**
   * Helper function to get an environment specific authentication provider.
   */
  public static BasicAuthenticationDetailsProvider getAuthProvider(String delegationTokenPath) {
    if (delegationTokenPath == null) { // local
      try {
        return new ConfigFileAuthenticationDetailsProvider(CONFIG_FILE_PATH, LOCAL_PROFILE);
      } catch (IOException ex) {
        throw new RuntimeException(ex);
      }
    }
    // Data Flow
    return InstancePrincipalsAuthenticationDetailsProvider.builder().build();
  }
 
  /**
   * Helper function to get an environment specific <tt>ClientConfigurator</tt>.
   */
  public static ClientConfigurator getConfigurator(String delegationTokenPath) {
    return (delegationTokenPath == null) ? new DefaultConfigurator() : // local
        new OboTokenClientConfigurator(delegationTokenPath); // Data Flow
  }
 
  /**
   * Helper function to get an environment specific working directory.
   */
  public static String getTempDirectory() {
    if (System.getenv("HOME").equals("/home/dataflow")) {
      return "/opt/spark/work-dir/";
    }
    return System.getProperty("java.io.tmpdir");
  }
 
  public OboTokenClientConfigurator(String delegationTokenPath) {
    this.delegationTokenPath = delegationTokenPath;
  }
 
  @Override
  public void customizeBuilder(ClientBuilder builder) {
  }
 
  @Override
  public void customizeClient(Client client) {
    client.register(new _OboTokenRequestFilter());
  }
 
  @Priority(_OboTokenRequestFilter.PRIORITY)
  class _OboTokenRequestFilter implements ClientRequestFilter {
 
    public static final int PRIORITY = Priorities.AUTHENTICATION - 1;
 
    @Override
    public void filter(final ClientRequestContext requestContext) throws IOException {
      String token = new String(Files.readAllBytes(Paths.get(delegationTokenPath)));
      requestContext.getHeaders().putSingle(Constants.OPC_OBO_TOKEN, token);
    }
  }
}
次の例は、Oracle Cloud Infrastructure Object Storageで委任トークンを使用する方法を示しています次のコマンドを実行します:
$ touch src/main/java/example/Example.java
Exmaple.java
package example;
 
import com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider;
import com.oracle.bmc.hdfs.BmcFilesystem;
import com.oracle.bmc.http.ClientConfigurator;
import com.oracle.bmc.objectstorage.ObjectStorageClient;
import com.oracle.bmc.objectstorage.requests.GetNamespaceRequest;
import com.oracle.bmc.objectstorage.requests.PutObjectRequest;
import com.oracle.bmc.objectstorage.responses.GetNamespaceResponse;
import com.oracle.bmc.objectstorage.transfer.UploadConfiguration;
import com.oracle.bmc.objectstorage.transfer.UploadManager;
import com.oracle.bmc.objectstorage.transfer.UploadManager.UploadRequest;
 
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
 
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
 
// This demo illustrates:
// 1. The delegation token path must be looked up in the Driver and passed to the Executor.
// 2. If the token path is null, assume you are running locally and load an API key.
// 3. Using the HDFS-Connector to create an HDFS FileSystem to connect to Object Storage.
public class Example {
 
  // TODO: Set these values for your sepcific OCI environment
  private static final String NAMESPACE = "<your tenant namespace>";
  private static final String BUCKET_NAME = "output"; // ensure that you create this bucket
 
  private static final String OCI_URI = "oci://" + BUCKET_NAME + "@" + NAMESPACE;
  private static final String SAMPLE_JOB_PATH = "/Example";
  private static final String INPUT_FILE = SAMPLE_JOB_PATH + "/input.dat";
 
  public static void main(String[] args) throws Exception {
    // Get our Spark session.
    SparkConf conf = new SparkConf();
    String master = conf.get("spark.master", "local[*]");
    SparkSession spark = SparkSession.builder().appName("Example").master(master).getOrCreate();
    JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
 
    String delegationTokenPath = OboTokenClientConfigurator.getDelegationTokenPath();
 
    // write a file to Object Storage using an HDFS FileSystem
    try (final BmcFilesystem fs = new BmcFilesystem())
    {
      fs.initialize(new URI(OCI_URI), OboTokenClientConfigurator.getConfiguration(
          jsc.hadoopConfiguration(), delegationTokenPath));
      fs.delete(new Path(SAMPLE_JOB_PATH), true);
      final FSDataOutputStream output = fs.create(new Path(INPUT_FILE));
      output.writeChars("example\npath\ngak\ntest\nexample\ngak\n\ngak");
      output.close();
    }
 
    // Test adding a Spark file, equivalent to spark.files or --files
    jsc.addFile(OCI_URI + INPUT_FILE); // Do not use this for data files
    // Executor -> read a file
    JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1, 2), 2);
    rdd.foreach(item -> executorGetFile());
 
    List<Integer> collection = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
    rdd = jsc.parallelize(collection, 4);
    rdd.foreach(item -> copyFileSample(item, delegationTokenPath));
 
    // Executor -> write a file to Object Storage using an HDFS FileSystem
    rdd = jsc.parallelize(Arrays.asList(1, 2), 2);
    rdd.foreach(item -> fileSystemSample(item, delegationTokenPath));
 
    // ---------------------------------------------------------------------
    // <COPY CODE SNIPPET EXAMPLE #2 HERE>
 
    // ---------------------------------------------------------------------
 
    // ---------------------------------------------------------------------
    // <COPY CODE SNIPPET EXAMPLE #3 HERE>
 
    // ---------------------------------------------------------------------
 
    jsc.stop();
  }
 
  public static void executorGetFile() {
    String filepath = SparkFiles.get("input.dat");
    File file = new File(filepath);
    try {
      String data = IOUtils.toString(new FileInputStream(file), StandardCharsets.UTF_8);
      System.out.println("Read a file:\n" + data);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }
 
  public static void fileSystemSample(int i, String delegationTokenPath) throws Exception {
    // write a file to Object Storage using a FileSystem
    try (final BmcFilesystem fs = new BmcFilesystem())
    {
      fs.initialize(new URI(OCI_URI), OboTokenClientConfigurator.getConfiguration(
          new Configuration(), delegationTokenPath));
      fs.delete(new Path(SAMPLE_JOB_PATH + "/" + i), true);
      final FSDataOutputStream output = fs.create(new Path(INPUT_FILE));
      output.writeChars("example\npath\ngak\ntest\nexample\ngak\n\ngak");
      output.close();
    }
  }
 
  // The driver needs to pass the delegation token path to executors.
  public static void copyFileSample(int i, String delegationTokenPath)
      throws Exception {
    // Create a file to upload.
    String outputFile = MessageFormat.format("{0}.txt", i);
    String outputPath = "/tmp/" + outputFile;
    Writer wr = new FileWriter(outputPath);
    wr.write(String.valueOf(i));
    wr.close();
 
    AbstractAuthenticationDetailsProvider provider =
        OboTokenClientConfigurator.getAuthProvider(delegationTokenPath);
    ClientConfigurator configurator = OboTokenClientConfigurator.getConfigurator(delegationTokenPath);
 
    // Create an object storage client.
    ObjectStorageClient client = ObjectStorageClient.builder()
        .clientConfigurator(configurator)
        .build(provider);
 
    // Look up our namespace.
    GetNamespaceResponse namespaceResponse = client.getNamespace(GetNamespaceRequest.builder().build());
    String namespaceName = namespaceResponse.getValue();
 
    // Upload the file.
    UploadConfiguration uploadConfiguration = UploadConfiguration.builder()
        .allowMultipartUploads(true)
        .allowParallelUploads(true)
        .build();
    UploadManager uploadManager = new UploadManager(client, uploadConfiguration);
    PutObjectRequest request = PutObjectRequest.builder()
        .bucketName(BUCKET_NAME)
        .namespaceName(namespaceName)
        .objectName(outputFile)
        .contentType("text/plain")
        .contentLanguage("EN")
        .contentEncoding("UTF-8")
        .build();
    UploadRequest uploadDetails = UploadRequest.builder(new File(outputPath))
        .allowOverwrite(true)
        .build(request);
    uploadManager.upload(uploadDetails);
  }
}
次のコードを使用して、ボールトに接続できます。example.javaファイルの<COPY CODE SNIPPET EXAMPLE #2 HERE>セクションにコピーします。
<!-- Add the Vault SDK dependency to our example project pom.xml below the Secrets SDK dependency -->
    <dependency>
      <groupId>com.oracle.oci.sdk</groupId>
      <artifactId>oci-java-sdk-vault</artifactId>
      <version>${oci-java-sdk-version}</version>
    </dependency>
 
// ---------------------------------------------------------------------
// Add these imports to the top of the file
import com.oracle.bmc.secrets.SecretsClient;
import com.oracle.bmc.secrets.requests.GetSecretBundleRequest;
import com.oracle.bmc.secrets.responses.GetSecretBundleResponse;
import com.oracle.bmc.secrets.model.Base64SecretBundleContentDetails;
import org.apache.commons.codec.binary.Base64;
import com.oracle.bmc.auth.BasicAuthenticationDetailsProvider;
// ---------------------------------------------------------------------
 
// <COPY CODE SNIPPET EXAMPLE #2 HERE>
 
    String passwordOcid = null; // TODO <the vault secret OCID>
 
    BasicAuthenticationDetailsProvider provider =
        OboTokenClientConfigurator.getAuthProvider(delegationTokenPath);
    ClientConfigurator configurator = OboTokenClientConfigurator.getConfigurator(delegationTokenPath);
    SecretsClient secretsClient = SecretsClient.builder().clientConfigurator(configurator).build(provider);
 
    // create get secret bundle request
    GetSecretBundleRequest getSecretBundleRequest = GetSecretBundleRequest
        .builder()
        .secretId(passwordOcid)
        .stage(GetSecretBundleRequest.Stage.Current)
        .build();
 
    // get the secret
    GetSecretBundleResponse getSecretBundleResponse = secretsClient.
        getSecretBundle(getSecretBundleRequest);
 
    // get the bundle content details
    Base64SecretBundleContentDetails base64SecretBundleContentDetails =
        (Base64SecretBundleContentDetails) getSecretBundleResponse.
            getSecretBundle().getSecretBundleContent();
 
    // decode the encoded secret
    byte[] secretValueDecoded = Base64.decodeBase64(base64SecretBundleContentDetails.getContent());
    System.out.println("Secret: " + new String(secretValueDecoded, StandardCharsets.UTF_8));
アプリケーションからAutonomous Database for Analytics and Data Warehousingに接続するには、Autonomous Database for Analytics and Data Warehousingウォレットをダウンロードします。次のステップに従います:
  1. Oracle Cloud Infrastructureにログインします。
  2. Autonomous Databaseを検索します。
  3. ご使用のADWインスタンスを選択します。
  4. 「DB接続」をクリックします。
  5. 「ウォレットのダウンロード」をクリックします。
  6. パスワードを入力します。
  7. ウォレットをオブジェクト・ストレージのバケットに保存します。
  8. 次のコードをサンプル・プロジェクトに追加します:
    <!-- Add the ADW dependencies to our example project pom.xml below the Secrets SDK dependency -->
        <!-- Drivers for talking to ADW. Jars need to be deployed using mvn deploy:deploy-file -->
        <dependency>
          <groupId>com.oracle.database.jdbc</groupId>
          <artifactId>ojdbc8</artifactId>
          <version>18.3.0.0</version>
        </dependency>
        <dependency>
          <groupId>com.oracle.database.jdbc</groupId>
          <artifactId>ucp</artifactId>
          <version>18.3.0.0</version>
        </dependency>
        <dependency>
          <groupId>com.oracle.database.security</groupId>
          <artifactId>oraclepki</artifactId>
          <version>18.3.0.0</version>
        </dependency>
        <dependency>
          <groupId>com.oracle.database.security</groupId>
          <artifactId>osdt_cert</artifactId>
          <version>18.3.0.0</version>
        </dependency>
        <dependency>
          <groupId>com.oracle.database.security</groupId>
          <artifactId>osdt_core</artifactId>
          <version>18.3.0.0</version>
        </dependency>
     
    // ---------------------------------------------------------------------
    // Add these imports to the top of the file
    import java.util.ArrayList;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.types.StructType;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.Dataset;
    import java.util.Map;
    import java.util.HashMap;
    import oracle.jdbc.driver.OracleConnection;
    // ---------------------------------------------------------------------
     
        // <COPY CODE SNIPPET EXAMPLE #3 HERE>
     
        // TODO <set these values as appropriate to access your ADW using your ADW wallet
        String walletPath = OCI_URI + "/Wallet_EXAMPLEADW.zip";
        String user = "ADMIN";
        String tnsName = "exampleadw_high"; // this can be found inside of the wallet.zip (unpack it)
        byte[] secretValueDecoded = "example_secret".getBytes();
     
        // Build a 2 row data set to save to ADW.
        // Usually you would load data from CSV/Parquet, this is to keep the example
        // simple.
        List<String[]> stringAsList = new ArrayList<>();
        stringAsList.add(new String[] { "value11", "value21" });
        stringAsList.add(new String[] { "value12", "value22" });
        JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
        JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map(RowFactory::create);
        StructType schema = DataTypes
            .createStructType(new StructField[] { DataTypes.createStructField("col1", DataTypes.StringType, false),
                DataTypes.createStructField("col2", DataTypes.StringType, false) });
        Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, schema).toDF();
     
        // Download the wallet from object storage and distribute it.
        String tmpPath = DataFlowDeployWallet.deployWallet(new URI(OCI_URI), spark.sparkContext(),
            OboTokenClientConfigurator.getConfiguration(jsc.hadoopConfiguration(), delegationTokenPath), walletPath);
     
        // Configure the ADW JDBC URL.
        String jdbcUrl = MessageFormat.format("jdbc:oracle:thin:@{0}?TNS_ADMIN={1}", tnsName, tmpPath);
        System.out.println("JDBC URL " + jdbcUrl);
     
        String password = new String(secretValueDecoded);
     
        // Save data to ADW.
        System.out.println("Saving to ADW");
        Map<String, String> options = new HashMap<String, String>();
        options.put("driver", "oracle.jdbc.driver.OracleDriver");
        options.put("url", jdbcUrl);
        options.put(OracleConnection.CONNECTION_PROPERTY_USER_NAME, user);
        options.put(OracleConnection.CONNECTION_PROPERTY_PASSWORD, password);
        options.put(OracleConnection.CONNECTION_PROPERTY_TNS_ADMIN, tmpPath);
        options.put("dbtable", "sample");
        df.write().format("jdbc").options(options).mode("Overwrite").save();
        System.out.println("Done writing to ADW");
  9. ヘルパー・クラスDataFlowDeployWallet.javaを使用して、ウォレット・ファイルにアクセスします。
    1. 次のコマンドを実行してファイルを作成します:
      $ touch src/main/java/example/DataFlowDeployWallet.java
    2. 次のコードをファイルにコピーします:
      /*
       * Copyright © 2021, Oracle and/or its affiliates.
       * The Universal Permissive License (UPL), Version 1.0
       */
      package example;
       
      import com.oracle.bmc.hdfs.BmcFilesystem;
       
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.spark.SparkContext;
       
      import java.io.BufferedOutputStream;
      import java.io.File;
      import java.io.FileInputStream;
      import java.io.FileOutputStream;
      import java.io.IOException;
      import java.net.URI;
      import java.net.URISyntaxException;
      import java.util.Arrays;
      import java.util.List;
      import java.util.zip.ZipEntry;
      import java.util.zip.ZipInputStream;
       
      /*
       * Helper to deploy a wallet to the Spark cluster.
       *
       * This only needs to be done once and should be done in the Spark driver.
       */
      public class DataFlowDeployWallet {
       
          private static final int BUFFER_SIZE = 4096;
       
          public static String deployWallet(URI oci_uri, SparkContext sc, Configuration configuration, String walletPath)
                  throws IOException, URISyntaxException {
              try (final BmcFilesystem fs = new BmcFilesystem()) {
                  fs.initialize(oci_uri, configuration);
       
                  String tmpPath = downloadAndExtract(fs, new Path(walletPath));
       
                  List<String> walletContents = Arrays.asList("cwallet.sso", "ewallet.p12", "keystore.jks", "ojdbc.properties",
                          "sqlnet.ora", "tnsnames.ora", "truststore.jks");
                  for (String file : walletContents) {
                      sc.addFile(tmpPath + file);
                  }
                  return tmpPath;
              }
          }
       
          private static String downloadAndExtract(BmcFilesystem bmc, Path walletRemotePath)
                  throws IllegalArgumentException, IOException {
              String tmpPath = OboTokenClientConfigurator.getTempDirectory();
              String walletLocal = tmpPath + "wallet.zip";
              bmc.copyToLocalFile(walletRemotePath, new Path(walletLocal));
              unzip(walletLocal, tmpPath);
              return tmpPath;
          }
       
          private static void unzip(String zipFilePath, String destDirectory) throws IOException {
              File destDir = new File(destDirectory);
              if (!destDir.exists()) {
                  destDir.mkdir();
              }
              ZipInputStream zipIn = new ZipInputStream(new FileInputStream(zipFilePath));
              ZipEntry entry = zipIn.getNextEntry();
              // iterates over entries in the zip file
              while (entry != null) {
                  String filePath = destDirectory + File.separator + entry.getName();
                  if (!entry.isDirectory()) {
                      // if the entry is a file, extracts it
                      extractFile(zipIn, filePath);
                  } else {
                      // if the entry is a directory, make the directory
                      File dir = new File(filePath);
                      dir.mkdir();
                  }
                  zipIn.closeEntry();
                  entry = zipIn.getNextEntry();
              }
              zipIn.close();
          }
       
          private static void extractFile(ZipInputStream zipIn, String filePath) throws IOException {
              BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath));
              byte[] bytesIn = new byte[BUFFER_SIZE];
              int read = 0;
              while ((read = zipIn.read(bytesIn)) != -1) {
                  bos.write(bytesIn, 0, read);
              }
              bos.close();
          }
      }

2. Pythonアプリケーションからの委任トークンの使用

次のコードでは、Pythonで認証されたクライアントを作成できます。アプリケーションがローカルで実行されている場合、コードは構成ファイルおよびAPIキーに依存します。設定の詳細は、必要なキーとOCIDを参照してください。
# Helper Functions
def get_token_path(spark):
    token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
    token_path = spark.sparkContext.getConf().get(token_key)
    return token_path

def get_authenticated_client(token_path, client):
    import oci
    import os

    if token_path is None:
        # You are running locally, so use our API Key.
        config = oci.config.from_file()
        authenticated_client = client(config)
    else:
        # You are running in Data Flow, so use our Delegation Token.
        with open(token_path) as fd:
            delegation_token = fd.read()
        signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
            delegation_token=delegation_token
        )
        authenticated_client = client(config={}, signer=signer)
    return authenticated_client
次に、このコードの使用例を2つ示します:
import oci
token_path = get_token_path(spark)

# Get an object storage client.
object_storage_client = get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)
...

# Retrieve a password using the secrets client.
import base64
password_ocid = "my_password_ocid"
secrets_client = get_authenticated_client(token_path, oci.secrets.SecretsClient)
response = secrets_client.get_secret_bundle(password_ocid)
base64_secret_content = response.data.secret_bundle_content.content
base64_secret_bytes = base64_secret_content.encode("ascii")
base64_message_bytes = base64.b64decode(base64_secret_bytes)
password = base64_message_bytes.decode("ascii")
...

認証済クライアントの構築に使用されるパターンに注意してください。get_authenicated_clientを実行するクライアントのトークン・パスとクラス名を渡すと、認証済クライアントが返されます。このアプローチは、Oracle Cloud Infrastructure Python SDKによって提供される任意のクライアントで機能します。

mapforeachまたはforeachPartition操作にはトークン・パスを指定する必要があります。Sparkエグゼキュータ内でトークン・パスを検索できません。

次の手順

Java SDKには、SparkアプリケーションでのOracle Cloud Infrastructure統合の開発に役立つ多くのがあります。

Oracle Cloud Infrastructure Python SDKには多くのがあり、Oracle Cloud Infrastructure統合をSparkアプリケーションで開発するために使用できます。