あめがえるのITブログ

頑張りすぎない。ほどほどに頑張るブログ。

Amazon EMRでSparkアプリケーションをステップで実行してみた

Amazon EMRはよく聞くが使ったことがなかったのでチュートリアルを実施して軽くイメージをつかもうと思います。

やること

下記公式サイトを参考にEMRのチュートリアルを実施する。
チュートリアルの中身はこんな感じ。
 1.EMRクラスター作成
 2.EMRタスク作成と実行
 3.EMRクラスタSSH接続
 4.後始末
※他VPC、IAMなどの準備が必要なのに載されていないものがいくつかあり初心者だとチュートリアルでつまづきますね。。。
docs.aws.amazon.com

注意点

VPC内のインスタンスからS3にアクセスできるネットワーク設定になっていること。
・セキュリティグループにグローバルから22を含む(443など)へのIngress接続許可がないこと。※EMRクラスター作成時にエラーとなる

環境

AWS EMR(Cluster) ← CloudShell(SSH接続用)

実践!

1.IAMロール作成
1-1.IAM-[ロール]-[ロールを作成]
1-2.下記を入力
 AWSのサービス:✅
 一般的なユースケース
  EC2:✅
1-3.下記を選択 ※いったんフルで。。
 AmazonS3FullAccess
1-4.下記を入力
 ロール名:role-s3-pyspark
1-5.[ロールを作成]

2.S3バケット作成
2-1.S3ー[バケット]-[バケット作成]
2-2.下記を入力
 バケット名:amazon-s3-pyspark
 AWSリージョン:ap-northeast-1
 オブジェクト所有者:ACL無効(推奨)
 このバケットのブロックパブリックアクセス設定:パブリックアクセスを許可
 バケットのバージョニング:無効にする
 暗号化タイプ:Amazon S3 マネージドキーを使用したサーバー側の暗号化(SSE-S3)
 バケットキー:有効にする
 オブジェクトロック:無効にする
2-3.[バケットを作成]
2-4.ログ用フォルダ作成
※自動で作成してくれるかもしれないが先に作成しておいた。。。
 s3://amazon-s3-pyspark/logs

3.サンプルスクリプト作成
3-1.スクリプトファイル作成

# vi health_violations.py
import argparse

from pyspark.sql import SparkSession

def calculate_red_violations(data_source, output_uri):
    """
    Processes sample food establishment inspection data and queries the data to find the top 10 establishments
    with the most Red violations from 2006 to 2020.

    :param data_source: The URI of your food establishment data CSV, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'.
    :param output_uri: The URI where output is written, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'.
    """
    with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark:
        # Load the restaurant violation CSV data
        if data_source is not None:
            restaurants_df = spark.read.option("header", "true").csv(data_source)

        # Create an in-memory DataFrame to query
        restaurants_df.createOrReplaceTempView("restaurant_violations")

        # Create a DataFrame of the top 10 restaurants with the most Red violations
        top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations 
          FROM restaurant_violations 
          WHERE violation_type = 'RED' 
          GROUP BY name 
          ORDER BY total_red_violations DESC LIMIT 10""")

        # Write the results to the specified output URI
        top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.")
    parser.add_argument(
        '--output_uri', help="The URI where output is saved, like an S3 bucket location.")
    args = parser.parse_args()

    calculate_red_violations(args.data_source, args.output_uri)

スクリプトの中身としては、「calculate_red_violations」関数はCSVを読み込んでSQLで「violation_type」がREDのものをグループにしてTOP10を出力してる感じ。
「if name == "main"」で自分自身からの読み込みのみ実行する文を入れてますね。
コマンドラインから引数を取得するためargparseを使用。こんな取り方もあるんですね。だいたいsysを使う感じかと。

3-2.作成したPyファイルを作成したS3バケットの下記フォルダへアップロード
s3://amazon-s3-pyspark

4.サンプル入力データダウンロード
4-1.下記からZipファイルをダウンロード
https://docs.aws.amazon.com/ja_jp/emr/latest/ManagementGuide/samples/food_establishment_data.zip food_establishment_data.zip
4-2.解凍し作成したS3バケットの下記フォルダへアップロード
s3://amazon-s3-pyspark

5.EMRクラスタ起動
5-1.EMR-[クラスター]-[クラスターを作成]
5-2.下記を入力
 名前:MyFirstCluster
 Amazon EMR リリース:emr-6.12.0
 アプリケーションバンドル:Spark
 Spark テーブルメタデータに使用:□
 オペレーティングシステムのオプション:Amazon Linux リリース
 Amazon Linux の最新の更新を自動的に適用します:✅
 クラスター設定:インスタンスグループ
 プライマリ
  EC2インスタンスタイプを選択:m5.xlarge
  複数のプライマリノードを使用:□
 コア
  EC2インスタンスタイプを選択:m5.xlarge
 1/1のタスク
  名前:タスク - 1
  EC2 インスタンスタイプを選択:m5.xlarge
 EBSルートボリュームサイズ:15GiB
 クラスターのスケーリングとプロビジョニングのオプション:クラスターサイズを手動で設定
 プロビジョニングの設定
  コア:1
  タスク - 1:1
 仮想プライベートクラウド(VPC):vpc-xxxxxxxxxxxx
 サブネット:subnet-xxxxxxxxxxxxx
 EC2 セキュリティグループ(ファイアウォール):
  プライマリノード:
   EMR マネージドセキュリティグループ:作成 ElasticMapReduce-Primary
  コアノードとタスクノード:
   EMR マネージドセキュリティグループ:作成 ElasticMapReduce-Core
 クラスターの終了:
  アイドル時間後にクラスターを終了する(推奨):✅
  アイドル時間:0日 01:00:00
  終了保護の使用:✅
 クラスター固有のログを Amazon S3 に発行する:✅
 Amazon S3 の場所:s3://amazon-s3-pyspark/logs
 クラスター固有のログを暗号化:□
 Security Configuration and EC2 Key Pair
  セキュリティ設定:空白
  クラスターへのSSH用のAmazon EC2 キーペア:xxxxxxx
 Amazon EMR サービスロール:
  サービスロールを作成:✅
 ネットワーキングリソース:
  仮想プライベートクラウド(VPC):vpc-xxxxxxxxxx
  サブネット:subnet-xxxxxxxxxxxx
 Amazon EMR の EC2 インスタンスプロファイル:
  インスタンスプロファイル:role-s3-pyspark
5-3.[クラスターを作成]
5-4.ステータスが[開始中]⇒[待機中]になったことを確認する。
 ※1分程度で終了する場合、失敗している可能性あり。その場合はエラーを確認する。
 ※約4分程度で完了する。

6.ステップ作成
6-1.EMR-[クラスター]-[MyFirstCluster]
6-2.[ステップ]-[ステップの追加]
6-3.下記を入力
 タイプ:Spark アプリケーション
 名前:FirstStep
 デプロイモード:クラスターモード
 アプリケーションの場所:s3://amazon-s3-pyspark/health_violations.py
 引数:--data_source s3://amazon-s3-pyspark/food_establishment_data.csv --output_uri s3://amazon-s3-pyspark/logs
 ステップが失敗した場合のアクション:続行
6-4.[追加]
6-5.ステータスが[Completed]となったことを確認する。

7.タスク実行結果確認
7-1.S3-[amazon-s3-pyspark]-[logs]
7-2.下記のように出力されていることを確認

name, total_red_violations
SUBWAY, 322
T-MOBILE PARK, 315
WHOLE FOODS MARKET, 299
PCC COMMUNITY MARKETS, 251
TACO TIME, 240
MCDONALD'S, 177
THAI GINGER, 153
SAFEWAY INC #1508, 143
TAQUERIA EL RINCONSITO, 134
HIMITSU TERIYAKI, 128


8.クラスターへSSH接続
8-1.セキュリティグループへグローバルから22番への接続許可をアタッチ
8-2.pemファイルをアップロード
8-3.pemファイルのパーミッションを600に変更
8-4.AWS CLI接続

$ aws emr ssh --cluster-id [cluster-id] --key-pair-file [pemファイル]



9.後始末
※EC2インスタンスは結構でかいので早めに消しましょう・・・
9-1.EMR-[クラスター]-[MyFirstCluster]
9-2.[アクション]-[クラスターの終了]
9-3.S3バケット削除
9-4.IAMロール削除



感想

分散ストレージ基盤だと思っていたけど、それはS3が担保するからワークフロー実行基盤という感じですね。
※だから名前がEMR(Elastic MapReduce)なのか。。。( ̄д ̄;)