aboutsummaryrefslogtreecommitdiffstats
path: root/epan/dissectors/packet-kafka.c
diff options
context:
space:
mode:
Diffstat (limited to 'epan/dissectors/packet-kafka.c')
-rw-r--r--epan/dissectors/packet-kafka.c287
1 files changed, 155 insertions, 132 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c
index 3d414c7d5b..478db6f4fe 100644
--- a/epan/dissectors/packet-kafka.c
+++ b/epan/dissectors/packet-kafka.c
@@ -66,7 +66,6 @@ static int hf_kafka_log_start_offset = -1;
static int hf_kafka_first_offset = -1;
static int hf_kafka_producer_id = -1;
static int hf_kafka_producer_epoch = -1;
-static int hf_kafka_message_set_size = -1;
static int hf_kafka_message_size = -1;
static int hf_kafka_message_crc = -1;
static int hf_kafka_message_magic = -1;
@@ -145,8 +144,6 @@ static int hf_kafka_fetch_session_id = -1;
static int hf_kafka_fetch_session_epoch = -1;
static int hf_kafka_record_header_key = -1;
static int hf_kafka_record_header_value = -1;
-static int hf_kafka_record_headers_count = -1;
-static int hf_kafka_record_length = -1;
static int hf_kafka_record_attributes = -1;
static int hf_kafka_allow_auto_topic_creation = -1;
static int hf_kafka_validate_only = -1;
@@ -254,6 +251,10 @@ static expert_field ei_kafka_unknown_api_key = EI_INIT;
static expert_field ei_kafka_unsupported_api_version = EI_INIT;
static expert_field ei_kafka_bad_string_length = EI_INIT;
static expert_field ei_kafka_bad_bytes_length = EI_INIT;
+static expert_field ei_kafka_bad_array_length = EI_INIT;
+static expert_field ei_kafka_bad_record_length = EI_INIT;
+static expert_field ei_kafka_bad_varint = EI_INIT;
+static expert_field ei_kafka_bad_message_set_length = EI_INIT;
static expert_field ei_kafka_unknown_message_magic = EI_INIT;
typedef gint16 kafka_api_key_t;
@@ -692,7 +693,7 @@ typedef struct kafka_packet_values_t {
/* Forward declaration (dissect_kafka_message_set() and dissect_kafka_message() call each other...) */
static int
-dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field, guint8 codec);
+dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, gint offset, guint len, guint8 codec);
/* HELPERS */
@@ -897,8 +898,13 @@ dissect_kafka_array_ref(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int
count = (gint32) tvb_get_ntohl(tvb, offset);
offset += 4;
- for (i=0; i<count; i++) {
- offset = func(tvb, pinfo, tree, offset, api_version);
+ if (count < -1) { // -1 means null array
+ expert_add_info(pinfo, proto_tree_get_parent(tree), &ei_kafka_bad_array_length);
+ }
+ else {
+ for (i=0; i<count; i++) {
+ offset = func(tvb, pinfo, tree, offset, api_version);
+ }
}
if (p_count != NULL) {
@@ -916,7 +922,6 @@ dissect_kafka_array(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int off
return dissect_kafka_array_ref(tree, tvb, pinfo, offset, api_version, func, NULL);
}
-
static int
dissect_kafka_string(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *pinfo, int offset,
int *p_string_offset, int *p_string_len)
@@ -1018,17 +1023,17 @@ dissect_kafka_timestamp(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree,
*
* tvb: actual data buffer
* offset: offset in the buffer where the string length is to be found
- * p_val: pointer to a variable to store the returned value
* p_len: pointer to a variable to store the length of the variable
+ * p_overflow: pointer to a variable to store information that the value exceeds gint32 capacity
*
- * returns: pointer to the next field in the message or -1
+ * returns: decoded value of 32-bit signed integer
*/
static gint32
-tvb_read_kafka_varint32(tvbuff_t *tvb, int offset, gint32 *p_val, int *p_len)
+tvb_read_kafka_varint32(tvbuff_t *tvb, gint offset, guint *p_len, gboolean *p_overflow)
{
gint32 v = 0;
guint8 p = 0;
- int i = 0;
+ guint i = 0;
do {
p = tvb_get_guint8(tvb, offset+i);
@@ -1036,21 +1041,17 @@ tvb_read_kafka_varint32(tvbuff_t *tvb, int offset, gint32 *p_val, int *p_len)
i += 1;
} while ((p&0x80)!=0 && i<5);
+ if (p_len != NULL) {
+ *p_len = i;
+ }
// 32-bit integer in zig-zag can take up to 5 octets
// the last octet can take at most 4 bits
-
// either continuation bit is set or there are more than 32 bits
- DISSECTOR_ASSERT((p&0x80) == 0);
- DISSECTOR_ASSERT(i < 5 || (p&0x70) == 0);
-
- if (p_val != NULL) {
- *p_val = (v>>1) ^ ((v & 1) ? -1 : 0);
- }
- if (p_len != NULL) {
- *p_len = i;
+ if (p_overflow != NULL) {
+ *p_overflow = ((p&0x80) != 0) || (i >= 5 && (p&0x70) != 0);
}
- return offset+i;
+ return (v>>1) ^ ((v & 1) ? -1 : 0);
}
/*
@@ -1061,18 +1062,17 @@ tvb_read_kafka_varint32(tvbuff_t *tvb, int offset, gint32 *p_val, int *p_len)
*
* tvb: actual data buffer
* offset: offset in the buffer where the string length is to be found
- * p_val: pointer to a variable to store the returned value
* p_len: pointer to a variable to store the length of the variable
+ * p_overflow: pointer to a variable to store information that the value exceeds gint64 capacity
*
- * returns: pointer to the next field in the message or -1
+ * returns: decoded value of 64-bit signed integer
*/
-
static gint64
-tvb_read_kafka_varint64(tvbuff_t *tvb, int offset, gint64 *p_val, int *p_len)
+tvb_read_kafka_varint64(tvbuff_t *tvb, gint offset, guint *p_len, gboolean *p_overflow)
{
gint64 v = 0;
guint8 p = 0;
- int i = 0;
+ guint i = 0;
do {
p = tvb_get_guint8(tvb, offset+i);
@@ -1080,48 +1080,39 @@ tvb_read_kafka_varint64(tvbuff_t *tvb, int offset, gint64 *p_val, int *p_len)
i += 1;
} while ((p&0x80)!=0 && i<10);
+ if (p_len != NULL) {
+ *p_len = i;
+ }
// 64-bit integer in zig-zag can take up to 10 octets
// the last octet can take at most 1 bit
-
// either continuation bit is set or there are more than 64 bits
- DISSECTOR_ASSERT((p&0x80) == 0);
- DISSECTOR_ASSERT(i < 10 || (p&0x7e) == 0);
-
- if (p_val != NULL) {
- *p_val = (v>>1) ^ ((v & 1) ? -1 : 0);
+ if (p_overflow != NULL) {
+ *p_overflow = ((p&0x80) != 0) || (i >= 10 && (p&0x7e) != 0);
}
- if (p_len != NULL) {
- *p_len = i;
- }
-
- return offset+i;
-}
-static int
-dissect_kafka_varint(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset)
-{
- gint64 val;
- int len;
- tvb_read_kafka_varint64(tvb, offset, &val, &len);
- proto_tree_add_int64(tree, hf_item, tvb, offset, len, val);
- return offset+len;
+ return (v>>1) ^ ((v & 1) ? -1 : 0);
}
static int
dissect_kafka_timestamp_delta(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset, guint64 first_timestamp)
{
- nstime_t nstime;
- guint64 milliseconds;
- guint64 val;
- int len;
+ nstime_t nstime;
+ guint64 milliseconds;
+ guint64 val;
+ int len;
+ gboolean overflow;
+ proto_item *pi;
- tvb_read_kafka_varint64(tvb, offset, &val, &len);
+ val = tvb_read_kafka_varint64(tvb, offset, &len, &overflow);
milliseconds = first_timestamp + val;
nstime.secs = (time_t) (milliseconds / 1000);
nstime.nsecs = (int) ((milliseconds % 1000) * 1000000);
- proto_tree_add_time(tree, hf_item, tvb, offset, len, &nstime);
+ pi = proto_tree_add_time(tree, hf_item, tvb, offset, len, &nstime);
+ if (overflow) {
+ expert_add_info(pinfo, pi, &ei_kafka_bad_varint);
+ }
return offset+len;
}
@@ -1129,10 +1120,11 @@ dissect_kafka_timestamp_delta(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree
static int
dissect_kafka_offset_delta(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset, guint64 base_offset)
{
- gint64 val;
- int len;
+ gint64 val;
+ int len;
+ gboolean overflow;
- tvb_read_kafka_varint64(tvb, offset, &val, &len);
+ val = tvb_read_kafka_varint64(tvb, offset, &len, &overflow);
proto_tree_add_int64(tree, hf_item, tvb, offset, len, base_offset+val);
@@ -1162,12 +1154,30 @@ dissect_kafka_string_new(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree
{
gint val;
gint len;
+ gboolean overflow;
+ proto_item *pi;
- tvb_read_kafka_varint32(tvb, offset, &val, &len);
-
- DISSECTOR_ASSERT(val >= 0);
+ val = tvb_read_kafka_varint32(tvb, offset, &len, &overflow);
- proto_tree_add_item(tree, hf_item, tvb, offset+len, val, ENC_UTF_8|ENC_NA);
+ if (overflow) {
+ pi = proto_tree_add_string_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<INVALID>");
+ expert_add_info(pinfo, pi, &ei_kafka_bad_varint);
+ val = 0;
+ } else if (val > 0) {
+ // there is payload available, possibly with 0 octets
+ pi = proto_tree_add_item(tree, hf_item, tvb, offset+len, val, ENC_NA | ENC_UTF_8);
+ } else if (val == 0) {
+ // there is empty payload (0 octets)
+ pi = proto_tree_add_string_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<EMPTY>");
+ } else if (val == -1) {
+ // there is no payload (null)
+ pi = proto_tree_add_string_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<NULL>");
+ val = 0;
+ } else {
+ pi = proto_tree_add_string_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<INVALID>");
+ expert_add_info(pinfo, pi, &ei_kafka_bad_string_length);
+ val = 0;
+ }
if (p_string_offset != NULL) {
*p_string_offset = offset+len;
@@ -1200,24 +1210,33 @@ dissect_kafka_string_new(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree
static int
dissect_kafka_bytes_new(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset, int *p_bytes_offset, int *p_bytes_length)
{
- gint val;
- gint len;
-
- tvb_read_kafka_varint32(tvb, offset, &val, &len);
+ gint val;
+ gint len;
+ gboolean overflow;
+ proto_item *pi;
- DISSECTOR_ASSERT(val >= -1);
+ val = tvb_read_kafka_varint32(tvb, offset, &len, &overflow);
- if (val > 0) {
+ if (overflow) {
+ pi = proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<INVALID>");
+ expert_add_info(pinfo, pi, &ei_kafka_bad_varint);
+ val = 0;
+ } else if (val > 0) {
// there is payload available, possibly with 0 octets
- proto_tree_add_item(tree, hf_item, tvb, offset+len, val, ENC_NA);
+ pi = proto_tree_add_item(tree, hf_item, tvb, offset+len, val, ENC_NA);
} else if (val == 0) {
// there is empty payload (0 octets)
- proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<EMPTY>");
- } else {
+ pi = proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<EMPTY>");
+ } else if (val == -1) {
// there is no payload (null)
- proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<NULL>");
+ pi = proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<NULL>");
+ val = 0;
+ } else {
+ pi = proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<INVALID>");
+ expert_add_info(pinfo, pi, &ei_kafka_bad_bytes_length);
val = 0;
}
+
if (p_bytes_offset != NULL) {
*p_bytes_offset = offset+len;
}
@@ -1265,17 +1284,21 @@ dissect_kafka_record_headers(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *
{
proto_item *record_headers_ti;
proto_tree *subtree;
-
- gint32 count;
- int i;
+ gint32 count;
+ gint len;
+ gboolean overflow;
+ int i;
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_record_headers, &record_headers_ti, "Headers");
- tvb_read_kafka_varint32(tvb, offset, &count, NULL);
- offset = dissect_kafka_varint(tvb, pinfo, subtree, hf_kafka_record_headers_count, offset);
+ count = tvb_read_kafka_varint32(tvb, offset, &len, &overflow);
+ if (overflow) {
+ expert_add_info(pinfo, record_headers_ti, &ei_kafka_bad_varint);
+ } else if (count < -1) { // -1 means null array
+ expert_add_info(pinfo, record_headers_ti, &ei_kafka_bad_array_length);
+ }
- // null array is marked by -1 as opposed to empty array marked by 0
- DISSECTOR_ASSERT(count >= -1);
+ offset += len;
for (i=0;i<count;i++) {
offset = dissect_kafka_record_headers_header(tvb, pinfo, subtree, offset);
}
@@ -1291,8 +1314,9 @@ dissect_kafka_record(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, in
proto_item *record_ti;
proto_tree *subtree;
- gint32 val;
- int len;
+ gint32 size;
+ gint len;
+ gboolean overflow;
int offset, end_offset;
@@ -1300,11 +1324,17 @@ dissect_kafka_record(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, in
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_record, &record_ti, "Record");
- tvb_read_kafka_varint32(tvb, offset, &val, &len);
- DISSECTOR_ASSERT(val >= 0);
- end_offset = offset + len + val;
+ size = tvb_read_kafka_varint32(tvb, offset, &len, &overflow);
+ if (overflow) {
+ expert_add_info(pinfo, record_ti, &ei_kafka_bad_varint);
+ return offset + len;
+ } else if (size < 6) {
+ expert_add_info(pinfo, record_ti, &ei_kafka_bad_record_length);
+ return offset + len;
+ }
- offset = dissect_kafka_varint(tvb, pinfo, subtree, hf_kafka_record_length, offset);
+ end_offset = offset + len + size;
+ offset += len;
proto_tree_add_item(subtree, hf_kafka_record_attributes, tvb, offset, 1, ENC_BIG_ENDIAN);
offset += 1;
@@ -1317,9 +1347,9 @@ dissect_kafka_record(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, in
offset = dissect_kafka_record_headers(tvb, pinfo, subtree, offset);
- DISSECTOR_ASSERT(offset == end_offset);
-
- proto_item_set_end(record_ti, tvb, offset);
+ if (offset != end_offset) {
+ expert_add_info(pinfo, record_ti, &ei_kafka_bad_record_length);
+ }
return end_offset;
}
@@ -1660,13 +1690,15 @@ dissect_kafka_message_old(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
if (decompress(tvb, pinfo, offset, length, codec, &decompressed_tvb, &decompressed_offset)==1) {
add_new_data_source(pinfo, decompressed_tvb, "Decompressed content");
show_compression_reduction(tvb, subtree, length, tvb_captured_length(decompressed_tvb));
- dissect_kafka_message_set(decompressed_tvb, pinfo, subtree, decompressed_offset, FALSE, codec);
+ dissect_kafka_message_set(decompressed_tvb, pinfo, subtree, decompressed_offset,
+ tvb_reported_length_remaining(decompressed_tvb, decompressed_offset), codec);
} else {
proto_item_append_text(subtree, " [Cannot decompress records]");
}
+ offset += length;
}
- proto_item_set_len(message_ti, offset - start_offset);
+ proto_item_set_end(message_ti, tvb, offset);
return offset;
}
@@ -1787,27 +1819,12 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int o
}
static int
-dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field, guint8 codec)
+dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, gint offset, guint len, guint8 codec)
{
proto_item *ti;
proto_tree *subtree;
- gint len, message_size;
- int offset = start_offset;
- int messages = 0;
-
- if (has_length_field) {
- proto_tree_add_item(tree, hf_kafka_message_set_size, tvb, offset, 4, ENC_BIG_ENDIAN);
- len = (gint)tvb_get_ntohl(tvb, offset);
- offset += 4;
- start_offset += 4;
- }
- else {
- len = tvb_reported_length_remaining(tvb, offset);
- }
-
- if (len <= 0) {
- return offset;
- }
+ gint end_offset = offset + len;
+ guint messages = 0;
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_message_set, &ti, "Message Set");
/* If set came from a compressed message, make it obvious in tree root */
@@ -1815,19 +1832,17 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
proto_item_append_text(subtree, " [from compressed %s message]", val_to_str_const(codec, kafka_message_codecs, "Unknown"));
}
- while (offset - start_offset < len) {
-
- message_size = tvb_get_guint32(tvb, offset+8, ENC_BIG_ENDIAN);
-
- dissect_kafka_message(tvb, pinfo, subtree, offset);
- offset += 12 + message_size;
-
+ while (offset < end_offset) {
+ offset = dissect_kafka_message(tvb, pinfo, subtree, offset);
messages += 1;
+ }
+ if (offset != end_offset) {
+ expert_add_info(pinfo, ti, &ei_kafka_bad_message_set_length);
}
proto_item_append_text(ti, " (%d Messages)", messages);
- proto_item_set_len(ti, offset - start_offset);
+ proto_item_set_end(ti, tvb, offset);
return offset;
}
@@ -2892,7 +2907,8 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_
{
proto_item *ti;
proto_tree *subtree;
- int offset = start_offset;
+ int offset = start_offset;
+ guint len;
kafka_packet_values_t packet_values;
memset(&packet_values, 0, sizeof(packet_values));
@@ -2918,7 +2934,12 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_
offset = dissect_kafka_aborted_transactions(tvb, pinfo, subtree, offset, api_version);
}
- offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE, KAFKA_MESSAGE_CODEC_NONE);
+ len = tvb_get_ntohl(tvb, offset);
+ offset += 4;
+
+ if (len > 0) {
+ offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, len, KAFKA_MESSAGE_CODEC_NONE);
+ }
proto_item_set_len(ti, offset - start_offset);
@@ -2983,6 +3004,7 @@ dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto
{
proto_item *ti;
proto_tree *subtree;
+ guint len;
kafka_packet_values_t packet_values;
memset(&packet_values, 0, sizeof(packet_values));
@@ -2990,7 +3012,12 @@ dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto
offset = dissect_kafka_partition_id_ret(tvb, pinfo, subtree, offset, &packet_values.partition_id);
- offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE, KAFKA_MESSAGE_CODEC_NONE);
+ len = tvb_get_ntohl(tvb, offset);
+ offset += 4;
+
+ if (len > 0) {
+ offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, len, KAFKA_MESSAGE_CODEC_NONE);
+ }
proto_item_append_text(ti, " (ID=%u)", packet_values.partition_id);
proto_item_set_end(ti, tvb, offset);
@@ -5099,12 +5126,15 @@ dissect_kafka_offset_for_leader_epoch_request_topic_partition(tvbuff_t *tvb, pac
partition_id = tvb_get_ntohl(tvb, offset);
proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
if (api_version >= 2) {
proto_tree_add_item(subtree, hf_kafka_current_leader_epoch, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
}
proto_tree_add_item(subtree, hf_kafka_leader_epoch, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
proto_item_set_end(subti, tvb, offset);
@@ -5263,7 +5293,7 @@ dissect_kafka_add_partitions_to_txn_request_topic_partition(tvbuff_t *tvb, packe
{
proto_tree_add_item(tree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
- return offset;
+ return offset+4;
}
static int
@@ -8354,11 +8384,6 @@ proto_register_kafka(void)
FT_INT32, BASE_DEC, 0, 0,
NULL, HFILL }
},
- { &hf_kafka_message_set_size,
- { "Message Set Size", "kafka.message_set_size",
- FT_INT32, BASE_DEC, 0, 0,
- NULL, HFILL }
- },
{ &hf_kafka_message_size,
{ "Message Size", "kafka.message_size",
FT_INT32, BASE_DEC, 0, 0,
@@ -8734,16 +8759,6 @@ proto_register_kafka(void)
FT_BYTES, BASE_SHOW_ASCII_PRINTABLE, 0, 0,
NULL, HFILL }
},
- { &hf_kafka_record_headers_count,
- { "Headers Count", "kafka.headers_count",
- FT_INT64, BASE_DEC, 0, 0,
- NULL, HFILL }
- },
- { &hf_kafka_record_length,
- { "Record Length", "kafka.record_length",
- FT_INT64, BASE_DEC, 0, 0,
- NULL, HFILL }
- },
{ &hf_kafka_record_attributes,
{ "Record Attributes (reserved)", "kafka.record_attributes",
FT_INT8, BASE_DEC, 0, 0,
@@ -9027,6 +9042,14 @@ proto_register_kafka(void)
{ "kafka.bad_string_length", PI_MALFORMED, PI_WARN, "Invalid string length field", EXPFILL }},
{ &ei_kafka_bad_bytes_length,
{ "kafka.bad_bytes_length", PI_MALFORMED, PI_WARN, "Invalid byte length field", EXPFILL }},
+ { &ei_kafka_bad_array_length,
+ { "kafka.bad_array_length", PI_MALFORMED, PI_WARN, "Invalid array length field", EXPFILL }},
+ { &ei_kafka_bad_record_length,
+ { "kafka.bad_record_length", PI_MALFORMED, PI_WARN, "Invalid record length field", EXPFILL }},
+ { &ei_kafka_bad_varint,
+ { "kafka.bad_varint", PI_MALFORMED, PI_WARN, "Invalid varint bytes", EXPFILL }},
+ { &ei_kafka_bad_message_set_length,
+ { "kafka.ei_kafka_bad_message_set_length", PI_MALFORMED, PI_WARN, "Message set size does not match content", EXPFILL }},
{ &ei_kafka_unknown_message_magic,
{ "kafka.unknown_message_magic", PI_MALFORMED, PI_WARN, "Invalid message magic field", EXPFILL }},
};