首页 > 其他分享 >面试官问我:线程锁导致的kafka客户端超时,如何解决?

面试官问我:线程锁导致的kafka客户端超时,如何解决?

时间:2023-12-18 16:58:18浏览次数:36  
标签:面试官 responses kafka 线程 new 超时 response

本文分享自华为云社区《线程锁导致的kafka客户端超时问题》,作者: 张俭 。

问题背景

有一个环境的kafka client发送数据有部分超时,拓扑图也非常简单

cke_114.png

定位历程

我们先对客户端的环境及JVM情况进行了排查,从JVM所在的虚拟机到kafka server的网络正常,垃圾回收(GC)时间也在预期范围内,没有出现异常。

紧接着,我们把目光转向了kafka 服务器,进行了一些基础的检查,同时也查看了kafka处理请求的超时日志,其中我们关心的metadata和produce请求都没有超时。

问题就此陷入了僵局,虽然也搜到了一些kafka server会对连上来的client反解导致超时的问题( https://github.com/apache/kafka/pull/10059),但通过一些简单的分析,我们确定这并非是问题所在。

同时,我们在环境上也发现一些异常情况,当时觉得不是核心问题/解释不通,没有深入去看

  • 问题JVM线程数较高,已经超过10000,这个线程数量虽然确实较高,但并不会对1个4U的容器产生什么实质性的影响。
  • 负责指标上报的线程CPU较高,大约占用了1/4 ~ 1/2 的CPU核,这个对于4U的容器来看问题也不大

当排查陷入僵局,我们开始考虑其他可能的调查手段。我们尝试抓包来找线索,这里的抓包是SASL鉴权+SSL加密的,非常难读,只能靠长度和响应时间勉强来推断报文的内容。

在这个过程中,我们发现了一个非常重要的线索,客户端竟然发起了超时断链,并且超时的那条消息,实际服务端是有响应回复的。

随后我们将kafka client的trace级别日志打开,这里不禁感叹kafka client日志打的相对较少,发现的确有log.debug(“Disconnecting from node {} due to request timeout.”, nodeId);的日志打印。

与网络相关的流程:

try {

// 这里发出了请求

client.send(request, time.milliseconds());

while (client.active()) {

List<ClientResponse> responses = client.poll(Long.MAX_VALUE, time.milliseconds());

for (ClientResponse response : responses) {

if (response.requestHeader().correlationId() == request.correlationId()) {

if (response.wasDisconnected()) {

throw new IOException("Connection to " + response.destination() + " was disconnected before the response was read");

}

if (response.versionMismatch() != null) {

throw response.versionMismatch();

}

return response;

}

}

}

throw new IOException("Client was shutdown before response was read");

} catch (DisconnectException e) {

if (client.active())

throw e;

else

throw new IOException("Client was shutdown before response was read");

}

这个poll方法,不是简单的poll方法,而在poll方法中会进行超时判断,查看poll方法中调用的handleTimedOutRequests方法

@Override

public List<ClientResponse> poll(long timeout, long now) {

ensureActive();

if (!abortedSends.isEmpty()) {

// If there are aborted sends because of unsupported version exceptions or disconnects,

// handle them immediately without waiting for Selector#poll.

List<ClientResponse> responses = new ArrayList<>();

handleAbortedSends(responses);

completeResponses(responses);

return responses;

}

long metadataTimeout = metadataUpdater.maybeUpdate(now);

try {

this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));

} catch (IOException e) {

log.error("Unexpected error during I/O", e);

}

// process completed actions

long updatedNow = this.time.milliseconds();

List<ClientResponse> responses = new ArrayList<>();

handleCompletedSends(responses, updatedNow);

handleCompletedReceives(responses, updatedNow);

handleDisconnections(responses, updatedNow);

handleConnections();

handleInitiateApiVersionRequests(updatedNow);

// 关键的超时判断

handleTimedOutRequests(responses, updatedNow);

completeResponses(responses);

return responses;

}

由此我们推断,问题可能在于客户端hang住了一段时间,从而导致超时断链。我们通过工具Arthas深入跟踪了Kafka的相关代码,甚至发现一些简单的操作(如A.field)也需要数秒的时间。这进一步确认了我们的猜想:问题可能出在JVM。JVM可能在某个时刻出现问题,导致系统hang住,但这并非由GC引起。

cke_115.png

为了解决这个问题,我们又检查了监控线程CPU较高的问题。我们发现线程的执行热点是从"sun.management.ThreadImpl"中的"getThreadInfo"方法。

"metrics-1@746" prio=5 tid=0xf nid=NA runnable

java.lang.Thread.State: RUNNABLE

at sun.management.ThreadImpl.getThreadInfo(Native Method)

at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:185)

at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:149)

进一步发现,在某些版本的JDK8中,读取线程信息是需要加锁的。

至此,问题的根源已经清晰明了:过高的线程数以及线程监控时JVM全局锁的存在导致了这个问题。您可以使用如下的demo来复现这个问题

import java.lang.management.ManagementFactory;

import java.lang.management.ThreadInfo;

import java.lang.management.ThreadMXBean;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

public class ThreadLockSimple {

public static void main(String[] args) {

for (int i = 0; i < 15_000; i++) {

new Thread(new Runnable() {

@Override

public void run() {

try {

Thread.sleep(200_000);

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

}

}).start();

}

ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

executorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

System.out.println("take " + " " + System.currentTimeMillis());

}

}, 1, 1, TimeUnit.SECONDS);

ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();

ScheduledExecutorService metricsService = Executors.newSingleThreadScheduledExecutor();

metricsService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

long start = System.currentTimeMillis();

ThreadInfo[] threadInfoList = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds());

System.out.println("threads count " + threadInfoList.length + " cost :" + (System.currentTimeMillis() - start));

}

}, 1, 1, TimeUnit.SECONDS);

}

}

为了解决这个问题,我们有以下几个可能的方案:

  • 将不合理的线程数往下降,可能存在线程泄露的场景
  • 升级jdk到jdk11或者jdk17(推荐)
  • 将Thread相关的监控临时关闭

这个问题的解决方案应根据实际情况进行选择,希望对你有所帮助。

 

点击关注,第一时间了解华为云新鲜技术~

 

标签:面试官,responses,kafka,线程,new,超时,response
From: https://www.cnblogs.com/huaweiyun/p/17911613.html

相关文章

  • Python多线程应用于自动化测试操作示例
    本文实例讲述了Python多线程应用于自动化测试操作。分享给大家供大家参考,具体如下:多线程执行测试用例实例:importthreadingfromtimeimportsleep,ctimefromseleniumimportwebdriver#测试用例1deftest_baidu(browser,search):print("开始,现在时间是%s"%ctime())print("......
  • linux 使用 mwget 实现多线程下载
    mwget和curl/multicurl一样,m就是multi多线程的意思。mwget是wget的升级版,支持多线程下载【使用方法】gitclonehttps://github.com/rayylee/mwget.gitcdmwgetyum-yinstallpkg-configyum-yinstalllibssl-devyum-yinstallintltool./configuremake&&makeins......
  • Java | 多线程并发编程CountDownLatch实践
    关注:CodingTechWork引言  在一次数据割接需求中,数据需要通过编程的方式进行转移割接到新平台,此时若串行化方式,无疑会拉锯此次战斗,所以首当其冲要使用并发编程来降低割接时长。  本次主要考虑使用CountDownLatch工具类进行并发编程的控制。CountDownLatch概述  在并发编程过程......
  • 关于python http.server 开启多线程并发的问题
    问题描述thon中的http.server模块是单线程的,这意味着它一次只能处理一个请求,而其他请求必须等待。为了解决这个问题,您可以考虑使用多线程或异步处理来处理并发请求。您可以使用Python的ThreadingMixIn来创建一个支持多线程的HTTP服务器,或者考虑使用异步框架如asyncio来处理请求......
  • 多线程+信号量同步线程
    实现场景:多线程+信号量实现线程同步执行线程在创建的时候并不能保证优先顺序,是异步的,如果想按照自己指定的顺序先后执行的话,可以使用一些互斥或者同步的方式;以下我是通过信号量来实现同步:信号量的类型是sem_t,需要的头文件是 #include<semaphore.h>,主要是方法是sem_init......
  • kafka入门(四):kafka生产者发送消息
    创建生产者实例和构建消息之后,就可以开始发送消息了。发送消息主要有三种模式:发后即忘、同步、异步。发后即忘:就是直接调用生产者的send方法发送。发后即完,只管往kafka中发送消息,而不关心消息是否正确到达。这种发送方式的性能最高,可靠性也最差。producer.send(record);......
  • 变量的线程安全分析(一)
    成员变量和静态变量是否线程安全?如果它们没有共享,则线程安全如果它们被共享了,根据它们的状态是否能够改变,又分两种情况如果只有读操作,则线程安全如果有读写操作,则这段代码是临界区,需要考虑线程安全局部变量是否线程安全?局部变量是线程安全的但局部变量引用的对象则未必如果......
  • C#:多线程篇
    文章目录基础概念进程线程句柄多线程同步/异步C#中的多线程Thread如何开启新线程线程的停止等待后台线程,前台线程跨线程操作主线程UI线程的优先级扩展封装数据槽内存栅栏资源竞争与线程锁ThreadPoolThreadPool好处线程池如何分配一个线程线程等待线程池如......
  • 面试官:这些大学生都会
    大家好,我是JavaPub。最近有些同学在后台问我,面试总是会遇到被问Linux命令的问题,自己就面试个后端开发岗位,怎么这么难呢?image-20231216131924099其实Linux命令,对于一个后端开发来说,并不是很难,只是我们平时很少使用而已。但是,我们平时开发,用到的Linux命令,其实也就那么几......
  • helm部署Kafka集群
    1.准备文件1.1.创建命令空间kubectlcreatenskafka1.1.helm包拉取本地#添加bitnami仓库helmrepoaddbitnamihttps://charts.bitnami.com/bitnami#查询charthelmsearchrepobitnami#拉取zookeeperhelmpullbitnami/zookeeper#解压tarzxvfzookeeper-12.0.......