Amazon Elastic MapReduceの使い方─Hadoopより手軽にはじめる大規模計算

第4回 Java SDKでEMRを起動する

この記事を読むのに必要な時間:およそ 5 分

Jobを実行させるリクエストを定義する

最後に,EMRに対してJobを実行させるリクエストを定義します。

リスト6 EMRManager.java(リクエスト定義部分 抜粋)

private static final String INSTANCE = "m1.large";
private static final String AVAILABILITY_ZONE = "us-east-1a";
private static final String HADOOP_VERSION = "0.20";
private static final String KEY_PAIR_NAME = "cluster-key";
private static final int INSTANCE_NUMBER = 10;
private static final String MAPREDUCE_JAR = "s3://example/mr.jar";
private static final String EMR_LOG_URI = "s3://emr-log/";

RunJobFlowRequest runJobFlowRequest =
    new RunJobFlowRequest()
            .withName(MAP_REDUCE_NAME)
            .withSteps(enableDebugging, stepConfig)
            .withLogUri(EMR_LOG_URI)
            .withInstances(new JobFlowInstancesConfig()
                                    .withEc2KeyName(KEY_PAIR_NAME)
                                    .withHadoopVersion(HADOOP_VERSION)
                                    .withInstanceCount(INSTANCE_NUMBER)
                                    .withKeepJobFlowAliveWhenNoSteps(true)
                                    .withMasterInstanceType(INSTANCE)
                                    .withSlaveInstanceType(INSTANCE)
                                    .withPlacement(new PlacementType()
                                                          .withAvailabilityZone(AVAILABILITY_ZONE)));

リスト6の3行目以降の意味を,runJobFlowRequestの部分とJobFlowInstancesConfigの部分に分けて見ていきましょう。まずはrunJobFlowRequestの部分です。

runJobFlowRequest#withName
→このJob全体の名前を定義します。
runJobFlowRequest#withSteps
→先ほど定義した実際のJobとデバッグ用のステップを定義します。
runJobFlowRequest#withLogUri
→今回はデバッグの指定をしているので出力先を定義します。
runJobFlowRequest#withInstances
→JobFlowInstancesConfigで細かいインスタンスの情報を定義します。

続いて,JobFlowInstancesConfigの部分の定義です。

JobFlowInstancesConfig#withEc2KeyName
→EC2で定義してあるSSHのキーを指定します。
JobFlowInstancesConfig#withHadoopVersion
→Hadoopのバージョンを定義します。現在は0.19と0.20を指定することができます。
JobFlowInstancesConfig#withInstanceCount
→インスタンスの起動数を定義します。このインスタンス数は全体でのインスタンス数となります。今回は10インスタンスとしました。そのため,マスタノード1台,スレーブノード9台という構成になります。
JobFlowInstancesConfig#withKeepJobFlowAliveWhenNoSteps
→ステップが終了した時の動作です。trueとしてあるので1つのJobが終了してもEMR自体は終了せずに待機状態になります。
JobFlowInstancesConfig#withMasterInstanceType
→マスタのインスタンスタイプを指定します。
JobFlowInstancesConfig#withSlaveInstanceType
→スレーブのインスタンスタイプを指定します。
JobFlowInstancesConfig#withPlacement
→PlacementTypeを定義し,インスタンスをどのEC2のゾーンに起動するかを定義します。

なお,今回は上記のように定義しましたが,ゾーンなどは任意です。特に指定しなくても大丈夫です。

Jobを実行させる

それでは最後にEMRを起動し,Jobを実行させてみましょう。

リスト7 EMRManager.java(job実行部分 抜粋)

RunJobFlowResult result = emr.runJobFlow(runJobFlowRequest);
String jobFlowId = result.getJobFlowId();

まずはAmazonElasticMapReduce#runJobFlowを渡します。実行するとRunJobFlowResultに結果が返ってきますが,⁠結果が返ってくる」と言ってもすぐに終了するものではありません。前回同様,JobフローIDを取得し,これ以降はこのJobフローIDを使って操作します。

また,今回は1度の起動で複数のJobを実行させることが前提なので,2回目以降のJobはリスト8のようにして実行させます。

リスト8 EMRManager.java(複数job実行部分 抜粋)

AddJobFlowStepsRequest addJobFlowStepsRequest =
    new AddJobFlowStepsRequest().withJobFlowId(jobFlowId)
                                .withSteps(stepConfig);
emr.addJobFlowSteps(addJobFlowStepsRequest);

それぞれの行の意味は以下のとおりです。

  1. AddJobFlowStepsRequestに追加のJobリクエストを定義
  2. AddJobFlowStepsRequest#withJobFlowIdにRunJobFlowResultで取得したJobフローIDを指定
  3. AddJobFlowStepsRequest#withStepsで新しいステップを定義
  4. AmazonElasticMapReduce#addJobFlowStepsに追加のリクエストを渡して完了

Jobが完了したか確認する

ここまでで起動させる方法を見てきましたが,実際にはJobが完了したのがエラーなのかを判断しなければなりません。Jobの状態を確認するコードはリスト9のようになります。

リスト9 EMRManager.java(Job状態確認部分 抜粋)

private static final String STEP_DETAIL_STATUS_PENDING = "PENDING";
private static final String STEP_DETAIL_STATUS_RUNNING = "RUNNING";
private static final String STEP_DETAIL_STATUS_COMPLETED = "COMPLETED";

String stepStatus = STEP_DETAIL_STATUS_PENDING;
while (stepStatus.equals(STEP_DETAIL_STATUS_PENDING) ||
       stepStatus.equals(STEP_DETAIL_STATUS_RUNNING)) {
  try {
    Thread.sleep(1 * (1000 * 60));
  } catch (Exception e) {
  }

  DescribeJobFlowsRequest describeJobFlowsRequest =
    new DescribeJobFlowsRequest().withJobFlowIds(jobFlowId);
  DescribeJobFlowsResult describeJobFlowsResult =
    emr.describeJobFlows(describeJobFlowsRequest);

  boolean found = false;
  for (JobFlowDetail jobFlowDetail : describeJobFlowsResult.getJobFlows()) {
    for (StepDetail stepDetail : jobFlowDetail.getSteps()) {
      if (stepDetail.getStepConfig().getName().equals("job1")) {
        stepStatus = stepDetail.getExecutionStatusDetail().getState();
        found = true;
        break;
      }
    }

    if (found) {
      break;
    }
  }
}

ここではステップの実行が待ち,もしくは実行している間,1分間隔でステップの状態を監視するようループの条件を設定しています。

まずは,Job状態を取得するリクエストを定義します。DescribeJobFlowsRequest#withJobFlowIdsにJobフローIDを指定します。そしてAmazonElasticMapReduce#describeJobFlowsでリクエストを送信します。結果は,DescribeJobFlowsResultとして戻されます。

DescribeJobFlowsResultからステップのステータスを取得し,待ちでも実行中でもなければメインのループを抜けます。Jobの実行に成功していれば,ステータスはCOMPLETEDが返されます。

以上で,Jobの実行から監視までが完了です。

EMRを終了させる

最後に忘れてはいけないのはEMR自体を終了させることです。これを忘れてしまうといつまでもEMR(正確にはインスタンス)が起動したままになり,ずっと課金されてしまうことになるためです。

終了のコードはリスト10のようになります。

リスト10 EMRManager.java(EMR終了部分 抜粋)

TerminateJobFlowsRequest request = new TerminateJobFlowsRequest();
request.setJobFlowIds(Arrays.asList(jobFlowId));
emr.terminateJobFlows(request);

以下の3つを行うのがポイントとなります。

  • TerminateJobFlowsRequestを定義する
  • 終了させるJobフローIDを指定する
  • AmazonElasticMapReduce#terminateJobFlowsを送信して完了

今回は,Java SDKを使ってEMRを起動する方法を説明してきました。Java Docにはここでは説明しきれなかったその他の機能についても書かれていますので,確認してみてください。

次回は,ここまで何度か出てきたデバッグモードの詳細を説明します。

  • 今回の記事内で紹介したサンプルコードはこちらからダウンロードしてください:samples.zip

著者プロフィール

小林隆(こばやしりゅう)

クラウド,Hadoopを使ったMapReduce,NoSQL(主にCassandra)を使った開発を行っている。CassndraのGUIをオープンソースで公開。

bloghttp://beter-max.blogspot.com/

URLhttp://twitter.com/ryu_kobayashi/

コメント

コメントの記入