• Home
  • |
  • blog

  • |
  • Easy ETL processing with AWS D...

Easy ETL processing with AWS Data Wrangler

Written By notebooktabletphone

In September 2019, AWS Data Wrangler (hereafter, Data Wrangler) was released on Github. Data Wrangler is a Python module that retrieves data from various AWS services and supports coding.

Currently, when using Python to retrieve data from Amazon Athena (hereafter Athena) or Amazon Redshift (hereafter Redshift) and perform ETL processing, it is possible to use PyAthena, boto3, Pandas, etc. I think there are many. At that time, it was necessary to write connection settings and various coding before coding the ETL that I originally wanted to implement. By using Data Wragler, you can use Pandas from CSV on Athena or Amazon S3 (hereafter referred to as S3) with just a few lines, or link with Redshift from PySpark. You can concentrate on what you write. In addition to being able to install this module on an instance with pip, it can also be used as a Lambda Layer or by uploading an egg file on Glue.

In this blog, we will introduce a tutorial that uses Amazon SageMaker (SageMaker) Notebook to query Athena, preprocess it, and place the result in S3.

Tutorial

The scenarios to run are as follows.

As a use case, even those who have little experience building AWS services can use it for machine learning preprocessing from a notebook. For example, you may want to transform the data in your database into a desired form, or otherwise fill in missing values ​​with the mean, in order to take advantage of XG Boost, a built-in algorithm within SageMaker.

This tutorial will walk you through the process of removing unanalyzed data from Athena query results and then replacing the item with another value. In addition, the environment construction itself will be implemented in the “Tokyo region” (ap-northeast-1).

Download the sample data.

(Use the June 2019 “Green Taxi Trip Records (CSV)” in the sample data URL.)

1-1.Log in to the AWS management console and select “S3” from the service list.

1-2. Click the [Create Bucket] button, enter an arbitrary name (*unique in the world) in "Bucket Name", and make sure that the region is "Asia Pacific (Tokyo)". Confirm and click the Create button.

1-3.After the bucket is created, upload the CSV file downloaded in step “0” to the created bucket.

1-4.Select “Athena” from the service list.

1-5. Execute the following query to create the Athena database and table.

CREATE DATABASE [YOUR DATABASE NAME];
CREATE EXTERNAL TABLE green_tripdata(VendorID string,lpep_pickup_datetime string,lpep_dropoff_datetime string,store_and_fwd_flag string,RatecodeID string,PULocationID string ,DOLocationID string,passenger_count int,trip_distance double,fare_amount double,extra double,mta_max double,tip_amount double,tolls_amount double,ehail_fee string,improvement_surcharge double,total_amount double,payment_type string,trip_type string,congestion_surcharge double)ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ,' LOCATION 's3://[YOUR S3 BUCKET NAME]/[PREFIX]'; Confirm that the data can be retrieved.

Easy with AWS Data Wrangler

select count(*) from green_tripdata;

*In this bucket configuration, the entire file is scanned each time. For Athena query optimization, see "Top 10 Performance Tuning Tips for Amazon Athena".

*The procedure from starting SageMaker Notebook to executing the code is simplified. For details, please check Step 2 and Step 3 at the URL below. https://docs.aws.amazon.com/ja_jp/sagemaker/latest/dg/gs-setup-working-env.html

2-1.Open the SageMaker console, select “Notebook Instance” from the left menu bar, and select [Create notebook instance].

2-2.Enter a notebook instance name to create a notebook. * Here, create a new IAM role.

2-3.Select “IAM” from the service list.

2-4. Grant "AmazonS3FullAccess" and "AmazonAthenaFullAccess" to the IAM role created in step "2-2".

2-5. Select [Open Jupyter] from the created notebook instance and start the notebook.

3-1.Select [conda_python3] from the [New] tag to create a new file.

3-2.Execute the following command to install Data Wrangler.

!pip install awswrangler

Verify that the installation was successful.

3-3.Run the code that executes the query with Athena on SageMaker Notebook.

import pandasimport awswranglersession = awswrangler.Session()df = session.pandas.read_sql_athena( sql="select * from green_tripdata", database="[YOUR DATABASE NAME]")print(df) 

Confirm that the output contents are the same as those executed in step "1-6".

4-1.The data with "trip_distance" of 0 is regarded as outside the scope of analysis, and rows are deleted.

## Extract trip_distance value of 0, check number rows_drop = df.index[df["trip_distance"] == 0.00]print(df.loc[rows_drop].count())## trip_distance removes 0 values, check the number of items df_drop = df.drop(rows_drop)print(df_drop)df_lens = df_drop.count()print(df_lens)

4-2.Processing result Make sure the total count is reduced by the number of columns.

4-3. Replace the columns in the data after removing the unnecessary data. Here we will replace the data for the item "payment_type".

df_replace = df_drop.replace( {'payment_type':{'1': 'Credit card', '2': 'Cash', '3': 'No charge', '4': 'Dispute ', '5': 'Unknown', '6': 'Voided trip'} })print(df_replace)

Check that payment_type is replaced with what you defined.

4-4. Output the CSV file to S3.

session.pandas.to_csv( dataframe=df_replace, path="s3://[YOUR S3 BUCKET NAME]/[PREFIX]",## output bucket name sep=",", database=None , table=None, partition_cols=None, preserve_index=True,mode='append', procs_cpu_bound=None,procs_io_bound=None)

4-5. Confirm that CSV is output.

That's it.

Summary

Because it has just been released, the things you can do are limited. increase. Please use all means.

About the author

Satoshi Kuramitsu is a solution architect at AWS. His favorite AWS services are AWS Glue, Amazon Kinesis and Amazon S3.