concurrentパッケージ

concurrentパッケージは、マルチスレッド処理において、各スレッドの状態の監視や管理をし易くした大変便利なパッケージです。

concurrentパッケージは、Java5より導入されたパッケージのため、Java5以降で利用可能です。
2017時点Javaプラットフォームを利用した多くのシステムでは、マルチスレッドと言えばconcurrentパッケージと言えます。

concurrentパッケージでは、大きく分けて次の概念で構成されています。

  • Thread及びrunnableのrunで実装していた主処理をタスクと呼びます。
  • タスクは、Executorインターフェイスを頂点とした各種サービス(ThreadPoolExecutor,ScheduledThreadPoolExecutorなど)に処理を並列処理を委譲します。
  • 各種サービスは、サービスに依頼されたタスクの並列数の制御や監視、実行を行います。

何が強力かといえば、今までスレッド数の実行制御や監視を管理するサービスの実装は、
ハードウェア、OS,JVM、JavaAPI等の全ての知識と技術に長けた小数のエンジニアが安全な実装をできていました。
または、できるエンジニアがおらず、並列処理を行うクラスでThreadのインスタンス実装し、
全体のスレッド数に関わらず即実行させていました。するとThreadの並列数によっては、
サーバーのCPUを100%に張り付けるからOOME(OutOfMemoryException)を発生させるかどちからでした。
それが、JavaAPIとして利用方法と実装方法の理解でほぼタスクの理解とサービスの設定値の調整だけで、
マルチスレッドを制御及び監視するサービスが利用できるようになりました。

JavaのプラットフォームがJava5以降であれば、非同期処理のロジック数に関わらずconcurrentパッケージを利用するべきです。

Executors

concurrentパッケージは、多数のクラスがありますが、1つ1つ丁寧に確認していけばJavaエンジニアにとって大変強力な技術になります。
まずは、Executorsクラスを用いて、Thread,Runnable,タスクの概念,Executor及びExecutorServiceを理解することが全体の理解に繋がるでしょう。

Executor及びExecutorService

Executorsクラスは、利用者に簡易にスレッドプール(スレッド)のインスタンスを作成するためのクラスです。 車の運転に例えるならばExecutorsクラスは、オートマに該当します。対して、Executorインターフェイス、ExecutorServiceインターフェイスクラス、 その配下の実体クラスは、マニュアル運転に該当します。前者は、簡易にExecutorServiceを利用したい場合に大変便利です。 後者は、独自のスレッドプールなどを構築する際にきめ細やかに動作をエンジニアが、決める事ができます。

まずは、Java8APIから戻り値がExecutorServiceインターフェイス(ScheduledExecutorService含む)であるクラスメソッドについて説明します。

導入 戻り値 クラスメソッド 説明
Java5 Executor newSingleThreadExecutor() 生成されたExecutorが単一のワーカースレッドで
taskに対してFIFO(First in First Out)順で順次実行されます。
Executorに対して、taskの委譲数に上限はありません。(unbounded)
newFixedThreadPool(1)と同じ動きですが、
Executorの再構築に関わらずthreadが常に使いまわされる点が異なります。
Java5 Executor newSingleThreadExecutor(ThreadFactory threadFactory) newSingleThreadExecutorと同様に単一のワーカースレッドで、
taskに対してFIFOで順次実行されます。
違いは、引数のThreadFactoryで独自のThreadをワーカースレッドとして利用できます。
(Threadのrunメソッド以外をオーバライドして独自に実装する必要がある場合)
Java5 Executor newFixedThreadPool( int nThreads) 生成されたExecutorが引数で指定されたThread数をスレッドプールとします。
taskに対してFIFO順でスレッドープール分並列処理が実行されます。
Java5 Executor newFixedThreadPool
( int nThreads , ThreadFactory threadFactory)
newFixedThreadPool( int nThreads)と同様です。
違いは、引数のThreadFactoryで独自のThreadをワーカースレッドとして利用できます。
(Threadのrunメソッド以外をオーバライドして独自に実装する必要がある場合)
Java5 Scheduled
ExecutorService
newScheduledThreadPool( int corePoolSize ) 指定された遅延時間後、または周期的にコマンドの実行を
スケジュール可能なスレッドプールを作成します。
生成されたExecutorが引数で指定されたThread数をスレッドプールとします。
schedule,scheduleAtFixedRate,scheduleWithFixedDelayメソッドを利用して、
taskの実行前後の時間を調整します。
Java5 Scheduled
ExecutorService
newScheduledThreadPool
( int corePoolSize , ThreadFactory threadFactory )
newScheduledThreadPool( int corePoolSize )と同様です。
違いは、引数のThreadFactoryで独自のThreadをワーカースレッドとして利用できます。
Java5 ExecutorService newCachedThreadPool() 新規スレッドプールを1つ作成し、executeで渡されたtask数に応じて、
スレッドプール内のスレッド数とします。
JVMとCPUの資産があればtask数=thread数です。
taskの消化が終わり、次のタスクが60秒未満に発生した場合には、
先に作成されたキャッシュされているスレッドを再利用します。
60秒以上経過した場合には、前のスレッドは消去され、
task数に応じて、新しくスレッドを生成します。
Java5 ExecutorService newCachedThreadPool
(ThreadFactory threadFactory)
newCachedThreadPool()と同様の動きです。
スレッドプールのスレッドの動きを引数で指定したThreadの動きにします。
Java5 ExecutorService unconfigurableExecutorService
(ExecutorService executor)
全てのExecutorServiceのインスタンスを指定した
ExecutorServiceに委譲または、制限する機能です。 前行までだけでも多くのExecutorServiceがExecutorsで生成できます。
更にマニュアルで、独自のExecutorServiceも作成できます。
アプリケーション内で多種のExecutorServiceが実装されるとThreadの管理が、 煩雑になります。それを抑制させるために、
このメソッドを指定した以後のExecutorServiceのインスタンスは、 同じか同機能のみを利用することになります。

JavaAPIの注意事項

APIの日本語訳で「共有アンバウンド形式のキューなしで」と記載されている箇所があります。
英語の本家サイトを見ると誤った翻訳です。実際には、キューを利用しています。
Java8APIのOracle(米国サイト)をGoogleの翻訳機能を使った方がかなり精度の高い日本語訳がでます。
concurrentパッケージのAPIを閲覧する場合には、Oracle社英語版のAPIを必ず確認して下さい。
アンバウンドとは、無制限のという意味を示しThreadQueueに追加できるtaskの数に制限がない事を意味します。

newSingleThreadExecutor

凡例

各ExecutorServiceの概要図を上記凡例図を用いて示します。

凡例
  • タスクは、executeメソッドが実行された順にTaskQueueに管理されます。
  • 実行順番はFIFOになり、最初にexecuteが呼び出された順から処理されていきます。
  • ワーカースレッドは常に1つなので、1つのtaskが終わるまで次のtaskは待機状態になります。
  • ある意味、mainスレッドとは別の順次実行用のスレッドが1つあると捉える事ができます。
  • 簡易なアプリケーションでスレッドを処理させる場合には有効なメソッドでしょう。

サンプル共通のクラス

newSingleThreadExecutorのメソッドの前に全てのExecutorServiceで利用するクラスが以下になります。

カウンタークラス
package jp.co.yourcompany.education.concurrent;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 非同期用カウンタークラス
 */
public class SummaryCounter {

    private AtomicInteger counter = new AtomicInteger( 0 );

    /**
     * カウンターをインクリメントし、
     * その値を呼び出し元に戻す。
     * @return
     */
    public synchronized int inclrement( ){
        return counter.getAndIncrement();
    }
}
					
Runnable実装例
package jp.co.yourcompany.education.concurrent;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * concurrent用学習用taskに与える処理
 * @raita.kuwabara
 */
public class RunnableSample implements Runnable {

    /**
     * private
     */
    private int id = 0;

    // 0.5秒周期
    private static final long SLEEP_MILLIONS = TimeUnit.MILLISECONDS.convert( 1000, TimeUnit.MILLISECONDS);

    /**
     * スレッド別カウンター
     */
    private AtomicInteger counter = new AtomicInteger(0);

    /**
     * カウンターオブジェクト
     */
    private SummaryCounter summaryCounter = null;

    /**
     * コンストラクタ
     */
    public RunnableSample(SummaryCounter summaryCounter, int id) {
        this.summaryCounter = summaryCounter;
        this.id = id;
    }

    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            try {
                Thread.sleep( SLEEP_MILLIONS );
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            increment();
            int summaryCount = summaryIncrement();
            outputCounter(summaryCount);
        }
    }

    /**
     * それぞれのカウントをインクリメント
     */
    private void increment() {
        counter.incrementAndGet();
    }

    /**
     * SampleThreadクラスの全インスタンスのループ総数をインクリメントする。 インクリメントした数を出力する。
     */
    private int summaryIncrement() {
        return summaryCounter.inclrement();
    }

    /**
     * 出力
     */
    private void outputCounter(int summaryCount) {
        System.out.println(Thread.currentThread().getName() + ":id[ " + id + " ] counter:" + counter);
        System.out.println(Thread.currentThread().getName() + ":id[ " + id + " ] summaryCounter:" + summaryCount);
    }

}
					

newSingleThreadExecutorのサンプル

newSingleThreadExecutor実装例
    /**
     * newSingleThreadExecutorを確認するサンプル
     * @param factory ThreadFactory(ワーカースレッドを変更したい場合引き渡す。) 
* null 指定時には、デフォルトのワーカースレッドが利用されます。 */ public void execSingleThreadExecutor( ThreadFactory factory ) { ExecutorService executorService = null; log.info("newSingleThreadExecutor START"); //サイズ制限のない1つのThreadExecutor(); if( factory == null){ executorService = Executors.newSingleThreadExecutor(); } else { executorService = Executors.newSingleThreadExecutor( factory ); } SummaryCounter summaryCounter = new SummaryCounter(); for ( int i = 0 ; i < 100 ; i++ ){ executorService.execute( new RunnableSample( summaryCounter , i ) ); } log.info("newSingleThreadExecutor runnable"); //taskが残存していない事を確認してサービスの終了 if( executorService.isTerminated() ){ executorService.shutdown(); log.info("newSingleThreadExecutor ShutDown"); } log.info("newSingleThreadExecutor END"); }
上記メソッドの実行方法
    public static final void main ( String[] args){
        ExecutorsSample sample = new ExecutorsSample();
        sample.execSingleThreadExecutor ( null );
    }
					
実行結果
pool-1-thread-1:id[ 0 ] counter:1
pool-1-thread-1:id[ 0 ] summaryCounter:0
pool-1-thread-1:id[ 0 ] counter:2
pool-1-thread-1:id[ 0 ] summaryCounter:1
pool-1-thread-1:id[ 0 ] counter:3
pool-1-thread-1:id[ 0 ] summaryCounter:2
pool-1-thread-1:id[ 0 ] counter:4
pool-1-thread-1:id[ 0 ] summaryCounter:3
pool-1-thread-1:id[ 0 ] counter:5
pool-1-thread-1:id[ 0 ] summaryCounter:4
	:
				

スレッドプールが1つあり、スレッド1つで順番に実行されることがご理解いただけると思います。

newFixedThreadPool

newFixedThreadPool
  • タスクは、executeメソッドが実行された順にTaskQueueに管理されます。
  • 実行順番はFIFOになり、最初にexecuteが呼び出された順から処理されていきます。
  • スレッドプールのスレッド数は、引数で指定したスレッド数になります。
  • 例えばサーバのスペックに限りがあり、並列処理のリクエスト数も限られている場合に、
    全体のスループットより、CPU、メモリ、JVMの安定性を求める場合に有効な手段でしょう。
  • スループットを上げたい場合には、スレッド数を加減し、CPUがピーク時に100%に張り付かないように調整します。

newFixedThreadPoolのサンプル

newFixedThreadPool実装例
    /**
     * newFixedThreadPool確認用メソッド
     * スレッド数は、threadCountで変更して下さい。
     * @param factory ThreadFactory(ワーカースレッドを変更したい場合引き渡す。) 
* null 指定時には、デフォルトのワーカースレッドが利用されます。 */ public void execNewFixedThreadPool( ThreadFactory threadFactory) { ExecutorService executorService = null; log.info("execnewFixedThreadPool START"); final int threadCount = 5; if( threadFactory == null ){ executorService = Executors.newFixedThreadPool( threadCount ); } else { executorService = Executors.newFixedThreadPool( threadCount, threadFactory ); } SummaryCounter summaryCounter = new SummaryCounter(); for ( int i = 0 ; i < 100 ; i++ ){ executorService.execute( new RunnableSample( summaryCounter , i ) ); } log.info("newSingleThreadExecutor runnable"); //taskが残存していない事を確認してサービスの終了 if( executorService.isTerminated() ){ executorService.shutdown(); log.info("newSingleThreadExecutor ShutDown"); } log.info("newSingleThreadExecutor END"); }
上記メソッドの実行方法
    public static final void main ( String[] args){
        ExecutorsSample sample = new ExecutorsSample();
        sample.execNewFixedThreadPool ( null );
    }
					
実行結果
pool-1-thread-1:id[ 0 ] counter:1
pool-1-thread-1:id[ 0 ] summaryCounter:0
pool-1-thread-3:id[ 2 ] counter:1
pool-1-thread-3:id[ 2 ] summaryCounter:1
pool-1-thread-2:id[ 1 ] counter:1
pool-1-thread-2:id[ 1 ] summaryCounter:4
pool-1-thread-5:id[ 4 ] counter:1
pool-1-thread-4:id[ 3 ] counter:1
pool-1-thread-5:id[ 4 ] summaryCounter:3
pool-1-thread-4:id[ 3 ] summaryCounter:2
pool-1-thread-1:id[ 0 ] counter:2
pool-1-thread-1:id[ 0 ] summaryCounter:5
pool-1-thread-3:id[ 2 ] counter:2
pool-1-thread-2:id[ 1 ] counter:2
pool-1-thread-3:id[ 2 ] summaryCounter:6
pool-1-thread-2:id[ 1 ] summaryCounter:7
pool-1-thread-4:id[ 3 ] counter:2
pool-1-thread-5:id[ 4 ] counter:2
pool-1-thread-5:id[ 4 ] summaryCounter:9
	:
				

スレッドプールが1つあり、スレッド数が指定された数5つで処理されています。
newSingleThreadExecutorより並列数が5倍のため、全体のスループットが早くなっているがご理解頂けるかと思います。
※.マルチスレッド対応のCPUの性質、コア数、スレッド数に依存します。

newScheduledThreadPool

ScheduledExecutorService1

newScheduledThreadPool概要図です。
ScheduledExecutorService.serviceの引数で指定した待機時間後にスレッドプールのスレッドに実行されます。

スレッドプールのスレッド数は、newScheduledThreadPoolで調整します。

ScheduledExecutorService1

ScheduledExecutorService.scheduleAtFixedRateの概念図です。
他のコマンドの並列状況に関わらず待機時間を経過し次第実行されます。
定周期で何かを実行したい場合に有効なメソッドです。

ScheduledExecutorService1

ScheduledExecutorService.scheduleWithFixedDelayの概念図です。
他のコマンドと次のコマンドの遅延時間を指定しています。
スレッドプールのスレッド数を1つにすることでコマンド(タスク)の重なりが無くなります。

  • 定周期でなにかを実行したい場合に有効な手法です。
  • Javaのmainスレッドを常駐しておき、スケジューラ(日次、週次バッチみたいに)のような機能もできます。

newScheduledThreadPoolのサンプル

newScheduledThreadPool実装例
    /**
     * newScheduledThreadPool確認用メソッド
     * @param threadFactory
     */
    public void execScheduledExecutorService( ThreadFactory threadFactory) {
        ScheduledExecutorService scheduledExecutorService = null;

        log.info("execScheduledExecutorService START");

        final int  threadCount = 2;

        if( threadFactory == null ){
            scheduledExecutorService = Executors.newScheduledThreadPool( threadCount );
        } else {
            scheduledExecutorService = Executors.newScheduledThreadPool( threadCount, threadFactory );
        }

        SummaryCounter summaryCounter = new SummaryCounter();

        //3,6、9、12、15・・・30秒の待機時間のタスク
        for ( int i = 0 ; i < 10 ; i++  ) {
            int waitSeconds = 3 * ( 1 + i );
            scheduledExecutorService.schedule( new RunnableSample( summaryCounter , i ), waitSeconds ,  TimeUnit.SECONDS );
        }
        log.info("execScheduledExecutorService runnable");

        if( scheduledExecutorService.isTerminated() ){
            scheduledExecutorService.shutdown();
            log.info("execScheduledExecutorService ShutDown");
        }
        log.info("execScheduledExecutorService END");

    }
					
実行結果
pool-1-thread-1:id[ 0 ] summaryCounter:0
pool-1-thread-1:id[ 0 ] counter:2
pool-1-thread-1:id[ 0 ] summaryCounter:1
pool-1-thread-1:id[ 0 ] counter:3
pool-1-thread-1:id[ 0 ] summaryCounter:2
pool-1-thread-2:id[ 1 ] counter:1
pool-1-thread-2:id[ 1 ] summaryCounter:3
pool-1-thread-1:id[ 0 ] counter:4
pool-1-thread-1:id[ 0 ] summaryCounter:4
	:
	:
				

3秒間隔でタスクを実行しています。

newCachedThreadPool

newCachedThreadPool
  • 最初にタスクの要求数分スレッドを全て作ります。
  • タスクの要求数が無くなり、スレッドが60秒以上経過した場合には、スレッドのインスタンスは無くなります。
  • スレッドが次のタスクの要求を受け付ける時間が60秒未満の場合には、キャッシュされたスレッドを再利用されます。
  • スレッド数がタスク数分作られる点が、流量制御の観点ではnewFixedThreadPoolの方が制御し易いかと思います。

newCachedThreadPoolのサンプル

newCachedThreadPool実装例
    /**
     * cachedTreadPool
     * @param threadFactory
     */
    private void execCachedThreadPool( ThreadFactory threadFactory ){
        ExecutorService cachedThreadPool = null;
        log.info("execCachedThreadPool START");

        if( threadFactory == null ){
            cachedThreadPool = Executors.newCachedThreadPool( );
        } else {
            cachedThreadPool = Executors.newCachedThreadPool( threadFactory );
        }

        SummaryCounter summaryCounter = new SummaryCounter();

        for ( int i = 0 ; i < 3 ; i++  ) {
            cachedThreadPool.execute( new RunnableSample( summaryCounter , i ) );
        }

        try {
            //ここの間隔が、60秒以上だとThreadは常に作成される。
            Thread.sleep( 61000 );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for ( int i = 0 ; i < 3 ; i++  ) {
            cachedThreadPool.execute( new RunnableSample( summaryCounter , i ) );
        }
        log.info("execCachedThreadPool runnable");

        if( cachedThreadPool.isTerminated() ){
            cachedThreadPool.shutdown();
            log.info("execCachedThreadPool ShutDown");
        }
        log.info("execCachedThreadPool END");
    }
					
実行結果
pool-1-thread-2:id[ 1 ] counter:1
pool-1-thread-1:id[ 0 ] counter:1
pool-1-thread-3:id[ 2 ] counter:1
pool-1-thread-1:id[ 0 ] summaryCounter:0
pool-1-thread-2:id[ 1 ] summaryCounter:2
pool-1-thread-3:id[ 2 ] summaryCounter:1
	:
	:61秒後経過
pool-1-thread-5:id[ 1 ] counter:1
pool-1-thread-4:id[ 0 ] counter:1
pool-1-thread-6:id[ 2 ] counter:1
pool-1-thread-4:id[ 0 ] summaryCounter:0
pool-1-thread-5:id[ 1 ] summaryCounter:2
pool-1-thread-6:id[ 2 ] summaryCounter:1
					

スレッドの状況とタイミングによっては、スレッド1,2,3が出力されます。

まとめ

開発現場で利用されそうな主なExecutorを列記しました。Java8には、ForkJoinPoolという並列処理というよりは分散処理用の
機能も追加されています。
concurrentパッケージのExecutorsからExecutorServiceのインスタンスを利用して各種メソッドを確認して下さい。
マニュアルで独自のExecutorServiceやExecutorsではなく直接各種ExecutorServiceを実装したクラスを実装する場合には、
Java上級者の力が必要です。それ故に俗人化しやすい傾向があります。
concurrentパッケージを利用して、大規模システムの並列処理サービスを構築する際には、
上級者から開発者への指導とマニュアル化を必ず対応して下さい。

concurrentパッケージの各種クラスや他の利用方法については、本技術サイトの開発状況により充当していく予定です。