diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-19 09:28:18 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-19 09:28:18 +0200 |
| commit | 75abacb64468952f2202e38f35d12b8f2011d7c8 (patch) | |
| tree | 1eebc803e4b7348023ae7c45719620b7263d14de | |
| parent | 74a71ddef8a59bb4af177503a66e3403fa5fb7d5 (diff) | |
fix: preserve generator writer-queue backpressure (task 462)
| -rw-r--r-- | ioriot/src/datas/rbuffer.c | 46 | ||||
| -rw-r--r-- | ioriot/src/datas/rbuffer.h | 12 | ||||
| -rw-r--r-- | ioriot/src/generate/generate.c | 3 | ||||
| -rw-r--r-- | ioriot/src/generate/gparser.c | 4 |
4 files changed, 61 insertions, 4 deletions
diff --git a/ioriot/src/datas/rbuffer.c b/ioriot/src/datas/rbuffer.c index c019e6c..f0dfcdf 100644 --- a/ioriot/src/datas/rbuffer.c +++ b/ioriot/src/datas/rbuffer.c @@ -48,6 +48,14 @@ bool rbuffer_insert(rbuffer_s* r, void *data) return true; } +void rbuffer_insert_wait(rbuffer_s* r, void *data, const useconds_t sleep_us) +{ + Error_if(sleep_us == 0, "Expected retry delay to be greater than zero"); + + while (!rbuffer_insert(r, data)) + usleep(sleep_us); +} + bool rbuffer_has_next(rbuffer_s* r) { sig_atomic_t read_pos = (r->read_pos+1) % r->size; @@ -92,6 +100,22 @@ void rbuffer_print(rbuffer_s* r) Out("\n"); } +typedef struct { + rbuffer_s *buffer; + useconds_t sleep_us; + long expected; +} rbuffer_test_drain_s; + +static void *_rbuffer_test_drain_one(void *data) +{ + rbuffer_test_drain_s *d = data; + + usleep(d->sleep_us); + assert(d->expected == (long) rbuffer_get_next(d->buffer)); + + return NULL; +} + void rbuffer_test(void) { rbuffer_s *r = rbuffer_new(5); @@ -144,4 +168,26 @@ void rbuffer_test(void) assert(NULL == rbuffer_get_next(r)); rbuffer_destroy(r); + + r = rbuffer_new(3); + assert(rbuffer_insert(r, (void*)1)); + assert(rbuffer_insert(r, (void*)2)); + assert(!rbuffer_insert(r, (void*)3)); + + rbuffer_test_drain_s drain = { + .buffer = r, + .sleep_us = 1000, + .expected = 1, + }; + pthread_t drain_thread; + start_pthread(&drain_thread, _rbuffer_test_drain_one, &drain); + + rbuffer_insert_wait(r, (void*)3, 100); + pthread_join(drain_thread, NULL); + + assert(2 == (long) rbuffer_get_next(r)); + assert(3 == (long) rbuffer_get_next(r)); + assert(NULL == rbuffer_get_next(r)); + + rbuffer_destroy(r); } diff --git a/ioriot/src/datas/rbuffer.h b/ioriot/src/datas/rbuffer.h index 7aeeac1..c692f75 100644 --- a/ioriot/src/datas/rbuffer.h +++ b/ioriot/src/datas/rbuffer.h @@ -76,6 +76,18 @@ void rbuffer_destroy(rbuffer_s* r); bool rbuffer_insert(rbuffer_s* r, void *data); /** + * @brief Inserts data pointer to the ring buffer, waiting for space + * + * This helper preserves backpressure on correctness-critical queue handoffs by + * retrying until the insert succeeds. + * + * @param r The ring buffer object + * @param data The data pointer + * @param sleep_us The retry delay in microseconds + */ +void rbuffer_insert_wait(rbuffer_s* r, void *data, const useconds_t sleep_us); + +/** * @brief Determines whether there is any data in the ring buffer * * @param r The ring buffer object diff --git a/ioriot/src/generate/generate.c b/ioriot/src/generate/generate.c index 0185c50..65e07cc 100644 --- a/ioriot/src/generate/generate.c +++ b/ioriot/src/generate/generate.c @@ -143,8 +143,7 @@ status_e generate_run(options_s *opts) gtask_init(t, line, g->lineno); // ...pass it to the parser queue - while (!rbuffer_insert(parser->queue, t)) - usleep(100); + rbuffer_insert_wait(parser->queue, t, 100); if (g->lineno % 1000000 == 0) { Out(" %lu (filtered:%.2lf%%)", g->lineno, _Perc_filtered); diff --git a/ioriot/src/generate/gparser.c b/ioriot/src/generate/gparser.c index ff171f4..b9e91de 100644 --- a/ioriot/src/generate/gparser.c +++ b/ioriot/src/generate/gparser.c @@ -29,14 +29,14 @@ void* gparser_pthread_start(void *data) // First extract gparser_extract(p, t); // Second, pass the task to the writer thread - rbuffer_insert(w->queue, t); + rbuffer_insert_wait(w->queue, t, 100); } usleep(100); } while (!p->terminate); while (NULL != (t = rbuffer_get_next(p->queue))) { gparser_extract(p, t); - rbuffer_insert(w->queue, t); + rbuffer_insert_wait(w->queue, t, 100); } return NULL; |
