linux怎么查看本机内存大小
384
2022-11-01
Kafka学习笔记之Kakfa异常分析-Magic v0 does not support record headers
Kakfa异常 Magic v0 v1
0x00 概述
最近测试跟我说,某个应用消费不到交易的消息。登录到Kafka Broker看下了下日志,发现一直在报错:
java.lang.IllegalArgumentException: Magic v0 does not support record headers at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385) at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568) at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117) at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98) at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521) at scala.Option.map(Option.scala:146) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511) at scala.Option.flatMap(Option.scala:171) at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579) at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196) at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2012) at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598) at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196) at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188) at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597) at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614) at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606) at kafka.server.KafkaApis.handle(KafkaApis.scala:98) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) at java.lang.Thread.run(Thread.java:748)
问了下相关开发人员,发现最近有个版本需要在Kafka信息的Headers中增加LogId来做交易跟踪,结合错误信息中提示消费者Api版本太低,不支持header信息,导致出错,让开发人员去掉header后,消费者可以正常消费消息
0x01 模拟重现
1.1 Kafka版本:0.11.0
生产者代码:
写了个拦截器,为每条消息的header中添加LOG_ID
public class KafkaProducerInterceptor
消费者代码
使用高版本Api的客户端:
public class KafkaConsumerAsync { public static void main(String[] args) throws InterruptedException { // 1、准备配置文件 String kafkas = "127.0.0.1:9092"; Properties props = new Properties(); //kafka连接信息 props.put("bootstrap.servers",kafkas); //消费者组id props.put("group.id", "testTopic-002"); //是否自动提交offset props.put("enable.auto.commit", "false"); //在没有offset的情况下采取的拉取策略 props.put("auto.offset.reset", "earliest"); //自动提交时间间隔 props.put("auto.commit.interval.ms", "1000"); //设置一次fetch请求取得的数据最大为1k props.put("fetch.max.bytes", "1024"); //key反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //value反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); String topic = "testTopic"; // 2、创建KafkaConsumer KafkaConsumer
执行结果:
[28 10:20:08,923 INFO ] [main] internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Discovered group coordinator xxxxx.com:9092 (id: 2147483647 rack: null)[28 10:20:08,931 INFO ] [main] internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Revoking previously assigned partitions [][28 10:20:08,931 INFO ] [main] internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] (Re-)joining group[28 10:20:08,958 INFO ] [main] internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Successfully joined group with generation 1[28 10:20:08,960 INFO ] [main] internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Setting newly assigned partitions [testTopic-0][28 10:20:08,978 INFO ] [main] internals.Fetcher - [Consumer clientId=consumer-1, groupId=testTopic-002] Resetting offset for partition testTopic-0 to offset 0.topic = testTopic,partition = 0,offset = 0, key = H10, value = {"accessId":"123456","accessName":"源码婆媳10","busScope":"01","iconUrl":"= testTopic,partition = 0,offset = 1, key = H11, value = {"accessId":"123456","accessName":"源码婆媳11","busScope":"01","iconUrl":"= testTopic,partition = 0,offset = 2, key = H12, value = {"accessId":"123456","accessName":"源码婆媳12","busScope":"01","iconUrl":"= testTopic,partition = 0,offset = 3, key = H13, value = {"accessId":"123456","accessName":"源码婆媳13","busScope":"01","iconUrl":"= testTopic,partition = 0,offset = 4, key = H14, value = {"accessId":"123456","accessName":"源码婆媳14","busScope":"01","iconUrl":"= testTopic,partition = 0,offset = 5, key = H15, value = {"accessId":"123456","accessName":"源码婆媳15","busScope":"01","iconUrl":"= testTopic,partition = 0,offset = 6, key = H16, value = {"accessId":"123456","accessName":"源码婆媳16","busScope":"01","iconUrl":"= testTopic,partition = 0,offset = 7, key = H17, value = {"accessId":"123456","accessName":"源码婆媳17","busScope":"01","iconUrl":"= testTopic,partition = 0,offset = 8, key = H18, value = {"accessId":"123456","accessName":"源码婆媳18","busScope":"01","iconUrl":"= testTopic,partition = 0,offset = 9, key = H19, value = {"accessId":"123456","accessName":"源码婆媳19","busScope":"01","iconUrl":"class SimpleConsumerExample { private static kafka.javaapi.consumer.ConsumerConnector consumer; public static void consume() { Properties props = new Properties(); // zookeeper 配置 props.put("zookeeper.connect", "127.0.0.1:2181"); // group 代表一个消费组 props.put("group.id", "jd-group"); // zk连接超时 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); // 序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); Map
消费者运行后,一直消费不到消息
[28 09:51:41,590 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing[28 09:51:41,591 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Disconnecting from xxxxxx.xx.com:9092[28 09:51:41,592 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566957087042] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset 0 to broker id:0,host:xxxxxx.xx.com,port:9092] )[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Verifying properties[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property client.id is overridden to jd-group[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property metadata.broker.list is overridden to xxxxxx.xx.com:9092[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] client.ClientUtils$ - Fetching metadata from broker id:0,host:xxxxxx.xx.com,port:9092 with correlation id 65 for 1 topic(s) Set(testTopic)[28 09:51:41,799 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing[28 09:51:41,824 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Disconnecting from xxxxxx.xx.com:9092[28 09:51:41,825 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566957087042] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset 0 to broker id:0,host:xxxxxx.xx.com,port:9092] )[28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Verifying properties[28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property client.id is overridden to jd-group[28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property metadata.broker.list is overridden to xxxxxx.xx.com:9092[28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000[28 09:51:42,033 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] client.ClientUtils$ - Fetching metadata from broker id:0,host:xxxxxx.xx.com,port:9092 with correlation id 66 for 1 topic(s) Set(testTopic)[28 09:51:42,035 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing[28 09:51:42,041 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Disconnecting from xxxxxx.xx.com:9092[28 09:51:42,041 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566957087042] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset 0 to broker id:0,host:xxxxxx.xx.com,port:9092] )[28 09:51:42,251 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Verifying properties[28 09:51:42,252 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property client.id is overridden to jd-group[28 09:51:42,252 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property metadata.broker.list is overridden to xxxxxx.xx.com:9092[28 09:51:42,252 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000[28 09:51:42,253 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] client.ClientUtils$ - Fetching metadata from broker id:0,host:xxxxxx.xx.com,port:9092 with correlation id 67 for 1 topic(s) Set(testTopic)[28 09:51:42,254 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing
Kafka Broker的server.log一直在刷错误日志:
[2019-08-28 09:51:42,045] ERROR [KafkaApi-0] Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=testTopic,partitions=[{partition=0,fetch_offset=0,max_bytes=1048576}]}]} (kafka.server.KafkaApis)java.lang.IllegalArgumentException: Magic v0 does not support record headers at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385) at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568) at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117) at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98) at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521) at scala.Option.map(Option.scala:146) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511) at scala.Option.flatMap(Option.scala:171) at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579) at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196) at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2012) at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598) at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196) at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188) at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597) at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614) at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606) at kafka.server.KafkaApis.handle(KafkaApis.scala:98) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) at java.lang.Thread.run(Thread.java:748)
通过错误日志信息结合源码,我们发现,在Broker拉取到Kakfa消息后,调用fetchResponseCallback回调方法,创建返回信息时,会校验消费者Api版本,如果低于当前Broker版本与向下转换消息
def fetchResponseCallback(bandwidthThrottleTimeMs: Int) { def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = { val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData] fetchedPartitionData.asScala.foreach { case (tp, partitionData) => convertedData.put(tp, convertedPartitionData(tp, partitionData)) } val response = new FetchResponse(convertedData, 0) val responseStruct = response.toStruct(versionId) trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.") response.responseData.asScala.foreach { case (topicPartition, data) => // record the bytes out metrics only when the response is being sent brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes) } val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs, request.connectionId, request.header) RequestChannel.Response(request, responseSend) } if (fetchRequest.isFromFollower) sendResponseExemptThrottle(createResponse(0)) else sendResponseMaybeThrottle(request, request.header.clientId, requestThrottleMs => requestChannel.sendResponse(createResponse(requestThrottleMs))) } def convertedPartitionData(tp: TopicPartition, data: FetchResponse.PartitionData) = { replicaManager.getMagic(tp).flatMap { magic => val downConvertMagic = { if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0)) Some(RecordBatch.MAGIC_VALUE_V0) else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) Some(RecordBatch.MAGIC_VALUE_V1) else None } downConvertMagic.map { magic => trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId") //在这里会对消息进行向下转换 val converted = data.records.downConvert(magic, fetchRequest.fetchData.get(tp).fetchOffset) new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, data.logStartOffset, data.abortedTransactions, converted) } }.getOrElse(data) }
对消息转换,最后会调用MemoryRecordsBuilder的appendWithOffset,在此方法中做一些调用,如果调用不通过就会抛出异常,Magic v0 does not support record headers就是在此方法中抛出的。因为Magic v0 和Magic v1版本的消息格式中,不支持header
private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { try { if (isControlRecord != isControlBatch) throw new IllegalArgumentException("Control records can only be appended to control batches"); if (lastOffset != null && offset <= lastOffset) throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " + "(Offsets must increase monotonically).", offset, lastOffset)); if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP) throw new IllegalArgumentException("Invalid negative timestamp " + timestamp); if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) throw new IllegalArgumentException("Magic v" + magic + " does not support record headers"); if (baseTimestamp == null) baseTimestamp = timestamp; if (magic > RecordBatch.MAGIC_VALUE_V1) { appendDefaultRecord(offset, timestamp, key, value, headers); return null; } else { return appendLegacyRecord(offset, timestamp, key, value); } } catch (IOException e) { throw new KafkaException("I/O exception when writing to the append stream, closing", e); } }
1.2 Broker的Kafka版本1.0
运行低版本消费者:
[28 14:26:23,068 INFO ] [jd-group_xx-1566973572731-a5b3105a-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566973572960] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset -1 to broker id:0,host:xx.xx.com,port:9092] ){"accessId":"123456","accessName":"源码婆媳10","busScope":"01","iconUrl":"static MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) { RecordBatch batch = recordBatchAndRecords.batch; final TimestampType timestampType = batch.timestampType(); long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP; MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(), timestampType, recordBatchAndRecords.baseOffset, logAppendTime); for (Record record : recordBatchAndRecords.records) { // Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported if (magic > RecordBatch.MAGIC_VALUE_V1) builder.append(record); else builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value()); } builder.close(); return builder; }
0x02 总结
通过之前的分析,解决上面的错误有三种方法
1)升级消费端Api版本,升级到新版本后,支持header
2)升级Broker版本,1.0以上版本,向下转换时,会忽略到header信息
3)最后一种方式,也很简单,那就是生产者不增加header信息。因为我们项目中LOG-ID暂时不是必须的,我们选择了此种方式,等消费者端版本全部升级之后,再添加header信息
https://jianshu.com/p/80ca3ade8fb2
https://zhuanlan.zhihu.com/p/205676507?utm_source=wechat_session
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~