aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNeels Janosch Hofmeyr <nhofmeyr@sysmocom.de>2023-04-06 04:53:30 +0200
committerNeels Janosch Hofmeyr <nhofmeyr@sysmocom.de>2023-09-13 23:03:08 +0200
commite57ed39fbc55994471b6e3815fa6f4dfc928d9a6 (patch)
tree23391482c83659a587bcf3b8fc9e9683cbb358b0
parent4c8ab89921f5bea32bd8740eec73dcb97984ad4a (diff)
wip: write queueneels/wq
-rw-r--r--src/libosmo-pfcp/pfcp_endpoint.c157
1 files changed, 97 insertions, 60 deletions
diff --git a/src/libosmo-pfcp/pfcp_endpoint.c b/src/libosmo-pfcp/pfcp_endpoint.c
index 7e08d8e..ba4d999 100644
--- a/src/libosmo-pfcp/pfcp_endpoint.c
+++ b/src/libosmo-pfcp/pfcp_endpoint.c
@@ -28,6 +28,7 @@
#include <osmocom/core/talloc.h>
#include <osmocom/core/timer.h>
#include <osmocom/core/tdef.h>
+#include <osmocom/core/write_queue.h>
#include <osmocom/pfcp/pfcp_endpoint.h>
#include <osmocom/pfcp/pfcp_msg.h>
@@ -37,7 +38,7 @@ struct osmo_pfcp_endpoint {
struct osmo_pfcp_endpoint_cfg cfg;
/* PFCP socket */
- struct osmo_fd pfcp_fd;
+ struct osmo_wqueue wq;
/* The time at which this endpoint last restarted, as seconds since unix epoch. */
uint32_t recovery_time_stamp;
@@ -145,7 +146,9 @@ struct osmo_pfcp_endpoint *osmo_pfcp_endpoint_create(void *ctx, const struct osm
INIT_LLIST_HEAD(&ep->sent_requests);
INIT_LLIST_HEAD(&ep->sent_responses);
- ep->pfcp_fd.fd = -1;
+ /* proper init happens in osmo_pfcp_endpoint_bind() */
+ osmo_wqueue_init(&ep->wq, 0);
+ ep->wq.bfd.fd = -1;
/* time() returns seconds since 1970 (UNIX epoch), but the recovery_time_stamp is coded in the NTP format, which is
* seconds since 1900, the NTP era 0. 2208988800L is the offset between UNIX epoch and NTP era 0.
@@ -241,25 +244,42 @@ static void pfcp_queue_sent_resp_timer_cb(void *data)
static int osmo_pfcp_endpoint_tx_data_no_logging(struct osmo_pfcp_endpoint *ep, struct osmo_pfcp_msg *m)
{
int rc;
-
- if (!m->encoded) {
- /* Allocate msgb as child of the message m, so that when m gets deallocated at the end of
- * retransmission queueing, the msgb gets deallocated with it. */
- m->encoded = msgb_alloc_c(m, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-tx");
- OSMO_ASSERT(m->encoded);
- rc = osmo_pfcp_msg_encode(m->encoded, m);
+ struct msgb *msg;
+
+ if (m->encoded) {
+ /* Snatch the already encoded message from osmo_pfcp_msg.
+ * This is a tradeoff decision:
+ * The osmo_pfcp_msg may be queued and retransmitted, and we'd still need the encoded msgb then.
+ * If we steal the msgb from the osmo_pfcp_msg, it has to call the encoding again on retransmission.
+ * If we don't steal the msgb, we need have to make a copy of it for every transmission.
+ * Let's not optimize the retransmission case, but rather the common successful case.
+ */
+ msg = m->encoded;
+ m->encoded = NULL;
+ /* Make sure the msg in the write queue doesn't get deallocated along with the osmo_pfcp_msg */
+ talloc_steal(ep, msg);
+ } else {
+ /* Allocate new msgb for the write queue. */
+ msg = msgb_alloc_c(ep, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-tx");
+ OSMO_ASSERT(msg);
+ rc = osmo_pfcp_msg_encode(msg, m);
if (rc) {
- msgb_free(m->encoded);
- m->encoded = NULL;
+ msgb_free(msg);
return rc;
}
}
- rc = sendto(ep->pfcp_fd.fd, msgb_data(m->encoded), msgb_length(m->encoded), 0,
- (struct sockaddr *)&m->remote_addr, sizeof(m->remote_addr));
- if (rc != msgb_length(m->encoded)) {
- OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "sendto() failed: rc = %d != length %u\n",
- rc, msgb_length(m->encoded));
+ /* pass on the destination address; hope dearly that the PFCP retransmission queue keeps the pointer valid long
+ * enough (TEMPORARy hACK)*/
+ *(void**)msg->cb = (void*)&m->remote_addr;
+
+ rc = osmo_wqueue_enqueue(&ep->wq, msg);
+ if (rc) {
+ OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "failed to add PFCP message to write queue (rc=%d, wq.len=%u)\n",
+ rc, ep->wq.current_length);
+ /* Free msgb: instead, just tag it back to the osmo_pfcp_msg */
+ m->encoded = msg;
+ talloc_steal(m, msg);
return -EIO;
}
return 0;
@@ -420,50 +440,64 @@ static void osmo_pfcp_endpoint_handle_rx(struct osmo_pfcp_endpoint *ep, struct o
}
/* call-back for PFCP socket file descriptor */
-static int osmo_pfcp_fd_cb(struct osmo_fd *ofd, unsigned int what)
+static int osmo_pfcp_fd_read_cb(struct osmo_fd *ofd)
{
int rc;
struct osmo_pfcp_endpoint *ep = ofd->data;
-
- if (what & OSMO_FD_READ) {
- struct osmo_sockaddr remote;
- socklen_t remote_len = sizeof(remote);
- struct msgb *msg = msgb_alloc_c(OTC_SELECT, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-rx");
- if (!msg)
- return -ENOMEM;
-
- msg->l3h = msg->tail;
- rc = recvfrom(ofd->fd, msg->tail, msgb_tailroom(msg), 0, (struct sockaddr *)&remote, &remote_len);
- if (rc <= 0)
- return -EIO;
- msgb_put(msg, rc);
-
- OSMO_ASSERT(ep->cfg.rx_msg_cb);
-
- /* This may be a bundle of PFCP messages. Parse and receive each message received, by shifting l4h
- * through the message bundle. */
- msg->l4h = msg->l3h;
- while (msgb_l4len(msg)) {
- struct osmo_gtlv_load tlv;
- struct osmo_pfcp_msg *m = osmo_pfcp_msg_alloc_rx(OTC_SELECT, &remote);
- m->encoded = msg;
-
- rc = osmo_pfcp_msg_decode_header(&tlv, m, msg);
- if (rc < 0)
- break;
- msg->l4h += rc;
-
- rc = osmo_pfcp_msg_decode_tlv(m, &tlv);
- /* If errors occurred, they have already been logged on DLPFCP. */
- if (rc == 0)
- osmo_pfcp_endpoint_handle_rx(ep, m);
- osmo_pfcp_msg_free(m);
- }
- msgb_free(msg);
+ struct osmo_sockaddr remote;
+ socklen_t remote_len = sizeof(remote);
+ struct msgb *msg = msgb_alloc_c(OTC_SELECT, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-rx");
+ if (!msg)
+ return -ENOMEM;
+
+ msg->l3h = msg->tail;
+ rc = recvfrom(ofd->fd, msg->tail, msgb_tailroom(msg), 0, (struct sockaddr *)&remote, &remote_len);
+ if (rc <= 0)
+ return -EIO;
+ msgb_put(msg, rc);
+
+ OSMO_ASSERT(ep->cfg.rx_msg_cb);
+
+ /* This may be a bundle of PFCP messages. Parse and receive each message received, by shifting l4h
+ * through the message bundle. */
+ msg->l4h = msg->l3h;
+ while (msgb_l4len(msg)) {
+ struct osmo_gtlv_load tlv;
+ struct osmo_pfcp_msg *m = osmo_pfcp_msg_alloc_rx(OTC_SELECT, &remote);
+ m->encoded = msg;
+
+ rc = osmo_pfcp_msg_decode_header(&tlv, m, msg);
+ if (rc < 0)
+ break;
+ msg->l4h += rc;
+
+ rc = osmo_pfcp_msg_decode_tlv(m, &tlv);
+ /* If errors occurred, they have already been logged on DLPFCP. */
+ if (rc == 0)
+ osmo_pfcp_endpoint_handle_rx(ep, m);
+ osmo_pfcp_msg_free(m);
}
+ msgb_free(msg);
return 0;
}
+static int osmo_pfcp_fd_write_cb(struct osmo_fd *bfd, struct msgb *msg)
+{
+ int rc;
+ const int limit = 42;
+ struct sockaddr *dest_addr = *(void**)msg->cb;
+
+ rc = sendto(bfd->fd, msgb_data(msg), msgb_length(msg), 0,
+ dest_addr, sizeof(*dest_addr));
+ if (rc != msg->len)
+ LOGP(DLPFCP, LOGL_ERROR, "Failed to write to PFCP fd: %s: %d='%s'; msg: len=%u %s%s\n",
+ osmo_sock_get_name2(bfd->fd), errno, strerror(errno),
+ msg->len,
+ osmo_hexdump_nospc(msg->data, OSMO_MIN(limit, msg->len)),
+ limit < msg->len ? "..." : "");
+ return rc;
+}
+
/*! bind a PFCP endpoint to its configured address (ep->cfg.local_addr).
* \return 0 on success, negative on error. */
int osmo_pfcp_endpoint_bind(struct osmo_pfcp_endpoint *ep)
@@ -478,9 +512,12 @@ int osmo_pfcp_endpoint_bind(struct osmo_pfcp_endpoint *ep)
}
/* create the new socket, binding to configured local address */
- ep->pfcp_fd.cb = osmo_pfcp_fd_cb;
- ep->pfcp_fd.data = ep;
- rc = osmo_sock_init_osa_ofd(&ep->pfcp_fd, SOCK_DGRAM, IPPROTO_UDP, &ep->cfg.local_addr, NULL, OSMO_SOCK_F_BIND);
+ osmo_wqueue_init(&ep->wq, 10000);
+ ep->wq.read_cb = osmo_pfcp_fd_read_cb;
+ ep->wq.write_cb = osmo_pfcp_fd_write_cb;
+ osmo_fd_setup(&ep->wq.bfd, -1, OSMO_FD_READ, osmo_wqueue_bfd_cb, ep, 0);
+
+ rc = osmo_sock_init_osa_ofd(&ep->wq.bfd, SOCK_DGRAM, IPPROTO_UDP, &ep->cfg.local_addr, NULL, OSMO_SOCK_F_BIND);
if (rc < 0)
return rc;
return 0;
@@ -494,10 +531,10 @@ void osmo_pfcp_endpoint_close(struct osmo_pfcp_endpoint *ep)
while ((qe = llist_first_entry_or_null(&ep->sent_responses, struct osmo_pfcp_queue_entry, entry)))
osmo_pfcp_queue_del(qe);
- if (ep->pfcp_fd.fd != -1) {
- osmo_fd_unregister(&ep->pfcp_fd);
- close(ep->pfcp_fd.fd);
- ep->pfcp_fd.fd = -1;
+ if (ep->wq.bfd.fd != -1) {
+ osmo_fd_unregister(&ep->wq.bfd);
+ close(ep->wq.bfd.fd);
+ ep->wq.bfd.fd = -1;
}
}