concurrentパッケージは、マルチスレッド処理において、各スレッドの状態の監視や管理をし易くした大変便利なパッケージです。
concurrentパッケージは、Java5より導入されたパッケージのため、Java5以降で利用可能です。
2017時点Javaプラットフォームを利用した多くのシステムでは、マルチスレッドと言えばconcurrentパッケージと言えます。
concurrentパッケージでは、大きく分けて次の概念で構成されています。
何が強力かといえば、今までスレッド数の実行制御や監視を管理するサービスの実装は、
ハードウェア、OS,JVM、JavaAPI等の全ての知識と技術に長けた小数のエンジニアが安全な実装をできていました。
または、できるエンジニアがおらず、並列処理を行うクラスでThreadのインスタンス実装し、
全体のスレッド数に関わらず即実行させていました。するとThreadの並列数によっては、
サーバーのCPUを100%に張り付けるからOOME(OutOfMemoryException)を発生させるかどちからでした。
それが、JavaAPIとして利用方法と実装方法の理解でほぼタスクの理解とサービスの設定値の調整だけで、
マルチスレッドを制御及び監視するサービスが利用できるようになりました。
concurrentパッケージは、多数のクラスがありますが、1つ1つ丁寧に確認していけばJavaエンジニアにとって大変強力な技術になります。
まずは、Executorsクラスを用いて、Thread,Runnable,タスクの概念,Executor及びExecutorServiceを理解することが全体の理解に繋がるでしょう。
Executorsクラスは、利用者に簡易にスレッドプール(スレッド)のインスタンスを作成するためのクラスです。 車の運転に例えるならばExecutorsクラスは、オートマに該当します。対して、Executorインターフェイス、ExecutorServiceインターフェイスクラス、 その配下の実体クラスは、マニュアル運転に該当します。前者は、簡易にExecutorServiceを利用したい場合に大変便利です。 後者は、独自のスレッドプールなどを構築する際にきめ細やかに動作をエンジニアが、決める事ができます。
まずは、Java8APIから戻り値がExecutorServiceインターフェイス(ScheduledExecutorService含む)であるクラスメソッドについて説明します。
導入 | 戻り値 | クラスメソッド | 説明 |
---|---|---|---|
Java5 | Executor | newSingleThreadExecutor() | 生成されたExecutorが単一のワーカースレッドで taskに対してFIFO(First in First Out)順で順次実行されます。 Executorに対して、taskの委譲数に上限はありません。(unbounded) newFixedThreadPool(1)と同じ動きですが、 Executorの再構築に関わらずthreadが常に使いまわされる点が異なります。 |
Java5 | Executor | newSingleThreadExecutor(ThreadFactory threadFactory) | newSingleThreadExecutorと同様に単一のワーカースレッドで、 taskに対してFIFOで順次実行されます。 違いは、引数のThreadFactoryで独自のThreadをワーカースレッドとして利用できます。 (Threadのrunメソッド以外をオーバライドして独自に実装する必要がある場合) |
Java5 | Executor | newFixedThreadPool( int nThreads) | 生成されたExecutorが引数で指定されたThread数をスレッドプールとします。 taskに対してFIFO順でスレッドープール分並列処理が実行されます。 |
Java5 | Executor | newFixedThreadPool ( int nThreads , ThreadFactory threadFactory) |
newFixedThreadPool( int nThreads)と同様です。 違いは、引数のThreadFactoryで独自のThreadをワーカースレッドとして利用できます。 (Threadのrunメソッド以外をオーバライドして独自に実装する必要がある場合) |
Java5 | Scheduled ExecutorService |
newScheduledThreadPool( int corePoolSize ) | 指定された遅延時間後、または周期的にコマンドの実行を スケジュール可能なスレッドプールを作成します。 生成されたExecutorが引数で指定されたThread数をスレッドプールとします。 schedule,scheduleAtFixedRate,scheduleWithFixedDelayメソッドを利用して、 taskの実行前後の時間を調整します。 |
Java5 | Scheduled ExecutorService |
newScheduledThreadPool ( int corePoolSize , ThreadFactory threadFactory ) |
newScheduledThreadPool( int corePoolSize )と同様です。 違いは、引数のThreadFactoryで独自のThreadをワーカースレッドとして利用できます。 |
Java5 | ExecutorService | newCachedThreadPool() | 新規スレッドプールを1つ作成し、executeで渡されたtask数に応じて、 スレッドプール内のスレッド数とします。 JVMとCPUの資産があればtask数=thread数です。 taskの消化が終わり、次のタスクが60秒未満に発生した場合には、 先に作成されたキャッシュされているスレッドを再利用します。 60秒以上経過した場合には、前のスレッドは消去され、 task数に応じて、新しくスレッドを生成します。 |
Java5 | ExecutorService | newCachedThreadPool (ThreadFactory threadFactory) |
newCachedThreadPool()と同様の動きです。 スレッドプールのスレッドの動きを引数で指定したThreadの動きにします。 |
Java5 | ExecutorService | unconfigurableExecutorService (ExecutorService executor) |
全てのExecutorServiceのインスタンスを指定した ExecutorServiceに委譲または、制限する機能です。 前行までだけでも多くのExecutorServiceがExecutorsで生成できます。 更にマニュアルで、独自のExecutorServiceも作成できます。 アプリケーション内で多種のExecutorServiceが実装されるとThreadの管理が、 煩雑になります。それを抑制させるために、 このメソッドを指定した以後のExecutorServiceのインスタンスは、 同じか同機能のみを利用することになります。 |
APIの日本語訳で「共有アンバウンド形式のキューなしで」と記載されている箇所があります。
英語の本家サイトを見ると誤った翻訳です。実際には、キューを利用しています。
Java8APIのOracle(米国サイト)をGoogleの翻訳機能を使った方がかなり精度の高い日本語訳がでます。
concurrentパッケージのAPIを閲覧する場合には、Oracle社英語版のAPIを必ず確認して下さい。
アンバウンドとは、無制限のという意味を示しThreadQueueに追加できるtaskの数に制限がない事を意味します。
各ExecutorServiceの概要図を上記凡例図を用いて示します。
newSingleThreadExecutorのメソッドの前に全てのExecutorServiceで利用するクラスが以下になります。
カウンタークラス
package jp.co.yourcompany.education.concurrent;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 非同期用カウンタークラス
*/
public class SummaryCounter {
private AtomicInteger counter = new AtomicInteger( 0 );
/**
* カウンターをインクリメントし、
* その値を呼び出し元に戻す。
* @return
*/
public synchronized int inclrement( ){
return counter.getAndIncrement();
}
}
Runnable実装例
package jp.co.yourcompany.education.concurrent;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* concurrent用学習用taskに与える処理
* @raita.kuwabara
*/
public class RunnableSample implements Runnable {
/**
* private
*/
private int id = 0;
// 0.5秒周期
private static final long SLEEP_MILLIONS = TimeUnit.MILLISECONDS.convert( 1000, TimeUnit.MILLISECONDS);
/**
* スレッド別カウンター
*/
private AtomicInteger counter = new AtomicInteger(0);
/**
* カウンターオブジェクト
*/
private SummaryCounter summaryCounter = null;
/**
* コンストラクタ
*/
public RunnableSample(SummaryCounter summaryCounter, int id) {
this.summaryCounter = summaryCounter;
this.id = id;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep( SLEEP_MILLIONS );
} catch (InterruptedException e) {
e.printStackTrace();
}
increment();
int summaryCount = summaryIncrement();
outputCounter(summaryCount);
}
}
/**
* それぞれのカウントをインクリメント
*/
private void increment() {
counter.incrementAndGet();
}
/**
* SampleThreadクラスの全インスタンスのループ総数をインクリメントする。 インクリメントした数を出力する。
*/
private int summaryIncrement() {
return summaryCounter.inclrement();
}
/**
* 出力
*/
private void outputCounter(int summaryCount) {
System.out.println(Thread.currentThread().getName() + ":id[ " + id + " ] counter:" + counter);
System.out.println(Thread.currentThread().getName() + ":id[ " + id + " ] summaryCounter:" + summaryCount);
}
}
newSingleThreadExecutor実装例
/**
* newSingleThreadExecutorを確認するサンプル
* @param factory ThreadFactory(ワーカースレッドを変更したい場合引き渡す。)
* null 指定時には、デフォルトのワーカースレッドが利用されます。
*/
public void execSingleThreadExecutor( ThreadFactory factory ) {
ExecutorService executorService = null;
log.info("newSingleThreadExecutor START");
//サイズ制限のない1つのThreadExecutor();
if( factory == null){
executorService = Executors.newSingleThreadExecutor();
} else {
executorService = Executors.newSingleThreadExecutor( factory );
}
SummaryCounter summaryCounter = new SummaryCounter();
for ( int i = 0 ; i < 100 ; i++ ){
executorService.execute( new RunnableSample( summaryCounter , i ) );
}
log.info("newSingleThreadExecutor runnable");
//taskが残存していない事を確認してサービスの終了
if( executorService.isTerminated() ){
executorService.shutdown();
log.info("newSingleThreadExecutor ShutDown");
}
log.info("newSingleThreadExecutor END");
}
上記メソッドの実行方法
public static final void main ( String[] args){
ExecutorsSample sample = new ExecutorsSample();
sample.execSingleThreadExecutor ( null );
}
実行結果
pool-1-thread-1:id[ 0 ] counter:1
pool-1-thread-1:id[ 0 ] summaryCounter:0
pool-1-thread-1:id[ 0 ] counter:2
pool-1-thread-1:id[ 0 ] summaryCounter:1
pool-1-thread-1:id[ 0 ] counter:3
pool-1-thread-1:id[ 0 ] summaryCounter:2
pool-1-thread-1:id[ 0 ] counter:4
pool-1-thread-1:id[ 0 ] summaryCounter:3
pool-1-thread-1:id[ 0 ] counter:5
pool-1-thread-1:id[ 0 ] summaryCounter:4
:
スレッドプールが1つあり、スレッド1つで順番に実行されることがご理解いただけると思います。
newFixedThreadPool実装例
/**
* newFixedThreadPool確認用メソッド
* スレッド数は、threadCountで変更して下さい。
* @param factory ThreadFactory(ワーカースレッドを変更したい場合引き渡す。)
* null 指定時には、デフォルトのワーカースレッドが利用されます。
*/
public void execNewFixedThreadPool( ThreadFactory threadFactory) {
ExecutorService executorService = null;
log.info("execnewFixedThreadPool START");
final int threadCount = 5;
if( threadFactory == null ){
executorService = Executors.newFixedThreadPool( threadCount );
} else {
executorService = Executors.newFixedThreadPool( threadCount, threadFactory );
}
SummaryCounter summaryCounter = new SummaryCounter();
for ( int i = 0 ; i < 100 ; i++ ){
executorService.execute( new RunnableSample( summaryCounter , i ) );
}
log.info("newSingleThreadExecutor runnable");
//taskが残存していない事を確認してサービスの終了
if( executorService.isTerminated() ){
executorService.shutdown();
log.info("newSingleThreadExecutor ShutDown");
}
log.info("newSingleThreadExecutor END");
}
上記メソッドの実行方法
public static final void main ( String[] args){
ExecutorsSample sample = new ExecutorsSample();
sample.execNewFixedThreadPool ( null );
}
実行結果
pool-1-thread-1:id[ 0 ] counter:1
pool-1-thread-1:id[ 0 ] summaryCounter:0
pool-1-thread-3:id[ 2 ] counter:1
pool-1-thread-3:id[ 2 ] summaryCounter:1
pool-1-thread-2:id[ 1 ] counter:1
pool-1-thread-2:id[ 1 ] summaryCounter:4
pool-1-thread-5:id[ 4 ] counter:1
pool-1-thread-4:id[ 3 ] counter:1
pool-1-thread-5:id[ 4 ] summaryCounter:3
pool-1-thread-4:id[ 3 ] summaryCounter:2
pool-1-thread-1:id[ 0 ] counter:2
pool-1-thread-1:id[ 0 ] summaryCounter:5
pool-1-thread-3:id[ 2 ] counter:2
pool-1-thread-2:id[ 1 ] counter:2
pool-1-thread-3:id[ 2 ] summaryCounter:6
pool-1-thread-2:id[ 1 ] summaryCounter:7
pool-1-thread-4:id[ 3 ] counter:2
pool-1-thread-5:id[ 4 ] counter:2
pool-1-thread-5:id[ 4 ] summaryCounter:9
:
スレッドプールが1つあり、スレッド数が指定された数5つで処理されています。
newSingleThreadExecutorより並列数が5倍のため、全体のスループットが早くなっているがご理解頂けるかと思います。
※.マルチスレッド対応のCPUの性質、コア数、スレッド数に依存します。
newScheduledThreadPool概要図です。
ScheduledExecutorService.serviceの引数で指定した待機時間後にスレッドプールのスレッドに実行されます。
スレッドプールのスレッド数は、newScheduledThreadPoolで調整します。
ScheduledExecutorService.scheduleAtFixedRateの概念図です。
他のコマンドの並列状況に関わらず待機時間を経過し次第実行されます。
定周期で何かを実行したい場合に有効なメソッドです。
ScheduledExecutorService.scheduleWithFixedDelayの概念図です。
他のコマンドと次のコマンドの遅延時間を指定しています。
スレッドプールのスレッド数を1つにすることでコマンド(タスク)の重なりが無くなります。
newScheduledThreadPool実装例
/**
* newScheduledThreadPool確認用メソッド
* @param threadFactory
*/
public void execScheduledExecutorService( ThreadFactory threadFactory) {
ScheduledExecutorService scheduledExecutorService = null;
log.info("execScheduledExecutorService START");
final int threadCount = 2;
if( threadFactory == null ){
scheduledExecutorService = Executors.newScheduledThreadPool( threadCount );
} else {
scheduledExecutorService = Executors.newScheduledThreadPool( threadCount, threadFactory );
}
SummaryCounter summaryCounter = new SummaryCounter();
//3,6、9、12、15・・・30秒の待機時間のタスク
for ( int i = 0 ; i < 10 ; i++ ) {
int waitSeconds = 3 * ( 1 + i );
scheduledExecutorService.schedule( new RunnableSample( summaryCounter , i ), waitSeconds , TimeUnit.SECONDS );
}
log.info("execScheduledExecutorService runnable");
if( scheduledExecutorService.isTerminated() ){
scheduledExecutorService.shutdown();
log.info("execScheduledExecutorService ShutDown");
}
log.info("execScheduledExecutorService END");
}
実行結果
pool-1-thread-1:id[ 0 ] summaryCounter:0
pool-1-thread-1:id[ 0 ] counter:2
pool-1-thread-1:id[ 0 ] summaryCounter:1
pool-1-thread-1:id[ 0 ] counter:3
pool-1-thread-1:id[ 0 ] summaryCounter:2
pool-1-thread-2:id[ 1 ] counter:1
pool-1-thread-2:id[ 1 ] summaryCounter:3
pool-1-thread-1:id[ 0 ] counter:4
pool-1-thread-1:id[ 0 ] summaryCounter:4
:
:
3秒間隔でタスクを実行しています。
newCachedThreadPool実装例
/**
* cachedTreadPool
* @param threadFactory
*/
private void execCachedThreadPool( ThreadFactory threadFactory ){
ExecutorService cachedThreadPool = null;
log.info("execCachedThreadPool START");
if( threadFactory == null ){
cachedThreadPool = Executors.newCachedThreadPool( );
} else {
cachedThreadPool = Executors.newCachedThreadPool( threadFactory );
}
SummaryCounter summaryCounter = new SummaryCounter();
for ( int i = 0 ; i < 3 ; i++ ) {
cachedThreadPool.execute( new RunnableSample( summaryCounter , i ) );
}
try {
//ここの間隔が、60秒以上だとThreadは常に作成される。
Thread.sleep( 61000 );
} catch (InterruptedException e) {
e.printStackTrace();
}
for ( int i = 0 ; i < 3 ; i++ ) {
cachedThreadPool.execute( new RunnableSample( summaryCounter , i ) );
}
log.info("execCachedThreadPool runnable");
if( cachedThreadPool.isTerminated() ){
cachedThreadPool.shutdown();
log.info("execCachedThreadPool ShutDown");
}
log.info("execCachedThreadPool END");
}
実行結果
pool-1-thread-2:id[ 1 ] counter:1
pool-1-thread-1:id[ 0 ] counter:1
pool-1-thread-3:id[ 2 ] counter:1
pool-1-thread-1:id[ 0 ] summaryCounter:0
pool-1-thread-2:id[ 1 ] summaryCounter:2
pool-1-thread-3:id[ 2 ] summaryCounter:1
:
:61秒後経過
pool-1-thread-5:id[ 1 ] counter:1
pool-1-thread-4:id[ 0 ] counter:1
pool-1-thread-6:id[ 2 ] counter:1
pool-1-thread-4:id[ 0 ] summaryCounter:0
pool-1-thread-5:id[ 1 ] summaryCounter:2
pool-1-thread-6:id[ 2 ] summaryCounter:1
スレッドの状況とタイミングによっては、スレッド1,2,3が出力されます。
開発現場で利用されそうな主なExecutorを列記しました。Java8には、ForkJoinPoolという並列処理というよりは分散処理用の
機能も追加されています。
concurrentパッケージのExecutorsからExecutorServiceのインスタンスを利用して各種メソッドを確認して下さい。
マニュアルで独自のExecutorServiceやExecutorsではなく直接各種ExecutorServiceを実装したクラスを実装する場合には、
Java上級者の力が必要です。それ故に俗人化しやすい傾向があります。
concurrentパッケージを利用して、大規模システムの並列処理サービスを構築する際には、
上級者から開発者への指導とマニュアル化を必ず対応して下さい。
concurrentパッケージの各種クラスや他の利用方法については、本技術サイトの開発状況により充当していく予定です。