データフロー・グラフでは、ノードはデータメッセージの送受信を行います。一部のノードはメッセージを送るのみ、一部のノードはメッセージを受け取るのみ、一部のノードは受け取るメッセージに応じてメッセージを送ります。
次のデータフロー・グラフで、左端のノードは 1 から 10 までの整数値を生成し、2 つのサクセサーノードへ渡します。上のサクセサーノードは、受け取った値の 2 乗を計算して、結果をダウンストリームへ渡します。下のサクセサーノードは、受け取った値の 3 乗を計算して、結果をダウンストリームへ渡します。右端のノードは、中央の 2 つのノードから値を受け取ります。このノードは、値を受け取ると値の合計に追加します。このアプリケーションを完了するまで実行すると、値の合計は 1 から 10 までの 2 乗と 3 乗の合計になります。

次のコードは、上記の単純なデータフロー・グラフの実装を示しています。
int sum = 0;
graph g;
function_node< int, int > squarer( g, unlimited, [](const int &v) {
return v*v;
} );
function_node< int, int > cuber( g, unlimited, [](const int &v) {
return v*v*v;
} );
function_node< int, int > summer( g, 1, [&](const int &v ) -> int {
return sum += v;
} );
make_edge( squarer, summer );
make_edge( cuber, summer );
for ( int i = 1; i <= 10; ++i ) {
squarer.try_put(i);
cuber.try_put(i);
}
g.wait_for_all();
cout << "Sum is " << sum << "\n";
上記の実装では、次の function_node が作成されます。
squarer と cuber ノードは並列に実行しても問題ないため、同時実行数を unlimited にして作成しています。summer ノードはグローバル変数を参照して合計を更新するため、並列に実行できません。このため、同時実行数を 1 に制限して作成しています。上記の単純なデータフロー・グラフのノード F は、squarer と cuber ノードの両方にメッセージを送るループとして実装されています。
最初の実装を改良するオプションの 1 つは、追加のノードタイプ、broadcast_node を導入することです。broadcast_node は、受け取ったメッセージをすべてのサクセサーにブロードキャストします。
この変更により、ループの 2 つの try_put を 1 つの try_put に置換できます。
broadcast_node<int> b(g);
make_edge( b, squarer );
make_edge( b, cuber );
for ( int i = 1; i <= 10; ++i ) {
b.try_put(i);
}
g.wait_for_all();
上記の単純なデータフロー・グラフのような実装にする、さらに優れた方法は、source_node を導入することです。source_node は、メッセージを送りますがメッセージを受け取りません。コンストラクターには、3 つの引数を指定します。
template< typename Body > source_node( graph &g, Body body, bool is_active=true)
| 引数 | 説明 |
|---|---|
| g |
ノードが属するグラフ |
| body |
ソースノードのボディー |
| is_active |
サクセサーがノードにアタッチされた直後にメッセージの送信を開始するか、アクティブになるまで待つかを設定します。 |
ボディーは、関数オペレーターを含む関数オブジェクトまたはラムダ式です。
bool Body::operator()(OutputType &v );
ランタイム・ライブラリーは、false を返すまで source_node のボディーを繰り返し呼び出します。例のループは source_node に置換することができます。
source_node< int > src( g, src_body(10), false );
make_edge( src, squarer );
make_edge( src, cuber );
src.activate();
g.wait_for_all();
ランタイム・ライブラリーは、false を返すまで src_body の関数オペレーターを繰り返し呼び出します。このため、上記の単純なデータフロー・グラフのループのボディーのように動作するボディーを作成する必要があります。これらの変更をすべて行った後の最終的な実装を次に示します。
class src_body {
const int my_limit;
int my_next_value;
public:
src_body(int l) : my_limit(l), my_next_value(1) {}
bool operator()( int &v ) {
if ( my_next_value <= my_limit ) {
v = my_next_value++;
return true;
} else {
return false;
}
}
};
int main() {
int sum = 0;
graph g;
function_node< int, int > squarer( g, unlimited, [](const int &v) {
return v*v;
} );
function_node< int, int > cuber( g, unlimited, [](const int &v) {
return v*v*v;
} );
function_node< int, int > summer( g, 1, [&](const int &v ) -> int {
return sum += v;
} );
make_edge( squarer, summer );
make_edge( cuber, summer );
source_node< int > src( g, src_body(10), false );
make_edge( src, squarer );
make_edge( src, cuber );
src.activate();
g.wait_for_all();
cout << "Sum is " << sum << "\n";
}
この最終的なコードには、上記の単純なデータフロー・グラフのノードとエッジがすべて含まれます。この単純な例では、明示的なループで source_node を用いる利点はあまりありません。しかし、source_node はダウンストリーム・ノードの動作に反応できるため、より複雑なグラフでのメモリーの使用を制限できます。詳細は、「トークンベース・システムの作成」を参照してください。