POSIXスレッドにて複数スレッドのタイミング制御をまとめて行う(セマフォと条件変数を使う場合)

 POSIXスレッドを使用する場合で、複数のスレッドを一斉にブロック解除させる方法についてまとめます。

 今回は、割と課題になりがちな、スレッドの開始制御について扱います。POSIXでは、この開始時におけるタイミング同期の一斉制御は少し面倒になります。pthread_create実行からスレッド内の処理が開始するまでのタイミングが不確定なためです。
 おおよそ以下のステップが必要になります。

  1. すべての制御対象スレッドを生成する
  2. 各制御対象スレッドは、制御元にスレッド開始を通知し、制御元からの指示を待つ
  3. 制御元は、すべての制御対象スレッドからスレッド開始の通知を受けるまで待つ
  4. 制御元は、各制御対象スレッドに処理命令を通知する
  5. 各制御対象スレッドは、制御元からの処理命令に基づいて処理を実行する

 これらの実現手段として、POSIXではセマフォや条件変数が使われます。その場合、この各ステップの同期制御を実現するために、いずれも2つ分のセマフォ・条件変数を要します。以降にそれぞれの実装例を示します。

 なお、組み込みのリアルタイム処理の視点に限ると、POSIXスレッドのAPIはやや力不足です。リアルタイム処理でより複雑な一斉同期制御を行う場合は、各スレッドで独自のスレッド制御状態を管理させ、そのスレッド制御状態を共有しながら同期制御を行う処理を追加実装するアプローチが妥当なことが多いです。

セマフォの場合

 複数スレッドのまとめての同期制御には、セマフォが有望です。名前付きセマフォを使用した場合のサンプルコードを以下に示します。これは、制御元main()関数が3つのスレッドthread_func()の開始タイミングの制御を行うものです。(なお解説用なのでエラー処理は副作用の大きいもの以外省略しています)
 前述したステップ2、3に、スレッド→制御元のセマフォ制御(サンプルのpsync_prestart)、ステップ4、5に制御元→スレッドのセマフォ制御(サンプルのpsync_start)を用いています。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
#include <sys/stat.h>
#include <fcntl.h>

#define NUM_OF_THREADS 3
#define SEMNAME_PRESTART "hoge"
#define SEMNAME_START "fuga"

static sem_t *psync_prestart, *psync_start;

void *thread_func(void *arg)
{
	int id = (int)pthread_self();
	printf("[%x]start\n", id);
	sleep(1);
	sem_post(psync_prestart);
	printf("[%x]wait for psync_start\n", id);
	sleep(1);
	sem_wait(psync_start);
	printf("[%x]start\n", id);
	return NULL;
}

int main(void)
{
	int i, id;
	pthread_t th[NUM_OF_THREADS];
	psync_prestart = sem_open(SEMNAME_PRESTART, O_CREAT | O_EXCL, 0x777, 0);
	if (psync_prestart == SEM_FAILED) {
		sem_unlink(SEMNAME_PRESTART);
		perror("sem_open error\n");
		exit(EXIT_FAILURE);
	}
	psync_start = sem_open(SEMNAME_START, O_CREAT | O_EXCL, 0x777, 0);
	if (psync_start == SEM_FAILED) {
		sem_unlink(SEMNAME_START);
		perror("sem_open error\n");
		exit(EXIT_FAILURE);
	}
	for (i = 0; i < NUM_OF_THREADS; i++) {
		pthread_create(th + 1, NULL, thread_func, NULL);
	}
	for (i = 0; i < NUM_OF_THREADS; i++) {
		sem_wait(psync_prestart);
	}
	printf("all threads are waiting\n");
	for (i = 0; i < NUM_OF_THREADS; i++) {
		sem_post(psync_start);
	}
	printf("all threads are running\n");
	for (i = 0; i < NUM_OF_THREADS; i++) {
		pthread_join(th[i], NULL);
	}
	sem_close(psync_prestart);
	sem_close(psync_start);
	sem_unlink(SEMNAME_PRESTART);
	sem_unlink(SEMNAME_START);
	exit(EXIT_SUCCESS);
}

出力は以下のようになります。

[67c8000]start
[68ce000]start
[684b000]start
[68ce000]wait for psync_start
[67c8000]wait for psync_start
[684b000]wait for psync_start
all threads are waiting
all threads are running
[68ce000]start
[684b000]start
[67c8000]start

条件変数の場合

 条件変数のbroadcastを利用しても、複数スレッドのまとめての制御が可能です。
 セマフォのサンプルコードで、セマフォを条件変数に置き換えたものを以降に示します。ここではステップ4、5のブロック・ブロック解除処理に条件変数のAPI:pthread_cond_broadcast()を利用しています。なお前述したステップ2については、コードが少し面倒なのでセマフォのままとしています(条件変数のみを使用した実装については http://codezine.jp/article/detail/1893 に解説があります)。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
#include <sys/stat.h>
#include <fcntl.h>

#define NUM_OF_THREADS 3
#define SEMNAME_PRESTART "hoge"

static sem_t *psync_prestart;
pthread_mutex_t mutex_start = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_start = PTHREAD_COND_INITIALIZER;

void *thread_func(void *arg)
{
	unsigned int id = (unsigned int)pthread_self();
	printf("[%x]start\n", id);
	sem_post(psync_prestart);
	printf("[%x]wait for cond\n", id);
	pthread_mutex_lock(&mutex_start);
	pthread_cond_wait(&cond_start, &mutex_start);
	pthread_mutex_unlock(&mutex_start);
	printf("[%x]exit\n", id);
	return NULL;
}

int main(void)
{
	int i, id;
	pthread_t th[NUM_OF_THREADS];
	psync_prestart = sem_open(SEMNAME_PRESTART, O_CREAT | O_EXCL, 0x777, 0);
	if (psync_prestart == SEM_FAILED) {
		sem_unlink(SEMNAME_PRESTART);
		perror("sem_open error\n");
		exit(EXIT_FAILURE);
	}
	for (i = 0; i < NUM_OF_THREADS; i++) {
		pthread_create(th + 1, NULL, thread_func, NULL);
	}
	for (i = 0; i < NUM_OF_THREADS; i++) {
		sem_wait(psync_prestart);
	}
	printf("all threads are waiting\n");
	pthread_cond_broadcast(&cond_start);
	printf("all threads are running\n");
	sleep(1);
	for (i = 0; i < NUM_OF_THREADS; i++) {
		pthread_join(th[i], NULL);
	}
	sem_close(psync_prestart);
	sem_unlink(SEMNAME_PRESTART);
	exit(EXIT_SUCCESS);
}

出力はセマフォを利用したときと同様になります。