単一ノードのインスタンス数を制御するには、ノードで同時実行数の制限を使用します。同時実行数の制限に達した後にメッセージを拒否するには、"rejecting" ノードとして作成します。
関数ノードは 1 つ以上のテンプレート引数を指定して構築されます。第 3 引数は、ノードで使用するバッファーポリシーを制御します。デフォルトは queueing です。queueing ポリシーでは、function_node は同時実行数の制限に達した後も着信メッセージを受け付けて、内部にバッファーします。ポリシーを rejecting に設定すると、ノードは着信メッセージを拒否します。
template < typename Input,
typename Output = continue_msg,
graph_buffer_policy = queueing,
typename A = cache_aligned_allocator<Input> >
class function_node;
例えば、下記のように、function_node を source_node のダウンストリームに配置することで、グラフで処理するビッグ・オブジェクトの数を制御することができます。
graph g;
int src_count = 0;
int number_of_objects = 0;
int max_objects = 3;
source_node< big_object * > s( g, [&]( big_object* &v ) -> bool {
if ( src_count < M ) {
v = new big_object();
++src_count;
return true;
} else {
return false;
}
} );
function_node< big_object *, continue_msg, rejecting > f( g, 3,
[]( big_object *v ) -> continue_msg {
spin_for(1);
delete v;
return continue_msg();
} );
make_edge( s, f );
g.wait_for_all();
function_node は最大で 3 つのビッグ・オブジェクトを同時に処理します。ノードの同時実行数のしきい値により、同時に実行できる数は 3 つに制限されます。function_node が 3 つのインスタンスを同時に実行している場合、source_node からの着信メッセージは拒否されます。source_node は最後に作成したオブジェクトをバッファーに入れて、ボディー・オブジェクトの呼び出しを一時的に停止します。function_node で実行しているインスタンスの数が同時実行数の制限よりも低くなると、source_node から新しいメッセージを受け取ります。最大で 4 つのビッグ・オブジェクトが同時に存在します (function_node に 3 つオブジェクトと source_node に 1 つのバッファーされたオブジェクト)。