対話型プログラムを考えます。並列性と応答性を最大限にするために、ユーザーによって要求された操作をタスクとして実装することができます。操作の順序は重要になることがあります。例えば、プログラムがユーザーに編集可能なテキストを提示すると仮定します。ユーザーはテキストを選択する操作や選択したテキストを削除する操作を行います。このとき、同じバッファーで「選択」操作と「削除」操作を逆の順序で行うと問題になります。しかし、それぞれ異なるバッファーで操作を行えば問題はありません。したがって、目標は、異なるオブジェクト間のタスクの順序を制限することなく、オブジェクトと関連するタスクのシリアルな順序を確立することです。
特定のオブジェクトに関連する操作はシリアルに実行する必要があります。
ほかでワークを行うことが可能であるときにスレッドがロックで待機してしまうため、ロックを含む操作のシリアル化は効率的ではありません。
FIFO (先入れ先出し構造) を使用して、ワークアイテムを順番にします。可能であれば、常にアイテムが処理されるようにします。ワークアイテムが現れたときに処理しているアイテムがない場合、そのアイテムを処理します。処理しているアイテムがある場合は、現れたアイテムを FIFO にプッシュします。処理している現在のアイテムが完了したら、FIFO から別のアイテムをポップして処理します。
このロジックは、FIFO に concurrent_queue を使用し、atomic<int> で待機および処理しているアイテム数をカウントすることにより、mutex なしで実装できます。この後のサンプルで、処理を詳細に説明します。
次のサンプルは、ノンプリエンプティブな優先度のサンプルにローカル・シリアライザーを加えたもので、3 つの優先度とローカル・シリアライザーを実装します。ユーザー・インターフェイスが続きます。
enum Priority { P_High, P_Medium, P_Low }; template<typename Func> void EnqueueWork( Priority p, Func f, Serializer* s=NULL );
次の表の 3 つの条件が満たされると、テンプレート関数 EnqueueWork はファンクター f を実行します。
条件 |
解決するクラス... |
---|---|
Serializer の事前に行うワークがすべて完了している。 |
Serializer |
スレッドが利用可能。 |
RunWorkItem |
優先度の高いワークは実行する準備ができていない。 |
ReadyPileType |
ファンクターの条件は表の上から下に解決されます。s が NULL の場合、最初の条件は存在しません。EnqueueWork は SerializedWorkItem のファンクターをパッケージして、ワーク間で最初の条件を解決するクラスに送ります。
template<typename Func> void EnqueueWork( Priority p, Func f, Serializer* s=NULL ) { WorkItem* item = new SerializedWorkItem<Func>( p, f, s ); if( s ) s->add(item); else ReadyPile.add(item); }
SerializedWorkItem は WorkItem の派生クラスで、ワークの詳細を知ることなくワークの優先度が付けられた部分をパスする方法として提供されます。
// 優先度を付けるワークの抽象基本クラス class WorkItem { public: WorkItem( Priority p ) : priority(p) {} // 派生クラスは実際のワークを定義 virtual void run() = 0; const Priority priority; }; template<typename Func> class SerializedWorkItem: public WorkItem { Serializer* serializer; Func f; /* オーバーライド */ void run() { f(); Serializer* s = serializer; // Serializer の次のファンクターを実行する前に f を破棄 delete this; if( s ) s->noteCompletion(); } public: SerializedWorkItem( Priority p, const Func& f_, Serializer* s ) : WorkItem(p), serializer(s), f(f_) {} };
WorkItem 基本クラスは、ノンプリエンプティブな優先度のサンプルの WorkItem クラスと同じです。シリアル条件の概念は基本クラスから完全に隠されるため、フレームワークはほかの種類の条件や不足している条件を拡張することができます。SerializedWorkItem クラスは本質的には「ノンプリエンプティブな優先度」のサンプルの ConcreteWorkItem で、Serializer の観点から拡張したものです。
ファンクターを実行する時間になると、run() 仮想メソッドが起動され、次の 3 つのステップを行います。
ファンクターを実行します。
ファンクターを破棄します。
ファンクターが完了し、次の待機ファンクターの条件がなくなったことを Serializer に通知します。
ステップ 3 は、ConcreteWorkItem::run の操作とは異なります。ステップ 3 の後に実行することで並列性を高めることができる場合は、ステップ 2 をステップ 3 の後に実行することもできます。ただし、ステップ 2 にかかる時間がわずかでない場合、次のファンクターを実行する前に完了する必要があるため、示された順に実行することを推奨します。
Serializer クラスはローカル・シリアライザーのコアを実装します。
class Serializer { tbb::concurrent_queue<WorkItem*> queue; tbb::atomic<int> count; // キューに入れられたアイテムと実行中のアイテムのカウント void moveOneItemToReadyPile() { // キューから ReadyPile にアイテムを転送 WorkItem* item; queue.try_pop(item); ReadyPile.add(item); } public: void add( WorkItem* item ) { queue.push(item); if( ++count==1 ) moveOneItemToReadyPile(); } void noteCompletion() { // WorkItem が完了すると呼び出される if( ‐‐count!=0 ) moveOneItemToReadyPile(); } };
クラスは 2 つのメンバーを保持します。
前のワークが完了するのを待つ WorkItem のキュー。
キューに入れられたワークまたは実行中のワークのカウント。
操作の順序に注意しながら concurrent_queue<WorkItem*> と atomic<int> を使用することで、mutex の使用を回避しています。カウントの変化は Serializer クラスの動作を理解する鍵です。
add メソッドが count を 0 から 1 にインクリメントした場合、ほかのワークが実行中でなく、ワークを ReadyPile に移動すべきことを示しています。
noteCompletion メソッドがカウントをデクリメントする際に、1 から 0 のデクリメントでない 場合、キューが空でなく、キューの別のアイテムを ReadyPile に移動すべきことを示しています。
ReadyPile クラスは、ノンプリエンプティブな優先度のサンプルで説明しています。
優先度が必要ない場合、moveOneItemToReadyPile メソッドの 2 つのバリエーションがあります。バリエーションによって影響は異なります。
moveOneItemToReadyPile メソッドが item->run() を直接起動します。このアプローチでは、Serializer のオーバーヘッドが相対的に低くなり、スレッドの局所性が高くなります。しかし、これは公平 (フェア) ではありません。タスクの連続したストリームを Serializer が処理する場合、スレッド操作はそのタスクのサービスを続けてほかを除外します。
moveOneItemToReadyPile メソッドが task::enqueue を起動して、item->run() を起動するタスクをキューに入れます。最初のアプローチよりもオーバーヘッドが高くなり、スレッドの局所性が低くなりますが、飢餓状態 (starvation) を回避できます。
公平性と局所性の高さは相反するものであり、最適な選択肢は状況に依存します。
パターンは、Serializer クラスによって維持されているものよりも一般的なワークアイテムの条件に一般化します。一般化された Serializer::add は、ワークアイテムに条件があるかどうかを判断して、条件がない場合は直ちに実行します。一般化された Serializer::noteCompletion は、現在のワークアイテムの完了によって条件がなくなった (以前は条件があった) アイテムをすべて実行します。「実行」という用語は、ワークを直ちに実行することを意味します。さらに条件がある場合は、次の条件を解決するクラスにワークを転送します。