首页 > 其他分享 >librdkafka的rdk:broker-1线程cpu百分百问题分析

librdkafka的rdk:broker-1线程cpu百分百问题分析

时间:2023-07-13 17:13:46浏览次数:46  
标签:__ rkq tv librdkafka broker tspec 000 线程 timeout

问题调用栈:

(gdb) bt
#0  0x000000000068307c in rd_kafka_q_pop_serve (rkq=0x1ff31a0, timeout_ms=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, 
    callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:373
#1  0x0000000000683130 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:399
#2  0x000000000066abcf in rd_kafka_broker_ops_serve (rkb=rkb@entry=0x1ff28e0, timeout_ms=<optimized out>) at rdkafka_broker.c:2510
#3  0x000000000066ac84 in rd_kafka_broker_serve (rkb=rkb@entry=0x1ff28e0, abs_timeout=abs_timeout@entry=71168163411018) at rdkafka_broker.c:2532
#4  0x000000000066b157 in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x1ff28e0, timeout_ms=<optimized out>, timeout_ms@entry=-1) at rdkafka_broker.c:2617
#5  0x000000000066c796 in rd_kafka_broker_thread_main (arg=arg@entry=0x1ff28e0) at rdkafka_broker.c:3571
#6  0x00000000006b8d87 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:583
#7  0x00007f80a07e1eb5 in start_thread () from /lib64/libpthread.so.0
#8  0x00007f80a05098fd in clone () from /lib64/libc.so.6

相关代码(rd_kafka_q_pop):

rd_kafka_op_t *rd_kafka_q_pop (rd_kafka_q_t *rkq, int timeout_ms,
                               int32_t version) {
	return rd_kafka_q_pop_serve(rkq, timeout_ms, version,
                                    RD_KAFKA_Q_CB_RETURN,
                                    NULL, NULL);
}

通过 gdb 观察到 timeout_ms 值为 1,也就是 1 毫秒,这是导致 cpu 百分百的原因:

(gdb) f 1
#1  0x0000000000683130 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:399
399     in rdkafka_queue.c
(gdb) info args
rkq = <optimized out>
timeout_ms = <optimized out>
version = 0
(gdb) info reg
rax            0x0      0
rbx            0x0      0
rcx            0x64ab3c08       1688943624
rdx            0x7f7f537b4240   140184838160960
rsi            0x1ff31a0        33501600
rdi            0x1ff31c8        33501640
rbp            0x1ff28e0        0x1ff28e0
rsp            0x7f7f537b4288   0x7f7f537b4288
r8             0x7      7
r9             0x4748105a       1195905114
r10            0x7b     123
r11            0x1da063a5132567 8339124156310887
r12            0x7f7f537b42f0   140184838161136
r13            0x40ba2119744a   71168163411018
r14            0x0      0
r15            0x4      4
rip            0x683130 0x683130 <rd_kafka_q_serve>
eflags         0x246    [ PF ZF IF ]
cs             0x33     51
ss             0x2b     43
ds             0x0      0
es             0x0      0
fs             0x0      0
gs             0x0      0
(gdb) p *(rd_kafka_q_t*)0x1ff31c8
$1 = {rkq_lock = {__data = {__lock = 0, __count = 4289904, __owner = 2144952, __nusers = 0, __kind = 2144952, __spins = 0, __elision = 0, __list = {__prev = 0x20bab8, 
        __next = 0x1ff31a0}}, 
    __size = "\000\000\000\000puA\000\270\272 \000\000\000\000\000\270\272 \000\000\000\000\000\270\272 \000\000\000\000\000\240\061\377\001\000\000\000", 
    __align = 18424997382979584}, rkq_cond = {__data = {__lock = 0, __futex = 0, __total_seq = 0, __wakeup_seq = 0, __woken_seq = 33501696, __mutex = 0x0, __nwaiters = 0, 
      __broadcast_seq = 0}, __size = '\000' <repeats 25 times>, "\062\377\001", '\000' <repeats 19 times>, __align = 0}, rkq_fwdq = 0x300000001, rkq_q = {
    tqh_first = 0x1ff1930, tqh_last = 0x0}, rkq_qlen = 0, rkq_qsize = 0, rkq_refcnt = 9087760, rkq_flags = 0, rkq_rk = 0x0, rkq_qio = 0x261, rkq_serve = 0x23, 
  rkq_opaque = 0x0, rkq_name = 0x1 <Address 0x1 out of bounds>}
(gdb) p *(int*)0x1ff31a0
$2 = 1

继续跟踪,问题发生在函数 cnd_timedwait_abs:

(gdb) b cnd_timedwait_abs
Breakpoint 1 at 0x6b92c0: file tinycthread_extra.c, line 95.
(gdb) c
Continuing.

Breakpoint 1, cnd_timedwait_abs (cnd=cnd@entry=0x1ff31c8, mtx=mtx@entry=0x1ff31a0, tspec=tspec@entry=0x7f7f537b4240) at tinycthread_extra.c:95
95      tinycthread_extra.c: No such file or directory.
(gdb) p *tspec
$3 = {tv_sec = 1688943624, tv_nsec = 1000000000}

函数 cnd_timedwait_abs 源码:

int cnd_timedwait_abs (cnd_t *cnd, mtx_t *mtx, const struct timespec *tspec) {
        if (tspec->tv_sec == RD_POLL_INFINITE)
                return cnd_wait(cnd, mtx);
        else if (tspec->tv_sec == RD_POLL_NOWAIT)
                return thrd_timedout;

        return cnd_timedwait(cnd, mtx, tspec); // 走这里来了
}

底层调用的是 Posix 的 pthread_cond_timedwait 函数:

#include <pthread.h>

int pthread_cond_timedwait(pthread_cond_t *restrict cond,
      pthread_mutex_t *restrict mutex,
      const struct timespec *restrict abstime);

函数 pthread_cond_timedwait 的参数 abstime 是个绝对时间,不是相对时间。初始化如下:

rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, int timeout_ms,
                                     int32_t version,
                                     rd_kafka_q_cb_type_t cb_type,
                                     rd_kafka_q_serve_cb_t *callback,
                                     void *opaque) {
    struct timespec timeout_tspec;

    rd_timeout_init_timespec(&timeout_tspec, timeout_ms);
    
    if (cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, &timeout_tspec) == thrd_timedout) {
        mtx_unlock(&rkq->rkq_lock);
    }
}

static RD_INLINE void rd_timeout_init_timespec (struct timespec *tspec, int timeout_ms) {
    if (timeout_ms == RD_POLL_INFINITE ||
        timeout_ms == RD_POLL_NOWAIT) {
        tspec->tv_sec = timeout_ms;
        tspec->tv_nsec = 0;
    } else {
        timespec_get(tspec, TIME_UTC); // 这里
        tspec->tv_sec  += timeout_ms / 1000;
        tspec->tv_nsec += (timeout_ms % 1000) * 1000000;
        if (tspec->tv_nsec > 1000000000) {
            tspec->tv_nsec -= 1000000000;
            tspec->tv_sec++;
        }
    }
}

函数 timespec_get:

/* If TIME_UTC is missing, provide it and provide a wrapper for
   timespec_get. */
#ifndef TIME_UTC
#define TIME_UTC 1
#define _TTHREAD_EMULATE_TIMESPEC_GET_

int _tthread_timespec_get(struct timespec *ts, int base);
#define timespec_get _tthread_timespec_get
#endif

#if defined(_TTHREAD_EMULATE_TIMESPEC_GET_)
int _tthread_timespec_get(struct timespec *ts, int base)
{
#if defined(_TTHREAD_WIN32_)
  struct _timeb tb;
#elif !defined(CLOCK_REALTIME)
  struct timeval tv;
#endif

  if (base != TIME_UTC) // 约束为 UTC,即世界统一时间
  {
    return 0;
  }

#if defined(_TTHREAD_WIN32_)
  _ftime_s(&tb);
  ts->tv_sec = (time_t)tb.time;
  ts->tv_nsec = 1000000L * (long)tb.millitm;
#elif defined(CLOCK_REALTIME)
  base = (clock_gettime(CLOCK_REALTIME, ts) == 0) ? base : 0;
#else
  gettimeofday(&tv, NULL);
  ts->tv_sec = (time_t)tv.tv_sec;
  ts->tv_nsec = 1000L * (long)tv.tv_usec;
#endif

  return base;
}
#endif /* _TTHREAD_EMULATE_TIMESPEC_GET_ */

回过头看函数 cnd_timedwait_abs 的参数 tspec 的值:

(gdb) p *tspec
$3 = {tv_sec = 1688943624, tv_nsec = 1000000000}

将 tv_sec 转为可读的值:

时间戳(秒)	1688943624
ISO 8601	2023-07-09T23:00:24.000Z
日期时间(UTC)	2023-07-09 23:00:24
日期时间(本地)	2023-07-10 07:00:24

而本地的实际时间为:

# date +'%Y-%m-%d %H:%M:%S'
2023-07-13 16:15:27

# date +'%s'
1689236277

# expr 1689236277 - 1688943624
292653
# expr 292653 / 3600
81

很明显 tspec 不对,。

准备进一步分析时,遇到 gdb 的 bug 了:

(gdb) b gettimeofday
Breakpoint 1 at 0x7fddd8ba9650 (2 locations)
(gdb) b clock_gettime
Breakpoint 2 at gnu-indirect-function resolver at 0x7fddd8c08800 (3 locations)
(gdb) c
Continuing.
../../gdb/elfread.c:1052: internal-error: elf_gnu_ifunc_resolver_return_stop: Assertion `b->loc->next == NULL' failed.
A problem internal to GDB has been detected,
further debugging may prove unreliable.
Quit this debugging session? (y or n)

进一步验证是发生在 clock_gettime,单独断点 gettimeofday 没有问题,而且不会进入 gettimeofday。

$ man clock_gettime

clock_getres(), clock_gettime(), clock_settime():
      _POSIX_C_SOURCE >= 199309L
      
#  ifdef __USE_POSIX199309
/* Identifier for system-wide realtime clock.  */
#   define CLOCK_REALTIME

找了个正常的,查看:

(gdb) p timeout_tspec
$2 = {tv_sec = 1689237800, tv_nsec = 687268217}

# expr 1689237800 - 1688943624
294176
# expr 294176 / 3600
81

暂时怀疑 clock_gettime 调用出问题了,实现在最新的代码中没有变化:https://github.com/confluentinc/librdkafka/blob/master/src/tinycthread.c。

标签:__,rkq,tv,librdkafka,broker,tspec,000,线程,timeout
From: https://www.cnblogs.com/aquester/p/17551484.html

相关文章

  • 现代C++(Modern C++)基本用法实践:八、线程支持
    概述在c++11之前,c++并未对线程编程提供直接的支持。在c++11之后,支持了线程管理、同步、条件变量等支持。在其他的c++库中(例如UE的线程库)还增加了多任务模型的抽象。用法举例参考测试项目的modrenc_auto_decltype.cpp文件主要内容:线程的创建使用future&async进行异步操作......
  • Java面试高频技术线程池,源码笔记答案全纪录
    有一定的java基础(线程),尤其是正要或正准备找工作的童鞋如果想在众多面试者中脱颖而出,你就需要多准备一些知识点,多刷一些面试题。而对于企业而言,有这么多的选择那我们就提高面试门槛,可能我需要的仅仅是CRUD的初中级,但我也希望你能了解JVM、多线程、Spring源码、Sql优化、分布......
  • SpringBoot中使用Netty开发WebSocket服务-netty-websocket-spring-boot-starter开源项
    场景SpringBoot+Vue整合WebSocket实现前后端消息推送:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/114392573SpringCloud(若依微服务版为例)集成WebSocket实现前后端的消息推送:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/114480731若依前后......
  • Java入门12(多线程)
    多线程线程的实现方式继承Thread类:一旦继承了Thread类,就不能再继承其他类了,可拓展性差实现Runnable接口:仍然可以继承其他类,可拓展性较好使用线程池继承Thread类​ 不能通过线程对象调用run()方法,需要通过t1.start()方法,使线程进入到就绪状态,只要进入到就绪状态......
  • Java之多线程的同步和死锁
    设计模式中的单例模式的懒汉方式会存在多线程的安全问题;通过以下测试代码可以看到两个线程中得到的并不是同一个单例对象;@TestpublicvoidunsafeSingleInstanceTest()throwsInterruptedException{AtomicReference<UnSafeSingleInstance>s1=newAtomicRe......
  • java中虚拟机栈是线程共享的吗?
    在Java中,每个线程都有自己的虚拟机栈。虚拟机栈是用于存储线程执行方法时的局部变量、方法参数、方法调用和返回的数据等信息的内存区域。每个方法在执行时都会在虚拟机栈上创建一个称为"栈帧"的数据结构,栈帧中包含了方法的局部变量表、操作数栈、动态链接等信息。由于每个线程都......
  • 进程与线程
    简而言之,一个程序至少有一个进程,一个进程至少有一个线程. 线程的划分尺度小于进程,使得多线程程序的并发性高。另外,进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运......
  • 线程阻塞案例分析
    线程阻塞案例分析一.Jstack打印快照1.jmeter运行压测脚本2.用jstack打印快照,下载到本地jstack112759>log1.txtjstack112759>log2.txtjstack112759>log3.txt3.在本地搜索是否有blocked关键字二.分析代码1.下载源码文件cn.testfan.perf.beihe.pinter.http.Cas......
  • Zephyr入门教程 2 线程
    RTOS的必要性当你开始增加你的嵌入式应用的功能时,在单一的主循环和一些中断例程中做所有的事情变得越来越难。通常情况下,下一级的复杂性是某种状态机,你的电子设备的输出会根据这个(内部)状态而改变。如果你需要能够同时操作多个复杂的输入和输出呢?一个很好的例子是TCP/IP连接,通过这......
  • 堆垛机西门子PLC程序+输送线程序+触摸屏程序。 物流仓
    堆垛机西门子PLC程序+输送线程序+触摸屏程序。物流仓储。涵盖通信,算法,运动控制,屏幕程序,可电脑仿真测试。实际项目完整程序。西门子S7-1200+G120+劳易测激光测距博途V15.1编程采用SCL高级编程语言。无加密。延申科普:物流仓储是一个涉及到供应链管理和仓库操作的领域。它涵盖了从货......