チュートリアル(Tutorial)





スレッドをつくる


スレッドをつくるには CreateThread() を実行する。 スレッドで実行する関数は次のシグネーチャをもつ
void func( void *ptr )
# include<stdio.h> # include<windows.h> void job( void *ptr ) { while( true ) { Sleep( 1000 ); printf( "thread working\n" ); } } int main() { HANDLE trd[2]; // Thread を 2個つくる for( int i=0; i<2; i++ ){ trd[i] = CreateThread( 0, 0, (LPTHREAD_START_ROUTINE)job, NULL, 0, 0 ); } return 0; }



スレッドの終了をまつ


スレッドのハンドルを保持しておき、 WaitForSingleObject() でまつ。
int main() { HANDLE trd[2]; // Thread を 2個つくる for( int i=0; i<2; i++ ){ trd[i] = CreateThread( 0, 0, (LPTHREAD_START_ROUTINE)job, NULL, 0, 0 ); } for( i=0; i<2; i++ ){ WaitForSingleObject( trd[i], INFINITE ); } // 最大でまつ時間も指定できる。 WaitForSingleObject( trd[i], 3000 ); return 0; }



スレッドを停止させる


SAMPLE http://ooo.iiyudana.net/bin/thread/thread_stop.exe( スレッドを停止させる ) https://www.dropbox.com/s/bwqpc28zncsj1lr/thread_gui.exe( スレッドの動きを制御する ) スレッドは指定した関数が最後まで到達すると終了するため 制御用の変数で制御する。 メインスレッド側でユーザーの入力をまち、スレッドの停止をすることができる。
class Job { public: int flag; Job() : flag(0){} }; void jobtest( void *ptr ) { Job *j = ( Job *)ptr; while( true ) { Sleep( 1000 ); printf( "thread working\n" ); // 終了条件 if ( j->flag ) { printf( "stop\n" ); break; } } } int main() { HANDLE trd[2]; Job j; int i; for( i=0; i<2; i++ ){ trd[i] = CreateThread( 0, 0, (LPTHREAD_START_ROUTINE)jobtest, &j, 0, 0 ); } // メッセージループ { while ( true ) { printf( "スレッドを止めるためには 1 を入力をしてください\n" ); char buf[256]; gets( buf ); puts( buf ); // スレッドを止めるため変数を変更する if ( strcmp(buf, "1") == 0 ) { j.flag = 1; break; } } } return 0; }



スレッドの処理の完了をチェックする


制御用の変数をそのままチェック用の変数に使う。


スレッド間の動作を連携させる


SAMPLE http://ooo.iiyudana.net/bin/thread/thread_signal.exe( グローバル変数をつかってスレッド間で通信する ) スレッド側で値を更新して、更新後にメインスレッド側でその値を表示する。 スレッド間で通信するには、一番単純にはグローバル変数で値を共有すればいい。
// スレッド間で共有する変数 Mutex gReadMtx; Mutex gWriteMtx; volatile bool gRead; volatile bool gWrite; int gX; class TestJob : public Job { public: void main( void *ptr ) { while ( true ) { // メインスレッドが読むまで待つ while ( !gRead ) { ; } gX += 2; gReadMtx.lock(); gRead = false; gReadMtx.unlock(); // 書いたことを true にしてメインスレッドに教える Sleep(1000); printf ( "thread write gX\n" ); gWriteMtx.lock(); gWrite = true; gWriteMtx.unlock(); } printf ( "thread work finish\n" ); } };
メインスレッドは gWrite = true になれば、画面に出力して gRead = true にして読んだことを知らせる
int main() { Worker w; TestJob job; w.addJob( &job ); // 最初に書き込みをはじめる gRead = true; gWrite = false; w.start(); while ( true ){ // スレッドが書くまでまつ while ( !gWrite ) { ; } // 元にもどす gWrite = false; printf( "main tread Read %d\n", gX ); // 読んだ gRead = true;; } w.join(); return 0; }



volatile.をつかう


WARNING 上のコードでは コンパイラが最適化をかけることで無限ループになる可能性がある。
while ( gWrite ) { // ループ内で gWrite を変更する処理がないため、コンパイラが gWrite の評価をなくしてしまう。 // コンパイラからすると別スレッドが操作するとは、わからない。 ; }
そこで volatile 宣言をすると、 while の条件式が変化すると教えることができるため 無限ループになることはない。 コンパイラの余計なおせっかいを防ぐ。
volatile bool gWrite;
コンパイラオプションの最適化を最大にすると再現できる。( 可能性がある )
cl /Ox main.cpp



スレッドの動作を連携させる


メインスレッドで入力をうけて, 変更があった場合にスレッド側で表示する。 SAMPLE http://ooo.iiyudana.net/bin/thread/thread_stop.exe( スレッドの処理の終了まちをチェックする )
int main() { HANDLE trd[2]; Job j; int i; for( i=0; i<2; i++ ){ trd[i] = CreateThread( 0, 0, (LPTHREAD_START_ROUTINE)jobtest, &j, 0, 0 ); } // メッセージループ while ( true ) { printf( "スレッドがデータをロード中 %d\n", j.flag ); Sleep( 1000 ); if ( j.flag == 10 ) { printf( "スレッドの処理が終わりました\n" ); break; } } return 0; }



スレッドをクラスにしてみる


SAMPLE http://ooo.iiyudana.net/bin/thread/thread_worker.exe( スレッドクラス ) スレッドを実行する処理が定型句なのでクラス化してみる。 スレッドに渡すスタティック関数をラップしてみる。
class Worker { public: // 停止状態でスレッドを生成 Worker() { DWORD id; trd = CreateThread( 0, 0, (LPTHREAD_START_ROUTINE)( Worker::workMain ), this, CREATE_SUSPENDED, &id ); } // スレッドを実行 void start() { ResumeThread( trd ); } // スレッドの終了をまつ void Worker::join() { WaitForSingleObject( trd, INFINITE ); } // 仕事を追加する void addJob() { } static void workMain( void *ptr ) { Worker *p = ( Worker *)ptr; p->job->main( p->job ); } private: HANDLE trd; };
スレッドに与える仕事。 Java でいえば Runnable インターフェイスにあたる。 このインターフェイスを実装したクラスは Worker クラスを使ってスレッドから実行できる。
class Job { public: /// Worker が実行する仕事 virtual void main( void *ptr ) = 0; virtual ~Job(){} };
適当に実装してコールする。
class TestJob : public Job { public: int flag; TestJob() : flag(1){} void main( void *ptr ) { while ( flag ) { Sleep(1000); printf ( "thread working ...\n" ); } } }; int main() { Worker w; TestJob job; w.addJob( &job ); w.start(); char buf[256]; gets(buf); job.flag = 0; w.join(); return 0; }



スレッドプール(ThreadPool)


SAMPLE http://ooo.iiyudana.net/bin/thread/thread_pool.exe( スレッドプール ) https://www.dropbox.com/s/k073grb4rntsi7u/thread_pool_gui.exe( スレッドプールGUI ) POINT スレッド(ワーカー)をマスタと通信する箇所をひとつのモジュールにまとめておく。 ここにはワーカーに依頼する仕事をリスト(キュー)につめておく。 ひとつのキューを複数のスレッドで操作をすることは危険な処理。 たとえば ワーカーがキューから仕事をとる最中に、マスタが仕事をキューに入れようとすると キューを破壊する可能性がある。 そこで共有物であるキューを Mutex で保護をしてあげる。
[ マスタ ( メインスレッド) ] | | (仕事をつめる) | [ 仕事依頼リスト ] [ 仕事1 ][ 仕事2 ][ 仕事3 ][ 仕事4 ] | | (仕事をする) | [ ワーカー ][ ワーカー ][ ワーカー ]
template< class T > class JobQ { volatile u32 cnt; public: // キューにつめる void enQ( const T &ref ){ // 空き数カウンタが0より大きくなるのをまつ。 // いっぱいならば Worker が仕事を取るまで待つ WaitForSingleObject( smp_empty, INFINITE ); EnterCriticalSection( &(s) ); cnt ++; data[wp] = ref; wp ++; if ( wp == NRQ ) wp = 0; LeaveCriticalSection( &(s) ); // 残り数を1つ増やして待機中のスレッドを動かす。 // [○][○][][][] ReleaseSemaphore( smp_count, 1, 0 ); } // キューからとりだす void deQ( T &ref ){ // 仕事がはいるのをまつ。 WaitForSingleObject( smp_count, INFINITE ); EnterCriticalSection( &(s) ); ref = data[rp]; cnt --; rp ++; if ( rp == NRQ ) rp = 0; LeaveCriticalSection( &(s) ); // 空き数カウンタを増やす。 // [][][][○][○] ReleaseSemaphore( smp_empty, 1, 0 ); } // コンストラクタ JobQ() : cnt(0){ rp = 0; wp = 0; // 現在のつめた個数 // 最初は 空なので "0" // Worker は カウンタ があれば Queue から取り出す smp_count = CreateSemaphore( 0, 0, NRQ, 0 ); // こちらは空きカウンタ なので いっぱいにする -> ( == 満タン ) // Master は 空きカウンタ があれば Queue に 仕事( Request ) をつめる smp_empty = CreateSemaphore( 0, NRQ, NRQ, 0 ); InitializeCriticalSection( &(s) ); } u32 getCount() { EnterCriticalSection( &(s) ); u32 ret = cnt; LeaveCriticalSection( &(s) ); return ret; } private: T data[NRQ]; int rp, wp; // 読み込み位置, 書き込み位置 CRITICAL_SECTION s; // Q read/write の排他制御. HANDLE smp_count; // Productor Semaphore ( 1個追加したら ++ ) HANDLE smp_empty; // Consumer Semaphore ( 1個とったら -- ) };
メインスレッド側でリクエストをキューにつめて、 ワーカースレッド側でキューから仕事を取って実行するようにする。
int main() { JobQ< string > q; // ワーカースレッドを作成して Worker *w = new Worker(); Worker *w1 = new Worker(); // 仕事をあたえる w->addJob( new JobTest( &q ) ); w1->addJob( new JobTest( &q ) ); w->work(); w1->work(); // メインスレッドはリクエストだけを受け付けてキューにつめることを繰り返す。 while (1) { string s; cin >> s; cout << "マスタ: リクエスト( " << s << " )いただきました" << endl; // キューにつめる。 q.enQ( s ); } return 0; }
ワーカースレッドはリクエストキューから取り出した仕事を実際に実行することを繰り返す。 スレッドを使い回すため、 main() メソッドは無限ループにする。
class JobTest : public Job { public: JobTest( JobQ< string > *_q ) : q( _q ){} void main( void *ptr ) { while ( true ) { string s; cout << Worker::currentWorker()->getName(); cout << ": リクエストまってます" << endl; q->deQ( s ); cout << Worker::currentWorker()->getName(); cout << "仕事をもらったので実行します " << s << endl; } } private: JobQ< string > *q; };



Mutexクラスをつくる


SAMPLE https://www.dropbox.com/s/a7aylj7ryzhgc5q/thread_mutex_gui.exe( スレッドの動作を排他的にする )
// Class にしてみる class Mutex { public: void lock() { EnterCriticalSection( &cs ); } void unlock() { LeaveCriticalSection( &cs ); } Mutex() { InitializeCriticalSection( &cs ); } ~Mutex() { DeleteCriticalSection( &cs ); } private: CRITICAL_SECTION cs; };
// 使う static Mutex m; // 排他処理をする効果を見るため, Sleep() する void threadFunc( void *) { m.lock(); m.unlock(); }



InitializeCriticalSection


SYNTAX InitializeCriticalSection( LPCRITICAL_SECTION ) DESC CriticalSection を初期化する


EnterCriticalSection


SYNTAX EnterCriticalSection( LPCRITICAL_SECTION ) DESC 排他制御変数 を使って排他制御を開始する


LeaveCriticalSection


SYNTAX LeaveCriticalSection( LPCRITICAL_SECTION ) DESC 排他制御を終了する


DeleteCriticalSection


SYNTAX void DeleteCriticalSection( LPCRITICAL_SECTION ) DESC CriticalSection を破棄する


同期処理


class Event { public: // ON にする void signal() // Thread を停止させる void block() { WaitForSingleObject( evt, INFINITE ); } // 作成時は信号を ON にしておく Event() { evt = CreateEvent( 0, false, true, NULL ); } ~Event() { CloseHandle( evt ); } private: HANDLE evt; }



CreateEvent


SYNTAX bool CreateEvent ( lpEventAttributes : DWORD bManualReset : DWORD // False: 自動 Reset bInitialState : DWORD // 初期状態. True: Signal False: NoSignal lpName : DWORD ); RET hdl : 0 : Fail TIP Event の状態を監視したい( 信号まちをしたい ) Thread は WaitForSingleObject を呼ぶことで待機状態にする


SetEvent


SYNTAX BOOL SetEvent( HANDLE hEvent // EventObject Handle ); RET !0 : Success 0 : Fail DESC Thread 同士と同期をとるために利用する信号 Thread 間で処理の Timing の通信をする際に利用する EventObject はふたつの状態をもつ. ON : Signal 状態. OFF: NoSignal 状態. POINT NoSignal 状態ならば, イベントオブジェクトを待ち受けるスレッドが休止状態である。 スレッドが待ち状態であるとき、ほとんどCPUに負荷はかからない。 class UpdateTask : public Task { public: void main() { // 描画処理はまかせて更新処理をくりかえす. // 問題は, vector< Object > をつついてしまうこと. // Buffer ( 要は push , pop する側に分ける必要がある ) while ( 1 ) { App::update(); // 終了したら信号を青にする evt.signal( 1 ); } } }; void main() { // 更新処理. UpdateTask task; Thread A; A.addTask( &task ); // MessageLoop while ( 1 ) { App::display(); // update まち evt.block(); } }


ResetEvent


SYNTAX BOOL ResetEvent( HANDLE hEvent // EventObject Handle ); DESC Manual の場合は, Reset されるまでは, Signal 状態を維持する. 自動の場合 Thread 解放時に System が NoSignal にする. Thread がひとつも待機していないときは, Signal 状態を維持する. Ret !0 : Ok 0 : Fail SyncObject を有効にする. ( 信号を青にすると考えておく. ) Event MainLoopSignalt; 待つ側: // 1 になるのをまつ. 待たせる側: // 信号を 1 にする. ( 同期をとる timing で 青にかえてあげる. ) MainLoopSignal.signal(1); 待つ側 // 1 になるのをまつ. :: WaitForSingleObject( hMainLoopSync, INFINITE ); 待たせる側 // 信号を 1 にする. ( 同期をとる timing で 青にかえてあげる. ) :: SetEvent( hMainLoopSync );
// Mutex でも実装できる 待つ側: mutex.lock(); 待たせる側: mutex.unlock();



CreateSemaphore


SYNTAX HANDLE CreateSemaphore( LPSECURITY_ATTRIBUTES lpSemaphoreAttributes, // セキュリティ記述子 LONG lInitialCount, // 初期カウント LONG lMaximumCount, // 最大カウント数 LPCTSTR lpName // オブジェクトの名前 ); DESC セマフォオブジェクトを作成または開く。
// 資源数5、初期カウント0のセマフォをつくる smp = CreateSemaphore( 0, 0, 5, 0 ); // セマフォを取得するまでスレッドはまつ。 // シグナル状態になったセマフォを取得するとカウント数はひとつ減る。 WaitForSingleObject( smp, INFINITE ); // 待機中のスレッドは動かすには別のスレッドから資源数を増やす。 ReleaseSemaphore( smp, 1, 0 );



ReleaseSemaphore


SYNTAX BOOL ReleaseSemaphore( HANDLE hSemaphore, // セマフォのハンドル LONG lReleaseCount, // 増やすカウント数 LPLONG lpPreviousCount // 変更前のカウント( 知る必要がないなら NULL ) ); DESC 指定されたセマフォオブジェクトのカウントを、指定した数だけ増やす。