If you imagine it, envision it, create it... Teradata makes it Possible. Join us. "If you imagine it...Teradata makes it Possible. Register now.

Data Transfer from Amazon S3 to Teradata Vantage Using Apache Airflow

Overview

This document provides instructions and guidance for transferring data in CSV, JSON and Parquet formats from Amazon S3 to Teradata Vantage using the Airflow Teradata Provider and the S3 Cloud Transfer Operator. It outlines the setup, configuration and execution steps required to establish a seamless data transfer pipeline between these platforms.

Use The Windows Subsystem for Linux (WSL) on Windows to try this quickstart example.

Prerequisites

  • Access to a Teradata Vantage instance, version 17.10 or higher.

    If you need a test instance of Vantage, you can provision one for free at https://clearscape.teradata.com.
  • Python 3.8, 3.9, 3.10 or 3.11 and python3-env, python3-pip installed.

    • Linux

    • WSL

    • macOS

    sudo apt install -y python3-venv python3-pip
    sudo apt install -y python3-venv python3-pip
    brew install python

    Refer Installation Guide if you face any issues.

Install Apache Airflow

  1. Create a new python environment to manage airflow and its dependencies. Activate the environment.

    python3 -m venv airflow_env
    source airflow_env/bin/activate
    AIRFLOW_VERSION=2.9.3
    PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
    CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
    pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
  2. Install the Apache Airflow Teradata provider package and the Amazon provider package.

    pip install "apache-airflow-providers-teradata[amazon]"
  3. Set the AIRFLOW_HOME environment variable.

    export AIRFLOW_HOME=~/airflow

Configure Apache Airflow

  1. Switch to the virtual environment where Apache Airflow was installed at Install Apache Airflow

    source airflow_env/bin/activate
  2. Configure the listed environment variables to activate the test connection button, preventing the loading of sample DAGs and default connections in the Airflow UI.

      export AIRFLOW__CORE__TEST_CONNECTION=Enabled
      export AIRFLOW__CORE__LOAD_EXAMPLES=false
      export AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=false

Start the Apache Airflow web server

  1. Run airflow’s web server

    airflow standalone
  2. Access the airflow UI. Visit https://localhost:8080 in the browser and log in with the admin account details shown in the terminal.

    Airflow Password

Define the Apache Airflow connection to Vantage

  1. Click on Admin - Connections

  2. Click on + to define a new connection to a Teradata Vantage instance.

  3. Assign the new connection the id teradata_default with Teradata Vantage instance details.

    • Connection Id: teradata_default

    • Connection Type: Teradata

    • Database Server URL (required): Teradata Vantage instance hostname to connect to.

    • Database: database name

    • Login (required): database user

    • Password (required): database user password

Refer Teradata Connection for more details.

Define DAG in Apache Airflow

DAGs in airflow are defined as python files. The DAG below transfers data from Teradata-supplied public buckets to a Teradata Vantage instance. Copy the python code below and save it as airflow-aws-to-teradata-transfer-operator-demo.py under the directory $AIRFLOW_HOME/DAGs.

This DAG is a very simple example that covers:

  • Droping the destination table if it exists

  • Transfer of the data stored in object storage

  • Get the number of transferred records

  • Write the number of transferred records to the log

Refer S3 To Teradata Operator for more information on Amazon S3 to Teradata Transfer Operator.

from __future__ import annotations

import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.teradata.operators.teradata import TeradataOperator
from airflow.providers.teradata.transfers.s3_to_teradata import S3ToTeradataOperator

DAG_ID = "example_aws_s3_to_teradata_transfer_operator"
CONN_ID = "teradata_default"
with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
) as dag:
    # Drop table if it exists
    drop_table_if_exists = TeradataOperator(
        task_id="drop_table_if_exists",
        sql="DROP table example_s3_teradata_csv;",
    )
    # Transfer data from S3 to Teradata Vantage instance
    transfer_data_csv = S3ToTeradataOperator(
        task_id="transfer_data_s3_to_teradata_csv",
        s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/",
        public_bucket=True,
        teradata_table="example_s3_teradata_csv",
        teradata_conn_id="teradata_default",
        trigger_rule="always",
    )
    # Get the number of records transferred from S3 to teradata table
    read_data_table_csv = TeradataOperator(
        task_id="read_data_table_csv",
        sql="SELECT count(1) from example_s3_teradata_csv;",
    )
    # Print number of records in table
    print_number_of_records = BashOperator(
    task_id='print_number_of_records',
    bash_command="echo {{ ti.xcom_pull(task_ids='read_data_table_csv') }}",
    )
    (
       drop_table_if_exists
        >> transfer_data_csv
        >> read_data_table_csv
        >> print_number_of_records
    )

Load DAG

When the DAG file is copied to $AIRFLOW_HOME/dags, Apache Airflow displays the DAG in the UI under the DAGs section. It will take 2 to 3 minutes to load the DAG in the Apache Airflow UI.

Run DAG

Run the DAG as shown in the image below.

Run DAG

Transfer data from Private Amazon S3 bucket to Teradata instance

To successfully transfer data from a Private Amazon S3 bucket to a Teradata instance, the following prerequisites are necessary.

  • Access to an Amazon AWS account

  • Create a S3 bucket

  • Upload CSV/JSON/Parquest format files to Private S3 bucket.

  • Access Keys to access AWS account

  • Create a Teradata Authorization object with the AWS Account Key and the Account Secret Key

    CREATE AUTHORIZATION aws_authorization USER 'AWSAccessKey' PASSWORD 'AWSSecretAccessKey'
    Replace AWSAccessKey and AWSSecretAccessKey with your AWS account access key
  • Modify blob_source_key with YOUR-PRIVATE-OBJECT-STORE-URI in transfer_data_csv task and add teradata_authorization_name field with Teradata Authorization Object name

    transfer_data_csv = S3ToTeradataOperator(
            task_id="transfer_data_blob_to_teradata_csv",
            s3_source_key="YOUR-OBJECT-STORE-URI",
            teradata_table="example_blob_teradata_csv",
            teradata_conn_id="teradata_default",
            teradata_authorization_name="aws_authorization",
            trigger_rule="always",
        )

Summary

This guide details the utilization of the Airflow Teradata Provider’s S3 Cloud Transfer Operator to seamlessly transfer CSV, JSON, and Parquet data from Amazon S3 to Teradata Vantage, facilitating streamlined data operations between these platforms.

Did this page help?
Also of interest