Data Transfer from Amazon S3 to Teradata Vantage Using Apache Airflow
Overview
This document provides instructions and guidance for transferring data in CSV, JSON and Parquet formats from Amazon S3 to Teradata Vantage using the Airflow Teradata Provider and the S3 Cloud Transfer Operator. It outlines the setup, configuration and execution steps required to establish a seamless data transfer pipeline between these platforms.
Use The Windows Subsystem for Linux (WSL) on Windows to try this quickstart example.
|
Prerequisites
-
Access to a Teradata Vantage instance, version 17.10 or higher.
If you need a test instance of Vantage, you can provision one for free at https://clearscape.teradata.com. -
Python 3.8, 3.9, 3.10 or 3.11 and python3-env, python3-pip installed.
sudo apt install -y python3-venv python3-pip
sudo apt install -y python3-venv python3-pip
brew install python
Refer Installation Guide if you face any issues.
Install Apache Airflow
-
Create a new python environment to manage airflow and its dependencies. Activate the environment.
python3 -m venv airflow_env source airflow_env/bin/activate AIRFLOW_VERSION=2.9.3 PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)" CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
-
Install the Apache Airflow Teradata provider package and the Amazon provider package.
pip install "apache-airflow-providers-teradata[amazon]"
-
Set the AIRFLOW_HOME environment variable.
export AIRFLOW_HOME=~/airflow
Configure Apache Airflow
-
Switch to the virtual environment where Apache Airflow was installed at Install Apache Airflow
source airflow_env/bin/activate
-
Configure the listed environment variables to activate the test connection button, preventing the loading of sample DAGs and default connections in the Airflow UI.
export AIRFLOW__CORE__TEST_CONNECTION=Enabled export AIRFLOW__CORE__LOAD_EXAMPLES=false export AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=false
Start the Apache Airflow web server
-
Run airflow’s web server
airflow standalone
-
Access the airflow UI. Visit https://localhost:8080 in the browser and log in with the admin account details shown in the terminal.
Define the Apache Airflow connection to Vantage
-
Click on Admin - Connections
-
Click on + to define a new connection to a Teradata Vantage instance.
-
Assign the new connection the id
teradata_default
with Teradata Vantage instance details.-
Connection Id: teradata_default
-
Connection Type: Teradata
-
Database Server URL (required): Teradata Vantage instance hostname to connect to.
-
Database: database name
-
Login (required): database user
-
Password (required): database user password
-
Refer Teradata Connection for more details.
Define DAG in Apache Airflow
DAGs in airflow are defined as python files. The DAG below transfers data from Teradata-supplied public buckets to a Teradata Vantage instance. Copy the python code below and save it as airflow-aws-to-teradata-transfer-operator-demo.py
under the directory $AIRFLOW_HOME/DAGs.
This DAG is a very simple example that covers:
-
Droping the destination table if it exists
-
Transfer of the data stored in object storage
-
Get the number of transferred records
-
Write the number of transferred records to the log
Refer S3 To Teradata Operator for more information on Amazon S3 to Teradata
Transfer Operator.
from __future__ import annotations
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.teradata.operators.teradata import TeradataOperator
from airflow.providers.teradata.transfers.s3_to_teradata import S3ToTeradataOperator
DAG_ID = "example_aws_s3_to_teradata_transfer_operator"
CONN_ID = "teradata_default"
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
) as dag:
# Drop table if it exists
drop_table_if_exists = TeradataOperator(
task_id="drop_table_if_exists",
sql="DROP table example_s3_teradata_csv;",
)
# Transfer data from S3 to Teradata Vantage instance
transfer_data_csv = S3ToTeradataOperator(
task_id="transfer_data_s3_to_teradata_csv",
s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/",
public_bucket=True,
teradata_table="example_s3_teradata_csv",
teradata_conn_id="teradata_default",
trigger_rule="always",
)
# Get the number of records transferred from S3 to teradata table
read_data_table_csv = TeradataOperator(
task_id="read_data_table_csv",
sql="SELECT count(1) from example_s3_teradata_csv;",
)
# Print number of records in table
print_number_of_records = BashOperator(
task_id='print_number_of_records',
bash_command="echo {{ ti.xcom_pull(task_ids='read_data_table_csv') }}",
)
(
drop_table_if_exists
>> transfer_data_csv
>> read_data_table_csv
>> print_number_of_records
)
Load DAG
When the DAG file is copied to $AIRFLOW_HOME/dags, Apache Airflow displays the DAG in the UI under the DAGs section. It will take 2 to 3 minutes to load the DAG in the Apache Airflow UI.
Transfer data from Private Amazon S3 bucket to Teradata instance
To successfully transfer data from a Private Amazon S3 bucket to a Teradata instance, the following prerequisites are necessary.
-
Access to an Amazon AWS account
-
Create a S3 bucket
-
Upload CSV/JSON/Parquest format files to Private S3 bucket.
-
Access Keys to access AWS account
-
Create a Teradata Authorization object with the AWS Account Key and the Account Secret Key
CREATE AUTHORIZATION aws_authorization USER 'AWSAccessKey' PASSWORD 'AWSSecretAccessKey'
Replace AWSAccessKey
andAWSSecretAccessKey
with your AWS account access key -
Modify
blob_source_key
with YOUR-PRIVATE-OBJECT-STORE-URI intransfer_data_csv
task and addteradata_authorization_name
field with Teradata Authorization Object nametransfer_data_csv = S3ToTeradataOperator( task_id="transfer_data_blob_to_teradata_csv", s3_source_key="YOUR-OBJECT-STORE-URI", teradata_table="example_blob_teradata_csv", teradata_conn_id="teradata_default", teradata_authorization_name="aws_authorization", trigger_rule="always", )
Summary
This guide details the utilization of the Airflow Teradata Provider’s S3 Cloud Transfer Operator to seamlessly transfer CSV, JSON, and Parquet data from Amazon S3 to Teradata Vantage, facilitating streamlined data operations between these platforms.