データ・フロー委任トークンを使用したOracle Cloud Infrastructureリソースへのアクセス
データ・フロー実行を作成すると、データ・フローによって委任トークンを使用したSparkジョブが自動的に構成されます。このトークンは、ユーザーのかわりにOracle Cloud Infrastructureリソースにアクセスするために使用されます。
これは、データ・フローで実行するSparkジョブが、資格証明を必要とせずにOracle Cloud Infrastructure Object Storageに対して読取りおよび書込みを行うことができる理由です。
- アプリケーションはボールト・サービスにアクセスして、パスワードまたはその他のシークレットを取得できます。
- NoSQLサービスにアクセスして、Sparkアプリケーションの外部で状態を管理できます。Sparkの実行間で高水位標をトラッキングしたり、分散ロックを作成することが一般的です。NoSQLサービスはこれらの目的に使用できます。
- 他のコンピューティング・タスクのためにOracle Functionsを起動できます。
- 電子メール・サービスを使用して電子メールを送信できます。
Oracle Cloud Infrastructureサービスへのコールは、他のSparkコードに自由に組み込むことができます。「ポリシー・リファレンス」ページは、すべてのIAM対応サービスに関する情報を取得するのに適した場所です。
開始する前に
データ・フローの起動を認可されている必要があります。権限は昇格されず、かわりにコールのみが実行されます。Oracle Cloud Infrastructureコンソールで実行できる場合、Sparkジョブはデータ・フローで実行できます。同様に、データ・フローで実行できる場合、SparkジョブはOracle Cloud Infrastructureコンソールで実行できます。
1. Javaアプリケーションからの委任トークンの使用
OboTokenClientConfigurator.java
は、認証されたOracle Cloud Infrastructureクライアントを作成します。テンプレート・プロジェクトのベース・ディレクトリから、OboTokenClientConfigurationコードを貼り付ける空のファイルを含むサンプルのソース・ディレクトリ構造を作成します。
$ mkdir -p src/main/java/example/
$ touch src/main/java/example/OboTokenClientConfigurator.java
アプリケーションがローカルで実行されている場合、コードは構成ファイルおよびAPIキーに依存します。この設定の詳細は、「必要なキーとOCID」を参照してください。OboTokenClientConfiguration.javaの例:
package example;
import com.oracle.bmc.ConfigFileReader;
import com.oracle.bmc.Region;
import com.oracle.bmc.auth.BasicAuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.auth.InstancePrincipalsAuthenticationDetailsProvider;
import com.oracle.bmc.hdfs.BmcProperties;
import com.oracle.bmc.http.ClientConfigurator;
import com.oracle.bmc.http.DefaultConfigurator;
import com.oracle.bmc.http.signing.internal.Constants;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.NoSuchElementException;
import javax.annotation.Priority;
import javax.ws.rs.Priorities;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.ClientRequestContext;
import javax.ws.rs.client.ClientRequestFilter;
/**
* Customize the SDK underlying REST client to use the on-behalf-of token when running on
* Data Flow.
*/
public class OboTokenClientConfigurator implements ClientConfigurator {
// TODO: Set these values for your sepcific OCI environment
public static final String LOCAL_PROFILE = "DEV"; // TODO <your ~/.oci/config profile>
private static final String CONFIG_FILE_PATH = ConfigFileReader.DEFAULT_FILE_PATH;
private static final String CANONICAL_REGION_NAME = Region.US_PHOENIX_1.getRegionId();
private final String delegationTokenPath;
/**
* Helper function for the Spark Driver to get the token path.
*/
public static String getDelegationTokenPath() {
SparkConf conf = new SparkConf();
try {
return conf.get("spark.hadoop.fs.oci.client.auth.delegationTokenPath");
} catch (NoSuchElementException e) {
return null;
}
}
/**
* Helper function to get the Hadoop configuration for the HDFS BmcFileSystem.
*/
public static Configuration getConfiguration(Configuration config, String delegationTokenPath) {
// https://objectstorage.us-phoenix-1.oraclecloud.com
String domain = "oraclecloud.com";
String overlayEndpoint = String
.format("https://objectstorage.%s.%s", CANONICAL_REGION_NAME, domain);
config.set(BmcProperties.HOST_NAME.getPropertyName(), overlayEndpoint);
// Data Flow
if (delegationTokenPath != null) {
config.set("fs.oci.client.auth.delegationTokenPath", delegationTokenPath);
config.set(BmcProperties.OBJECT_STORE_CLIENT_CLASS.getPropertyName(),
"oracle.dfcs.hdfs.DelegationObjectStorageClient");
} else { // local
try {
ConfigFileAuthenticationDetailsProvider provider =
new ConfigFileAuthenticationDetailsProvider(CONFIG_FILE_PATH, LOCAL_PROFILE);
config.set(BmcProperties.TENANT_ID.getPropertyName(), provider.getTenantId());
config.set(BmcProperties.USER_ID.getPropertyName(), provider.getUserId());
config.set(BmcProperties.FINGERPRINT.getPropertyName(), provider.getFingerprint());
config.set(BmcProperties.PEM_FILE_PATH.getPropertyName(), provider.getPemFilePath());
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
return config;
}
/**
* Helper function to get an environment specific authentication provider.
*/
public static BasicAuthenticationDetailsProvider getAuthProvider(String delegationTokenPath) {
if (delegationTokenPath == null) { // local
try {
return new ConfigFileAuthenticationDetailsProvider(CONFIG_FILE_PATH, LOCAL_PROFILE);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
// Data Flow
return InstancePrincipalsAuthenticationDetailsProvider.builder().build();
}
/**
* Helper function to get an environment specific <tt>ClientConfigurator</tt>.
*/
public static ClientConfigurator getConfigurator(String delegationTokenPath) {
return (delegationTokenPath == null) ? new DefaultConfigurator() : // local
new OboTokenClientConfigurator(delegationTokenPath); // Data Flow
}
/**
* Helper function to get an environment specific working directory.
*/
public static String getTempDirectory() {
if (System.getenv("HOME").equals("/home/dataflow")) {
return "/opt/spark/work-dir/";
}
return System.getProperty("java.io.tmpdir");
}
public OboTokenClientConfigurator(String delegationTokenPath) {
this.delegationTokenPath = delegationTokenPath;
}
@Override
public void customizeBuilder(ClientBuilder builder) {
}
@Override
public void customizeClient(Client client) {
client.register(new _OboTokenRequestFilter());
}
@Priority(_OboTokenRequestFilter.PRIORITY)
class _OboTokenRequestFilter implements ClientRequestFilter {
public static final int PRIORITY = Priorities.AUTHENTICATION - 1;
@Override
public void filter(final ClientRequestContext requestContext) throws IOException {
String token = new String(Files.readAllBytes(Paths.get(delegationTokenPath)));
requestContext.getHeaders().putSingle(Constants.OPC_OBO_TOKEN, token);
}
}
}
$ touch src/main/java/example/Example.java
Exmaple.javapackage example;
import com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider;
import com.oracle.bmc.hdfs.BmcFilesystem;
import com.oracle.bmc.http.ClientConfigurator;
import com.oracle.bmc.objectstorage.ObjectStorageClient;
import com.oracle.bmc.objectstorage.requests.GetNamespaceRequest;
import com.oracle.bmc.objectstorage.requests.PutObjectRequest;
import com.oracle.bmc.objectstorage.responses.GetNamespaceResponse;
import com.oracle.bmc.objectstorage.transfer.UploadConfiguration;
import com.oracle.bmc.objectstorage.transfer.UploadManager;
import com.oracle.bmc.objectstorage.transfer.UploadManager.UploadRequest;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
// This demo illustrates:
// 1. The delegation token path must be looked up in the Driver and passed to the Executor.
// 2. If the token path is null, assume you are running locally and load an API key.
// 3. Using the HDFS-Connector to create an HDFS FileSystem to connect to Object Storage.
public class Example {
// TODO: Set these values for your sepcific OCI environment
private static final String NAMESPACE = "<your tenant namespace>";
private static final String BUCKET_NAME = "output"; // ensure that you create this bucket
private static final String OCI_URI = "oci://" + BUCKET_NAME + "@" + NAMESPACE;
private static final String SAMPLE_JOB_PATH = "/Example";
private static final String INPUT_FILE = SAMPLE_JOB_PATH + "/input.dat";
public static void main(String[] args) throws Exception {
// Get our Spark session.
SparkConf conf = new SparkConf();
String master = conf.get("spark.master", "local[*]");
SparkSession spark = SparkSession.builder().appName("Example").master(master).getOrCreate();
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
String delegationTokenPath = OboTokenClientConfigurator.getDelegationTokenPath();
// write a file to Object Storage using an HDFS FileSystem
try (final BmcFilesystem fs = new BmcFilesystem())
{
fs.initialize(new URI(OCI_URI), OboTokenClientConfigurator.getConfiguration(
jsc.hadoopConfiguration(), delegationTokenPath));
fs.delete(new Path(SAMPLE_JOB_PATH), true);
final FSDataOutputStream output = fs.create(new Path(INPUT_FILE));
output.writeChars("example\npath\ngak\ntest\nexample\ngak\n\ngak");
output.close();
}
// Test adding a Spark file, equivalent to spark.files or --files
jsc.addFile(OCI_URI + INPUT_FILE); // Do not use this for data files
// Executor -> read a file
JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1, 2), 2);
rdd.foreach(item -> executorGetFile());
List<Integer> collection = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
rdd = jsc.parallelize(collection, 4);
rdd.foreach(item -> copyFileSample(item, delegationTokenPath));
// Executor -> write a file to Object Storage using an HDFS FileSystem
rdd = jsc.parallelize(Arrays.asList(1, 2), 2);
rdd.foreach(item -> fileSystemSample(item, delegationTokenPath));
// ---------------------------------------------------------------------
// <COPY CODE SNIPPET EXAMPLE #2 HERE>
// ---------------------------------------------------------------------
// ---------------------------------------------------------------------
// <COPY CODE SNIPPET EXAMPLE #3 HERE>
// ---------------------------------------------------------------------
jsc.stop();
}
public static void executorGetFile() {
String filepath = SparkFiles.get("input.dat");
File file = new File(filepath);
try {
String data = IOUtils.toString(new FileInputStream(file), StandardCharsets.UTF_8);
System.out.println("Read a file:\n" + data);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void fileSystemSample(int i, String delegationTokenPath) throws Exception {
// write a file to Object Storage using a FileSystem
try (final BmcFilesystem fs = new BmcFilesystem())
{
fs.initialize(new URI(OCI_URI), OboTokenClientConfigurator.getConfiguration(
new Configuration(), delegationTokenPath));
fs.delete(new Path(SAMPLE_JOB_PATH + "/" + i), true);
final FSDataOutputStream output = fs.create(new Path(INPUT_FILE));
output.writeChars("example\npath\ngak\ntest\nexample\ngak\n\ngak");
output.close();
}
}
// The driver needs to pass the delegation token path to executors.
public static void copyFileSample(int i, String delegationTokenPath)
throws Exception {
// Create a file to upload.
String outputFile = MessageFormat.format("{0}.txt", i);
String outputPath = "/tmp/" + outputFile;
Writer wr = new FileWriter(outputPath);
wr.write(String.valueOf(i));
wr.close();
AbstractAuthenticationDetailsProvider provider =
OboTokenClientConfigurator.getAuthProvider(delegationTokenPath);
ClientConfigurator configurator = OboTokenClientConfigurator.getConfigurator(delegationTokenPath);
// Create an object storage client.
ObjectStorageClient client = ObjectStorageClient.builder()
.clientConfigurator(configurator)
.build(provider);
// Look up our namespace.
GetNamespaceResponse namespaceResponse = client.getNamespace(GetNamespaceRequest.builder().build());
String namespaceName = namespaceResponse.getValue();
// Upload the file.
UploadConfiguration uploadConfiguration = UploadConfiguration.builder()
.allowMultipartUploads(true)
.allowParallelUploads(true)
.build();
UploadManager uploadManager = new UploadManager(client, uploadConfiguration);
PutObjectRequest request = PutObjectRequest.builder()
.bucketName(BUCKET_NAME)
.namespaceName(namespaceName)
.objectName(outputFile)
.contentType("text/plain")
.contentLanguage("EN")
.contentEncoding("UTF-8")
.build();
UploadRequest uploadDetails = UploadRequest.builder(new File(outputPath))
.allowOverwrite(true)
.build(request);
uploadManager.upload(uploadDetails);
}
}
<COPY CODE SNIPPET EXAMPLE #2 HERE>
セクションにコピーします。
<!-- Add the Vault SDK dependency to our example project pom.xml below the Secrets SDK dependency -->
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-vault</artifactId>
<version>${oci-java-sdk-version}</version>
</dependency>
// ---------------------------------------------------------------------
// Add these imports to the top of the file
import com.oracle.bmc.secrets.SecretsClient;
import com.oracle.bmc.secrets.requests.GetSecretBundleRequest;
import com.oracle.bmc.secrets.responses.GetSecretBundleResponse;
import com.oracle.bmc.secrets.model.Base64SecretBundleContentDetails;
import org.apache.commons.codec.binary.Base64;
import com.oracle.bmc.auth.BasicAuthenticationDetailsProvider;
// ---------------------------------------------------------------------
// <COPY CODE SNIPPET EXAMPLE #2 HERE>
String passwordOcid = null; // TODO <the vault secret OCID>
BasicAuthenticationDetailsProvider provider =
OboTokenClientConfigurator.getAuthProvider(delegationTokenPath);
ClientConfigurator configurator = OboTokenClientConfigurator.getConfigurator(delegationTokenPath);
SecretsClient secretsClient = SecretsClient.builder().clientConfigurator(configurator).build(provider);
// create get secret bundle request
GetSecretBundleRequest getSecretBundleRequest = GetSecretBundleRequest
.builder()
.secretId(passwordOcid)
.stage(GetSecretBundleRequest.Stage.Current)
.build();
// get the secret
GetSecretBundleResponse getSecretBundleResponse = secretsClient.
getSecretBundle(getSecretBundleRequest);
// get the bundle content details
Base64SecretBundleContentDetails base64SecretBundleContentDetails =
(Base64SecretBundleContentDetails) getSecretBundleResponse.
getSecretBundle().getSecretBundleContent();
// decode the encoded secret
byte[] secretValueDecoded = Base64.decodeBase64(base64SecretBundleContentDetails.getContent());
System.out.println("Secret: " + new String(secretValueDecoded, StandardCharsets.UTF_8));
- Oracle Cloud Infrastructureにログインします。
Autonomous Database
を検索します。- ご使用の
ADW
インスタンスを選択します。 - 「DB接続」をクリックします。
- 「ウォレットのダウンロード」をクリックします。
- パスワードを入力します。
- ウォレットをオブジェクト・ストレージのバケットに保存します。
- 次のコードをサンプル・プロジェクトに追加します:
<!-- Add the ADW dependencies to our example project pom.xml below the Secrets SDK dependency --> <!-- Drivers for talking to ADW. Jars need to be deployed using mvn deploy:deploy-file --> <dependency> <groupId>com.oracle.database.jdbc</groupId> <artifactId>ojdbc8</artifactId> <version>18.3.0.0</version> </dependency> <dependency> <groupId>com.oracle.database.jdbc</groupId> <artifactId>ucp</artifactId> <version>18.3.0.0</version> </dependency> <dependency> <groupId>com.oracle.database.security</groupId> <artifactId>oraclepki</artifactId> <version>18.3.0.0</version> </dependency> <dependency> <groupId>com.oracle.database.security</groupId> <artifactId>osdt_cert</artifactId> <version>18.3.0.0</version> </dependency> <dependency> <groupId>com.oracle.database.security</groupId> <artifactId>osdt_core</artifactId> <version>18.3.0.0</version> </dependency> // --------------------------------------------------------------------- // Add these imports to the top of the file import java.util.ArrayList; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.Dataset; import java.util.Map; import java.util.HashMap; import oracle.jdbc.driver.OracleConnection; // --------------------------------------------------------------------- // <COPY CODE SNIPPET EXAMPLE #3 HERE> // TODO <set these values as appropriate to access your ADW using your ADW wallet String walletPath = OCI_URI + "/Wallet_EXAMPLEADW.zip"; String user = "ADMIN"; String tnsName = "exampleadw_high"; // this can be found inside of the wallet.zip (unpack it) byte[] secretValueDecoded = "example_secret".getBytes(); // Build a 2 row data set to save to ADW. // Usually you would load data from CSV/Parquet, this is to keep the example // simple. List<String[]> stringAsList = new ArrayList<>(); stringAsList.add(new String[] { "value11", "value21" }); stringAsList.add(new String[] { "value12", "value22" }); JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext()); JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map(RowFactory::create); StructType schema = DataTypes .createStructType(new StructField[] { DataTypes.createStructField("col1", DataTypes.StringType, false), DataTypes.createStructField("col2", DataTypes.StringType, false) }); Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, schema).toDF(); // Download the wallet from object storage and distribute it. String tmpPath = DataFlowDeployWallet.deployWallet(new URI(OCI_URI), spark.sparkContext(), OboTokenClientConfigurator.getConfiguration(jsc.hadoopConfiguration(), delegationTokenPath), walletPath); // Configure the ADW JDBC URL. String jdbcUrl = MessageFormat.format("jdbc:oracle:thin:@{0}?TNS_ADMIN={1}", tnsName, tmpPath); System.out.println("JDBC URL " + jdbcUrl); String password = new String(secretValueDecoded); // Save data to ADW. System.out.println("Saving to ADW"); Map<String, String> options = new HashMap<String, String>(); options.put("driver", "oracle.jdbc.driver.OracleDriver"); options.put("url", jdbcUrl); options.put(OracleConnection.CONNECTION_PROPERTY_USER_NAME, user); options.put(OracleConnection.CONNECTION_PROPERTY_PASSWORD, password); options.put(OracleConnection.CONNECTION_PROPERTY_TNS_ADMIN, tmpPath); options.put("dbtable", "sample"); df.write().format("jdbc").options(options).mode("Overwrite").save(); System.out.println("Done writing to ADW");
- ヘルパー・クラスDataFlowDeployWallet.javaを使用して、ウォレット・ファイルにアクセスします。
- 次のコマンドを実行してファイルを作成します:
$ touch src/main/java/example/DataFlowDeployWallet.java
- 次のコードをファイルにコピーします:
/* * Copyright © 2021, Oracle and/or its affiliates. * The Universal Permissive License (UPL), Version 1.0 */ package example; import com.oracle.bmc.hdfs.BmcFilesystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkContext; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; /* * Helper to deploy a wallet to the Spark cluster. * * This only needs to be done once and should be done in the Spark driver. */ public class DataFlowDeployWallet { private static final int BUFFER_SIZE = 4096; public static String deployWallet(URI oci_uri, SparkContext sc, Configuration configuration, String walletPath) throws IOException, URISyntaxException { try (final BmcFilesystem fs = new BmcFilesystem()) { fs.initialize(oci_uri, configuration); String tmpPath = downloadAndExtract(fs, new Path(walletPath)); List<String> walletContents = Arrays.asList("cwallet.sso", "ewallet.p12", "keystore.jks", "ojdbc.properties", "sqlnet.ora", "tnsnames.ora", "truststore.jks"); for (String file : walletContents) { sc.addFile(tmpPath + file); } return tmpPath; } } private static String downloadAndExtract(BmcFilesystem bmc, Path walletRemotePath) throws IllegalArgumentException, IOException { String tmpPath = OboTokenClientConfigurator.getTempDirectory(); String walletLocal = tmpPath + "wallet.zip"; bmc.copyToLocalFile(walletRemotePath, new Path(walletLocal)); unzip(walletLocal, tmpPath); return tmpPath; } private static void unzip(String zipFilePath, String destDirectory) throws IOException { File destDir = new File(destDirectory); if (!destDir.exists()) { destDir.mkdir(); } ZipInputStream zipIn = new ZipInputStream(new FileInputStream(zipFilePath)); ZipEntry entry = zipIn.getNextEntry(); // iterates over entries in the zip file while (entry != null) { String filePath = destDirectory + File.separator + entry.getName(); if (!entry.isDirectory()) { // if the entry is a file, extracts it extractFile(zipIn, filePath); } else { // if the entry is a directory, make the directory File dir = new File(filePath); dir.mkdir(); } zipIn.closeEntry(); entry = zipIn.getNextEntry(); } zipIn.close(); } private static void extractFile(ZipInputStream zipIn, String filePath) throws IOException { BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath)); byte[] bytesIn = new byte[BUFFER_SIZE]; int read = 0; while ((read = zipIn.read(bytesIn)) != -1) { bos.write(bytesIn, 0, read); } bos.close(); } }
- 次のコマンドを実行してファイルを作成します:
2. Pythonアプリケーションからの委任トークンの使用
# Helper Functions
def get_token_path(spark):
token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
token_path = spark.sparkContext.getConf().get(token_key)
return token_path
def get_authenticated_client(token_path, client):
import oci
import os
if token_path is None:
# You are running locally, so use our API Key.
config = oci.config.from_file()
authenticated_client = client(config)
else:
# You are running in Data Flow, so use our Delegation Token.
with open(token_path) as fd:
delegation_token = fd.read()
signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
delegation_token=delegation_token
)
authenticated_client = client(config={}, signer=signer)
return authenticated_client
import oci
token_path = get_token_path(spark)
# Get an object storage client.
object_storage_client = get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)
...
# Retrieve a password using the secrets client.
import base64
password_ocid = "my_password_ocid"
secrets_client = get_authenticated_client(token_path, oci.secrets.SecretsClient)
response = secrets_client.get_secret_bundle(password_ocid)
base64_secret_content = response.data.secret_bundle_content.content
base64_secret_bytes = base64_secret_content.encode("ascii")
base64_message_bytes = base64.b64decode(base64_secret_bytes)
password = base64_message_bytes.decode("ascii")
...
認証済クライアントの構築に使用されるパターンに注意してください。get_authenicated_client
を実行するクライアントのトークン・パスとクラス名を渡すと、認証済クライアントが返されます。このアプローチは、Oracle Cloud Infrastructure Python SDKによって提供される任意のクライアントで機能します。
map
、foreach
またはforeachPartition
操作にはトークン・パスを指定する必要があります。Sparkエグゼキュータ内でトークン・パスを検索できません。