前回 の記事では、Fair Schedulerによる複数ジョブの同時実行についてご紹介しました。今回は、Capacity Schedulerを使った場合について、複数ジョブの同時実行時の挙動を見てみましょう。   
HadoopのSchedulerとは複数のジョブを実行する際に、slot数の割り当てを制御し、均等にリソースを割り振ってすべてのジョブが同時に動くようにしたり、優先度の高いジョブから実行させたりといったことを可能にするものです。Scheduler自体の意義については、具体例とともに前回の記事に記載しているので併せて参考にしてください。    
Capacity Schedulerとは? 
Fair Schedulerは、あらかじめ「pool」を定義して、poolごとにslot数の割り当てなどを決め、ジョブの実行時にどのpoolに属するジョブかを指定しました。Capacity Schedulerでは、「 queue」を定義して、それぞれのqueueに対してslot数の割り当てなどを決めます。       
Fair Schedulerの場合、同じpoolに属するジョブ同士は、公平にリソースを分け合うのが基本(設定で、FIFOの順に実行させることもできます)でしたが、Capacity Schedulerの場合は、同じqueueに属するジョブは基本的にはFIFOの順に実行されます。このqueueの中で、ジョブの優先度によって順番を入れ替えたり、実行するユーザごとにslot数の割り振りを変えたり、メモリ使用量に応じてタスクの同時実行数を制限したりできることが、Capacity Schedulerの特徴となっています。           
Capacity Schedulerの利用 
Capacity Schedulerを使うためには以下の作業が必要です。
Capacity SchedulerのJARファイルをHadoop のcontrib/ capacity-schedulerディレクトリからlib ディレクトリにコピーし、Hadoopのクラスパスに配置する。 
 
mapred-site.xmlのmapred.jobtracker.taskSchedulerプロパティを以下の値に設定する。   
org.apache.hadoop.mapred.CapacityTaskScheduler     
 
 
queue名を定義する。
mapred-site.xmlのmapred.queue.namesプロパティにqueue名をカンマ区切りで指定する(デフォルトは「default」という名前のqueueのみが定義されている) 。      
例: 
<property>
  <name>mapred.queue.names</name>
  <value>default,queue1,queue2</value>
</property>   
 
各queueの設定をconf/capacity-scheduler.xmlに記載する。  
 
設定ファイル変更後、JobTrackerとTaskTrackerを再起動する。 
 
ジョブの実行時に、mapred.job.queue.name プロパティでqueue名を指定する。    
capacity-scheduler.xmlには、 
mapred.capacity-scheduler.queue.<queue-name>.<property-name>   
 
という形で、queue名とプロパティの名前を指定して、必要な設定を行います。たとえば、queue1に割り当てるキャパシティ(%)を75%にするには次のように記載します。     
<property>
  <name>mapred.capacity-scheduler.queue.queue1.capacity</name>
  <value>75</value>
</property>     
 
queueごとに設定可能なプロパティはいくつかありますが、まず設定すべきはこのcapacityになります。mapred.queue.namesプロパティで定義したすべてのqueueに対して、それぞれのqueueがクラスタ全体の何%のslotを使用するのかをここで定義します。ここで指定する、各queueのcapacityの合計がちょうど100%になっていないと、以下のようなエラーメッセージを表示してJobTrackerが起動に失敗するので、注意しましょう。       
2013-05-06 21:45:38,471 FATAL org.apache.hadoop.mapred.JobTracker: java.lang.IllegalArgumentException: Sum of queue capacities not 100% at 75.0
     at org.apache.hadoop.mapred.CapacityTaskScheduler.parseQueues(CapacityTaskScheduler.java:921)              
 
 
さて、それでは、Capacity Schedulerを設定するとMapReduceジョブ実行時にどのように動作するか、halookで見てみましょう。   
前回の記事と同様、クラスタ全体でMapのslot数が8の環境(CDH3u5)で、以下の2つのジョブを実行します。  
ジョブA:Mapタスク数40、Reduceタスク数0  
ジョブB:Mapタスク数10、Reduceタスク数0  
 
1Mapタスクの実行にかかる時間は1分とします。ジョブAだけをこのクラスタ上で動かすと、8タスクの同時実行×5回で5分かかります。ジョブBだけを動かすと、8タスク、2タスクを順に動かし、2分かかります。図1 の(1)がジョブAを単体で動かしたもの、図1の(2)がジョブBを単体で動かしたものです。     
図1  ジョブA、ジョブBの個別実行と同時実行  
 
 
図2 が、ジョブAを、図3 がジョブBを単体実行した結果です。どちらも、8タスクずつ同時に実行されていることが、halookのArrow Chart 画面の矢印の数およびConcurrent task num のグラフで確認できます。また、(3)と(4)は、ジョブAを開始した直後にジョブBを、Schedulerの設定なしに実行した結果です。(4)は(2)と同じジョブBですが、リソースを(3)のジョブAに占有されているため、待たされていることがわかります(図4 ) 。         
図2  (1)ジョブAの単体実行 
 
 
図3  (2)ジョブBの単体実行 
 
 
図4  (4)ジョブAの終了を待機するジョブB 
 
 
次に、Capacity Schedulerを設定し、capacityが50%のqueueを2つ作った状態で実行します。その結果が、(5)と(6)です。(5)がジョブA、(6)がジョブBで、(5)の実行開始直後に(6)を実行しています。使用するslot数が半分になっているため、実行に時間がかかりますが、(6)のジョブBが待たされることはなくなっています。       
図5  (5)capacityが50%の際のジョブA実行 
 
 
図6  (6) capacityが50%の際のジョブB実行 
 
 
queueの設定を変えて、queue1のcapacityが75%、queue2のcapacityが25%として、ジョブAをqueue1で、ジョブBをqueue2で実行したのが、図1の(7)と(8)です。     
図7  (7)capacityが75%の際のジョブA実行 
 
 
図8  (8)capacityが25%の際のジョブA実行 
 
 
同一queue内での実行 
同じqueueの中で2つのジョブを実行する場合、基本的にはFIFOの順で実行されますが、ジョブの優先度を設定することで、優先度の高いものから順に実行することもできるようになります。そのためには、まずcapacity-scheduler.xml のqueueの設定で、優先度に応じた実行順序の入れ替えを有効にする必要があります。たとえば「queue1」に対して優先度を有効にするには以下のように記載します。        
<property>
  <name>mapred.capacity-scheduler.queue.queue1.supports-priority</name>
  <value>true</value>
</property> 
 
実行時に、mapred.job.priorityプロパティでジョブの優先度を指定することができます。優先度は、VERY_LOW、LOW、NORMAL、HIGH、VERY_HIGHの中から選べます。          
図9 は、(9)のジョブAの優先度をNORMAL(デフォルト)として、(10)のジョブBの優先度をVERY_HIGH にして実行した結果です。ジョブAのほうが先に開始していますが、ジョブBが先に終了しています。図10 と図11 を見ると、優先度の高いジョブBを実行するために、ジョブAの新規タスク投入が一時的に止まっていることが確認できます。        
図9  優先度を設定したジョブ実行 
 
 
図10  (9)優先度の低いジョブAの実行 
 
 
図11  (10)優先度の高いジョブBの実行 
 
 
まとめ 
Capacity Schedulerの利用による複数ジョブの同時実行について、halookを用いて実際の動きを確認しました。 
今回ご紹介した、Capacity Schedulerの基本的な機能である、複数のキューの設定とそれぞれのキューの中でのFIFOスケジューリングについては、設定次第でFair Scheduler でも同じことができます。しかし、まだまだ紹介し切れていない機能はたくさんあり、その中には、Capacity Scheduler にしかできないTaskのメモリ使用量に応じた割り振りもあります。メモリ使用量の異なるジョブを動かすことを想定する場合は、Fair SchedulerではなくCapacity Schedulerを使うことになるでしょう。Schedulerの選択および、Schedulerの設定は、クラスタの規模や、実行する予定のジョブの種類に大きく依存して変わるので、今回の記事と前回のFair Schedulerの記事とを合わせて、まずは基本的な概念を理解する手助けとしていただければと思います。