revalidator是做什么的?需要知道哪些东西?

有关于revalidator需要弄明白的是以下三个问题:
  1. 通过ovs-vsctl list open_vs可以看到other_config里面有两个变量线程数配置:n-handler-threadsn-revalidator-threads,很明显这是两个线程数量的配置,但这两个线程都是做什么的做什么的?
  2. 除了设置线程数的配置,还有两个:max-idlemax-revalidator,这两个是做什么的?
  3. ovs-appctl upcall/show里显示的dump duration时间和上边的有什么关系?
  4. flow-limit也会说到,这个flow-limit是如何限制的?

答案在文尾。

需要记得的东西

牢记这个图,各种一层层配置,一层层调用
 
tnl_conf_seq:Incremented whenever tnl route, arp, etc changes.
flow_restore_wait:最开始1,然后第一次config就就给变成0了。
 

概括:

handler:
handle在dpdk版本其实是个空线程。内核版才有实现。handler线程只对普通ovs下,从内核upcall的消息进行处理。ovs+dpdk下,handler线程也存在,但是一直处于堵塞状态,实际上什么也没干。ovs+dpdk下的慢速路径的处理直接由收包线程执行,由pmd线程做的。
 
revalidator:(关键)
  • 为什么要重验证呢?实际上真正有效的规则是openflow规则,datapath里面存的megaflow只不过是pmd进行upcall之后翻译的openflow规则的缓存,当openflow规则发生改变时,如果修改megaflow缓存呢,megaflow如果判断缓存是否还是正确的呢,这个就是revalidate做的,那么可以用几个revalidator线程去做这种事。
  • revalidator检查快速路径中的流缓存(pmd->flow_table),检查流及其action是否还继续有效(max-idle就是超时时间,默认10s)。若继续有效则保留它不做修改。若能对流及其action做部分修改后保持其有效则修改后保留它。若流已完全无效则从快速路径的流缓存中删除流。
 
revalidation的2种触发机制:
  • 一是周期性触发,这是通过leader线程每500ms改变一次udpif->dump_seq进行的在revalidate(revalidator)函数中会检查ukey->dump_seq是否等于udpif->dump_seq相等时不验证。因为leader线程会周期性改变udpif->dump_seq,所以流也会被周期性验证每一轮revalidation之后,ukey->dump_seq会被赋值为udpif->dump_seq。
  • 另一种是主动通知。当ofproto层改变了流表规则,会调用udpif_revalidate()主动改变udpif->reval_seq。在revalidate_ukey()中会检查ukey->reval_seq是否等于udpif->reval_seq,如果不相等则必须重验证流。reval_seq不相等的另一层含义是我们不能再使用之前创建的ukey->xcache,而必须新创建一个。一般来说,udpif->reval_seq由某些上层的事件触发,它的变化频率没有udpif->dump_seq那么高在revalidate_ukey()中验证完流以后会把ukey->reval_seq 赋值为udpif->reval_seq。
 
配置生效和多线程的同步:
配置是从bridge调ofproto,ofproto里面设置参数,然后掉udpif,udpif从ofproto里拿参数设置到自己维护的数据结构里。根据配置生成对应个数个线程。里面用到latch和ovs-rcu进行一些同步。配置时发现需要重新调节线程数,就发个通知给latch_exit,然后rcu等待。跑着的线程while循环每次会查看这个latch_exit,决定是否退出。并把megaflow清空。

配置生效过程

set config

在bridge_run->bridge_reconfigure里面,ofproto层会set这两个线程数量,然后在下一层的dpif也设置好,框架:
bridge_run
  bridge_init_ofproto // 这个里面很巧妙,使用了一个static bool变量,只会被init一次
    ofproto_init      // 绑定ofproto_class, ofproto-dpif.c里
  ofproto_set_flow_restore_wait // flow_restore_wait,没见过other_config里设置的,都是默认值0
  bridge_run__
    ofproto_enumerate_types // 绑定dpif_netdev_class
      ofproto_type_run
        ofproto_class->type_run  // 上边绑定的,ofproto-dpif.c里
          //在dpif创建前不会走到这,进来判断没有就返回了
          if (backer->recv_set_enable)
            udpif_set_threads(backer->udpif, n_handlers, n_revalidators);
  bridge_reconfigure
    // 这四个变量,在dpif层就是从这里拿的
    ofproto_set_flow_limit(ofproto_flow_limit)
    ofproto_set_max_idle(ofproto_max_idle)
    ofproto_set_max_revalidator(ofproto_max_revalidator)
    ofproto_set_threads(n_handlers, n_revalidators)
    ofproto_create // 进入dpif层
      ofproto->ofproto_class->construct // 这个ofproto_class就是上边绑定的,ofproto-dpif.c里
        open_dpif_backer
          dpif_create_and_open
          udpif_create // 这里创建之后,上边的udpif_set_threads才会执行
            atomic_init(&udpif->flow_limit, ofproto_flow_limit);       // 顺便提一下,flow-limit是这里设置的
          backer->recv_set_enable = !ofproto_get_flow_restore_wait();  // 默认为1,之后bridge_run__里判断就会命中
          udpif_set_threads(backer->udpif, n_handlers, n_revalidators) // 这里设置了
ofproto和dpif层set config的两个核心函数:
// ofproto层就单纯的根据other_config设置一下
// 如果没有设置revalidator,根据max(cpu核数,2)和handler线程数来确定设为几。
// handle也类似,代码一看就懂 
ofproto_set_threads(int n_handlers_, int n_revalidators_)
{
    int threads = MAX(count_cpu_cores(), 2);

    n_revalidators = MAX(n_revalidators_, 0);
    n_handlers = MAX(n_handlers_, 0);

    if (!n_revalidators) {
        n_revalidators = n_handlers
            ? MAX(threads - (int) n_handlers, 1)
            : threads / 4 + 1;
    }

    if (!n_handlers) {
        n_handlers = MAX(threads - (int) n_revalidators, 1);
    }
}

// dpif这里有三个函数重要,加粗了,分为三部分,暂停thread,设置thread数量,开始运行thread
// 但实际上设置和运行都在一个函数里面
void
udpif_set_threads(struct udpif *udpif, size_t n_handlers_,
                  size_t n_revalidators_)
{
    ovs_assert(udpif);
    ovs_assert(n_handlers_ && n_revalidators_);

    if (udpif->n_handlers != n_handlers_
        || udpif->n_revalidators != n_revalidators_) {
        udpif_stop_threads(udpif, true);
    }

    if (!udpif->handlers && !udpif->revalidators) {
        int error;
        // 不执行,因为dpif-netdev.c里面这个函数为NULL
        error = dpif_handlers_set(udpif->dpif, n_handlers_);
        if (error) {
            VLOG_ERR("failed to configure handlers in dpif %s: %s",
                     dpif_name(udpif->dpif), ovs_strerror(error));
            return;
        }

        udpif_start_threads(udpif, n_handlers_, n_revalidators_);
    }
}

// 在运行线程数的和设置的不同就会执行

udpif_stop_threads:
udpif->n_handlers != 0 || udpif->n_revalidators != 0
    latch_set(&udpif->exit_latch); // 一个通讯机制,set后正在运行的handle和revalidator线程,会判断然后退出
    //先别管什么rcu,ovs_barrier,因为我也不懂,之后再看
    xpthread_join(udpif->handlers[i].thread, NULL);
    xpthread_join(udpif->revalidators[i].thread, NULL);
    dpif_disable_upcall(udpif->dpif); // 在dpif-netdev里面,把upcall的读写锁加写锁,这样就不会执行upcall了
    revalidator_purge(&udpif->revalidators); // 把megaflow清光
    latch_poll(&udpif->exit_latch);   
    // 把参数和指针重置
    free(udpif->revalidators);  free(udpif->handlers);
    udpif->revalidators = NULL; udpif->handlers = NULL;
    udpif->n_revalidators = 0; udpif->n_handlers = 0;

// start当!udpif->handlers && !udpif->revalidators才会执行
udpif_start_threads:
    udpif->n_handlers = n_handlers_;
    udpif->n_revalidators = n_revalidators_;
    dpif_enable_upcall(udpif->dpif); // 把upcall的读锁释放
    // 然后就会ovs_thread_create()对应个数个handle和revalidator
    // 相当于这里就启动了

 

handler线程:dpdk版是个空线程

  • 上边最后一部就生成了对应属两个线程:udpif_upcall_handler and udpif_revalidator
  • 这两个线程通过udpif->exit_latch感知到是否需要退出
 
udpif_upcall_handler(recv_upcalls)。如果是dpif-netdev,那么就什么都不做,内核版的才会做。
static void *
udpif_upcall_handler(void *arg)
{
    struct handler *handler = arg;
    struct udpif *udpif = handler->udpif;
    // 上边如果stop里面set了就会退出
    while (!latch_is_set(&handler->udpif->exit_latch)) {
        //recv_upcalls用于接收upcall消息。只有kernel path提供了recv函数dpif_netlink_recv,所以说handler线程对dpdk path不生效。
        //recv_upcalls返回值大于0,说明已经处理过upcall消息,防止
        //丢失后续的upcall消息,线程不能堵塞,需要再次调用recv_upcalls
        //尝试接收upcall消息,所以需要调用poll_immediate_wake将
        //timeout_when设置为最小值LLONG_MIN,这样poll_block调用poll函数就能立即返回,继续执行recv_upcalls。
        //recv_upcalls返回值小于0,说明没有upcall消息,dpif_recv_wait调用等待upcall消息即可。
        if (recv_upcalls(handler)) {
            poll_immediate_wake();
        } else {
            //等待upcall消息
            dpif_recv_wait(udpif->dpif, handler->handler_id);
            //等待exit消息
            latch_wait(&udpif->exit_latch);
        }
        //堵塞在poll函数上,超时时间timeout_when为最大值,等待事件发生
        poll_block();
    }

    return NULL;
}

recv_upcalls(struct handler *handler){
    n_upcalls = 0;
    // UPCALL_MAX_BATCH = 64
    while (n_upcalls < UPCALL_MAX_BATCH) {
        // 调用udpif->dpif->dpif_class->recv,因为并没有实现recv,这里return的是EAGAIN
        // 相当于没有upcall
        if (dpif_recv(udpif->dpif, handler->handler_id, dupcall, recv_buf)) {
            // free
            ofpbuf_uninit(recv_buf);
            break;
        }
        ...
    }
}

revalidator线程

ovs barrier

这里先说一下ovs_barrier,是用于多线程同步的一个东西,就用到了个seq。barrier有个size表示线程数,每个线程运行ovs_barrier_block,都会先把barrier->count++,然后除了最后一个发现count+1=size,执行block会修改seq,其他的线程只会阻塞住。比如4个线程,前3个线程进去后count变成3,然后会while循环seq,如果不变就一直阻塞。第4个线程进去,发现count+1=4了,就修改seq,这个时候其他三个线程就发现seq变了,就又不阻塞了。
void
ovs_barrier_block(struct ovs_barrier *barrier)
{
    uint64_t seq = seq_read(barrier->seq);
    uint32_t orig;

    orig = atomic_count_inc(&barrier->count);
    if (orig + 1 == barrier->size) {
        atomic_count_set(&barrier->count, 0);
        /* seq_change() serves as a release barrier against the other threads,
         * so the zeroed count is visible to them as they continue. */
        seq_change(barrier->seq);
    } else {
        /* To prevent thread from waking up by other event,
         * keeps waiting for the change of 'barrier->seq'. */
        while (seq == seq_read(barrier->seq)) {
            seq_wait(barrier->seq, seq);
            poll_block();
        }
    }
}

主函数udpif_revalidator

第一个revalidator是leader线程,leader会调dpif_flow_dump_create,dpif-netdev里面的dpif_netdev_flow_dump_create。leader线程每次循环会让udpif->dump_seq++,其他线程在revalidator时候会根据ukey->dump_seq对比udpif->dump_seq,如果相等表示已经被验证了,不相等就验证后让其相等。
udpif_revalidator(void *arg)
{
    /* Used by all revalidators. */
    struct revalidator *revalidator = arg;
    struct udpif *udpif = revalidator->udpif;
    bool leader = revalidator == &udpif->revalidators[0];

    /* Used only by the leader. */
    long long int start_time = 0;
    uint64_t last_reval_seq = 0;
    size_t n_flows = 0;

    revalidator->id = ovsthread_id_self();
    for (;;) {
        if (leader) {
            // recirc_run()回收释放struct recirc_id_node内的struct recirc_state结构
            recirc_run();
            // 在dpif层(datapath)读取流的总数,udpif->n_flows_timestamp记录了上一次读取操作的
            // 时间,udpif->n_flows记录了上一次读取的值,如果当前时间与上一次读取操作的时间间隔
            // 不超过100ms,则继续使用上一次读取的值。否则执行一次新的读取操作
            n_flows = udpif_get_n_flows(udpif);
            
            udpif->pause = latch_is_set(&udpif->pause_latch);
            udpif->reval_exit = latch_is_set(&udpif->exit_latch);
            start_time = time_msec();
            if (!udpif->reval_exit) {
                bool terse_dump;

                terse_dump = udpif_use_ufid(udpif);
                // dpif_flow_dump_create()创建一个dump上下文
                udpif->dump = dpif_flow_dump_create(udpif->dpif, terse_dump,
                                                    NULL);
            }
        }
        /* Wait for the leader to start the flow dump. */
        ovs_barrier_block(&udpif->reval_barrier);
        if (udpif->pause) {
            revalidator_pause(revalidator);
        }
        // 更改线程数量时候,这里会退出
        if (udpif->reval_exit) {
            break;
        }
        // 最重要的地方,进行验证,决定megaflow是否删除,修改
        revalidate(revalidator);

        /* Wait for all flows to have been dumped before we garbage collect. */
        ovs_barrier_block(&udpif->reval_barrier);
        // 上边的revalidate只能重验证datapath里面有的flow,但是有些被删了,
        // 可是ukey还存在,sweep就是清除残余无用的ukey
        revalidator_sweep(revalidator);
        
        ovs_barrier_block(&udpif->reval_barrier);

        if (leader) {
            unsigned int flow_limit;
            long long int duration;

            atomic_read_relaxed(&udpif->flow_limit, &flow_limit);

            dpif_flow_dump_destroy(udpif->dump);
            // 这里会dump_seq++
            seq_change(udpif->dump_seq);
            // 默认ofproto_flow_reduce_point=2000ms, ofproto_flow_quater_point=1300ms
            // 1. 如果revalidate时间大于2秒,就将flow_limit调小duration/1000倍
            // 2. 如果revalidate时间大于1.3秒,就将flow_limit调成3/4
            // 3. 如果revalidate时间小于1秒并且flow_limit小于实际上1秒可以revalidate的数量,就+1000
            duration = MAX(time_msec() - start_time, 1);
            udpif->dump_duration = duration;
            if (duration > ofproto_flow_reduce_point) { 
                flow_limit /= duration / 1000;
            } else if (duration > ofproto_flow_quater_point) {
                flow_limit = flow_limit * 3 / 4;
            } else if (duration < 1000 &&
                       flow_limit < n_flows * 1000 / duration) {
                flow_limit += 1000;
            }
            flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
            atomic_store_relaxed(&udpif->flow_limit, flow_limit);

            if (duration > 2000) {
                VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
                          duration);
            }
            // 这里就会用上设置的max_revalidator,比如设置了4秒,其中进行revalidate花费了2秒,就会再睡眠2秒
            // 表示4秒进行一次revalidate
            poll_timer_wait_until(start_time + MIN(ofproto_max_idle,
                                                   ofproto_max_revalidator));
            seq_wait(udpif->reval_seq, last_reval_seq);
            latch_wait(&udpif->exit_latch);
            latch_wait(&udpif->pause_latch);
            poll_block();

            if (!latch_is_set(&udpif->pause_latch) &&
                !latch_is_set(&udpif->exit_latch)) {
                long long int now = time_msec();
                /* Block again if we are woken up within 5ms of the last start
                 * time. */
                start_time += 5;
                // 如果上边的revalidate什么的加一起5ms内搞完,就再block着
                if (now < start_time) {
                    poll_timer_wait_until(start_time);
                    latch_wait(&udpif->exit_latch);
                    latch_wait(&udpif->pause_latch);
                    poll_block();
                }
            }
        }
    }

    return NULL;
}

1. revalidate:重验证

需要注意的是并不是一个revalidator线程处理一个pmd内的流,而是所有revalidator线程先处理同一个pmd内的流,这个pmd内的流都处理完毕了,所有revalidator线程再转向处理另一个pmd内的流。
revalidate(struct revalidator *revalidator)
{
    uint64_t odp_actions_stub[1024 / 8];
    struct ofpbuf odp_actions = OFPBUF_STUB_INITIALIZER(odp_actions_stub);

    struct udpif *udpif = revalidator->udpif;
    struct dpif_flow_dump_thread *dump_thread;
    uint64_t dump_seq, reval_seq;
    unsigned int flow_limit;

    dump_seq = seq_read(udpif->dump_seq);
    reval_seq = seq_read(udpif->reval_seq);
    atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
    dump_thread = dpif_flow_dump_thread_create(udpif->dump);
    for (;;) {
        struct ukey_op ops[REVALIDATE_MAX_BATCH];
        int n_ops = 0;

        struct dpif_flow flows[REVALIDATE_MAX_BATCH];
        const struct dpif_flow *f;
        int n_dumped;

        long long int max_idle;
        long long int now;
        size_t n_dp_flows;
        bool kill_them_all;
        //批量dump流
        n_dumped = dpif_flow_dump_next(dump_thread, flows, ARRAY_SIZE(flows));
        if (!n_dumped) {
            break;
        }

        now = time_msec();
        // 1. megaflow>flow_limit*2就全删
        // 2. megaflow大于flow_limit但小于2倍flow_limit,把max-idle改成100ms,加快删除
        n_dp_flows = udpif_get_n_flows(udpif);
        kill_them_all = n_dp_flows > flow_limit * 2;
        max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;

        udpif->dpif->current_ms = time_msec();
        for (f = flows; f < &flows[n_dumped]; f++) {
            // 上一次被命中的时间
            long long int used = f->stats.used;
            struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER;
            enum reval_result result;
            struct udpif_key *ukey;
            bool already_dumped;
            int error;
            // 对于dump的每一条流,lock它的ukey,如果流还没有ukey就为它分配一个
            if (ukey_acquire(udpif, f, &ukey, &error)) {
                if (error == EBUSY) {
                    /* Another thread is processing this flow, so don't bother
                     * processing it.*/
                    COVERAGE_INC(upcall_ukey_contention);
                } else {
                    log_unexpected_flow(f, error);
                    if (error != ENOENT) {
                        delete_op_init__(udpif, &ops[n_ops++], f);
                    }
                }
                continue;
            }
            
            // 发现ukey和udpif的dump_seq相等,说明被dump过了,别的线程就不会dump了
            already_dumped = ukey->dump_seq == dump_seq;
            if (already_dumped) {
                continue;
            }
            
            ukey_update_meter_stats(ukey, f);
            if (!used) {
                used = ukey->created;
            }
            // 全删或者超过的max-idle就删除
            if (kill_them_all || (used && used < now - max_idle)) {
                result = UKEY_DELETE;
            } else {
                result = revalidate_ukey(udpif, ukey, &f->stats, &odp_actions,
                                         reval_seq, &recircs);
            }
            // 更新ukey的dump_seq
            ukey->dump_seq = dump_seq;

            if (result != UKEY_KEEP) {
                /* Takes ownership of 'recircs'. */
                reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs,
                              &odp_actions);
            }
            ovs_mutex_unlock(&ukey->mutex);
        }

        if (n_ops) {
            /* Push datapath ops but defer ukey deletion to 'sweep' phase. */
            push_dp_ops(udpif, ops, n_ops);
        }
        ovsrcu_quiesce();
    }
    dpif_flow_dump_thread_destroy(dump_thread);
    ofpbuf_uninit(&odp_actions);
}

revalidate_ukey:真正对ukey进行重验证

should_revalidator会判断下是否值得重新验, 如果不值,会直接删掉udpif_key,那么同时也会删除megaflows。值不值条件有两种:1. 如果dump时间小于max-revalidate/2就值。 2. megaflow的pps>5
static enum reval_result
revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
                const struct dpif_flow_stats *stats,
                struct ofpbuf *odp_actions, uint64_t reval_seq,
                struct recirc_refs *recircs)
    OVS_REQUIRES(ukey->mutex)
{
    // 每一轮dump,udpif->reval_seq会加1,每个ukey也有一个ukey->reval_seq
    // 表示上一次验证时的序号当两者不同时说明需要对流进行重验证了。
    bool need_revalidate = ukey->reval_seq != reval_seq;
    enum reval_result result = UKEY_DELETE;
    struct dpif_flow_stats push;

    ofpbuf_clear(odp_actions);

    push.used = stats->used;
    push.tcp_flags = stats->tcp_flags;
    push.n_packets = (stats->n_packets > ukey->stats.n_packets
                      ? stats->n_packets - ukey->stats.n_packets
                      : 0);
    push.n_bytes = (stats->n_bytes > ukey->stats.n_bytes
                    ? stats->n_bytes - ukey->stats.n_bytes
                    : 0);

    if (need_revalidate) {
        // 如果命中这条流的数据包的速率不超过5pps则删除它,如果自上次重验证以来还没有包经过这条流则保留这条流
        // 如果自上次重验证以来ukey->xcache还没有超时删除则保留这条流。
        if (should_revalidate(udpif, push.n_packets, ukey->stats.used)) {
            if (!ukey->xcache) {
                ukey->xcache = xlate_cache_new();
            } else {
                xlate_cache_clear(ukey->xcache);
            }
            result = revalidate_ukey__(udpif, ukey, push.tcp_flags,
                                       odp_actions, recircs, ukey->xcache);
        } /* else delete; too expensive to revalidate */
    } else if (!push.n_packets || ukey->xcache
               || !populate_xcache(udpif, ukey, push.tcp_flags)) {
        result = UKEY_KEEP;
    }

    /* Stats for deleted flows will be attributed upon flow deletion. Skip. */
    if (result != UKEY_DELETE) {
        if (ukey->xcache)
            xlate_cache_push_meter_stats(ukey->xcache, ukey);
        xlate_push_stats(ukey->xcache, &push);
        ukey->stats = *stats;
        ukey->reval_seq = reval_seq;
    }

    return result;
}

 

push_dp_ops:根据重验证结果修改dp的megaflow

在revalidate最后边,如果发现result != UKEY_KEEP,就把对应操作放到ops里面,这一组flow全部重验证后,最后会执行push_dp_ops->dpif_operate->dpif->dpif_class->operate,掉dpif-netdev.c里面的函数执行dpif_netdev_operate
static void
dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops,
                    enum dpif_offload_type offload_type OVS_UNUSED)
{
    size_t i;

    for (i = 0; i < n_ops; i++) {
        struct dpif_op *op = ops[i];

        switch (op->type) {
        // 对应的就是revalidate里面的UKEY_MODIFY
        case DPIF_OP_FLOW_PUT:
            op->error = dpif_netdev_flow_put(dpif, &op->flow_put);
            break;
            
        // 删除megaflow,对应UKEY_DELETE
        case DPIF_OP_FLOW_DEL:
            op->error = dpif_netdev_flow_del(dpif, &op->flow_del);
            break;
        
        //这两个都是pmd自己upcall做的
        case DPIF_OP_EXECUTE:
            op->error = dpif_netdev_execute(dpif, &op->execute);
            break;

        case DPIF_OP_FLOW_GET:
            op->error = dpif_netdev_flow_get(dpif, &op->flow_get);
            break;
        }
    }
}

2.revalidator_sweep 清处多余的ukey

leader线程在每一轮revalidation后递增ukey->dump_seq每条流被revalidate后其ukey->dump_seq 赋值为 dump_seq在revaliadtion的第一个步骤dump中,只能重验证那些现在存在于datapath中的流,但是有很多情况下流可能已经从datapath中删除了,但是它的ukey还保存在udpif->ukeys中,这时就需要对这些残余的ukey进行清理了,这就是revaliadtion的第二个步奏seep。掉的是revalidator_sweep__(revalidator, false);
revalidator_sweep__最后会走push_ukey_ops,发现如果datapath里面是删除操作,则把ukey也删掉

push_ukey_ops(struct udpif *udpif, struct umap *umap,
              struct ukey_op *ops, size_t n_ops)
{
    int i;

    push_dp_ops(udpif, ops, n_ops);
    ovs_mutex_lock(&umap->mutex);
    for (i = 0; i < n_ops; i++) {
        // 如果datapath里是删除,则把ukey删除
        if (ops[i].dop.type == DPIF_OP_FLOW_DEL) {
            ukey_delete(umap, ops[i].ukey);
        }
    }
    ovs_mutex_unlock(&umap->mutex);
}

3.更新flow_limit,休眠:leader线程

这部分在主函数里的代码已经添加过了注释,主要调整flow-limit,进行睡眠,睡眠时间为MIN(ofproto_max_idle,
ofproto_max_revalidator) - revalidate耗时。更新dump_seq:seq_change(udpif->dump_seq);
 
 

最开始的问题

  1. 通过ovs-vsctl list open_vs可以看到other_config里面有两个变量线程数配置:n-handler-threadsn-revalidator-threads,很明显这是两个线程数量的配置,但这两个线程都是做什么的做什么的?
    • OVS-DPDK里面handler是个空线程,revalidator才是做重验证的,为的就是维护flow的的相关信息,当openflow规则发生改变,当flow超时等,去更新datapath里面的megaflow缓存。
  2. 除了设置线程数的配置,还有两个:max-idlemax-revalidator,这两个是做什么的?
    • max-idle是flow的超时时间,当megfalow数量超过flow_limit这个值会变成100ms,使其快速删除,默认是10s。
    • max-revalidator有俩作用,一个是进行完revalidate的休眠时间:为max-revalidator减去revalidate耗时。另一个是当revalidate的时间小于超过max-revalidator/2,表示耗时不会太多,就对flow进行revalidate。否则直接就删除了。
  3. ovs-appctl upcall/show里显示的dump duration时间和上边的有什么关系?
    • 就是执行完重验证revalidate耗时。
  4. flow-limit也会说到,这个flow-limit是如何限制的?
    • 默认ofproto_flow_reduce_point=2000ms, ofproto_flow_quater_point=1300ms
    • 初始化时udpif_create设置为ofproto_flow_limit。社区里的是MIN(ofproto_flow_limit, 10000)
    • 1. 如果revalidate时间大于2秒,就将flow_limit调小duration/1000倍
    • 2. 如果revalidate时间大于1.3秒,就将flow_limit调成3/4
    • 3. 如果revalidate时间小于1秒并且flow_limit小于实际上1秒可以revalidate的数量,就+1000
    • 如果当前megaflow > flow_limit*2,则将megaflow全删除

参考: