スレッドについて

前頁までOSのスレッドの概念やJVMの概要をご理解頂けたかと思います。

スレッドとは、OSのJavaコマンドなどのプロセスをCPUのコアに要求する最小の処理要求です。
Javaのスレッドは、OSのマルチスレッド(複数のスレッドによる並列処理)を実現するために、
JDK1.0からRunnableインターフェイス、ThreadクラスがJavaAPIが用意されています。

Java5からjava.util.concurrentパッケージとして、並列処理を管理及び実装し易いAPIがリリースされました。
まずは、Runnableインターフェイス、Threadクラスを学び、最終的にはconcurrentパッケージを利用した並列処理を学習します。

スレッドのライフサイクル

スレッドの状態遷移について説明します。

Threadクラスの状態遷移は、java5よりThread.State列挙型が導入され、
同時にThraedクラスのメンバーメソッドにgetState()もリリースされました。

thread_states
状態 和名 説明 遷移元 遷移先
NEW 初期状態 スレッドオブジェクトのインスタンスが生成された状態です。
スレッドはまだ起動していません。
startメソッドでスレッドが起動します。
- RUNNABLE
RUNNABLE 実行中 Java仮想マシンで実行されているスレッドの状態です。
runメソッドの実行が終了した場合には、TERMINATEDに遷移します。
ほかのスレッドが特定のアクションを実行するのを無期限に待機しているときにWAITに遷移します。
sleep( long )メソッドを実行するとTIME_WAITIINGに遷移します。
スレッドがI/O待ち又は、同期処理(synchronized)で、
前のスレッドの終了待ち状態にBLOCKEDに遷移します。
runメソッドの処理が全て完了した場合にTERMINATEDに遷移します。
NEW WAITING
TIMED_WAITING
TERMINATED
BLOCKED
WAITING 待機状態 ほかのスレッドが特定のアクションを実行するのを無期限に待機している状態です。
他のスレッドのnotify,notifyAllメソッドが実行されることにより、RUNNABLEに遷移します。
RUNNABLE RUNNABLE
TIMED_WAITING 一定時間待機状態 指定された待機時間の待機状態です。
指定された待機時間の経過によりRUNNABLEに戻ります。
自スレッドへのnotifyによる再開,
又はnotifyAllによる全てのスレッドへの再開時RUNNABLEに戻ります。
RUNNABLE RUNNABLE
BLOCKED ブロック状態 スレッドがI/O待ち又は、同期処理(synchronized)で他のアクション待ちの状態です。
I/O待ちの解消又は、同期処理で自スレッドがロックできた場合にRUNNABLEに戻ります。
RUNNABLE RUNNABLE
TEMINATED 終了状態 runメソッドの実行処理が全て終了した状態です。 RUNNABLE -

マルチスレッド

1つのjavaプロセス内でmainスレッド以外の1つのスレッド(シングルスレッド)を操作する場合や、
複数のスレッド(マルチスレッド)でも操作するインスタンスが別々であった場合には、
他のスレッドの状態を開発者が意識する必要はありません。

マルチスレッドで同じインスタンスを操作する場合や、因果関係のあるオブジェクトの操作を行う場合には、
同期処理(synchronized)による排他処理を行う必要があります。

Threadクラスによる実装

java.lang.Threadクラスを利用した非同期処理の実装方法について説明します。
処理の実装はとても簡単で、以下の手順で実行できます。

  1. Threadクラスを継承したクラスを作成する。
  2. run()メソッドをオーバライドして、機能を実装する。
  3. 1.で作成したクラスのインスタンスをnewで生成する。
  4. 2.で作成したクラスのrun()メソッドを実行する。

java.lang.Threadクラスは、java.langパッケージのため明示してimport節に定義する必要はありません。

Threadクラスによる実装例[スレッドセーフでない]
package jp.co.yourcompany.education.thread;

/**
 * Thread クラスを学習するためのサンプル
 * @raita.kuwabara
 */
public class SampleThread extends Thread {

    /**
     * スレッドサンプルクラスの共通カウンター
     * this is not thread-safe.
     */
    private static int summaryCounter = 0;

    /**
     * スレッドの停止時間(1000ミリ秒 = 1秒)
     */
    private static final long SLEEP_MILLIONS= 1000;

    /**
     * スレッド別カウンター
     */
    private int counter = 0;

    /**
     * スレッドのコンストラクタ
     * @param threadName スレッド名
     */
    public SampleThread( String threadName ){
        super( threadName );
    }

    @Override
    public void run(){
        try {
            for( int i = 0 ; i < 5 ; i++ ){
                sleep( SLEEP_MILLIONS );
                increment();
                outputCounter();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * それぞれのカウントをインクリメント
     */
    private void increment(){
        counter++;
        summaryCounter++;
    }

    /**
     * 出力
     */
    private void outputCounter(){
        System.out.println( getName() + " counter:" + counter);
        //this is not thread-safe
        System.out.println( getName() + " summaryCounter:" + summaryCounter);
    }

    /**
     * javaコマンドから起動されるmainメソッド
     */
    public static final void main( String[] args ){

        SampleThread thread1 = new SampleThread("Thread01");
        SampleThread thread2 = new SampleThread("Thread02");
        thread1.start();
        thread2.start();
    }
}
					

この処理は、runメソッドでループを5回実行します。
ループ内で1秒待機し、メンバー変数とクラス変数をインクリメントし値を出力しています。

実行結果
Thread02 counter:1
Thread01 counter:1
Thread02 summaryCounter:2
Thread01 summaryCounter:2
Thread02 counter:2
Thread01 counter:2
Thread01 summaryCounter:3
Thread02 summaryCounter:3
Thread02 counter:3
Thread01 counter:3
Thread01 summaryCounter:4
Thread02 summaryCounter:4
Thread02 counter:4
Thread01 counter:4
Thread01 summaryCounter:5
Thread02 summaryCounter:5
Thread01 counter:5
Thread02 counter:5
Thread02 summaryCounter:6
Thread01 summaryCounter:6
				

出力結果は、必ずしもこのように出力されるとは限りません。

非同期処理は、各Threadがそれぞれ別々のタイミングで実行されるので、
CPUのコアの利用状況により、ThreadクラスがCPUへ処理要求を実行するタイミングが異なります。

また、このサンプルに敢えて非同期処理によるスレッドセーフでないクラスメソッドを利用しています。
JVMの概要から、クラスのクラスメソッド(staticメソッド)は、メソッド領域に格納され、
全てのスレッドからアクセス可能な状態になっています。

上記例でsummaryCounterが示したい要件は、SampleThread全インスタンスのループ数(待機回数)です。
出力結果を見ると残念ながら、スレッドのタイミングがずれて10になるときもあれば、
タイミングがあって、10より小さい値にもなります。

このような当初予定していた要件が非同期処理によって、データの一意性が確保できていない事をスレッドセーフではないと呼びます。

この現象が起きた原因は、2つあります。

  • クラスメソッド(static)を用いている
  • 同期処理(synchronzied)による排他処理を実行していない
複数のスレッドで操作するフィールドは、クラスフィールド(static field)を避ける。
構文上スレッドで操作できるが、実装の間違いが起きやすい。

synchronizedブロック

synchronizedブロック構文を利用して、前例のプログラムのsummaryCounter部分を改修します。

改善用カウンタークラス
package jp.co.yourcompany.education.thread;

/**
 * 非同期処理用学習クラス
 */
public class SummaryCounter {
    private int counter = 0;

    public void inclrement(){
        counter++;
    }
    public int getCounter(){
        return counter;
    }
}
					
synchronized実装例
package jp.co.yourcompany.education.thread;

/**
 * Thread クラスを学習するためのサンプル
 * @raita.kuwabara
 */
public class SampleThread extends Thread {

    /**
     * スレッドの停止時間(1000ミリ秒 = 1秒)
     * ここで、スレッドのsleep時間を調整できます。
     */
    private static final long SLEEP_MILLIONS= 1;

    /**
     * スレッド別カウンター
     */
    private int counter = 0;

    /**
     * カウンターオブジェクト
     */
    private SummaryCounter summaryCounter = null;

    /**
     * スレッドのコンストラクタ
     * @param threadName スレッド名
     * @param 総計用カウンターオブジェクト
     */
    public SampleThread( String threadName  , SummaryCounter summaryCounter){
        super( threadName );
        this.summaryCounter = summaryCounter;
    }

    @Override
    public void run(){
        try {
            for( int i = 0 ; i < 5 ; i++ ){
                sleep( SLEEP_MILLIONS );
                increment();
                summaryIncrement();
                outputCounter();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * それぞれのカウントをインクリメント
     */
    private void increment(){
        counter++;
    }

    /**
     * SampleThreadクラスの全インスタンスのループ総数をインクリメントする。
     * インクリメントした数を出力する。
     */
    private  void summaryIncrement(){
        synchronized( summaryCounter ){
            summaryCounter.inclrement();
            System.out.println( getName() + " summaryCounter:" + summaryCounter.getCounter());
        }
    }

    /**
     * 出力
     */
    private void outputCounter(){
        System.out.println( getName() + " counter:" + counter);
    }


    /**
     * javaコマンドから起動されるmainメソッド
     */
    public static final void main( String[] args ){
        SummaryCounter summaryCounter = new SummaryCounter();
        int threadCount = 100;
        SampleThread[] threadList = new SampleThread[threadCount];
        for( int i = 0 ; i < threadCount ; i ++){
            threadList[ i ] = new SampleThread("Thread" + i , summaryCounter );
            threadList[ i ].start();
        }
    }
}
					

メソッドの競合が発生しやすいように変更した点は、以下になります。

  • スレッド数を100に増やす
  • メソッドの競合がが発生しやすいようにsleepを1秒から1ミリ秒に変更

そして、全スレッドのループ数を正しくカウントするために,以下の変更を実施します。

  • カウント用オブジェクトを作成(SummaryCounter)
  • SampleThreadのコンストラクタにSummaryCounterクラスを追加
  • SampleThreadのメンバーフィールドにSummaryCounterクラスを追加
  • SummaryCounterインスタンスを1つ生成し、SampleThreadクラス生成時に引き渡す。
  • SummaryCounterの操作がインクリメントと出力で1つのメソッドに寄せられるので寄せる。
  • 寄せられたメソッド内でsynchronizedブロック構文を利用してSummaryCounterインスタンスを同期化する。

上記サンプルは、CPUのコア数/スレッド数やThreadクラス数、待機時間に関係なく、
summaryCounterオブジェクトインスタンス1つに対して同期処理を実施しているため、
summaryCounterの最終出力値は、必ずThreadクラス数×run()内のループ数になります。

JVMで起きている事

上記サンプルでJVMで起きている事象の概要図を以下に示します。

thread_states
  • メソッド領域(Method Area)に2つのクラス情報がロードされます。
  • SampleThreadのインスタンス生成により、ヒープ領域に100個のSampleThreadオブジェクトの領域が確保されます。
  • SummaryCounterのインスタンス生成によりヒープ領域に1つのSummaryCounterオブジェクトの領域が確保されます。
  • ループ中の各SampleThreadインスタンスのstartメソッドにより、JVMStackに100個のスタックエントリーが作成されます。
  • SampleThreadインスタンスのrunメソッドにより、スタックエントリーに各種メソッド用のframeが生成されます。
  • 1つのスレッドがSummaryCounterにアクセスします。
  • このときSummaryCounterインスタンスにsynchronizedされているので、オブジェクトがロックされます。
  • ロックしているスレッド以外のスレッドがWAIT状態になります。
  • summaryIncrementメソッドが終了するとそのメソッドがSummaryCounterインスタンスに対するロック状態を解放します。
  • 解放した瞬間他のWAIT状態のスレッドの1つがSummaryCounterインスタンスに対してロックします。
  • このロックの取得、解放によりSummaryCounterオブジェクトのデータの一意性が保証されます。

synchronizedメソッド

synchronizedメソッドは、オブジェクトに対して排他処理を実行するのではなくメソッドに対して排他処理を行います。

カウンタークラス(synchronizedメソッド)
package jp.co.yourcompany.education.thread;

public class SummaryCounter {
    private int counter = 0;

    public synchronized void inclrement(){
        counter++;
    }

    public int getCounter(){
        return counter;
    }
}
					
SampleThread改修箇所
    private  void summaryIncrement(){
        summaryCounter.inclrement();
        System.out.println( getName() + " summaryCounter:" + summaryCounter.getCounter());
    }
						

この改修により、synchronizedブロックと同じ挙動になるように思えます。
summaryCounterのメンバーフィールドの値countは同期処理により、値の整合性は維持されます。
残念ながら、出力情報(スレッド名 summaryCounter: + summaryCounter.getCounter())の出力値は、幾つか同じ値が出力されます。
getCounter()もsynchronizedにしても同じ状況が起こります。

以下のような事象がスレッドのタイミングにより発生しているからです。

メソッド同期の図

synchronized纏め

synchronizedブロックによるオブジェクトの同期処理は、オブジェクトに対する全ての操作を排他しているため、
1つのスレッドが操作対象のオブジェクトに対して1連の操作の整合性を保つのには優れています。
しかしながら、オブジェクトに対する全てのメソッドを排他するので、他スレッドの待ち状態が発生し易くなります。

synchronizedメソッドによるメソッドの同期処理は、メソッドに対する操作を排他しているため、
1つのスレッドがオブジェクトのデータの整合性のみを保証するのには、synchronizedブロックより他のスレッドの待ち状態が少なくなります。
しかしながら、1つのオブジェクトに対して、複数のメソッドを操作して処理を実行する場合には、要件の整合性を保ち難いデメリットがあります。