2013/11/29

ExecutorServiceでマルチスレッドで実行可能なタスクキューを使う

マルチスレッドシリーズその2。
その1はこちら→[Java]マルチスレッドでの排他処理を行うsynchronizedを理解する
前回と同じく理解するためにサンプルを Android で作ってみました。

参考
ExecutorService の使い方 - Java 入門(図や概念がわかりやすい!)
Java - ExecutorService の復習 - Qiita [キータ](動作の違いがわかりやすい!)
死刑執行中 ExecutorService 終了中 - 倭マン's BLOG
[Java]ExecutorServiceの正しい終了(shutdown)の仕方 | 大発見 | 大発見

サンプルコードは以下にあります。
ExecutorActivity.java | wada811/AndroidLibrary@wada811
AndroidLibrary ってなってるけどやっぱりコレは Android 関係ないです。

実行するタスク

名前と実行時間(sleep時間)を貰ってログを出力するだけのクラス。以後、共通で使います。
public class ExecutorRunnable implements Runnable {
    private String mName;
    private int mSeconds;

    public ExecutorRunnable(String name, int seconds) {
        mName = name;
        mSeconds = seconds;
    }

    @Override
    public void run(){
        LogUtils.d(mName + ": ThreadId: " + Thread.currentThread().getId());
        LogUtils.d(mName + ": start");
        try{
            TimeUnit.SECONDS.sleep(mSeconds);
        }catch(InterruptedException e){
            e.printStackTrace();
        }
        LogUtils.d(mName + ": end");
    }
}

Executors.newSingleThreadExecutor();

public void newSingleThreadExecutorTest(){
    LogUtils.d();
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.submit(new ExecutorRunnable("A", 1));
    executorService.submit(new ExecutorRunnable("B", 1));
    executorService.submit(new ExecutorRunnable("C", 1));
    executorService.submit(new ExecutorRunnable("D", 1));
}
SingleThread なのでタスクをいくつ追加しても一つのスレッドで実行します。追加した順に実行されます。
実行中だったら待たされます。実行ログからもそのことがわかります。
11-27 23:50:49.697: [ExecutorActivity#newSingleThreadExecutorTest:108]
11-27 23:50:49.707: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27824
11-27 23:50:49.707: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-27 23:50:50.708: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-27 23:50:50.708: [ExecutorActivity$ExecutorRunnable#run:299]B: ThreadId: 27824
11-27 23:50:50.718: [ExecutorActivity$ExecutorRunnable#run:300]B: start
11-27 23:50:51.719: [ExecutorActivity$ExecutorRunnable#run:306]B: end
11-27 23:50:51.719: [ExecutorActivity$ExecutorRunnable#run:299]C: ThreadId: 27824
11-27 23:50:51.719: [ExecutorActivity$ExecutorRunnable#run:300]C: start
11-27 23:50:52.720: [ExecutorActivity$ExecutorRunnable#run:306]C: end
11-27 23:50:52.720: [ExecutorActivity$ExecutorRunnable#run:299]D: ThreadId: 27824
11-27 23:50:52.730: [ExecutorActivity$ExecutorRunnable#run:300]D: start
11-27 23:50:53.731: [ExecutorActivity$ExecutorRunnable#run:306]D: end

Executors.newFixedThreadPool(int);

public void newFixedThreadPoolTest(){
    LogUtils.d();
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    executorService.submit(new ExecutorRunnable("A", 1));
    executorService.submit(new ExecutorRunnable("B", 1));
    executorService.submit(new ExecutorRunnable("C", 1));
    executorService.submit(new ExecutorRunnable("D", 1));
}
引数のスレッド数で実行します。今回は2スレッドで実行します。追加した順に実行されます。
実行ログからも A, B と実行され、 A が終わったら C、B が終わったら D と実行されていることがわかります。
11-27 23:56:04.974: [ExecutorActivity#newFixedThreadPoolTest:124]
11-27 23:56:04.984: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27825
11-27 23:56:04.994: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-27 23:56:05.024: [ExecutorActivity$ExecutorRunnable#run:299]B: ThreadId: 27826
11-27 23:56:05.034: [ExecutorActivity$ExecutorRunnable#run:300]B: start
11-27 23:56:05.995: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-27 23:56:05.995: [ExecutorActivity$ExecutorRunnable#run:299]C: ThreadId: 27825
11-27 23:56:05.995: [ExecutorActivity$ExecutorRunnable#run:300]C: start
11-27 23:56:06.035: [ExecutorActivity$ExecutorRunnable#run:306]B: end
11-27 23:56:06.035: [ExecutorActivity$ExecutorRunnable#run:299]D: ThreadId: 27826
11-27 23:56:06.045: [ExecutorActivity$ExecutorRunnable#run:300]D: start
11-27 23:56:07.006: [ExecutorActivity$ExecutorRunnable#run:306]C: end
11-27 23:56:07.046: [ExecutorActivity$ExecutorRunnable#run:306]D: end

Executors.newCachedThreadPool();

public void newCachedThreadPoolTest(){
    LogUtils.d();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.submit(new ExecutorRunnable("A", 1));
    executorService.submit(new ExecutorRunnable("B", 1));
    executorService.submit(new ExecutorRunnable("C", 1));
    executorService.submit(new ExecutorRunnable("D", 1));
}
必要な分だけ(※)スレッドを生成して実行します。
(※ 内部の処理を見ると Integer.MAX_VALUE とあるので限界は int の最大値?)
タスクが何個 submit されようと必要に応じてスレッドを生成して同時に実行していることがわかります。
11-28 00:00:09.593: [ExecutorActivity#newCachedThreadPoolTest:140]
11-28 00:00:09.603: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27829
11-28 00:00:09.623: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:00:09.623: [ExecutorActivity$ExecutorRunnable#run:299]B: ThreadId: 27830
11-28 00:00:09.633: [ExecutorActivity$ExecutorRunnable#run:300]B: start
11-28 00:00:09.643: [ExecutorActivity$ExecutorRunnable#run:299]C: ThreadId: 27831
11-28 00:00:09.643: [ExecutorActivity$ExecutorRunnable#run:300]C: start
11-28 00:00:09.653: [ExecutorActivity$ExecutorRunnable#run:299]D: ThreadId: 27832
11-28 00:00:09.653: [ExecutorActivity$ExecutorRunnable#run:300]D: start
11-28 00:00:10.614: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-28 00:00:10.634: [ExecutorActivity$ExecutorRunnable#run:306]B: end
11-28 00:00:10.634: [ExecutorActivity$ExecutorRunnable#run:306]C: end
11-28 00:00:10.654: [ExecutorActivity$ExecutorRunnable#run:306]D: end

Executors.newScheduledThreadPool(int);

public void newScheduledThreadPoolTest(){
    LogUtils.d();
    ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
    executorService.schedule(new ExecutorRunnable("A", 1), 1, TimeUnit.SECONDS);
    executorService.schedule(new ExecutorRunnable("B", 1), 1, TimeUnit.SECONDS);
    executorService.schedule(new ExecutorRunnable("C", 1), 1, TimeUnit.SECONDS);
    executorService.schedule(new ExecutorRunnable("D", 1), 1, TimeUnit.SECONDS);
}
引数のスレッド数で実行します。今回は2スレッドで実行します。
実行タイミングを設定できますが同じタイミングを設定した場合は実行順が前後することがあります。
というか実行タイミング依存なので実行順は追加順に依存しないということだと思います。
設定のスレッド数に達したら待たされていることがわかります。
11-28 00:04:23.701: [ExecutorActivity#newScheduledThreadPoolTest:156]
11-28 00:04:24.712: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27833
11-28 00:04:24.722: [ExecutorActivity$ExecutorRunnable#run:299]B: ThreadId: 27834
11-28 00:04:24.722: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:04:24.722: [ExecutorActivity$ExecutorRunnable#run:300]B: start
11-28 00:04:25.733: [ExecutorActivity$ExecutorRunnable#run:306]B: end
11-28 00:04:25.733: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-28 00:04:25.743: [ExecutorActivity$ExecutorRunnable#run:299]D: ThreadId: 27834
11-28 00:04:25.753: [ExecutorActivity$ExecutorRunnable#run:299]C: ThreadId: 27833
11-28 00:04:25.753: [ExecutorActivity$ExecutorRunnable#run:300]C: start
11-28 00:04:25.753: [ExecutorActivity$ExecutorRunnable#run:300]D: start
11-28 00:04:26.754: [ExecutorActivity$ExecutorRunnable#run:306]D: end
11-28 00:04:26.754: [ExecutorActivity$ExecutorRunnable#run:306]C: end

Executors.newSingleThreadScheduledExecutor(); で ScheduledExecutorService#schedule(Runnable, long, TimeUnit) の実行タイミングが被らないパターン

public void newSingleThreadScheduledExecutorTest(){
    LogUtils.d();
    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    executorService.schedule(new ExecutorRunnable("A", 1), 8, TimeUnit.SECONDS);
    executorService.schedule(new ExecutorRunnable("B", 1), 4, TimeUnit.SECONDS);
    executorService.schedule(new ExecutorRunnable("C", 1), 0, TimeUnit.SECONDS);
    executorService.schedule(new ExecutorRunnable("D", 1), 12, TimeUnit.SECONDS);
}
SingleThread なのでタスクをいくつ追加しても一つのスレッドで実行します。
設定した実行タイミングの順で実行されていることがわかるようにしました。
11-28 00:10:40.248: [ExecutorActivity#newSingleThreadScheduledExecutorTest:172]
11-28 00:10:40.278: [ExecutorActivity$ExecutorRunnable#run:299]C: ThreadId: 27847
11-28 00:10:40.278: [ExecutorActivity$ExecutorRunnable#run:300]C: start
11-28 00:10:41.289: [ExecutorActivity$ExecutorRunnable#run:306]C: end
11-28 00:10:44.262: [ExecutorActivity$ExecutorRunnable#run:299]B: ThreadId: 27847
11-28 00:10:44.262: [ExecutorActivity$ExecutorRunnable#run:300]B: start
11-28 00:10:45.273: [ExecutorActivity$ExecutorRunnable#run:306]B: end
11-28 00:10:48.256: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27847
11-28 00:10:48.266: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:10:49.267: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-28 00:10:52.260: [ExecutorActivity$ExecutorRunnable#run:299]D: ThreadId: 27847
11-28 00:10:52.270: [ExecutorActivity$ExecutorRunnable#run:300]D: start
11-28 00:10:53.271: [ExecutorActivity$ExecutorRunnable#run:306]D: end

Executors.newSingleThreadScheduledExecutor(); で ScheduledExecutorService#schedule(Runnable, long, TimeUnit) の実行タイミングが被るパターン

public void newSingleThreadScheduledExecutorDuringExecutionTest(){
    LogUtils.d();
    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    executorService.schedule(new ExecutorRunnable("A", 1), 1, TimeUnit.SECONDS);
    executorService.schedule(new ExecutorRunnable("B", 1), 1, TimeUnit.SECONDS);
    executorService.schedule(new ExecutorRunnable("C", 1), 1, TimeUnit.SECONDS);
    executorService.schedule(new ExecutorRunnable("D", 1), 1, TimeUnit.SECONDS);
}
SingleThread なのでタスクをいくつ追加しても一つのスレッドで実行します。
実行タイミングがかぶったら待たされます。
11-28 00:14:52.637: [ExecutorActivity#newSingleThreadScheduledExecutorDuringExecutionTest:188]
11-28 00:14:53.648: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27848
11-28 00:14:53.648: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:14:54.659: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-28 00:14:54.669: [ExecutorActivity$ExecutorRunnable#run:299]B: ThreadId: 27848
11-28 00:14:54.669: [ExecutorActivity$ExecutorRunnable#run:300]B: start
11-28 00:14:55.680: [ExecutorActivity$ExecutorRunnable#run:306]B: end
11-28 00:14:55.690: [ExecutorActivity$ExecutorRunnable#run:299]C: ThreadId: 27848
11-28 00:14:55.690: [ExecutorActivity$ExecutorRunnable#run:300]C: start
11-28 00:14:56.701: [ExecutorActivity$ExecutorRunnable#run:306]C: end
11-28 00:14:56.701: [ExecutorActivity$ExecutorRunnable#run:299]D: ThreadId: 27848
11-28 00:14:56.711: [ExecutorActivity$ExecutorRunnable#run:300]D: start
11-28 00:14:57.712: [ExecutorActivity$ExecutorRunnable#run:306]D: end

Executors.newSingleThreadScheduledExecutor(); で ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit) の実行タイミングを過ぎないパターン

public void newSingleThreadScheduledExecutorAtFixedRateTest(){
    LogUtils.d();
    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    executorService.scheduleAtFixedRate(new ExecutorRunnable("A", 1), 1, 2, TimeUnit.SECONDS);
    shutdown(executorService); // 8秒後に終了します
}
一つのスレッドで実行します。実行タイミングを設定できます。繰り返し実行でき、実行間隔を設定できます。
実行間隔は実行開始から次の実行開始までの時間です。
実行間隔 が 2 秒で、実行 1 秒なので終了してから 1 秒で実行されます。
11-28 00:19:44.952: [ExecutorActivity#newSingleThreadScheduledExecutorAtFixedRateTest:205]
11-28 00:19:45.963: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27850
11-28 00:19:45.963: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:19:46.974: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-28 00:19:47.965: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27850
11-28 00:19:47.965: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:19:48.966: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-28 00:19:49.967: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27850
11-28 00:19:49.967: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:19:50.968: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-28 00:19:51.969: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27850
11-28 00:19:51.969: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:19:52.970: [ExecutorActivity$1#run:273]shutdown
11-28 00:19:52.980: [ExecutorActivity$ExecutorRunnable#run:306]A: end

Executors.newSingleThreadScheduledExecutor(); で ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit) の実行タイミングを過ぎるパターン

public void newSingleThreadScheduledExecutorAtFixedRateDuringExecutionTest(){
    LogUtils.d();
    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    executorService.scheduleAtFixedRate(new ExecutorRunnable("A", 3), 1, 2, TimeUnit.SECONDS);
    shutdown(executorService); // 8秒後に終了します
}
一つのスレッドで実行します。実行タイミングを設定できます。繰り返し実行でき、実行間隔を設定できます。
実行間隔は実行開始から次の実行開始までの時間です。
実行に 3 秒かかるのに実行間隔を 2 秒にした場合、次の実行開始タイミングは過ぎているのですぐに実行します。
11-28 00:23:59.431: [ExecutorActivity#newSingleThreadScheduledExecutorAtFixedRateDuringExecutionTest:221]
11-28 00:24:00.442: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27851
11-28 00:24:00.442: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:24:03.444: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-28 00:24:03.454: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27851
11-28 00:24:03.454: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:24:06.467: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-28 00:24:06.477: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27851
11-28 00:24:06.477: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:24:07.438: [ExecutorActivity$1#run:273]shutdown
11-28 00:24:08.449: [ExecutorActivity$1#run:276]shutdownNow
11-28 00:24:08.499: [ExecutorActivity$ExecutorRunnable#run:306]A: end

Executors.newSingleThreadScheduledExecutor(); で ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit) の実行タイミングを被らないパターン

public void newSingleThreadScheduledExecutorWithFixedDelayTest(){
    LogUtils.d();
    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    executorService.scheduleWithFixedDelay(new ExecutorRunnable("A", 1), 1, 2, TimeUnit.SECONDS);
    shutdown(executorService); // 8秒後に終了します
}
一つのスレッドで実行します。実行タイミングを設定できます。繰り返し実行でき、実行間隔を設定できます。
実行間隔は実行終了から次の実行開始までの時間です。
1 秒のタスクを実行終了してから 2 秒後に次の実行が開始されていることがわかります。
11-28 00:31:25.436: [ExecutorActivity#newSingleThreadScheduledExecutorWithFixedDelayTest:236]
11-28 00:31:26.447: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27852
11-28 00:31:26.447: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:31:27.458: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-28 00:31:29.470: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27852
11-28 00:31:29.470: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:31:30.481: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-28 00:31:32.483: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27852
11-28 00:31:32.483: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:31:33.443: [ExecutorActivity$1#run:273]shutdown
11-28 00:31:33.494: [ExecutorActivity$ExecutorRunnable#run:306]A: end

Executors.newSingleThreadScheduledExecutor(); で ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit) の実行タイミングが被るパターン

public void newSingleThreadScheduledExecutorWithFixedDelayDuringExecutionTest(){
    LogUtils.d();
    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    executorService.scheduleWithFixedDelay(new ExecutorRunnable("A", 3), 1, 2, TimeUnit.SECONDS);
    shutdown(executorService);
}
一つのスレッドで実行します。実行タイミングを設定できます。繰り返し実行でき、実行間隔を設定できます。
実行間隔は実行終了から次の実行開始までの時間なので実行タイミングが被ることがないです。
3 秒のタスクを実行終了してから 2 秒後に次の実行が開始されていることがわかります。
11-28 00:34:05.822: [ExecutorActivity#newSingleThreadScheduledExecutorWithFixedDelayDuringExecutionTest:252]
11-28 00:34:06.833: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27853
11-28 00:34:06.833: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:34:09.846: [ExecutorActivity$ExecutorRunnable#run:306]A: end
11-28 00:34:11.848: [ExecutorActivity$ExecutorRunnable#run:299]A: ThreadId: 27853
11-28 00:34:11.858: [ExecutorActivity$ExecutorRunnable#run:300]A: start
11-28 00:34:13.840: [ExecutorActivity$1#run:273]shutdown
11-28 00:34:14.851: [ExecutorActivity$1#run:276]shutdownNow
11-28 00:34:14.901: [ExecutorActivity$ExecutorRunnable#run:306]A: end

おまけ

上の ScheduledExecutorService#scheduleAtFixedRate と
ScheduledExecutorService#scheduleWithFixedDelay ではタスクを一つしか与えていないが、
これをそれぞれもう一つ initialDelay を1秒ずらして追加すると
どちらも実行終了を待たされて次の実行タイミングが来てすぐに実行するようになってしまいます。

Executors.newScheduledThreadPool(int); ではどうなる?

上の ScheduledExecutorService#scheduleAtFixedRate と
ScheduledExecutorService#scheduleWithFixedDelay で ExecutorService の生成を
Executors.newScheduledThreadPool(int); で行った場合は、
繰り返される同一のタスクは一つのスレッドで固定で、タスクの追加ごとにスレッドが分かれるようになります。
実行タイミングが過ぎたり、被ったりすると実行タイミングが来てすぐに実行するようになってしまいます。
ちゃんと等間隔で実行したかったら一つの ExecutorService に一つのタスクが良いかと思います。

ExecutorService の終了の仕方

try{
    LogUtils.d("shutdown");
    executorService.shutdown();
    if(!executorService.awaitTermination(1, TimeUnit.SECONDS)){
        LogUtils.d("shutdownNow");
        executorService.shutdownNow();
    }
}catch(InterruptedException e){
    e.printStackTrace();
    LogUtils.d("shutdownNow: " + e.getMessage());
    executorService.shutdownNow();
}
ExecutorService#shutdown で終了命令を出して新規タスクの追加の禁止して、
実行中のタスクが終了せずに ExecutorService#awaitTermination でタイムアウト時間を過ぎたら
ExecutorService#shutdownNow で実行中のタスクをキャンセルするといった流れです。
shutdownNow が呼ばれると ExecutorRunnable の sleep で InterruptedException が投げられますが
ExecutorService#shutdownNow が直接的にタスクの実行を強制終了するわけではないようです。

まとめ

長くなりましたが ExecutorService の使い方をまとめました。
ExecutorService はかなり強力なマルチスレッドの実行キューの便利クラスなのでぜひ使いこなしたいですね!

タグ(RSS)