データフロー・グラフでは、ノードはデータメッセージの送受信を行います。一部のノードはメッセージを送るのみ、一部のノードはメッセージを受け取るのみ、一部のノードは受け取るメッセージに応じてメッセージを送ります。
次のデータフロー・グラフで、左端のノードは 1 から 10 までの整数値を生成し、2 つのサクセサーノードへ渡します。上のサクセサーノードは、受け取った値の 2 乗を計算して、結果をダウンストリームへ渡します。下のサクセサーノードは、受け取った値の 3 乗を計算して、結果をダウンストリームへ渡します。右端のノードは、中央の 2 つのノードから値を受け取ります。このノードは、値を受け取ると値の合計に追加します。このアプリケーションを完了するまで実行すると、値の合計は 1 から 10 までの 2 乗と 3 乗の合計になります。
次のコードは、上記の単純なデータフロー・グラフの実装を示しています。
int sum = 0; graph g; function_node< int, int > squarer( g, unlimited, [](const int &v) { return v*v; } ); function_node< int, int > cuber( g, unlimited, [](const int &v) { return v*v*v; } ); function_node< int, int > summer( g, 1, [&](const int &v ) -> int { return sum += v; } ); make_edge( squarer, summer ); make_edge( cuber, summer ); for ( int i = 1; i <= 10; ++i ) { squarer.try_put(i); cuber.try_put(i); } g.wait_for_all(); cout << "Sum is " << sum << "\n";
上記の実装では、次の function_node が作成されます。
squarer と cuber ノードは並列に実行しても問題ないため、同時実行数を unlimited にして作成しています。summer ノードはグローバル変数を参照して合計を更新するため、並列に実行できません。このため、同時実行数を 1 に制限して作成しています。上記の単純なデータフロー・グラフのノード F は、squarer と cuber ノードの両方にメッセージを送るループとして実装されています。
最初の実装を改良するオプションの 1 つは、追加のノードタイプ、broadcast_node を導入することです。broadcast_node は、受け取ったメッセージをすべてのサクセサーにブロードキャストします。
この変更により、ループの 2 つの try_put を 1 つの try_put に置換できます。
broadcast_node<int> b(g); make_edge( b, squarer ); make_edge( b, cuber ); for ( int i = 1; i <= 10; ++i ) { b.try_put(i); } g.wait_for_all();
上記の単純なデータフロー・グラフのような実装にする、さらに優れた方法は、source_node を導入することです。source_node は、メッセージを送りますがメッセージを受け取りません。コンストラクターには、3 つの引数を指定します。
template< typename Body > source_node( graph &g, Body body, bool is_active=true)
引数 | 説明 |
---|---|
g |
ノードが属するグラフ |
body |
ソースノードのボディー |
is_active |
サクセサーがノードにアタッチされた直後にメッセージの送信を開始するか、アクティブになるまで待つかを設定します。 |
ボディーは、関数オペレーターを含む関数オブジェクトまたはラムダ式です。
bool Body::operator()(OutputType &v );
ランタイム・ライブラリーは、false を返すまで source_node のボディーを繰り返し呼び出します。例のループは source_node に置換することができます。
source_node< int > src( g, src_body(10), false ); make_edge( src, squarer ); make_edge( src, cuber ); src.activate(); g.wait_for_all();
ランタイム・ライブラリーは、false を返すまで src_body の関数オペレーターを繰り返し呼び出します。このため、上記の単純なデータフロー・グラフのループのボディーのように動作するボディーを作成する必要があります。これらの変更をすべて行った後の最終的な実装を次に示します。
class src_body { const int my_limit; int my_next_value; public: src_body(int l) : my_limit(l), my_next_value(1) {} bool operator()( int &v ) { if ( my_next_value <= my_limit ) { v = my_next_value++; return true; } else { return false; } } }; int main() { int sum = 0; graph g; function_node< int, int > squarer( g, unlimited, [](const int &v) { return v*v; } ); function_node< int, int > cuber( g, unlimited, [](const int &v) { return v*v*v; } ); function_node< int, int > summer( g, 1, [&](const int &v ) -> int { return sum += v; } ); make_edge( squarer, summer ); make_edge( cuber, summer ); source_node< int > src( g, src_body(10), false ); make_edge( src, squarer ); make_edge( src, cuber ); src.activate(); g.wait_for_all(); cout << "Sum is " << sum << "\n"; }
この最終的なコードには、上記の単純なデータフロー・グラフのノードとエッジがすべて含まれます。この単純な例では、明示的なループで source_node を用いる利点はあまりありません。しかし、source_node はダウンストリーム・ノードの動作に反応できるため、より複雑なグラフでのメモリーの使用を制限できます。詳細は、「トークンベース・システムの作成」を参照してください。