■ スレッドをつくる
スレッドをつくるには CreateThread() を実行する。
スレッドで実行する関数は次のシグネーチャをもつ
#include< stdio.h>
#include< windows.h>
void job( void *ptr )
{
while( true ) {
Sleep( 1000 );
printf( "thread working\n" );
}
}
int main()
{
HANDLE trd[2];
// Thread を 2個つくる
for( int i=0; i< 2; i++ ){
trd[i] = CreateThread( 0, 0,
(LPTHREAD_START_ROUTINE)job,
NULL, 0, 0 );
}
return 0;
}
■ スレッドの終了をまつ
スレッドのハンドルを保持しておき、 WaitForSingleObject() でまつ。
int main()
{
HANDLE trd[2];
// Thread を 2個つくる
for( int i=0; i< 2; i++ ){
trd[i] = CreateThread( 0, 0,
(LPTHREAD_START_ROUTINE)job,
NULL, 0, 0 );
}
for( i=0; i< 2; i++ ){
WaitForSingleObject( trd[i], INFINITE );
}
// 最大でまつ時間も指定できる。
WaitForSingleObject( trd[i], 3000 );
return 0;
}
■ スレッドを停止させる
SAMPLE
スレッドを停止させる
スレッドの動きを制御する
スレッドは指定した関数が最後まで到達すると終了するため
制御用の変数で制御する。
メインスレッド側でユーザーの入力をまち、スレッドの停止をすることができる。
class Job {
public:
int flag;
Job() : flag(0){}
};
void jobtest( void *ptr )
{
Job *j = ( Job *)ptr;
while( true ) {
Sleep( 1000 );
printf( "thread working\n" );
// 終了条件
if ( j->flag ) {
printf( "stop\n" );
break;
}
}
}
int main()
{
HANDLE trd[2];
Job j;
int i;
for( i=0; i< 2; i++ ){
trd[i] = CreateThread( 0, 0,
(LPTHREAD_START_ROUTINE)jobtest,
&j, 0, 0 );
}
// メッセージループ
{
while ( true ) {
printf( "スレッドを止めるためには 1 を入力をしてください\n" );
char buf[256];
gets( buf );
puts( buf );
// スレッドを止めるため変数を変更する
if ( strcmp(buf, "1") == 0 ) {
j.flag = 1;
break;
}
}
}
return 0;
}
■ スレッドの処理の完了をチェックする
制御用の変数をそのままチェック用の変数に使う。
■ スレッド間の動作を連携させる
SAMPLE
グローバル変数をつかってスレッド間で通信する
スレッド側で値を更新して、更新後にメインスレッド側でその値を表示する。
スレッド間で通信するには、一番単純にはグローバル変数で値を共有すればいい。
// スレッド間で共有する変数
Mutex gReadMtx;
Mutex gWriteMtx;
volatile bool gRead;
volatile bool gWrite;
int gX;
class TestJob : public Job
{
public:
void main( void *ptr ) {
while ( true ) {
// メインスレッドが読むまで待つ
while ( !gRead ) {
;
}
gX += 2;
gReadMtx.lock();
gRead = false;
gReadMtx.unlock();
// 書いたことを true にしてメインスレッドに教える
Sleep(1000);
printf ( "thread write gX\n" );
gWriteMtx.lock();
gWrite = true;
gWriteMtx.unlock();
}
printf ( "thread work finish\n" );
}
};
メインスレッドは gWrite = true になれば、画面に出力して
gRead = true にして読んだことを知らせる
int main()
{
Worker w;
TestJob job;
w.addJob( &job );
// 最初に書き込みをはじめる
gRead = true;
gWrite = false;
w.start();
while ( true ){
// スレッドが書くまでまつ
while ( !gWrite ) {
;
}
// 元にもどす
gWrite = false;
printf( "main tread Read %d\n", gX );
// 読んだ
gRead = true;;
}
w.join();
return 0;
}
■ volatile.をつかう
WARNING
上のコードでは コンパイラが最適化をかけることで無限ループになる可能性がある。
while ( gWrite ) {
// ループ内で gWrite を変更する処理がないため、コンパイラが gWrite の評価をなくしてしまう。
// コンパイラからすると別スレッドが操作するとは、わからない。
;
}
そこで volatile 宣言をすると、 while の条件式が変化すると教えることができるため
無限ループになることはない。
コンパイラの余計なおせっかいを防ぐ。
コンパイラオプションの最適化を最大にすると再現できる。( 可能性がある )
■ スレッドの動作を連携させる
メインスレッドで入力をうけて, 変更があった場合にスレッド側で表示する。
SAMPLE
スレッドの処理の終了まちをチェックする
int main()
{
HANDLE trd[2];
Job j;
int i;
for( i=0; i< 2; i++ ){
trd[i] = CreateThread( 0, 0,
(LPTHREAD_START_ROUTINE)jobtest,
&j, 0, 0 );
}
// メッセージループ
while ( true ) {
printf( "スレッドがデータをロード中 %d\n", j.flag );
Sleep( 1000 );
if ( j.flag == 10 ) {
printf( "スレッドの処理が終わりました\n" );
break;
}
}
return 0;
}
■ スレッドをクラスにしてみる
SAMPLE
スレッドクラス
スレッドを実行する処理が定型句なのでクラス化してみる。
スレッドに渡すスタティック関数をラップしてみる。
class Worker
{
public:
// 停止状態でスレッドを生成
Worker() {
DWORD id;
trd = CreateThread( 0, 0, (LPTHREAD_START_ROUTINE)( Worker::workMain ), this,
CREATE_SUSPENDED, &id );
}
// スレッドを実行
void start() {
ResumeThread( trd );
}
// スレッドの終了をまつ
void Worker::join()
{
WaitForSingleObject( trd, INFINITE );
}
// 仕事を追加する
void addJob() {
}
static void workMain( void *ptr ) {
Worker *p = ( Worker *)ptr;
p->job->main( p->job );
}
private:
HANDLE trd;
};
スレッドに与える仕事。
Java でいえば Runnable インターフェイスにあたる。
このインターフェイスを実装したクラスは Worker クラスを使ってスレッドから実行できる。
class Job
{
public:
/// Worker が実行する仕事
virtual void main( void *ptr ) = 0;
virtual ~Job(){}
};
適当に実装してコールする。
class TestJob : public Job
{
public:
int flag;
TestJob() : flag(1){}
void main( void *ptr ) {
while ( flag ) {
Sleep(1000);
printf ( "thread working ...\n" );
}
}
};
int main()
{
Worker w;
TestJob job;
w.addJob( &job );
w.start();
char buf[256];
gets(buf);
job.flag = 0;
w.join();
return 0;
}
■ スレッドプール(ThreadPool)
SAMPLE
スレッドプール
スレッドプールGUI
POINT
スレッド(ワーカー)をマスタと通信する箇所をひとつのモジュールにまとめておく。
ここにはワーカーに依頼する仕事をリスト(キュー)につめておく。
ひとつのキューを複数のスレッドで操作をすることは危険な処理。
たとえば ワーカーがキューから仕事をとる最中に、マスタが仕事をキューに入れようとすると
キューを破壊する可能性がある。
そこで共有物であるキューを Mutex で保護をしてあげる。
[ マスタ ( メインスレッド) ]
|
| (仕事をつめる)
|
[ 仕事依頼リスト ]
[ 仕事1 ][ 仕事2 ][ 仕事3 ][ 仕事4 ]
|
| (仕事をする)
|
[ ワーカー ][ ワーカー ][ ワーカー ]
template< class T >
class JobQ
{
volatile unsigned int cnt;
public:
// キューにつめる
void enQ( const T &ref ){
// 空き数カウンタが0より大きくなるのをまつ。
// いっぱいならば Worker が仕事を取るまで待つ
WaitForSingleObject( smp_empty, INFINITE );
EnterCriticalSection( &(s) );
cnt ++;
data[wp] = ref;
wp ++;
if ( wp == NRQ ) wp = 0;
LeaveCriticalSection( &(s) );
// 残り数を1つ増やして待機中のスレッドを動かす。
// [○][○][][][]
ReleaseSemaphore( smp_count, 1, 0 );
}
// キューからとりだす
void deQ( T &ref ){
// 仕事がはいるのをまつ。
WaitForSingleObject( smp_count, INFINITE );
EnterCriticalSection( &(s) );
ref = data[rp];
cnt --;
rp ++;
if ( rp == NRQ ) rp = 0;
LeaveCriticalSection( &(s) );
// 空き数カウンタを増やす。
// [][][][○][○]
ReleaseSemaphore( smp_empty, 1, 0 );
}
// コンストラクタ
JobQ() : cnt(0){
rp = 0;
wp = 0;
// 現在のつめた個数
// 最初は 空なので "0"
// Worker は カウンタ があれば Queue から取り出す
smp_count = CreateSemaphore( 0, 0, NRQ, 0 );
// こちらは空きカウンタ なので いっぱいにする -> ( == 満タン )
// Master は 空きカウンタ があれば Queue に 仕事( Request ) をつめる
smp_empty = CreateSemaphore( 0, NRQ, NRQ, 0 );
InitializeCriticalSection( &(s) );
}
unsigned int getCount() {
EnterCriticalSection( &(s) );
unsigned int ret = cnt;
LeaveCriticalSection( &(s) );
return ret;
}
private:
T data[NRQ];
int rp, wp; // 読み込み位置, 書き込み位置
CRITICAL_SECTION s; // Q read/write の排他制御.
HANDLE smp_count; // Productor Semaphore ( 1個追加したら ++ )
HANDLE smp_empty; // Consumer Semaphore ( 1個とったら -- )
};
メインスレッド側でリクエストをキューにつめて、
ワーカースレッド側でキューから仕事を取って実行するようにする。
int main()
{
JobQ< string > q;
// ワーカースレッドを作成して
Worker *w = new Worker();
Worker *w1 = new Worker();
// 仕事をあたえる
w->addJob( new JobTest( &q ) );
w1->addJob( new JobTest( &q ) );
w->work();
w1->work();
// メインスレッドはリクエストだけを受け付けてキューにつめることを繰り返す。
while (1) {
string s;
cin >> s;
cout < < "マスタ: リクエスト( " < < s < < " )いただきました" < < endl;
// キューにつめる。
q.enQ( s );
}
return 0;
}
ワーカースレッドはリクエストキューから取り出した仕事を実際に実行することを繰り返す。
スレッドを使い回すため、 main() メソッドは無限ループにする。
class JobTest : public Job
{
public:
JobTest( JobQ< string > *_q ) : q( _q ){}
void main( void *ptr ) {
while ( true ) {
string s;
cout < < Worker::currentWorker()->getName();
cout < < ": リクエストまってます" < < endl;
q->deQ( s );
cout < < Worker::currentWorker()->getName();
cout < < "仕事をもらったので実行します " < < s < < endl;
}
}
private:
JobQ< string > *q;
};
■ Mutexクラスをつくる
SAMPLE
スレッドの動作を排他的にする
// Class にしてみる
class Mutex {
public:
void lock() {
EnterCriticalSection( &cs );
}
void unlock() {
LeaveCriticalSection( &cs );
}
Mutex() {
InitializeCriticalSection( &cs );
}
~Mutex() {
DeleteCriticalSection( &cs );
}
private:
CRITICAL_SECTION cs;
};
// 使う
static Mutex m;
// 排他処理をする効果を見るため, Sleep() する
void threadFunc( void *) {
m.lock();
m.unlock();
}
■ InitializeCriticalSection
SYNTAX
InitializeCriticalSection( LPCRITICAL_SECTION )
DESC
CriticalSection を初期化する
■ EnterCriticalSection
SYNTAX
EnterCriticalSection( LPCRITICAL_SECTION )
DESC
排他制御変数 を使って排他制御を開始する
■ LeaveCriticalSection
SYNTAX
LeaveCriticalSection( LPCRITICAL_SECTION )
DESC
排他制御を終了する
■ DeleteCriticalSection
SYNTAX
void DeleteCriticalSection( LPCRITICAL_SECTION )
DESC
CriticalSection を破棄する
■ 同期処理
class Event {
public:
// ON にする
void signal()
// Thread を停止させる
void block() {
WaitForSingleObject( evt, INFINITE );
}
// 作成時は信号を ON にしておく
Event() {
evt = CreateEvent( 0, false, true, NULL );
}
~Event() {
CloseHandle( evt );
}
private:
HANDLE evt;
}
■ CreateEvent
SYNTAX
bool CreateEvent (
lpEventAttributes : DWORD
bManualReset : DWORD // False: 自動 Reset
bInitialState : DWORD // 初期状態. True: Signal False: NoSignal
lpName : DWORD
);
RET
hdl :
0 : Fail
TIP
Event の状態を監視したい( 信号まちをしたい ) Thread は
WaitForSingleObject を呼ぶことで待機状態にする
■ SetEvent
SYNTAX
BOOL SetEvent(
HANDLE hEvent // EventObject Handle
);
RET
!0 : Success
0 : Fail
DESC
Thread 同士と同期をとるために利用する信号
Thread 間で処理の Timing の通信をする際に利用する
EventObject はふたつの状態をもつ.
ON : Signal 状態.
OFF: NoSignal 状態.
POINT
NoSignal 状態ならば, イベントオブジェクトを待ち受けるスレッドが休止状態である。
スレッドが待ち状態であるとき、ほとんどCPUに負荷はかからない。
class UpdateTask : public Task {
public:
void main() {
// 描画処理はまかせて更新処理をくりかえす.
// 問題は, vector< Object > をつついてしまうこと.
// Buffer ( 要は push , pop する側に分ける必要がある )
while ( 1 ) {
App::update();
// 終了したら信号を青にする
evt.signal( 1 );
}
}
};
void main() {
// 更新処理.
UpdateTask task;
Thread A;
A.addTask( &task );
// MessageLoop
while ( 1 ) {
App::display();
// update まち
evt.block();
}
}
■ ResetEvent
SYNTAX
BOOL ResetEvent(
HANDLE hEvent // EventObject Handle
);
DESC
Manual の場合は, Reset されるまでは, Signal 状態を維持する.
自動の場合
Thread 解放時に System が NoSignal にする.
Thread がひとつも待機していないときは, Signal 状態を維持する.
Ret
!0 : Ok
0 : Fail
SyncObject を有効にする. ( 信号を青にすると考えておく. )
Event MainLoopSignalt;
待つ側:
// 1 になるのをまつ.
待たせる側:
// 信号を 1 にする. ( 同期をとる timing で 青にかえてあげる. )
MainLoopSignal.signal(1);
待つ側
// 1 になるのをまつ.
::WaitForSingleObject( hMainLoopSync, INFINITE );
待たせる側
// 信号を 1 にする. ( 同期をとる timing で 青にかえてあげる. )
::SetEvent( hMainLoopSync );
// Mutex でも実装できる
待つ側:
mutex.lock();
待たせる側:
mutex.unlock();
■ CreateSemaphore
SYNTAX
HANDLE CreateSemaphore(
LPSECURITY_ATTRIBUTES lpSemaphoreAttributes, // セキュリティ記述子
LONG lInitialCount, // 初期カウント
LONG lMaximumCount, // 最大カウント数
LPCTSTR lpName // オブジェクトの名前
);
DESC
セマフォオブジェクトを作成または開く。
// 資源数5、初期カウント0のセマフォをつくる
smp = CreateSemaphore( 0, 0, 5, 0 );
// セマフォを取得するまでスレッドはまつ。
// シグナル状態になったセマフォを取得するとカウント数はひとつ減る。
WaitForSingleObject( smp, INFINITE );
// 待機中のスレッドは動かすには別のスレッドから資源数を増やす。
ReleaseSemaphore( smp, 1, 0 );
■ ReleaseSemaphore
SYNTAX
BOOL ReleaseSemaphore(
HANDLE hSemaphore, // セマフォのハンドル
LONG lReleaseCount, // 増やすカウント数
LPLONG lpPreviousCount // 変更前のカウント( 知る必要がないなら NULL )
);
DESC
指定されたセマフォオブジェクトのカウントを、指定した数だけ増やす。