インテル® スレッディング・ビルディング・ブロック (インテル® TBB) フローグラフの join_node には 4 つのポリシーがあります: queueing、reserving、key_matching および tag_matching。join_node は、出力メッセージを作成する前に、各入力ポートにメッセージを必要とします。reserving join_node には内部バッファーがなく、各入力ポートにメッセージが揃うまで入力ポートからメッセージを pull しません。各入力ポートで一時的にメッセージを予約し、すべての入力ポートがメッセージを受け取ってから出力メッセージを作成します。いずれかの入力ポートがメッセージの予約に失敗すると、join_node はメッセージを pull しません。
reserving join_node をサポートするため、一部のノードは出力の予約をサポートしています。予約の仕組みは次のとおりです。
以下は、reserving join_node の動作を示すサンプルです。buffer_node は、出力エッジを push から pull に変更できるように、出力をバッファーします。broadcast_node は、メッセージをバッファーしません。また、try_get() と try_reserve() をサポートしません。
void run_example2() { // Flow_Graph_Reservation.xml 用のサンプル graph g; broadcast_node<int> bn(g); buffer_node<int> buf1(g); buffer_node<int> buf2(g); typedef join_node<tuple<int,int> reserving> join_type; join_type jn(g); buffer_node<join_type::output_type> buf_out(g); join_type::output_type tuple_out; int icnt; // join_node プレデセッサーは両方とも予約可能な buffer_node make_edge(buf1,input_port<0>jn)); make_edge(bn,input_port<0>jn)); // broadcast_node をアタッチ make_edge(buf2,input_port<1>jn)); make_edge(jn, buf_out); bn.try_put(2); buf1.try_put(3); buf2.try_put(4); buf2.try_put(7); g.wait_for_all(); while (buf_out.try_get(tuple_out)) { printf("join_node output == (%d,%d)\n",get<0>tuple_out), get<1>tuple_out) ); } if(buf1.try_get(icnt)) printf("buf1 had %d\n", icnt); else printf("buf1 was empty\n"); if(buf2.try_get(icnt)) printf("buf2 had %d\n", icnt); else printf("buf2 was empty\n"); }
このサンプルで、reserving join_node jn の port 0 のプレデセッサーは 2 つです: buffer_node buf1 と broadcast_node bn。join_node の port 1 のプレデセッサーは 1 つです: buffer_node buf2。
ここでは、実行パスの 1 つについて考えてみます (タスクのスケジュールはやや異なるかもしれませんが、結果は同じになります)。
bn.try_put(2);
bn は jn に 2 の転送を試みます。jn はこれを受け付けず、bn から jn への矢印が逆方向になります。bn と jn はどちらもメッセージをバッファーしないため、メッセージは破棄されます。jn の入力に利用できないプレデセッサーがあるため、jn は何もしません。
予約をサポートしないノードは、reserving join_node にアタッチすると正しく動作しません。上記のサンプルは、この問題が発生する原因を示しています。非 reserving ノードは予約のサポートが必要なノードに接続しないでください。
buf1.try_put(3);
buf1 は jn に 3 の転送を試みます。jn はこれを受け付けず、buf1 から jn への矢印が逆方向になります。jn の入力に利用できないプレデセッサーがあるため、jn は何もしません。
buf2.try_put(4);
buf2 は jn に 4 の転送を試みます。jn はこれを受け付けず、buf2 から jn への矢印が逆方向になります。jn のすべての入力のプレデセッサーが利用可能になったため、メッセージを構築し、jn から転送するためのタスクがスポーンされます。この時点では、まだタスクが実行されていないと仮定します。
buf2.try_put(7);
buf2 のサクセサーがないため (jn への矢印が逆方向になったため)、値 7 は格納されます。
ここで、jn を実行するためにスポーンされたタスクが実行されます。
そして、jn はメッセージを buf_out に push し、メッセージが受け付けられます。push に成功したため、jn は buf1 と buf2 に予約された値が使用されたことを通知し、バッファーは値を破棄します。jn は再度予約を試みます。
グラフにおけるすべての処理が終わり、wait_for_all() が完了します。以下は、このサンプルコードの出力です。
join_node output == (3,4) buf1 was empty buf2 had 7