こんにちは。業務委託の@morix1500と申します。
この度、スタディプラス様からデータ分析基盤の構築の業務委託を受け、AWSのマネージドサービスを用いて構築を行いました。
その際に得られた知見を共有したいと思います。
データ分析基盤について
今回スタディプラス様から受けたデータ分析基盤の要件は以下のようなものでした。
- S3にあるログをAWS Athenaから閲覧できるようにしてほしい
- S3にあるJSON形式のログを列指向型のフォーマット(Parquet)に変換してほしい
- ログは順次取り込み(毎朝、昨日分のログが見れるようにする)
すでにログはS3にあったのでログ収集は終わっています。
データ分析基盤の構成
今回作成したデータ分析基盤はAWSのマネージドサービスで完結してます。
今回構築したのはGlueの部分です。
Glueの構成や初期構築の手順は以下のドキュメント通りです。 https://aws.amazon.com/jp/blogs/news/build-a-data-lake-foundation-with-aws-glue-and-amazon-s3/
構築時のTIPS
S3のパーティション分け
Amazon Athena のパフォーマンスチューニング にも記載されていますが、
データをパーティション分けするとパフォーマンスが上がります。
今回取り込み対象のS3のログのパスは以下のようにうまいことパーティション分けされていました。
s3://{BucketName}/2018/07/30/12/hoge.log.gz
この場合は
- 年
- 月
- 日
- 時
でパーティション分けされています。
Glueでこの形式のログをロードすると「partition_0」という名前でパーティションをAthenaなどで参照できるようになります。
Glueジョブの対象データの絞り込み
今回行いたいJSONからParquetに変換作業は、Glueの「ETLジョブ」を使用します。
コンソールでポチポチやるといい感じの変換用Pythonスクリプトを出力してくれますが、
デフォルトだと対象のS3パスのデータを毎回全件取り込んでしまいます。
「ログは順次取り込み」という要件があるので、なんとかジョブ対象のデータを絞り込みたいです。
その方法は、上記で作成したパーティションを利用します。
作成したパーティションはGlueのETLジョブでも参照できますので、ジョブ実行時パーティションで絞り込みを行います。
この機能のことを「Pre-Filtering」というそうです。
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html
# ソースを一部抜粋 # 1時間前の時間を指定 one_hour_ago = datetime.today() - timedelta(hours = 1) year = one_hour_ago.year month = '{0:02d}'.format(one_hour_ago.month) day = '{0:02d}'.format(one_hour_ago.day) hour = '{0:02d}'.format(one_hour_ago.hour) pushDownPredicateString = "(partition_0='{0}' and partition_1='{1}' and partition_2='{2}' and partition_3='{3}')" pushDownPredicateString = pushDownPredicateString.format(year, month, day, hour) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "Glue DB名", table_name = "テーブル名", push_down_predicate = pushDownPredicateString, transformation_ctx = "datasource0")
変換後のデータをS3に出力する際パーティション作成
GlueのETLジョブでは、変換後のデータをS3に保存できます。
その際にS3のパーティションを分けないと、Athenaでの参照時にパフォーマンスが上がりません。
こちらの設定もPythonで設定できます。
"year"や"month"と指定していますが、その前のマッピングで
partition_0 と year を関連付けないと使えませんので注意してください。
# ソースを一部抜粋 datasink4 = glueContext.write_dynamic_frame.from_options( frame = dropnullfields3, connection_type = "s3", connection_options = { "path": "s3://{Bucket Name}/", "partitionKeys": ['year','month','day','hour'] }, format = "parquet", transformation_ctx = "datasink4" )
この設定を行うと以下のようなログが出力されます。
s3://{BucketName}/year=2018/month=07/day=30/hour=12/xxxxxxxxxxxxx.parquet
ログのカラムの増減時の対応
Glueはよく出来ていて、カラムが増えても動作に影響がありません。
ログのカラムが変更されると、CrawlersのTables Updatedが「1」になります。
カラムが増えたら、ETLジョブのマッピングのところに追加されたカラムの情報を追加。
カラムが減ったら特になにもする必要はないですが、ETLジョブのマッピングにはその情報は不要なので削除してもよいでしょう。
最後に
このようにログをデータ分析で使える形式にするためにやらなきゃいけないことが
AWSのマネージドサービスですべて済んでしまいました。
そして導入は非常に簡単です。
同じ課題をお持ちの方はこの記事を参考にデータ分析基盤を構築してみてはいかがでしょうか!
また宣伝になりますが、私@morix1500はこのような基盤構築やその他クラウド系のインフラの仕事を副業としてやらせていただいています。
なにかございましたらTwitterのDMなどでぜひ!