How to abort the execution of a node and its children in tbb flowgraph

I am currently testing the tbb stream stream function. To use it, I must be able to interrupt the execution of some node in the graph, including all the children that depend on it, but leave other children who are not dependent on it, are executed. Throwing an exception from the body or the calling task :: cancel_group_execution () interrupts all nodes.

#include <cstdio>
#include "tbb/flow_graph.h"

using namespace tbb::flow;

struct body
{   std::string my_name;
    body( const char *name ) : my_name(name)
    {
    }
    void operator()( continue_msg ) const
    {   if (my_name == "B")
            tbb::task::self().group()->cancel_group_execution();
        else
        {   sleep(1);
            printf("%s\n", my_name.c_str());
        }
    }
};

int main()
{
    graph g;

    broadcast_node< continue_msg > start(g);
    continue_node<continue_msg> a( g, body("A"));
    continue_node<continue_msg> b( g, body("B"));
    continue_node<continue_msg> c( g, body("C"));
    continue_node<continue_msg> d( g, body("D"));
    continue_node<continue_msg> e( g, body("E"));

    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, c );
    make_edge( b, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i )
        try
        {   start.try_put( continue_msg() );
            g.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
    return 0;
}
+4
source share
2 answers

bool continue_msg. process_node node , , node.

struct body
{   std::string my_name;
    body( const char *name ) : my_name(name)
    {
    }
    bool operator()( bool avail ) const
    {   if (!avail)
           printf("%s skipped\n", my_name.c_str());
        else
            if (my_name == "B")
            {   printf("%s fail\n", my_name.c_str());
                avail = false;  // fail task
            }
            else
            {   sleep(1);
                printf("%s\n", my_name.c_str());
            }
        return avail;
    }
};

int main()
{
    graph g;

    typedef function_node<bool, bool> process_node;
    typedef std::tuple<bool,bool> bool_pair;
    broadcast_node< bool > start(g);
    process_node a( g, unlimited, body("A"));
    process_node b( g, unlimited, body("B"));
    process_node c( g, unlimited, body("C"));
    join_node<bool_pair> join_c(g);
    function_node<bool_pair, bool> and_c(g, unlimited, [](const bool_pair& in)->bool {
        return std::get<0>(in) && std::get<1>(in);
    });
    process_node d( g, unlimited, body("D"));
    process_node e( g, unlimited, body("E"));

    /*
     * start -+-> A -+-> E
     *        |       \
     *        |        \
     *        |         join_c -> and_c -> C -> D
     *        |        /
     *        |       /
     *        +-> B -- 
     */
    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, input_port<0>(join_c) );
    make_edge( b, input_port<1>(join_c) );
    make_edge( join_c, and_c );
    make_edge( and_c, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i )
        try
        {   start.try_put( true );
            g.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
    return 0;
}
+2

, task_group_contexts. :

#include "tbb/task.h"

:

int main()
{
    tbb::task_group_context tgc1;
    tbb::task_group_context tgc2;
    graph g1(tgc1);
    graph g2(tgc2);
    printf("Constructing graph\n");
    broadcast_node< continue_msg > start(g1);
    continue_node<continue_msg> a( g1, body("A"));
    continue_node<continue_msg> b( g2, body("B"));
    continue_node<continue_msg> c( g2, body("C"));
    continue_node<continue_msg> d( g2, body("D"));
    continue_node<continue_msg> e( g1, body("E"));

    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, c );
    make_edge( b, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i ) {
        try
        {   
            printf("broadcasting graph %d\n", i);
            start.try_put( continue_msg() );
            g1.wait_for_all();
            g2.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
        g1.wait_for_all();
        g1.reset();
        g2.reset();
    }
    return 0;
}

task_group_context ( ). g2 g1. , , catch , . , , A E.

graph with multiple task_group_contexts

, . reset() reset continue_nodes '. , , , , g1 catch(...), g1.wait_for_all() try/catch. , .

B a multifunction_node continue_msg continue_msg:

typedef multifunction_node<continue_msg, tuple<continue_msg> > mf_type;

struct mf_body {
    std::string my_name;
    mf_body(const char *name) : my_name(name) {}
    void operator()(continue_msg, mf_type::output_ports_type &op) {
        if(my_name == "B") {
            printf("%s returning without sending\n", my_name.c_str());
            return;
        }
        sleep(1);
        get<0>(op).try_put(continue_msg());
        return;
    }
};

node B:

mf_type b( g, unlimited, mf_body("B"));

B C :

make_edge( output_port<0>(b), c ); 

. node B , continue_msg . node B , node C , continue_msgs . reset , reset C count.

multifunction_node , . , a multifunction_node continue_msg continue_node. continue_node continue_msgs, ( .) multifunction_node , continue_msg, , . multifunction_nodes.

+4

All Articles