データフロー・グラフ

データフロー・グラフでは、ノードはデータメッセージの送受信を行います。一部のノードはメッセージを送るのみ、一部のノードはメッセージを受け取るのみ、一部のノードは受け取るメッセージに応じてメッセージを送ります。

次のデータフロー・グラフで、左端のノードは 1 から 10 までの整数値を生成し、2 つのサクセサーノードへ渡します。上のサクセサーノードは、受け取った値の 2 乗を計算して、結果をダウンストリームへ渡します。下のサクセサーノードは、受け取った値の 3 乗を計算して、結果をダウンストリームへ渡します。右端のノードは、中央の 2 つのノードから値を受け取ります。このノードは、値を受け取ると値の合計に追加します。このアプリケーションを完了するまで実行すると、値の合計は 1 から 10 までの 2 乗と 3 乗の合計になります。

単純なデータフロー・グラフ
単純なデータフロー・グラフ

次のコードは、上記の単純なデータフロー・グラフの実装を示しています。

    int sum = 0;
    graph g;
    function_node< int, int > squarer( g, unlimited, [](const int &v) { 
        return v*v; 
    } );
    function_node< int, int > cuber( g, unlimited, [](const int &v) { 
        return v*v*v; 
    } );
    function_node< int, int > summer( g, 1, [&](const int &v ) -> int { 
        return sum += v; 
    } );
    make_edge( squarer, summer );
    make_edge( cuber, summer );

    for ( int i = 1; i <= 10; ++i ) {
      squarer.try_put(i);
      cuber.try_put(i);
    }
    g.wait_for_all();

    cout << "Sum is " << sum << "\n";

上記の実装では、次の function_node が作成されます。

squarer と cuber ノードは並列に実行しても問題ないため、同時実行数を unlimited にして作成しています。summer ノードはグローバル変数を参照して合計を更新するため、並列に実行できません。このため、同時実行数を 1 に制限して作成しています。上記の単純なデータフロー・グラフのノード F は、squarer と cuber ノードの両方にメッセージを送るループとして実装されています。

最初の実装を改良するオプションの 1 つは、追加のノードタイプ、broadcast_node を導入することです。broadcast_node は、受け取ったメッセージをすべてのサクセサーにブロードキャストします。

この変更により、ループの 2 つの try_put を 1 つの try_put に置換できます。

    broadcast_node<int> b(g);
    make_edge( b, squarer );
    make_edge( b, cuber );
    for ( int i = 1; i <= 10; ++i ) {
      b.try_put(i);
    }
    g.wait_for_all();

上記の単純なデータフロー・グラフのような実装にする、さらに優れた方法は、source_node を導入することです。source_node は、メッセージを送りますがメッセージを受け取りません。コンストラクターには、3 つの引数を指定します。

template< typename Body > source_node( graph &g, Body body, bool is_active=true)
引数 説明
g

ノードが属するグラフ

body

ソースノードのボディー

is_active

サクセサーがノードにアタッチされた直後にメッセージの送信を開始するか、アクティブになるまで待つかを設定します。

ボディーは、関数オペレーターを含む関数オブジェクトまたはラムダ式です。

bool Body::operator()(OutputType &v );

ランタイム・ライブラリーは、false を返すまで source_node のボディーを繰り返し呼び出します。例のループは source_node に置換することができます。

    source_node< int > src( g, src_body(10), false );
    make_edge( src, squarer );
    make_edge( src, cuber );
    src.activate();
    g.wait_for_all();

ランタイム・ライブラリーは、false を返すまで src_body の関数オペレーターを繰り返し呼び出します。このため、上記の単純なデータフロー・グラフのループのボディーのように動作するボディーを作成する必要があります。これらの変更をすべて行った後の最終的な実装を次に示します。

    class src_body {
        const int my_limit;
        int my_next_value;
    public:
        src_body(int l) : my_limit(l), my_next_value(1) {}
        bool operator()( int &v ) {
            if ( my_next_value <= my_limit ) {
                v = my_next_value++;
                return true;
            } else {
                return false;
            }
        }
    };

    int main() {
      int sum = 0;
      graph g;
      function_node< int, int > squarer( g, unlimited, [](const int &v) { 
          return v*v; 
      } );
      function_node< int, int > cuber( g, unlimited, [](const int &v) { 
          return v*v*v; 
      } );
      function_node< int, int > summer( g, 1, [&](const int &v ) -> int { 
          return sum += v; 
      } );
      make_edge( squarer, summer );
      make_edge( cuber, summer );
      source_node< int > src( g, src_body(10), false );
      make_edge( src, squarer );
      make_edge( src, cuber );
      src.activate();
      g.wait_for_all();
      cout << "Sum is " << sum << "\n";
    }

この最終的なコードには、上記の単純なデータフロー・グラフのノードとエッジがすべて含まれます。この単純な例では、明示的なループで source_node を用いる利点はあまりありません。しかし、source_node はダウンストリーム・ノードの動作に反応できるため、より複雑なグラフでのメモリーの使用を制限できます。詳細は、「トークンベース・システムの作成」を参照してください。

関連情報