request.timeout.ms和session.timeout.ms配置有误导致kafka连接失败的事故

问题描述

程序中用到了其他部门的kafka,发布上线后,过了几天,发现没有读取到kafka里面的数据,查看日志也没发现有异常和报错。

最终在测试环境调试后,发现控制台有抛出如下异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:793)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
at com.myhexin.kiv.service.impl.SyncTemplateServiceImpl.receiver(SyncTemplateServiceImpl.java:60)
at com.myhexin.kiv.job.SchedulerAllJob.registerJobs(SchedulerAllJob.java:95)
at com.myhexin.kiv.job.SchedulerAllJob.scheduleJobs(SchedulerAllJob.java:74)
at com.myhexin.kiv.job.SchedulerListener.schedule(SchedulerListener.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.config.ConfigException: request.timeout.ms should be greater than session.timeout.ms and fetch.max.wait.ms
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:667)
... 20 more

关键的一句是:

request.timeout.ms should be greater than session.timeout.ms and fetch.max.wait.ms

原来是代码中设置了request.timeout.ms为10000ms,而session.timeout.ms设置为了30000ms。

互换这2个参数的值后,程序恢复正常了。

原因分析

kafka的源码中有这么一段:如果请求超时时间不是一个大于session的超时时间的值或者请求超时时间不是一个大于fetch的最大等待时间的值时,表示requestTimeoutMs的配置不合法,直接throw exception:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (this.requestTimeoutMs <= sessionTimeOutMs ||

this.requestTimeoutMs <= fetchMaxWaitMs)
throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG

+ " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG

+ " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);



this.time = new SystemTime();

MetricConfig metricConfig = new MetricConfig().samples(

config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(

ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);

request.timeout.ms和session.timeout.ms分别代表什么含义?

request.timeout.ms : 从发送请求到收到ACK确认等待的最长时间(超时时间),默认40000ms

session.timeout.ms : 当前的consumer的session的超时时间,也就是client端多长时间不给server发送心跳就表示这个client端超时。默认30000ms

kafka的连接参数中,和时间相关的配置需要注意哪些事项?

在StackOverflow上面找到一个说明:

These conditions needed to be keep in mind to change session.timeout.ms:

  1. group.max.session.timeout.ms in the server.properties > session.timeout.ms in the consumer.properties.
  2. group.min.session.timeout.ms in the server.properties < session.timeout.ms in the consumer.properties.
  3. request.timeout.ms > session.timeout.ms and fetch.max.wait.ms
  4. (session.timeout.ms)/3 > heartbeat.interval.ms
  5. session.timeout.ms > Worst case processing time of Consumer Records per consumer poll(ms).

为什么这个异常没有出现在日志中?

控制台一直出现Received successful Heartbeat response

kafka的Consumer客户端对象被new出来后,会通过心跳包的方式和kafka服务端保持连接。

可以手动调用这个对象的close()方法来主动关闭连接;否则就只能等到达到超时时间,自动关闭连接。

在连接被关闭之前,都会持续发送和接收心跳包。

重新部署程序,抛出异常:javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-1

参考文章:http://www.mamicode.com/info-detail-1889211.html

解决方式:不配置client.id这一项,kakfa中会默认为多个线程生成id;如果手动指定了,那么就是单例了。

不对,我们根本没配置这个参数,且本地没问题,一放到线上tomcat里面就会出问题。猜测是因为本地每次都是重启tomcat服务,而服务器上,没有重启tomcat,只是重新reload了而已。

但是这个报错似乎不影响程序运行,修改后的程序也被成功加载了。

这个问题待确认。