summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-19 09:28:18 +0200
committerPaul Buetow <paul@buetow.org>2026-03-19 09:28:18 +0200
commit75abacb64468952f2202e38f35d12b8f2011d7c8 (patch)
tree1eebc803e4b7348023ae7c45719620b7263d14de
parent74a71ddef8a59bb4af177503a66e3403fa5fb7d5 (diff)
fix: preserve generator writer-queue backpressure (task 462)
-rw-r--r--ioriot/src/datas/rbuffer.c46
-rw-r--r--ioriot/src/datas/rbuffer.h12
-rw-r--r--ioriot/src/generate/generate.c3
-rw-r--r--ioriot/src/generate/gparser.c4
4 files changed, 61 insertions, 4 deletions
diff --git a/ioriot/src/datas/rbuffer.c b/ioriot/src/datas/rbuffer.c
index c019e6c..f0dfcdf 100644
--- a/ioriot/src/datas/rbuffer.c
+++ b/ioriot/src/datas/rbuffer.c
@@ -48,6 +48,14 @@ bool rbuffer_insert(rbuffer_s* r, void *data)
return true;
}
+void rbuffer_insert_wait(rbuffer_s* r, void *data, const useconds_t sleep_us)
+{
+ Error_if(sleep_us == 0, "Expected retry delay to be greater than zero");
+
+ while (!rbuffer_insert(r, data))
+ usleep(sleep_us);
+}
+
bool rbuffer_has_next(rbuffer_s* r)
{
sig_atomic_t read_pos = (r->read_pos+1) % r->size;
@@ -92,6 +100,22 @@ void rbuffer_print(rbuffer_s* r)
Out("\n");
}
+typedef struct {
+ rbuffer_s *buffer;
+ useconds_t sleep_us;
+ long expected;
+} rbuffer_test_drain_s;
+
+static void *_rbuffer_test_drain_one(void *data)
+{
+ rbuffer_test_drain_s *d = data;
+
+ usleep(d->sleep_us);
+ assert(d->expected == (long) rbuffer_get_next(d->buffer));
+
+ return NULL;
+}
+
void rbuffer_test(void)
{
rbuffer_s *r = rbuffer_new(5);
@@ -144,4 +168,26 @@ void rbuffer_test(void)
assert(NULL == rbuffer_get_next(r));
rbuffer_destroy(r);
+
+ r = rbuffer_new(3);
+ assert(rbuffer_insert(r, (void*)1));
+ assert(rbuffer_insert(r, (void*)2));
+ assert(!rbuffer_insert(r, (void*)3));
+
+ rbuffer_test_drain_s drain = {
+ .buffer = r,
+ .sleep_us = 1000,
+ .expected = 1,
+ };
+ pthread_t drain_thread;
+ start_pthread(&drain_thread, _rbuffer_test_drain_one, &drain);
+
+ rbuffer_insert_wait(r, (void*)3, 100);
+ pthread_join(drain_thread, NULL);
+
+ assert(2 == (long) rbuffer_get_next(r));
+ assert(3 == (long) rbuffer_get_next(r));
+ assert(NULL == rbuffer_get_next(r));
+
+ rbuffer_destroy(r);
}
diff --git a/ioriot/src/datas/rbuffer.h b/ioriot/src/datas/rbuffer.h
index 7aeeac1..c692f75 100644
--- a/ioriot/src/datas/rbuffer.h
+++ b/ioriot/src/datas/rbuffer.h
@@ -76,6 +76,18 @@ void rbuffer_destroy(rbuffer_s* r);
bool rbuffer_insert(rbuffer_s* r, void *data);
/**
+ * @brief Inserts data pointer to the ring buffer, waiting for space
+ *
+ * This helper preserves backpressure on correctness-critical queue handoffs by
+ * retrying until the insert succeeds.
+ *
+ * @param r The ring buffer object
+ * @param data The data pointer
+ * @param sleep_us The retry delay in microseconds
+ */
+void rbuffer_insert_wait(rbuffer_s* r, void *data, const useconds_t sleep_us);
+
+/**
* @brief Determines whether there is any data in the ring buffer
*
* @param r The ring buffer object
diff --git a/ioriot/src/generate/generate.c b/ioriot/src/generate/generate.c
index 0185c50..65e07cc 100644
--- a/ioriot/src/generate/generate.c
+++ b/ioriot/src/generate/generate.c
@@ -143,8 +143,7 @@ status_e generate_run(options_s *opts)
gtask_init(t, line, g->lineno);
// ...pass it to the parser queue
- while (!rbuffer_insert(parser->queue, t))
- usleep(100);
+ rbuffer_insert_wait(parser->queue, t, 100);
if (g->lineno % 1000000 == 0) {
Out(" %lu (filtered:%.2lf%%)", g->lineno, _Perc_filtered);
diff --git a/ioriot/src/generate/gparser.c b/ioriot/src/generate/gparser.c
index ff171f4..b9e91de 100644
--- a/ioriot/src/generate/gparser.c
+++ b/ioriot/src/generate/gparser.c
@@ -29,14 +29,14 @@ void* gparser_pthread_start(void *data)
// First extract
gparser_extract(p, t);
// Second, pass the task to the writer thread
- rbuffer_insert(w->queue, t);
+ rbuffer_insert_wait(w->queue, t, 100);
}
usleep(100);
} while (!p->terminate);
while (NULL != (t = rbuffer_get_next(p->queue))) {
gparser_extract(p, t);
- rbuffer_insert(w->queue, t);
+ rbuffer_insert_wait(w->queue, t, 100);
}
return NULL;