From 05b6a9ad0bc86e2fddb250a7280fcf4658d66b82 Mon Sep 17 00:00:00 2001 From: Piotr Smolinski Date: Mon, 26 Aug 2019 22:07:23 +0200 Subject: Kafka: fixed OffsetForLeaderEpoch dissection Bug: 16023 Change-Id: I78e1354ac5509707c818d7968c7067583fb469ba Reviewed-on: https://code.wireshark.org/review/34379 Petri-Dish: Michael Mann Tested-by: Petri Dish Buildbot Reviewed-by: Michael Mann --- epan/dissectors/packet-kafka.c | 287 ++++++++++++++++++++++------------------- 1 file changed, 155 insertions(+), 132 deletions(-) diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c index 3d414c7d5b..478db6f4fe 100644 --- a/epan/dissectors/packet-kafka.c +++ b/epan/dissectors/packet-kafka.c @@ -66,7 +66,6 @@ static int hf_kafka_log_start_offset = -1; static int hf_kafka_first_offset = -1; static int hf_kafka_producer_id = -1; static int hf_kafka_producer_epoch = -1; -static int hf_kafka_message_set_size = -1; static int hf_kafka_message_size = -1; static int hf_kafka_message_crc = -1; static int hf_kafka_message_magic = -1; @@ -145,8 +144,6 @@ static int hf_kafka_fetch_session_id = -1; static int hf_kafka_fetch_session_epoch = -1; static int hf_kafka_record_header_key = -1; static int hf_kafka_record_header_value = -1; -static int hf_kafka_record_headers_count = -1; -static int hf_kafka_record_length = -1; static int hf_kafka_record_attributes = -1; static int hf_kafka_allow_auto_topic_creation = -1; static int hf_kafka_validate_only = -1; @@ -254,6 +251,10 @@ static expert_field ei_kafka_unknown_api_key = EI_INIT; static expert_field ei_kafka_unsupported_api_version = EI_INIT; static expert_field ei_kafka_bad_string_length = EI_INIT; static expert_field ei_kafka_bad_bytes_length = EI_INIT; +static expert_field ei_kafka_bad_array_length = EI_INIT; +static expert_field ei_kafka_bad_record_length = EI_INIT; +static expert_field ei_kafka_bad_varint = EI_INIT; +static expert_field ei_kafka_bad_message_set_length = EI_INIT; static expert_field ei_kafka_unknown_message_magic = EI_INIT; typedef gint16 kafka_api_key_t; @@ -692,7 +693,7 @@ typedef struct kafka_packet_values_t { /* Forward declaration (dissect_kafka_message_set() and dissect_kafka_message() call each other...) */ static int -dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field, guint8 codec); +dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, gint offset, guint len, guint8 codec); /* HELPERS */ @@ -897,8 +898,13 @@ dissect_kafka_array_ref(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int count = (gint32) tvb_get_ntohl(tvb, offset); offset += 4; - for (i=0; i>1) ^ ((v & 1) ? -1 : 0); - } - if (p_len != NULL) { - *p_len = i; + if (p_overflow != NULL) { + *p_overflow = ((p&0x80) != 0) || (i >= 5 && (p&0x70) != 0); } - return offset+i; + return (v>>1) ^ ((v & 1) ? -1 : 0); } /* @@ -1061,18 +1062,17 @@ tvb_read_kafka_varint32(tvbuff_t *tvb, int offset, gint32 *p_val, int *p_len) * * tvb: actual data buffer * offset: offset in the buffer where the string length is to be found - * p_val: pointer to a variable to store the returned value * p_len: pointer to a variable to store the length of the variable + * p_overflow: pointer to a variable to store information that the value exceeds gint64 capacity * - * returns: pointer to the next field in the message or -1 + * returns: decoded value of 64-bit signed integer */ - static gint64 -tvb_read_kafka_varint64(tvbuff_t *tvb, int offset, gint64 *p_val, int *p_len) +tvb_read_kafka_varint64(tvbuff_t *tvb, gint offset, guint *p_len, gboolean *p_overflow) { gint64 v = 0; guint8 p = 0; - int i = 0; + guint i = 0; do { p = tvb_get_guint8(tvb, offset+i); @@ -1080,48 +1080,39 @@ tvb_read_kafka_varint64(tvbuff_t *tvb, int offset, gint64 *p_val, int *p_len) i += 1; } while ((p&0x80)!=0 && i<10); + if (p_len != NULL) { + *p_len = i; + } // 64-bit integer in zig-zag can take up to 10 octets // the last octet can take at most 1 bit - // either continuation bit is set or there are more than 64 bits - DISSECTOR_ASSERT((p&0x80) == 0); - DISSECTOR_ASSERT(i < 10 || (p&0x7e) == 0); - - if (p_val != NULL) { - *p_val = (v>>1) ^ ((v & 1) ? -1 : 0); + if (p_overflow != NULL) { + *p_overflow = ((p&0x80) != 0) || (i >= 10 && (p&0x7e) != 0); } - if (p_len != NULL) { - *p_len = i; - } - - return offset+i; -} -static int -dissect_kafka_varint(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset) -{ - gint64 val; - int len; - tvb_read_kafka_varint64(tvb, offset, &val, &len); - proto_tree_add_int64(tree, hf_item, tvb, offset, len, val); - return offset+len; + return (v>>1) ^ ((v & 1) ? -1 : 0); } static int dissect_kafka_timestamp_delta(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset, guint64 first_timestamp) { - nstime_t nstime; - guint64 milliseconds; - guint64 val; - int len; + nstime_t nstime; + guint64 milliseconds; + guint64 val; + int len; + gboolean overflow; + proto_item *pi; - tvb_read_kafka_varint64(tvb, offset, &val, &len); + val = tvb_read_kafka_varint64(tvb, offset, &len, &overflow); milliseconds = first_timestamp + val; nstime.secs = (time_t) (milliseconds / 1000); nstime.nsecs = (int) ((milliseconds % 1000) * 1000000); - proto_tree_add_time(tree, hf_item, tvb, offset, len, &nstime); + pi = proto_tree_add_time(tree, hf_item, tvb, offset, len, &nstime); + if (overflow) { + expert_add_info(pinfo, pi, &ei_kafka_bad_varint); + } return offset+len; } @@ -1129,10 +1120,11 @@ dissect_kafka_timestamp_delta(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree static int dissect_kafka_offset_delta(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset, guint64 base_offset) { - gint64 val; - int len; + gint64 val; + int len; + gboolean overflow; - tvb_read_kafka_varint64(tvb, offset, &val, &len); + val = tvb_read_kafka_varint64(tvb, offset, &len, &overflow); proto_tree_add_int64(tree, hf_item, tvb, offset, len, base_offset+val); @@ -1162,12 +1154,30 @@ dissect_kafka_string_new(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree { gint val; gint len; + gboolean overflow; + proto_item *pi; - tvb_read_kafka_varint32(tvb, offset, &val, &len); - - DISSECTOR_ASSERT(val >= 0); + val = tvb_read_kafka_varint32(tvb, offset, &len, &overflow); - proto_tree_add_item(tree, hf_item, tvb, offset+len, val, ENC_UTF_8|ENC_NA); + if (overflow) { + pi = proto_tree_add_string_format_value(tree, hf_item, tvb, offset+len, 0, NULL, ""); + expert_add_info(pinfo, pi, &ei_kafka_bad_varint); + val = 0; + } else if (val > 0) { + // there is payload available, possibly with 0 octets + pi = proto_tree_add_item(tree, hf_item, tvb, offset+len, val, ENC_NA | ENC_UTF_8); + } else if (val == 0) { + // there is empty payload (0 octets) + pi = proto_tree_add_string_format_value(tree, hf_item, tvb, offset+len, 0, NULL, ""); + } else if (val == -1) { + // there is no payload (null) + pi = proto_tree_add_string_format_value(tree, hf_item, tvb, offset+len, 0, NULL, ""); + val = 0; + } else { + pi = proto_tree_add_string_format_value(tree, hf_item, tvb, offset+len, 0, NULL, ""); + expert_add_info(pinfo, pi, &ei_kafka_bad_string_length); + val = 0; + } if (p_string_offset != NULL) { *p_string_offset = offset+len; @@ -1200,24 +1210,33 @@ dissect_kafka_string_new(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree static int dissect_kafka_bytes_new(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset, int *p_bytes_offset, int *p_bytes_length) { - gint val; - gint len; - - tvb_read_kafka_varint32(tvb, offset, &val, &len); + gint val; + gint len; + gboolean overflow; + proto_item *pi; - DISSECTOR_ASSERT(val >= -1); + val = tvb_read_kafka_varint32(tvb, offset, &len, &overflow); - if (val > 0) { + if (overflow) { + pi = proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, ""); + expert_add_info(pinfo, pi, &ei_kafka_bad_varint); + val = 0; + } else if (val > 0) { // there is payload available, possibly with 0 octets - proto_tree_add_item(tree, hf_item, tvb, offset+len, val, ENC_NA); + pi = proto_tree_add_item(tree, hf_item, tvb, offset+len, val, ENC_NA); } else if (val == 0) { // there is empty payload (0 octets) - proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, ""); - } else { + pi = proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, ""); + } else if (val == -1) { // there is no payload (null) - proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, ""); + pi = proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, ""); + val = 0; + } else { + pi = proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, ""); + expert_add_info(pinfo, pi, &ei_kafka_bad_bytes_length); val = 0; } + if (p_bytes_offset != NULL) { *p_bytes_offset = offset+len; } @@ -1265,17 +1284,21 @@ dissect_kafka_record_headers(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree * { proto_item *record_headers_ti; proto_tree *subtree; - - gint32 count; - int i; + gint32 count; + gint len; + gboolean overflow; + int i; subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_record_headers, &record_headers_ti, "Headers"); - tvb_read_kafka_varint32(tvb, offset, &count, NULL); - offset = dissect_kafka_varint(tvb, pinfo, subtree, hf_kafka_record_headers_count, offset); + count = tvb_read_kafka_varint32(tvb, offset, &len, &overflow); + if (overflow) { + expert_add_info(pinfo, record_headers_ti, &ei_kafka_bad_varint); + } else if (count < -1) { // -1 means null array + expert_add_info(pinfo, record_headers_ti, &ei_kafka_bad_array_length); + } - // null array is marked by -1 as opposed to empty array marked by 0 - DISSECTOR_ASSERT(count >= -1); + offset += len; for (i=0;i= 0); - end_offset = offset + len + val; + size = tvb_read_kafka_varint32(tvb, offset, &len, &overflow); + if (overflow) { + expert_add_info(pinfo, record_ti, &ei_kafka_bad_varint); + return offset + len; + } else if (size < 6) { + expert_add_info(pinfo, record_ti, &ei_kafka_bad_record_length); + return offset + len; + } - offset = dissect_kafka_varint(tvb, pinfo, subtree, hf_kafka_record_length, offset); + end_offset = offset + len + size; + offset += len; proto_tree_add_item(subtree, hf_kafka_record_attributes, tvb, offset, 1, ENC_BIG_ENDIAN); offset += 1; @@ -1317,9 +1347,9 @@ dissect_kafka_record(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, in offset = dissect_kafka_record_headers(tvb, pinfo, subtree, offset); - DISSECTOR_ASSERT(offset == end_offset); - - proto_item_set_end(record_ti, tvb, offset); + if (offset != end_offset) { + expert_add_info(pinfo, record_ti, &ei_kafka_bad_record_length); + } return end_offset; } @@ -1660,13 +1690,15 @@ dissect_kafka_message_old(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i if (decompress(tvb, pinfo, offset, length, codec, &decompressed_tvb, &decompressed_offset)==1) { add_new_data_source(pinfo, decompressed_tvb, "Decompressed content"); show_compression_reduction(tvb, subtree, length, tvb_captured_length(decompressed_tvb)); - dissect_kafka_message_set(decompressed_tvb, pinfo, subtree, decompressed_offset, FALSE, codec); + dissect_kafka_message_set(decompressed_tvb, pinfo, subtree, decompressed_offset, + tvb_reported_length_remaining(decompressed_tvb, decompressed_offset), codec); } else { proto_item_append_text(subtree, " [Cannot decompress records]"); } + offset += length; } - proto_item_set_len(message_ti, offset - start_offset); + proto_item_set_end(message_ti, tvb, offset); return offset; } @@ -1787,27 +1819,12 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int o } static int -dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field, guint8 codec) +dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, gint offset, guint len, guint8 codec) { proto_item *ti; proto_tree *subtree; - gint len, message_size; - int offset = start_offset; - int messages = 0; - - if (has_length_field) { - proto_tree_add_item(tree, hf_kafka_message_set_size, tvb, offset, 4, ENC_BIG_ENDIAN); - len = (gint)tvb_get_ntohl(tvb, offset); - offset += 4; - start_offset += 4; - } - else { - len = tvb_reported_length_remaining(tvb, offset); - } - - if (len <= 0) { - return offset; - } + gint end_offset = offset + len; + guint messages = 0; subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_message_set, &ti, "Message Set"); /* If set came from a compressed message, make it obvious in tree root */ @@ -1815,19 +1832,17 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i proto_item_append_text(subtree, " [from compressed %s message]", val_to_str_const(codec, kafka_message_codecs, "Unknown")); } - while (offset - start_offset < len) { - - message_size = tvb_get_guint32(tvb, offset+8, ENC_BIG_ENDIAN); - - dissect_kafka_message(tvb, pinfo, subtree, offset); - offset += 12 + message_size; - + while (offset < end_offset) { + offset = dissect_kafka_message(tvb, pinfo, subtree, offset); messages += 1; + } + if (offset != end_offset) { + expert_add_info(pinfo, ti, &ei_kafka_bad_message_set_length); } proto_item_append_text(ti, " (%d Messages)", messages); - proto_item_set_len(ti, offset - start_offset); + proto_item_set_end(ti, tvb, offset); return offset; } @@ -2892,7 +2907,8 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_ { proto_item *ti; proto_tree *subtree; - int offset = start_offset; + int offset = start_offset; + guint len; kafka_packet_values_t packet_values; memset(&packet_values, 0, sizeof(packet_values)); @@ -2918,7 +2934,12 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_ offset = dissect_kafka_aborted_transactions(tvb, pinfo, subtree, offset, api_version); } - offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE, KAFKA_MESSAGE_CODEC_NONE); + len = tvb_get_ntohl(tvb, offset); + offset += 4; + + if (len > 0) { + offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, len, KAFKA_MESSAGE_CODEC_NONE); + } proto_item_set_len(ti, offset - start_offset); @@ -2983,6 +3004,7 @@ dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto { proto_item *ti; proto_tree *subtree; + guint len; kafka_packet_values_t packet_values; memset(&packet_values, 0, sizeof(packet_values)); @@ -2990,7 +3012,12 @@ dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto offset = dissect_kafka_partition_id_ret(tvb, pinfo, subtree, offset, &packet_values.partition_id); - offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE, KAFKA_MESSAGE_CODEC_NONE); + len = tvb_get_ntohl(tvb, offset); + offset += 4; + + if (len > 0) { + offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, len, KAFKA_MESSAGE_CODEC_NONE); + } proto_item_append_text(ti, " (ID=%u)", packet_values.partition_id); proto_item_set_end(ti, tvb, offset); @@ -5099,12 +5126,15 @@ dissect_kafka_offset_for_leader_epoch_request_topic_partition(tvbuff_t *tvb, pac partition_id = tvb_get_ntohl(tvb, offset); proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; if (api_version >= 2) { proto_tree_add_item(subtree, hf_kafka_current_leader_epoch, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; } proto_tree_add_item(subtree, hf_kafka_leader_epoch, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; proto_item_set_end(subti, tvb, offset); @@ -5263,7 +5293,7 @@ dissect_kafka_add_partitions_to_txn_request_topic_partition(tvbuff_t *tvb, packe { proto_tree_add_item(tree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); - return offset; + return offset+4; } static int @@ -8354,11 +8384,6 @@ proto_register_kafka(void) FT_INT32, BASE_DEC, 0, 0, NULL, HFILL } }, - { &hf_kafka_message_set_size, - { "Message Set Size", "kafka.message_set_size", - FT_INT32, BASE_DEC, 0, 0, - NULL, HFILL } - }, { &hf_kafka_message_size, { "Message Size", "kafka.message_size", FT_INT32, BASE_DEC, 0, 0, @@ -8734,16 +8759,6 @@ proto_register_kafka(void) FT_BYTES, BASE_SHOW_ASCII_PRINTABLE, 0, 0, NULL, HFILL } }, - { &hf_kafka_record_headers_count, - { "Headers Count", "kafka.headers_count", - FT_INT64, BASE_DEC, 0, 0, - NULL, HFILL } - }, - { &hf_kafka_record_length, - { "Record Length", "kafka.record_length", - FT_INT64, BASE_DEC, 0, 0, - NULL, HFILL } - }, { &hf_kafka_record_attributes, { "Record Attributes (reserved)", "kafka.record_attributes", FT_INT8, BASE_DEC, 0, 0, @@ -9027,6 +9042,14 @@ proto_register_kafka(void) { "kafka.bad_string_length", PI_MALFORMED, PI_WARN, "Invalid string length field", EXPFILL }}, { &ei_kafka_bad_bytes_length, { "kafka.bad_bytes_length", PI_MALFORMED, PI_WARN, "Invalid byte length field", EXPFILL }}, + { &ei_kafka_bad_array_length, + { "kafka.bad_array_length", PI_MALFORMED, PI_WARN, "Invalid array length field", EXPFILL }}, + { &ei_kafka_bad_record_length, + { "kafka.bad_record_length", PI_MALFORMED, PI_WARN, "Invalid record length field", EXPFILL }}, + { &ei_kafka_bad_varint, + { "kafka.bad_varint", PI_MALFORMED, PI_WARN, "Invalid varint bytes", EXPFILL }}, + { &ei_kafka_bad_message_set_length, + { "kafka.ei_kafka_bad_message_set_length", PI_MALFORMED, PI_WARN, "Message set size does not match content", EXPFILL }}, { &ei_kafka_unknown_message_magic, { "kafka.unknown_message_magic", PI_MALFORMED, PI_WARN, "Invalid message magic field", EXPFILL }}, }; -- cgit v1.2.3