Java で簡単マルチスレッドプログラミング

仕事でマルチスレッドを扱うようになったので備忘録として Executors 関連のメモを残しておく。
あるタスクを別スレッドで実行したい時、Executors クラスを使えば自前でスレッドの管理をすること無く簡単に並列処理を行えるようになる。
もちろん完全に簡単になったわけではなく、マルチスレッドプログラミングの困難さは健在だが、自前でスレッドの管理をしない分バグを仕込む可能性も減るだろう。


はじめに

Executors はタスクと呼ばれる処理の最小単位を別スレッドで実行する仕組みである。マルチスレッドなのでシングルスレッドに比べると複雑性は若干上がるものの、通常のマルチスレッドプログラミングに比べれば少ない複雑性、高い安全性を備える。そして何より、パフォーマンスの向上が期待できる。

この仕組みはジョブキューモデルそのものである。ジョブはタスク、キューは後述する Executor に相当する。

マルチスレッドとは言っても、ひとつのインスタンス複数のスレッドから参照しないよう設計することは容易であり、レースコンディションやデッドロックを避けやすい実装が行える。


タスクの定義

タスクを定義する。これはメインスレッドとは別で実行する処理で、このタスク自体はスレッドを扱う事は無い。
Callable インターフェースを実装するのみ。Callable の型パラメーターTが call されたときの返り値になる。この call は Runnable で言う run メソッドに相当する。

このタスクはシンプルなタスクで、3秒間自分のスレッドを止めたあとに "task executed!" を表示し、"success" を返す。

public class Task implements Callable<String> {

    @Override
    public String call() throws Exception {
        Thread.sleep(3000);
        System.out.println("task executed!");
        return "success";
    }
}

タスクの実行

では定義したタスクを実行する。実行するには ExecutorService を使う。ExecutorService は、処理を依頼(submit)されたら新規スレッドを立ち上げて別スレッドで実行する機能を持ったサービスだ。

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService service = Executors.newCachedThreadPool();

    System.out.println("task start");

    // タスクの依頼
    service.submit(new Task());

    System.out.println("task end");
    service.shutdown();
}

// 実行結果
task start
task end
(3秒経過)
task executed!

見ての通り、submit タスクが別スレッドで実行されている。もし同じスレッドで実行されたなら、実行結果は下記のようになるはずだ。すなわち、"task executed!" が表示されたあとに "task end" が表示されることになる。

task start
(3秒経過)
task executed!
task end

しかしそうはなっていない。つまり ExecutorService は別スレッドでタスクを実行したのだ。


タスクの戻り値を受け取る

さてここから、過去によく見られた Thread + Runnable の組み合わせでは実現できなかったことを行う。タスク実行後、タスクの戻り値を取得したい。この service.submit メソッドは Future というクラスのインスタンスを返し、そのインスタンスの get メソッドを呼ぶと戻り値が得られる。なぜ直接 submit の返り値がタスクの返り値を表さないのか?それはタスクが別スレッドであるため、一度別オブジェクトを介さなければ同期的に取得できないからだ。

ExecutorService と Future の組み合わせは恐ろしく便利で、Future オブジェクト取得後に get メソッドを呼ぶとタスクの処理が終わるまでメインスレッドをブロックし、タスクが終了してから結果が返される。

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService service = Executors.newCachedThreadPool();

    System.out.println("task start");
    Future<String> future = service.submit(new Task());
    System.out.println(future.get());
    System.out.println("task end");

    service.shutdown();
}

// 実行結果
task start
(3秒経過)
task executed!
success
task end

以上、一見同期的にタスクが実行できたことがわかる。これは極めて便利だ。スレッドの停止や同期等を自前で処理することが無く、Thread という単語がひとつも現れなくともプログラマはタスクを非同期的に実行できたり、同期するまで待つことができたりするのだ。
さらに、コードを注意深く見るとわかるように Future の get メソッドの返り値はパラメタライズされている。この型は、Callable に指定した型と同一である。つまり、型安全なのだ!


タスクの複数実行

ではタスクを複数実行してみよう。ここでは ExecutorService を newFixedThreadPool というメソッドから取得している。これは、タスクを指定した数だけのスレッドを立ち上げて並列実行してくれるサービスだ。もしスレッドの数を指定できなかった場合、システムの安定性が損なわれるほどのスレッドが立ち上がってしまう可能性がある。そうでなくとも、コンテキストスイッチのコストが無視できなくレベルまでスレッドが立ち上がってしまうかもしれない。FixedThreadPool はそのような事態を避けることが大変簡単にできる。

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService service = Executors.newFixedThreadPool(3);

    System.out.println("task start");
    service.submit(new Task());
    service.submit(new Task());
    service.submit(new Task());
    service.submit(new Task());
    System.out.println("task end");

    service.shutdown();
}

// 実行結果
task start
task end
(3秒経過)
task executed!
task executed!
task executed!
(3秒経過)
task executed!

当然、いずれかのタスクの戻り値 Future を取得し、get メソッドを呼べばそのタスクが終了するまでメインスレッドがブロックされる。
例えば、最後のタスクの Future を取得して実験してみよう。

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService service = Executors.newFixedThreadPool(3);

    System.out.println("task start");
    service.submit(new Task());
    service.submit(new Task());
    service.submit(new Task());
    Future<String> future = service.submit(new Task()); // このタスクの結果を得る
    System.out.println(future.get()); // ここでメインスレッドがブロックする
    System.out.println("task end");

    service.shutdown();
}

// 実行結果
task start
(3秒経過)
task executed!
task executed!
task executed!
(3秒経過)
task executed!
success
task end

期待通り、"task end" が最後に表示されている。これは future.get メソッドでブロックされたことを表す。


タスクに引数を渡す

今回の Task クラスに少々手を加えれば、外部から引数を渡すことができる。そのような修正は簡単だ。

public class Task implements Callable<String> {

    int param; // パラメーターを保持
    public Task(int param) {
        this.param = param;
    }

    @Override
    public String call() throws Exception {
        Thread.sleep(3000);
        System.out.println("task executed!"+this.param);
        return "success";
    }
}

ここで注意点がひとつ。上記のような int 型なら問題ないのだが、オブジェクトをパラメーターとして渡した場合はメインスレッドとタスクスレッドから参照される。この時、メインスレッドでパラメーターとして渡したあと、タスク処理中にパラメーターの状態を変更してはならない。これをやってしまうとバグが発生する。また、パラメーターとして渡したオブジェクト内でスレッドの操作を行ってはならない。デッドロックが発生する可能性がある。


まとめ

レースコンディションやデッドロックの可能性を完全に取り除く事はできない。また、マルチスレッドは不可解なバグを生み出しやすい。それを理解した上で、注意しながらこのライブラリを使えばマルチスレッドプログラミングの強力な味方になる。


増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編

増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編