百度brpc 压测工具rpc_press解析

哈哈 阅读:949 2020-10-19 15:34:36 评论:0

1. 背景

昨天看到一段brpc中的压测代码rpc_press, 看着不错。整理一下。
发压工具的难点不是发送请求,而是要注意下面的2点:

  • 保证能发出足够的qps,比如上万qps
  • 控制发送合理的qps,比如控制为5qps,不可以大量发压

2. brpc 中的是关键实现

2.1 如何确保发送足够qps

rpc_press 采用多线程发送。
对于上万qps,多线程来分摊,每个线程发送qps控制在一定范围内。

2.2 如何控制合理qps

这是难点,线程如何控制发送qps。
看下面公式,1000000us(1s)需要发送200请求, 那50请求需要多少时间了?这就是brpc控制压力的核心原理。

核心原理: rpc_press中根据目前发送请求的速度与应该的速度进行比较,来判断是继续发压,还是usleep

2.3 rpc_press 代码

这里贴一点rpc_press工作线程git代码(引用自brpc)

void RpcPress::sync_client() { 
    double req_rate = _options.test_req_rate / _options.test_thread_num; 
    //max make up time is 5 s 
    if (_msgs.empty()) { 
        LOG(ERROR) << "nothing to send!"; 
        return; 
    } 
    const int thread_index = g_thread_count.fetch_add(1, butil::memory_order_relaxed); 
    int msg_index = thread_index; 
    std::deque<int64_t> timeq; 
    size_t MAX_QUEUE_SIZE = (size_t)req_rate; 
    if (MAX_QUEUE_SIZE < 100) { 
        MAX_QUEUE_SIZE = 100; 
    } else if (MAX_QUEUE_SIZE > 2000) { 
        MAX_QUEUE_SIZE = 2000; 
    } 
    timeq.push_back(butil::gettimeofday_us()); 
    while (!_stop) { 
        brpc::Controller* cntl = new brpc::Controller; 
        msg_index = (msg_index + _options.test_thread_num) % _msgs.size(); 
        Message* request = _msgs[msg_index]; 
        Message* response = _pbrpc_client->get_output_message(); 
        const int64_t start_time = butil::gettimeofday_us(); 
        google::protobuf::Closure* done = brpc::NewCallback< 
            RpcPress,  
            RpcPress*,  
            brpc::Controller*,  
            Message*,  
            Message*, int64_t> 
            (this, &RpcPress::handle_response, cntl, request, response, start_time); 
        const brpc::CallId cid1 = cntl->call_id(); 
        _pbrpc_client->call_method(cntl, request, response, done); 
        _sent_count << 1; 
 
        if (_options.test_req_rate <= 0) {  
            brpc::Join(cid1); 
        } else { 
            int64_t end_time = butil::gettimeofday_us(); 
            int64_t expected_elp = 0; 
            int64_t actual_elp = 0; 
            timeq.push_back(end_time); 
            if (timeq.size() > MAX_QUEUE_SIZE) { 
                actual_elp = end_time - timeq.front(); 
                timeq.pop_front(); 
                expected_elp = (int64_t)(1000000 * timeq.size() / req_rate); 
            } else { 
                actual_elp = end_time - timeq.front(); 
                expected_elp = (int64_t)(1000000 * (timeq.size() - 1) / req_rate); 
            } 
            if (actual_elp < expected_elp) { 
                usleep(expected_elp - actual_elp); 
            } 
        } 
    } 
} 
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号