Use a Pipeline

A pipeline lets you connect a set of tasks in a sequence or in parallel to orchestrate data processing.

By creating a pipeline, you can build a complex task dependency graph and automate an entire workload of tasks. The tasks must be published, and you can add published tasks from any application that's in the current workspace or from another workspace.

In this tutorial, you:

  • Create two data loader tasks to be run in parallel in a pipeline.
  • Create a REST task to use the Notification service for sending email notifications.
  • Create a pipeline and add operators for data loader tasks, merge, integration task, and REST task.
  • Create a pipeline task to configure a runtime context for a pipeline.
  • Publish a pipeline task and run a pipeline.
  • Monitor a pipeline run.

1. Creating a Data Loader Task for Revenue Data

Duplicate the Load Revenue Data into Data Warehouse task to create a new task that loads and overwrites revenue data.

  1. On the DI Lab project details page, click Tasks in the submenu.
  2. On the Tasks list, find Load Revenue Data into Data Warehouse.
  3. Click the Actions menu (Actions menu), and then select Duplicate.
  4. In the Duplicate task dialog, enter Revenue Data Load for the new name, and click Duplicate.

    The Identifier value is automatically generated based on the name you provide. You can change the generated value, but after you save the new task, you aren't allowed to update the identifier.

  5. On the Tasks list, click Revenue Data Load.

    The duplicated data loader task page opens in a new tab.

  6. Click the Target step icon.
  7. Under Target data entities load settings, click Use existing data entities.
  8. From the Integration strategy menu, select Overwrite.
  9. Under Available data entities, select the checkbox for REVENUE_TARGET and then click Set as target.

    The name REVENUE_TARGET displays next to Selected data entity.

  10. Click Save to save the task and continue editing.
  11. Click Next to navigate to the Transformation step.

    Don't remove the Null fill up transformation that was previously applied to SRC_ORDER_NUMBER.

  12. Click Next to navigate to the Attribute mapping step.

    All source and target attributes are automatically mapped.

  13. Click Next to navigate to the Review and validate step.

    Validation of the task begins automatically.

    A summary of the configuration details for each step is presented in a block. If you change a step's configuration, navigate to the Review and validate step to validate the task again.

    The result of the task validation is shown in the last block, Validation.

  14. When validation is successful, click Save and close.

2. Creating a Data Loader Task for Customer Data

Create a data loader task to load customer data into Data Warehouse by creating a new target data entity.

  1. On the DI Lab project details page, click Tasks in the submenu.
  2. Click Create task, and then select Data loader.

    The Create Data Loader Task page opens in a new tab. Numbered and named steps at the top guide you through the configuration. A check mark displays on a step icon after you configure the step. To move between steps, click Next or Previous. You can also navigate directly to a configured step by selecting the icon.

  3. On the Create data loader task page, Basic information step, select the following:
    For this itemSelect
    Source type File storage
    Target type Database
    Load type Single data entity
  4. For the task Name, enter Customer Data Load. Then click Next to navigate to the next step.

    A check mark displays on the Basic information step icon after you have configured the step.

  5. On the Source step, select the following:
    For this itemSelect
    Data asset Data_Lake
    Connection Default Connection
    Compartment The compartment that has the bucket in which you have uploaded the sample data file, CUSTOMERS.JSON
    Bucket The Object Storage bucket that contains the sample JSON file
  6. Under File settings, select the following:
    For this itemSelect
    File type JSON
    Compression type Auto (Default)
    Encoding UTF-8

    You can leave the default settings as-is in the remaining fields.

  7. Under Available data entities, select the checkbox for CUSTOMERS.JSON and then click Set as source.

    The name CUSTOMERS.JSON displays next to Selected data entity.

  8. Click Create to save the task and continue editing.
  9. Click Next to advance to the Target step, and then select the following:
    For this itemSelect
    Data asset Data_Warehouse
    Connection Default Connection
    Schema BETA
  10. Under Staging location, you can use the default staging location that was set up when you created the target data asset.

    Or, you can clear the checkbox to select another Object Storage bucket.

  11. Under Target data entities load settings, click Create new data entities.
  12. Under Target data entity name options, select Specify entity name. Then in the Entity name field, enter CUSTOMER_JSON_TARGET.
  13. Click Save to save the task and continue editing.
  14. Click the Review and validate step, skipping the optional transformation step.

    Validation of the task begins automatically.

    A summary of the configuration details for each step is presented in a block. If you change a step's configuration, navigate to the Review and validate step to validate the task again.

    The result of the task validation is shown in the last block, Validation.

  15. When validation is successful, click Save and close.

3. Creating a REST Task for Sending Notifications

You can use a REST task to run a REST API endpoint in a pipeline. In this tutorial, you use the Notifications service API in a Data Integration REST task to publish an email from within a pipeline.

To create a REST task in this step, you must already have the following:
  • A topic and email subscription created in the Notifications service.

  • The OCID of the topic you created. The OCID is available on the Topic Information section of the topic details page in the Notifications service.

  • The following policy statement that lets you run Data Integration tasks that invoke the Notifications REST API:

    allow any-user to use notification-family in tenancy where ALL {request.principal.type='disworkspace'}

Then in Data Integration, create a REST task that uses the Notifications service API to publish an email.

  1. On the DI Lab project details page, click Tasks in the submenu.
  2. Click Create task, and then select REST.

    The Create REST task page opens in a new tab.

  3. For Name, enter Notify by Email.

    The Identifier value is automatically generated based on the name you provide. You can change the generated value, but after you save the new task, you aren't allowed to update the identifier.

  4. In the REST API details section, click Configure.

    The Configure REST API details page displays. Numbered and named steps at the top guide you through the configuration. A check mark displays on a step icon after you configure the step. To move between steps, click Next or Previous. You can also navigate directly to a configured step by selecting the icon.

  5. For HTTP method, select POST.
  6. In the URL field, enter the following and press Enter.
    https://notification.us-ashburn-1.oci.oraclecloud.com/20181201/topics/${TOPICID}/messages
    Note

    Ensure that you use the appropriate region identifier for the Notifications service.

    When you press Enter after entering the URL, Data Integration converts the ${} parameter syntax into a String URL parameter.

  7. In the table row for the newly added URL parameter TOPICID, select Edit from the Actions menu (Actions menu).
  8. In the Value field, enter the OCID of the Notifications topic you created and click Save.
  9. Next, add a header by following these steps:
    1. Click Header.
    2. Click Add header.
    3. In the Key field, enter con and select Content-Type from the list.
    4. In the Value field, enter app and select application/json from the list.
    5. Click Add.
  10. Add a request body by following these steps:
    1. Click Request.
    2. In the editor, enter the following.
      {"title": "Put your title here", "body": "Put your email body here."}
    3. Click Add.
  11. Click Next and then click Configure.
  12. To provide authentication, do the following:
    1. In the Authentication section, click Edit to display the Configure authentication panel.
    2. From the Authentication menu, select OCI resource principal.
    3. Under Authentication source, select Workspace.
    4. Click Configure.
  13. In the optional Validate task section, click Validate.
  14. When validation is successful, click Create and close.

4. Publishing the Data Loader and REST Tasks

  1. On the DI_Lab project details page, click Tasks in the submenu.
  2. From the list of tasks, select the check boxes next to Revenue Data Load, Customer Data Load, and Notify by Email.
  3. Click Publish to application.
  4. In the Publish to application dialog, select Lab Application, and click Publish.

    A notification message appears, with a link to the Application to view the published tasks.

  5. Select View application in the notification. Then select X to close the notification.

    The Patches list on the Application details page displays. One patch entry is created for the tasks you're publishing.

  6. On the Patches list, you can monitor the patch status. Click Refresh to get the latest status updates.

    When the status of a patch changes to Success, three published task entries are created on the Tasks list of the Application details page.

  7. On the Lab Application details page, click Tasks.

    Published tasks for Revenue Data Load, Customer Data Load, and Notify by Email are shown in the tasks list.

5. Creating a Pipeline

  1. In the tab bar, click Open tab (plus icon), and then select Projects.
  2. On the Projects page, click DI_Lab.
  3. On the DI_Lab project details page, click Pipelines in the submenu on the left side, and then click Create pipeline.

    The pipeline designer opens in a new tab. A start operator and an end operator are placed on the canvas for you.

  4. In the Properties panel for the pipeline, enter Analyze Revenue as the Name.

    The Identifier value is automatically generated based on the value that you enter for the pipeline name. You can change the generated value, but after you save the pipeline, you aren't allowed to update the identifier.

  5. Click Create.

    The designer remains open for you to continue editing.

6. Adding Pipeline Operators

You add task operators to specify the published tasks to orchestrate in the pipeline.

Learn more about pipeline operators.

  1. From the Operators panel, drop a Data loader operator onto the canvas, placing it between the start and end operators.

    The Properties panel now displays the details for the unbounded data loader task operator.

  2. In the Details tab of the Properties panel, click Select.

    The Select a data loader task panel displays for you to select a published data loader task.

  3. From Lab Application, select Revenue Data Load (the task that loads revenue data into a data warehouse), and click Select.

    The name on the operator icon changes to the name of the selected task.

  4. Connect the start operator to the revenue data loader task.
  5. To save the pipeline and continue editing, click Save.
  6. Repeat the steps to add a second Data loader operator. This time, select Customer Data Load (the task that loads customer data). Then connect the start operator to the customer data loader task.
  7. Next, drop the Merge operator onto the canvas, placing it after the two data loader tasks.
  8. Connect each data loader task to the Merge operator.
  9. In the Details tab of the Properties panel for the merge operator, select All success from the Merge condition menu.

    This specifies that the parallel operations linked upstream must complete and succeed before the next downstream operation can proceed.

  10. From the Operators panel, drop the Integration operator onto the canvas, placing it after the merge operator.
  11. In the Details tab of the Properties panel, click Select.
  12. In the Select an integration task panel, select the Load Customers Lab task, and click Select.
  13. Connect the merge operator to the integration task operator.
  14. Next, drop the REST operator onto the canvas, placing it after the integration task.
  15. In the Details tab of the Properties panel, click Select.
  16. In the Select a REST task panel, select the Notify by Email task, and click Select.
  17. In the Details tab of the Properties panel for the REST task operator, select Run on success of previous operator from the Incoming link condition menu.
  18. Connect the REST task to the end operator.
  19. Click Validate on the canvas toolbar.

    The Global validation panel displays for you to review any warnings or errors.

  20. To save the pipeline, click Save and close.

7. Creating a Pipeline Task

  1. In the tab bar, click Open tab (plus icon), and then select Projects.
  2. On the Projects page, click DI_Lab.
  3. On the DI_Lab project details page, click Tasks in the submenu on the left side.
  4. Click Create task, and then select Pipeline.

    The Create pipeline task page opens in a new tab.

  5. On the Create pipeline task page, change the Name to Analyze Revenue Lab.

    Entering a Description is optional. The value in the Identifier field is automatically generated based on the value you enter for Name. You can change the generated value, but after save the task, you aren't allowed to update the identifier.

  6. In the Pipeline section, click Select.
  7. In the Select a pipeline panel, select Analyze Revenue, and click Select.

    Validation of the pipeline begins automatically.

  8. Click Create and close.

8. Publishing and Running a Pipeline Task

  1. On the DI_Lab project details page, click Tasks in the submenu.
  2. In the Tasks list, click the Actions menu (Actions menu) for Analyze Revenue Lab and select Publish to application.
  3. In the Publish to application dialog, select Lab Application, and click Publish.

    A notification message appears, with a link to the Application to view published tasks.

  4. Go to the Lab Application details page, and click Patches in the submenu on the left side to view details of the task patch.

    A patch contains updates to a published task in an Application. When you publish a task, a publish patch is created. Learn more about Patches.

  5. On the Patches list, you can monitor the patch status. Click Refresh to get the latest status updates.

    When the status of a patch changes to Success, a published task entry is created on the Tasks list of the Application details page.

  6. On the Lab Application details page, click Tasks.

    The pipeline published task Analyze Revenue Lab is shown in the tasks list.

  7. Click the Actions menu (Actions menu) for the pipeline task and select Run.

    A success message appears. Running a task creates a task run. You're automatically brought to the Runs page, where you can view all task runs and their status. The initial status of a pipeline run is Not started.

  8. On the Runs list of the Lab Application details page, click Refresh to get the latest task run status updates.

    Note that running a pipeline includes steps for preprocessing, acceptance, and validation before the run engine starts the actual pipeline run.

    Click Refresh a few times until you see the status Running.

  9. When the pipeline task is running, click the task run name.

    The Run details page displays, where you can monitor the progress of the pipeline run on the Pipeline graph. The status of each node is indicated by an icon and a label. For example, a green check mark for a completed node, the label Running for tasks that are running, and the label Waiting for a downstream task that's waiting to be run.

    Click Refresh a few times until you see Success for the overall pipeline run status.

    You can also click Overview to see more details about the pipeline run.

  10. When the pipeline run is successful, go to the Runs list of the Lab Application details page, and expand the main run entry for the pipeline task run.

    You can view run details of four individual tasks in the pipeline.

    You would also have received an email from the Notifications service.