Amazon EMRはよく聞くが使ったことがなかったのでチュートリアルを実施して軽くイメージをつかもうと思います。
やること
下記公式サイトを参考にEMRのチュートリアルを実施する。
チュートリアルの中身はこんな感じ。
1.EMRクラスター作成
2.EMRタスク作成と実行
3.EMRクラスターSSH接続
4.後始末
※他VPC、IAMなどの準備が必要なのに載されていないものがいくつかあり初心者だとチュートリアルでつまづきますね。。。
docs.aws.amazon.com
注意点
・VPC内のインスタンスからS3にアクセスできるネットワーク設定になっていること。
・セキュリティグループにグローバルから22を含む(443など)へのIngress接続許可がないこと。※EMRクラスター作成時にエラーとなる
環境
AWS EMR(Cluster) ← CloudShell(SSH接続用)
実践!
1.IAMロール作成
1-1.IAM-[ロール]-[ロールを作成]
1-2.下記を入力
AWSのサービス:✅
一般的なユースケース:
EC2:✅
1-3.下記を選択 ※いったんフルで。。
AmazonS3FullAccess
1-4.下記を入力
ロール名:role-s3-pyspark
1-5.[ロールを作成]
2.S3バケット作成
2-1.S3ー[バケット]-[バケット作成]
2-2.下記を入力
バケット名:amazon-s3-pyspark
AWSリージョン:ap-northeast-1
オブジェクト所有者:ACL無効(推奨)
このバケットのブロックパブリックアクセス設定:パブリックアクセスを許可
バケットのバージョニング:無効にする
暗号化タイプ:Amazon S3 マネージドキーを使用したサーバー側の暗号化(SSE-S3)
バケットキー:有効にする
オブジェクトロック:無効にする
2-3.[バケットを作成]
2-4.ログ用フォルダ作成
※自動で作成してくれるかもしれないが先に作成しておいた。。。
s3://amazon-s3-pyspark/logs
3.サンプルスクリプト作成
3-1.スクリプトファイル作成
# vi health_violations.py import argparse from pyspark.sql import SparkSession def calculate_red_violations(data_source, output_uri): """ Processes sample food establishment inspection data and queries the data to find the top 10 establishments with the most Red violations from 2006 to 2020. :param data_source: The URI of your food establishment data CSV, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'. :param output_uri: The URI where output is written, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'. """ with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark: # Load the restaurant violation CSV data if data_source is not None: restaurants_df = spark.read.option("header", "true").csv(data_source) # Create an in-memory DataFrame to query restaurants_df.createOrReplaceTempView("restaurant_violations") # Create a DataFrame of the top 10 restaurants with the most Red violations top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations FROM restaurant_violations WHERE violation_type = 'RED' GROUP BY name ORDER BY total_red_violations DESC LIMIT 10""") # Write the results to the specified output URI top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.") parser.add_argument( '--output_uri', help="The URI where output is saved, like an S3 bucket location.") args = parser.parse_args() calculate_red_violations(args.data_source, args.output_uri)
スクリプトの中身としては、「calculate_red_violations」関数はCSVを読み込んでSQLで「violation_type」がREDのものをグループにしてTOP10を出力してる感じ。
「if name == "main"」で自分自身からの読み込みのみ実行する文を入れてますね。
コマンドラインから引数を取得するためargparseを使用。こんな取り方もあるんですね。だいたいsysを使う感じかと。
3-2.作成したPyファイルを作成したS3バケットの下記フォルダへアップロード
s3://amazon-s3-pyspark
4.サンプル入力データダウンロード
4-1.下記からZipファイルをダウンロード
https://docs.aws.amazon.com/ja_jp/emr/latest/ManagementGuide/samples/food_establishment_data.zip
food_establishment_data.zip
4-2.解凍し作成したS3バケットの下記フォルダへアップロード
s3://amazon-s3-pyspark
5.EMRクラスタ起動
5-1.EMR-[クラスター]-[クラスターを作成]
5-2.下記を入力
名前:MyFirstCluster
Amazon EMR リリース:emr-6.12.0
アプリケーションバンドル:Spark
Spark テーブルメタデータに使用:□
オペレーティングシステムのオプション:Amazon Linux リリース
Amazon Linux の最新の更新を自動的に適用します:✅
クラスター設定:インスタンスグループ
プライマリ
EC2インスタンスタイプを選択:m5.xlarge
複数のプライマリノードを使用:□
コア
EC2インスタンスタイプを選択:m5.xlarge
1/1のタスク
名前:タスク - 1
EC2 インスタンスタイプを選択:m5.xlarge
EBSルートボリュームサイズ:15GiB
クラスターのスケーリングとプロビジョニングのオプション:クラスターサイズを手動で設定
プロビジョニングの設定
コア:1
タスク - 1:1
仮想プライベートクラウド(VPC):vpc-xxxxxxxxxxxx
サブネット:subnet-xxxxxxxxxxxxx
EC2 セキュリティグループ(ファイアウォール):
プライマリノード:
EMR マネージドセキュリティグループ:作成 ElasticMapReduce-Primary
コアノードとタスクノード:
EMR マネージドセキュリティグループ:作成 ElasticMapReduce-Core
クラスターの終了:
アイドル時間後にクラスターを終了する(推奨):✅
アイドル時間:0日 01:00:00
終了保護の使用:✅
クラスター固有のログを Amazon S3 に発行する:✅
Amazon S3 の場所:s3://amazon-s3-pyspark/logs
クラスター固有のログを暗号化:□
Security Configuration and EC2 Key Pair
セキュリティ設定:空白
クラスターへのSSH用のAmazon EC2 キーペア:xxxxxxx
Amazon EMR サービスロール:
サービスロールを作成:✅
ネットワーキングリソース:
仮想プライベートクラウド(VPC):vpc-xxxxxxxxxx
サブネット:subnet-xxxxxxxxxxxx
Amazon EMR の EC2 インスタンスプロファイル:
インスタンスプロファイル:role-s3-pyspark
5-3.[クラスターを作成]
5-4.ステータスが[開始中]⇒[待機中]になったことを確認する。
※1分程度で終了する場合、失敗している可能性あり。その場合はエラーを確認する。
※約4分程度で完了する。
6.ステップ作成
6-1.EMR-[クラスター]-[MyFirstCluster]
6-2.[ステップ]-[ステップの追加]
6-3.下記を入力
タイプ:Spark アプリケーション
名前:FirstStep
デプロイモード:クラスターモード
アプリケーションの場所:s3://amazon-s3-pyspark/health_violations.py
引数:--data_source s3://amazon-s3-pyspark/food_establishment_data.csv --output_uri s3://amazon-s3-pyspark/logs
ステップが失敗した場合のアクション:続行
6-4.[追加]
6-5.ステータスが[Completed]となったことを確認する。
7.タスク実行結果確認
7-1.S3-[amazon-s3-pyspark]-[logs]
7-2.下記のように出力されていることを確認
name, total_red_violations SUBWAY, 322 T-MOBILE PARK, 315 WHOLE FOODS MARKET, 299 PCC COMMUNITY MARKETS, 251 TACO TIME, 240 MCDONALD'S, 177 THAI GINGER, 153 SAFEWAY INC #1508, 143 TAQUERIA EL RINCONSITO, 134 HIMITSU TERIYAKI, 128
8.クラスターへSSH接続
8-1.セキュリティグループへグローバルから22番への接続許可をアタッチ
8-2.pemファイルをアップロード
8-3.pemファイルのパーミッションを600に変更
8-4.AWS CLI接続
$ aws emr ssh --cluster-id [cluster-id] --key-pair-file [pemファイル]
9.後始末
※EC2インスタンスは結構でかいので早めに消しましょう・・・
9-1.EMR-[クラスター]-[MyFirstCluster]
9-2.[アクション]-[クラスターの終了]
9-3.S3バケット削除
9-4.IAMロール削除
感想
分散ストレージ基盤だと思っていたけど、それはS3が担保するからワークフロー実行基盤という感じですね。
※だから名前がEMR(Elastic MapReduce)なのか。。。( ̄д ̄;)