96 #include <sys/queue.h> 104 #define RTE_TAILQ_RING_NAME "RTE_RING" 106 enum rte_ring_queue_behavior {
107 RTE_RING_QUEUE_FIXED = 0,
108 RTE_RING_QUEUE_VARIABLE
111 #ifdef RTE_LIBRTE_RING_DEBUG 115 struct rte_ring_debug_stats {
116 uint64_t enq_success_bulk;
117 uint64_t enq_success_objs;
118 uint64_t enq_quota_bulk;
119 uint64_t enq_quota_objs;
120 uint64_t enq_fail_bulk;
121 uint64_t enq_fail_objs;
122 uint64_t deq_success_bulk;
123 uint64_t deq_success_objs;
124 uint64_t deq_fail_bulk;
125 uint64_t deq_fail_objs;
129 #define RTE_RING_NAMESIZE 32 130 #define RTE_RING_MZ_PREFIX "RG_" 132 #ifndef RTE_RING_PAUSE_REP_COUNT 133 #define RTE_RING_PAUSE_REP_COUNT 0 161 volatile uint32_t head;
162 volatile uint32_t tail;
170 volatile uint32_t head;
171 volatile uint32_t tail;
172 #ifdef RTE_RING_SPLIT_PROD_CONS 178 #ifdef RTE_LIBRTE_RING_DEBUG 179 struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
187 #define RING_F_SP_ENQ 0x0001 188 #define RING_F_SC_DEQ 0x0002 189 #define RTE_RING_QUOT_EXCEED (1 << 31) 190 #define RTE_RING_SZ_MASK (unsigned)(0x0fffffff) 201 #ifdef RTE_LIBRTE_RING_DEBUG 202 #define __RING_STAT_ADD(r, name, n) do { \ 203 unsigned __lcore_id = rte_lcore_id(); \ 204 if (__lcore_id < RTE_MAX_LCORE) { \ 205 r->stats[__lcore_id].name##_objs += n; \ 206 r->stats[__lcore_id].name##_bulk += 1; \ 210 #define __RING_STAT_ADD(r, name, n) do {} while(0) 306 int socket_id,
unsigned flags);
348 #define ENQUEUE_PTRS() do { \ 349 const uint32_t size = r->prod.size; \ 350 uint32_t idx = prod_head & mask; \ 351 if (likely(idx + n < size)) { \ 352 for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \ 353 r->ring[idx] = obj_table[i]; \ 354 r->ring[idx+1] = obj_table[i+1]; \ 355 r->ring[idx+2] = obj_table[i+2]; \ 356 r->ring[idx+3] = obj_table[i+3]; \ 359 case 3: r->ring[idx++] = obj_table[i++]; \ 360 case 2: r->ring[idx++] = obj_table[i++]; \ 361 case 1: r->ring[idx++] = obj_table[i++]; \ 364 for (i = 0; idx < size; i++, idx++)\ 365 r->ring[idx] = obj_table[i]; \ 366 for (idx = 0; i < n; i++, idx++) \ 367 r->ring[idx] = obj_table[i]; \ 374 #define DEQUEUE_PTRS() do { \ 375 uint32_t idx = cons_head & mask; \ 376 const uint32_t size = r->cons.size; \ 377 if (likely(idx + n < size)) { \ 378 for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\ 379 obj_table[i] = r->ring[idx]; \ 380 obj_table[i+1] = r->ring[idx+1]; \ 381 obj_table[i+2] = r->ring[idx+2]; \ 382 obj_table[i+3] = r->ring[idx+3]; \ 385 case 3: obj_table[i++] = r->ring[idx++]; \ 386 case 2: obj_table[i++] = r->ring[idx++]; \ 387 case 1: obj_table[i++] = r->ring[idx++]; \ 390 for (i = 0; idx < size; i++, idx++) \ 391 obj_table[i] = r->ring[idx]; \ 392 for (idx = 0; i < n; i++, idx++) \ 393 obj_table[i] = r->ring[idx]; \ 422 static inline int __attribute__((always_inline))
423 __rte_ring_mp_do_enqueue(struct
rte_ring *r,
void * const *obj_table,
424 unsigned n, enum rte_ring_queue_behavior behavior)
426 uint32_t prod_head, prod_next;
427 uint32_t cons_tail, free_entries;
428 const unsigned max = n;
431 uint32_t mask = r->prod.mask;
439 prod_head = r->prod.head;
440 cons_tail = r->cons.tail;
445 free_entries = (mask + cons_tail - prod_head);
449 if (behavior == RTE_RING_QUEUE_FIXED) {
450 __RING_STAT_ADD(r, enq_fail, n);
456 __RING_STAT_ADD(r, enq_fail, n);
464 prod_next = prod_head + n;
474 if (
unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
475 ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
477 __RING_STAT_ADD(r, enq_quota, n);
480 ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
481 __RING_STAT_ADD(r, enq_success, n);
488 while (
unlikely(r->prod.tail != prod_head)) {
500 r->prod.tail = prod_next;
526 static inline int __attribute__((always_inline))
527 __rte_ring_sp_do_enqueue(struct
rte_ring *r,
void * const *obj_table,
528 unsigned n, enum rte_ring_queue_behavior behavior)
530 uint32_t prod_head, cons_tail;
531 uint32_t prod_next, free_entries;
533 uint32_t mask = r->prod.mask;
536 prod_head = r->prod.head;
537 cons_tail = r->cons.tail;
542 free_entries = mask + cons_tail - prod_head;
546 if (behavior == RTE_RING_QUEUE_FIXED) {
547 __RING_STAT_ADD(r, enq_fail, n);
553 __RING_STAT_ADD(r, enq_fail, n);
561 prod_next = prod_head + n;
562 r->prod.head = prod_next;
569 if (
unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
570 ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
572 __RING_STAT_ADD(r, enq_quota, n);
575 ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
576 __RING_STAT_ADD(r, enq_success, n);
579 r->prod.tail = prod_next;
610 static inline int __attribute__((always_inline))
611 __rte_ring_mc_do_dequeue(struct
rte_ring *r,
void **obj_table,
612 unsigned n, enum rte_ring_queue_behavior behavior)
614 uint32_t cons_head, prod_tail;
615 uint32_t cons_next, entries;
616 const unsigned max = n;
619 uint32_t mask = r->prod.mask;
626 cons_head = r->cons.head;
627 prod_tail = r->prod.tail;
632 entries = (prod_tail - cons_head);
636 if (behavior == RTE_RING_QUEUE_FIXED) {
637 __RING_STAT_ADD(r, deq_fail, n);
642 __RING_STAT_ADD(r, deq_fail, n);
650 cons_next = cons_head + n;
663 while (
unlikely(r->cons.tail != cons_head)) {
675 __RING_STAT_ADD(r, deq_success, n);
676 r->cons.tail = cons_next;
678 return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
704 static inline int __attribute__((always_inline))
705 __rte_ring_sc_do_dequeue(struct
rte_ring *r,
void **obj_table,
706 unsigned n, enum rte_ring_queue_behavior behavior)
708 uint32_t cons_head, prod_tail;
709 uint32_t cons_next, entries;
711 uint32_t mask = r->prod.mask;
713 cons_head = r->cons.head;
714 prod_tail = r->prod.tail;
719 entries = prod_tail - cons_head;
722 if (behavior == RTE_RING_QUEUE_FIXED) {
723 __RING_STAT_ADD(r, deq_fail, n);
728 __RING_STAT_ADD(r, deq_fail, n);
736 cons_next = cons_head + n;
737 r->cons.head = cons_next;
743 __RING_STAT_ADD(r, deq_success, n);
744 r->cons.tail = cons_next;
745 return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
766 static inline int __attribute__((always_inline))
770 return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
788 static inline int __attribute__((always_inline))
792 return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
814 static inline int __attribute__((always_inline))
840 static inline int __attribute__((always_inline))
859 static inline int __attribute__((always_inline))
882 static inline int __attribute__((always_inline))
908 static inline int __attribute__((always_inline))
911 return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
929 static inline int __attribute__((always_inline))
932 return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
953 static inline int __attribute__((always_inline))
977 static inline int __attribute__((always_inline))
995 static inline int __attribute__((always_inline))
1017 static inline int __attribute__((always_inline))
1039 uint32_t cons_tail = r->cons.
tail;
1040 return (((cons_tail - prod_tail - 1) & r->
prod.
mask) == 0);
1056 uint32_t cons_tail = r->cons.
tail;
1057 return !!(cons_tail == prod_tail);
1068 static inline unsigned 1072 uint32_t cons_tail = r->cons.
tail;
1073 return ((prod_tail - cons_tail) & r->
prod.
mask);
1084 static inline unsigned 1088 uint32_t cons_tail = r->cons.
tail;
1089 return ((cons_tail - prod_tail - 1) & r->
prod.
mask);
1127 static inline unsigned __attribute__((always_inline))
1131 return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1146 static inline unsigned __attribute__((always_inline))
1150 return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1169 static inline unsigned __attribute__((always_inline))
1196 static inline unsigned __attribute__((always_inline))
1199 return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1216 static inline unsigned __attribute__((always_inline))
1219 return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1238 static inline unsigned __attribute__((always_inline))
static void rte_smp_rmb(void)
static int rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src)
static int rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
static int rte_ring_dequeue(struct rte_ring *r, void **obj_p)
static int rte_ring_mp_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned n)
int rte_ring_set_water_mark(struct rte_ring *r, unsigned count)
static int rte_ring_empty(const struct rte_ring *r)
static int rte_ring_sp_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned n)
void rte_ring_list_dump(FILE *f)
#define RTE_RING_QUOT_EXCEED
static int rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
static int rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
#define RTE_RING_PAUSE_REP_COUNT
static int rte_ring_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned n)
void rte_ring_free(struct rte_ring *r)
#define RTE_RING_NAMESIZE
static unsigned rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
static unsigned rte_ring_sp_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned n)
static void rte_smp_wmb(void)
static int rte_ring_enqueue(struct rte_ring *r, void *obj)
static unsigned rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
static unsigned rte_ring_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned n)
static int rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
void rte_ring_dump(FILE *f, const struct rte_ring *r)
static unsigned rte_ring_count(const struct rte_ring *r)
struct rte_ring * rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags)
char name[RTE_RING_NAMESIZE]
static int rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
static unsigned rte_ring_mp_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned n)
struct rte_ring * rte_ring_lookup(const char *name)
static unsigned rte_ring_free_count(const struct rte_ring *r)
static int rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
static unsigned rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, unsigned flags)
static int rte_ring_full(const struct rte_ring *r)
static int rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
ssize_t rte_ring_get_memsize(unsigned count)
#define __rte_cache_aligned