From d01acfcc75a6c5798a95a8ccca9be18eba65a0bf Mon Sep 17 00:00:00 2001 From: Jacob Erlbeck Date: Mon, 26 Oct 2015 16:22:45 +0100 Subject: stats: Support statsd Multi-Metric Packets If the MTU is given, combine several messages into a single UDP packet until the limit is reached. Flush all reporters after the values have been scanned. New vty commands (node config-stats): mtu <100-65535> Enable multi-metric packets and set the maximum packet size (in byte) no mtu Disable multi-metric packets Note that single messages that are longer than the given MTU (minus 28 octets protocol overhead) will be dropped. Sponsored-by: On-Waves ehf --- include/osmocom/core/stats.h | 5 ++ src/stats.c | 108 ++++++++++++++++++++++++++++++++++++++----- src/vty/stats_vty.c | 19 ++++++++ 3 files changed, 121 insertions(+), 11 deletions(-) diff --git a/include/osmocom/core/stats.h b/include/osmocom/core/stats.h index 489e0e41..ed461ddc 100644 --- a/include/osmocom/core/stats.h +++ b/include/osmocom/core/stats.h @@ -22,6 +22,8 @@ #include #include +struct msgb; + enum stats_reporter_type { STATS_REPORTER_STATSD, }; @@ -45,6 +47,8 @@ struct stats_reporter { struct sockaddr bind_addr; int bind_addr_len; int fd; + struct msgb *buffer; + int agg_enabled; struct llist_head list; }; @@ -71,6 +75,7 @@ struct stats_reporter *stats_reporter_find(enum stats_reporter_type type, int stats_reporter_set_remote_addr(struct stats_reporter *srep, const char *addr); int stats_reporter_set_remote_port(struct stats_reporter *srep, int port); int stats_reporter_set_local_addr(struct stats_reporter *srep, const char *addr); +int stats_reporter_set_mtu(struct stats_reporter *srep, int mtu); int stats_reporter_set_name_prefix(struct stats_reporter *srep, const char *prefix); int stats_reporter_enable(struct stats_reporter *srep); int stats_reporter_disable(struct stats_reporter *srep); diff --git a/src/stats.c b/src/stats.c index ef4be828..8faed89a 100644 --- a/src/stats.c +++ b/src/stats.c @@ -38,11 +38,13 @@ #include #include #include +#include /* TODO: register properly */ #define DSTATS DLGLOBAL #define STATS_DEFAULT_INTERVAL 5 /* secs */ +#define STATS_DEFAULT_STATSD_BUFLEN 256 static LLIST_HEAD(stats_reporter_list); static void *stats_ctx = NULL; @@ -60,6 +62,7 @@ static int stats_reporter_statsd_open(struct stats_reporter *srep); static int stats_reporter_statsd_close(struct stats_reporter *srep); static int stats_reporter_send(struct stats_reporter *srep, const char *data, int data_len); +static int stats_reporter_send_buffer(struct stats_reporter *srep); static int update_srep_config(struct stats_reporter *srep) { @@ -213,6 +216,16 @@ int stats_reporter_set_local_addr(struct stats_reporter *srep, const char *addr) return update_srep_config(srep); } +int stats_reporter_set_mtu(struct stats_reporter *srep, int mtu) +{ + if (mtu < 0) + return -EINVAL; + + srep->mtu = mtu; + + return update_srep_config(srep); +} + int stats_set_interval(int interval) { if (interval <= 0) @@ -261,6 +274,21 @@ static int stats_reporter_send(struct stats_reporter *srep, const char *data, return rc; } +static int stats_reporter_send_buffer(struct stats_reporter *srep) +{ + int rc; + + if (!srep->buffer || msgb_length(srep->buffer) == 0) + return 0; + + rc = stats_reporter_send(srep, + (const char *)msgb_data(srep->buffer), msgb_length(srep->buffer)); + + msgb_trim(srep->buffer, 0); + + return rc; +} + /*** statsd reporter ***/ struct stats_reporter *stats_reporter_create_statsd(const char *name) @@ -275,6 +303,7 @@ static int stats_reporter_statsd_open(struct stats_reporter *srep) { int sock; int rc; + int buffer_size = STATS_DEFAULT_STATSD_BUFLEN; if (srep->fd != -1) stats_reporter_statsd_close(srep); @@ -291,6 +320,13 @@ static int stats_reporter_statsd_open(struct stats_reporter *srep) srep->fd = sock; + if (srep->mtu > 0) { + buffer_size = srep->mtu - 20 /* IP */ - 8 /* UDP */; + srep->agg_enabled = 1; + } + + srep->buffer = msgb_alloc(buffer_size, "stats buffer"); + return 0; failed: @@ -306,8 +342,12 @@ static int stats_reporter_statsd_close(struct stats_reporter *srep) if (srep->fd == -1) return -EBADF; + stats_reporter_send_buffer(srep); + rc = close(srep->fd); srep->fd = -1; + msgb_free(srep->buffer); + srep->buffer = NULL; return rc == -1 ? -errno : 0; } @@ -315,30 +355,62 @@ static int stats_reporter_statsd_send(struct stats_reporter *srep, const char *name1, int index1, const char *name2, int value, const char *unit) { - char buf[256]; - int nchars, rc; + char *buf; + int buf_size; + int nchars, rc = 0; char *fmt = NULL; + int old_len = msgb_length(srep->buffer); if (name1) { if (index1 > 0) - fmt = "%1$s.%2$s.%3$d.%4$s:%5$d|%6$s"; + fmt = "%1$s.%2$s.%6$d.%3$s:%4$d|%5$s"; else - fmt = "%1$s.%2$s.%4$s:%5$d|%6$s"; + fmt = "%1$s.%2$s.%3$s:%4$d|%5$s"; } else { - fmt = "%1$s.%4$s:%5$d|%6$s"; + fmt = "%1$s.%2$0.0s%3$s:%4$d|%5$s"; } if (!srep->name_prefix) fmt += 5; /* skip prefix part */ - nchars = snprintf(buf, sizeof(buf), fmt, - srep->name_prefix, name1, index1, name2, - value, unit); + if (srep->agg_enabled) { + if (msgb_length(srep->buffer) > 0 && + msgb_tailroom(srep->buffer) > 0) + { + msgb_put_u8(srep->buffer, '\n'); + } + } + + buf = (char *)msgb_put(srep->buffer, 0); + buf_size = msgb_tailroom(srep->buffer); - if (nchars >= sizeof(buf)) + nchars = snprintf(buf, buf_size, fmt, + srep->name_prefix, name1, name2, + value, unit, index1); + + if (nchars >= buf_size) { /* Truncated */ - return -EMSGSIZE; + /* Restore original buffer (without trailing LF) */ + msgb_trim(srep->buffer, old_len); + /* Send it */ + rc = stats_reporter_send_buffer(srep); + + /* Try again */ + buf = (char *)msgb_put(srep->buffer, 0); + buf_size = msgb_tailroom(srep->buffer); - rc = stats_reporter_send(srep, buf, nchars); + nchars = snprintf(buf, buf_size, fmt, + srep->name_prefix, name1, name2, + value, unit, index1); + + if (nchars >= buf_size) + return -EMSGSIZE; + } + + if (nchars > 0) + msgb_trim(srep->buffer, msgb_length(srep->buffer) + nchars); + + if (!srep->agg_enabled) + rc = stats_reporter_send_buffer(srep); return rc; } @@ -498,11 +570,25 @@ static int handle_counter(struct osmo_counter *counter, void *sctx_) /*** main reporting function ***/ +static void flush_all_reporters() +{ + struct stats_reporter *srep; + + llist_for_each_entry(srep, &stats_reporter_list, list) { + if (!srep->running) + continue; + + stats_reporter_send_buffer(srep); + } +} + int stats_report() { osmo_counters_for_each(handle_counter, NULL); rate_ctr_for_each_group(rate_ctr_group_handler, NULL); stat_item_for_each_group(stat_item_group_handler, NULL); + flush_all_reporters(); + return 0; } diff --git a/src/vty/stats_vty.c b/src/vty/stats_vty.c index a4fd7b05..775184c9 100644 --- a/src/vty/stats_vty.c +++ b/src/vty/stats_vty.c @@ -128,6 +128,23 @@ DEFUN(cfg_stats_reporter_remote_port, cfg_stats_reporter_remote_port_cmd, argv[0], "remote port"); } +DEFUN(cfg_stats_reporter_mtu, cfg_stats_reporter_mtu_cmd, + "mtu <100-65535>", + "Set the maximum packet size\n" + "Size in byte\n") +{ + return set_srep_parameter_int(vty, stats_reporter_set_mtu, + argv[0], "mtu"); +} + +DEFUN(cfg_no_stats_reporter_mtu, cfg_no_stats_reporter_mtu_cmd, + "no mtu", + NO_STR "Set the maximum packet size\n") +{ + return set_srep_parameter_int(vty, stats_reporter_set_mtu, + 0, "mtu"); +} + DEFUN(cfg_stats_reporter_prefix, cfg_stats_reporter_prefix_cmd, "prefix PREFIX", "Set the item name prefix\n" @@ -312,6 +329,8 @@ void stats_vty_add_cmds() install_element(CFG_STATS_NODE, &cfg_no_stats_reporter_local_ip_cmd); install_element(CFG_STATS_NODE, &cfg_stats_reporter_remote_ip_cmd); install_element(CFG_STATS_NODE, &cfg_stats_reporter_remote_port_cmd); + install_element(CFG_STATS_NODE, &cfg_stats_reporter_mtu_cmd); + install_element(CFG_STATS_NODE, &cfg_no_stats_reporter_mtu_cmd); install_element(CFG_STATS_NODE, &cfg_stats_reporter_prefix_cmd); install_element(CFG_STATS_NODE, &cfg_no_stats_reporter_prefix_cmd); install_element(CFG_STATS_NODE, &cfg_stats_reporter_enable_cmd); -- cgit v1.2.3