/* * Handle ring buffer link between ESP32 and FPGA * This implements the ESP32 (upstream/active/initiator/master) side. */ #define MODULE "esplink" #include "common.h" #include "esplink.h" #include "fpga.h" struct esplink_ringbuf_ptrs { const volatile struct esplink_ptrs_dstr *d; volatile struct esplink_ptrs_ustr *u; }; struct esplink_sem { SemaphoreHandle_t lock; }; /* * Event group indicating which ring buffers are ready for I/O */ EventGroupHandle_t esplink_filled; /* Data requested in "fill" ready */ static struct { volatile unsigned int ready; bool shutdown; SemaphoreHandle_t mutex; /* Configuration mutex */ struct esplink_ringbuf_ptrs rb; struct esplink_ringbuf_desc *desc; struct esplink_ringbuf_head head; struct esplink_sem sem[EL_RB_COUNT][2]; size_t need[EL_RB_COUNT][2]; /* Amount of data requested for wakeup */ } elink; /* Leave at least this much space, to preserve alignment */ #define RINGBUF_UNUSABLE 4 #define ELQUEUE_ALL_MASK ((1UL << (EL_RB_COUNT*2)) - 1) static void esplink_stop(void) { elink.ready = 0; /* No waking up waiters that only want online wakeups */ xEventGroupClearBits(esplink_filled, ELWAIT_ONLINE); /* Wake up all the internal waiters or others with online = false */ xEventGroupSetBits(esplink_filled, ELQUEUE_ALL_MASK); struct esplink_sem *s = &elink.sem[0][0]; for (size_t i = 0; i < EL_RB_COUNT*2; i++) { xSemaphoreTake(s->lock, portMAX_DELAY); s++; } } /* This must be called and completed before any other esplink functions! */ void esplink_init(void) { elink.mutex = null_check(xSemaphoreCreateMutex()); esplink_filled = null_check(xEventGroupCreate()); elink.desc = xmalloc_dma(EL_RB_COUNT * sizeof *elink.desc); elink.rb.u = xmalloc_dma(EL_RB_COUNT * sizeof *elink.rb.u); elink.rb.d = xmalloc_dma(EL_RB_COUNT * sizeof *elink.rb.d); struct esplink_sem *s = &elink.sem[0][0]; for (size_t i = 0; i < EL_RB_COUNT*2; i++) { s->lock = null_check(xSemaphoreCreateBinary()); s++; } xSemaphoreGive(elink.mutex); } /* * This needs to be called from the FPGA service thread once before * any esplink functions can be called from any other thread. After that, * those functions are safe to call (but may fail) regardless of * shutdown and/or reinitialization of the link. * * Call this function with head == NULL to either shut the link down * or to initialize the data structures (see above) */ void esplink_start(const struct esplink_head *head) { struct fpga_iov iov[3]; static unsigned int gen_count; static bool started = false; xSemaphoreTake(elink.mutex, portMAX_DELAY); if (elink.ready) esplink_stop(); if (!head) goto shutdown; /* At this point elink.mutex and all the ->lock mutexes are held */ elink.head = head->rb; elink.head.count = Min(head->rb.count, EL_RB_COUNT); size_t desc_size = sizeof(*elink.desc) * elink.head.count; size_t dptr_size = sizeof(*elink.rb.d) * elink.head.count; /* Set wakeup thresholds */ for (size_t i = 0; i < elink.head.count; i++) { elink.need[i][0] = RINGBUF_UNUSABLE + 1; elink.need[i][1] = 1; } iov[0].cmd = FPGA_CMD_RD; iov[0].addr = elink.head.desc; iov[0].rdata = (void *)&elink.desc; iov[0].len = desc_size; iov[1].cmd = FPGA_CMD_RD | FPGA_CMD_ACK(EL_UIRQ_RINGBUF); iov[1].addr = elink.head.dstr; iov[1].rdata = (void *)elink.rb.d; iov[1].len = dptr_size; /* Write back the same pointer values -> all buffers currently empty */ iov[2].cmd = FPGA_CMD_WR | FPGA_CMD_IRQ(EL_DIRQ_RINGBUF); iov[2].addr = elink.head.ustr; iov[2].wdata = (void *)elink.rb.d; /* rb.d is correct */ iov[2].len = dptr_size; fpga_iov(iov, ARRAY_SIZE(iov)); memcpy((void *)elink.rb.u, (void *)elink.rb.d, dptr_size); elink.ready = ++gen_count; xEventGroupClearBits(esplink_filled, ELQUEUE_ALL_MASK); struct esplink_sem *s = &elink.sem[0][0]; for (size_t i = 0; i < EL_RB_COUNT*2; i++) { xSemaphoreGive(s->lock); s++; } xEventGroupSetBits(esplink_filled, ELWAIT_ONLINE); shutdown: xSemaphoreGive(elink.mutex); } /* * Called from the FPGA service thread when a ring buffer * interrupt is received */ void esplink_poll(void) { if (!elink.ready) return; xSemaphoreTake(elink.mutex, portMAX_DELAY); if (elink.ready) { const size_t count = elink.head.count; const size_t dptr_size = sizeof(*elink.rb.d) * count; struct fpga_iov iov[1]; iov[0].cmd = FPGA_CMD_RD | FPGA_CMD_ACK(EL_UIRQ_RINGBUF); iov[0].addr = elink.head.dstr; iov[0].rdata = (void *)elink.rb.d; iov[0].len = dptr_size; fpga_iov(iov, 1); EventBits_t wakeup = 0; EventBits_t tbit = 1; for (size_t i = 0; i < count; i++) { size_t need; need = atomic(elink.need[i][0]); if (((elink.rb.d[i].tail - atomic(elink.rb.u[i].head) - 1) & (elink.desc[i].dstr.size-1)) >= need) wakeup |= tbit; tbit <<= 1; need = atomic(elink.need[i][1]); if (((elink.rb.d[i].head - atomic(elink.rb.u[i].tail)) & (elink.desc[i].ustr.size-1)) >= need) wakeup |= tbit; tbit <<= 1; } xEventGroupSetBits(esplink_filled, wakeup); } xSemaphoreGive(elink.mutex); } /* ------------------------------------------------------------------------- * * Functions that can be called from a non-service thread. * ------------------------------------------------------------------------- */ /* * Write/read data to/from a ring buffer. Block if necessary until at least * / bytes have been send/received, otherwise return. * * Returns the number of bytes send/received. * * If is set, only advance the head/tail pointer after the * whole transaction has been performed (must be no larger than half * the ring buffer size.) If / < and the return * value is less than , then the pointer will NOT have been advanced * and the transaction was aborted. A new call will restart from the * previous pointer location. * * The wakeup "need" value in esplink_wait_for() is set appropriately * for a transaction the same size, depending on if the flag * was set or not. Thus, for an atomic transaction, if a shorter * atomic or non-atomic transaction is then desired, it may be * necessary to issue it without waiting for esplink_wait_for(). */ size_t esplink_write(enum esplink_ringbuf_user ring, const void *data, size_t len, size_t mintx, bool atomic) { const size_t unusable = RINGBUF_UNUSABLE; size_t tx = 0; if (!len || ring >= EL_RB_COUNT || !elink.ready) return tx; mintx = Min(mintx, len); const char *p = data; struct esplink_sem *sem = &elink.sem[ring][0]; xSemaphoreTake(sem->lock, portMAX_DELAY); const unsigned int ready_gen = elink.ready; if (unlikely(!ready_gen || ring >= elink.head.count)) goto bail; const struct esplink_ringbuf * const desc = &elink.head.desc[ring].dstr; const size_t size = desc->size; if (unlikely(atomic && len > (size >> 1))) goto bail; size_t * const hptr = (size_t *)&elink.rb.u[ring].head; const volatile size_t * const tptr = &elink.rb.d[ring].tail; size_t head = *hptr; const size_t need = (atomic ? len : 1) + unusable; atomic(elink.need[ring][0]) = need; /* Minimum wakeup */ char * const start = desc->start; while (elink.ready == ready_gen) { xEventGroupClearBits(esplink_filled, ELQUEUE_DL(ring)); const size_t tail = *tptr; size_t space = (tail-head) & (size-1); if (!len) { if (space >= need) xEventGroupSetBits(esplink_filled, ELQUEUE_DL(ring)); break; } if (space < need) { if (tx >= mintx) break; esplink_wait_for(ELQUEUE_DL(ring), false); continue; } size_t chunk = Min(space - unusable, len); struct fpga_iov iov[4], *iv = iov; while (chunk) { iv->cmd = FPGA_CMD_WR; iv->addr = start + head; iv->wdata = p; iv->len = Min(chunk, size - head); p += iv->len; tx += iv->len; len -= iv->len; chunk -= iv->len; head = (head+iv->len) & (size-1); iv++; } if (!len || !atomic) { /* Commit the data to the ring buffer */ elink.rb.u[ring].head = head; iv->cmd = FPGA_CMD_WR | FPGA_CMD_IRQ(EL_DIRQ_RINGBUF); iv->addr = &elink.head.dstr[ring].head; iv->wdata = hptr; iv->len = sizeof *hptr; iv++; } /* Optimistically poll for tail pointer advance */ iv->cmd = FPGA_CMD_RD; iv->addr = &elink.head.dstr[ring].tail; iv->rdata = (void *)tptr; iv->len = sizeof *tptr; iv++; fpga_iov(iov, iv - iov); } bail: xSemaphoreGive(sem->lock); return tx; } size_t esplink_read(enum esplink_ringbuf_user ring, void *data, size_t len, size_t minrx, bool atomic) { size_t rx = 0; if (!len || ring >= EL_RB_COUNT || !elink.ready) return rx; minrx = Min(minrx, len); char *p = data; struct esplink_sem *sem = &elink.sem[ring][1]; xSemaphoreTake(sem->lock, portMAX_DELAY); const unsigned int ready_gen = elink.ready; if (unlikely(!ready_gen || ring >= elink.head.count)) goto bail; const struct esplink_ringbuf * const desc = &elink.head.desc[ring].ustr; const size_t size = desc->size; if (unlikely(atomic && len > (size >> 1))) goto bail; size_t * const tptr = (size_t *)&elink.rb.u[ring].tail; const volatile size_t * const hptr = &elink.rb.d[ring].head; size_t tail = *tptr; const size_t need = atomic ? len : 1; atomic(elink.need[ring][1]) = need; /* Minimum wakeup */ char * const start = desc->start; while (elink.ready == ready_gen) { xEventGroupClearBits(esplink_filled, ELQUEUE_UL(ring)); const size_t head = *hptr; size_t avail = (head-tail) & (size-1); if (!len) { if (avail >= need) xEventGroupSetBits(esplink_filled, ELQUEUE_UL(ring)); break; } if (avail < need) { if (rx >= minrx) break; esplink_wait_for(ELQUEUE_UL(ring), false); continue; } size_t chunk = Min(avail, len); struct fpga_iov iov[4], *iv = iov; while (chunk) { iv->cmd = FPGA_CMD_RD; iv->addr = start + tail; iv->rdata = p; iv->len = Min(chunk, size - tail); p += iv->len; rx += iv->len; len -= iv->len; chunk -= iv->len; tail = (tail+iv->len) & (size-1); iv++; } if (!len || !atomic) { /* Consume the data from the ring buffer */ elink.rb.u[ring].tail = tail; iv->cmd = FPGA_CMD_WR | FPGA_CMD_IRQ(EL_DIRQ_RINGBUF); iv->addr = &elink.head.dstr[ring].tail; iv->wdata = tptr; iv->len = sizeof *tptr; iv++; } /* Optimistically poll for head pointer advance */ iv->cmd = FPGA_CMD_RD; iv->addr = &elink.head.dstr[ring].head; iv->rdata = (void *)hptr; iv->len = sizeof *hptr; iv++; fpga_iov(iov, iv - iov); } bail: xSemaphoreGive(sem->lock); return rx; }