EM-ODP  3.7.0
Event Machine on ODP
em_queue.c
1 /*
2  * Copyright (c) 2015-2021, Nokia Solutions and Networks
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * * Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in the
13  * documentation and/or other materials provided with the distribution.
14  * * Neither the name of the copyright holder nor the names of its
15  * contributors may be used to endorse or promote products derived
16  * from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "em_include.h"
32 
33 #define EM_Q_BASENAME "EM_Q_"
34 
35 /**
36  * Default queue create conf to use if not provided by the user
37  */
38 static const em_queue_conf_t default_queue_conf = {
40  .min_events = 0, /* use EM default value */
41  .conf_len = 0, /* .conf is ignored if this is 0 */
42  .conf = NULL
43 };
44 
45 static int
46 queue_init_prio_map(int minp, int maxp, int nump);
47 static void
48 queue_init_prio_legacy(int minp, int maxp);
49 static void
50 queue_init_prio_adaptive(int minp, int maxp, int nump);
51 static int
52 queue_init_prio_custom(int minp, int maxp);
53 
54 static inline int
55 queue_create_check_sched(const queue_setup_t *setup, const char **err_str);
56 
57 static int
58 queue_setup(queue_elem_t *q_elem, const queue_setup_t *setup,
59  const char **err_str);
60 static void
61 queue_setup_odp_common(const queue_setup_t *setup,
62  odp_queue_param_t *odp_queue_param);
63 static int
64 queue_setup_scheduled(queue_elem_t *q_elem, const queue_setup_t *setup,
65  const char **err_str);
66 static int
67 queue_setup_unscheduled(queue_elem_t *q_elem, const queue_setup_t *setup,
68  const char **err_str);
69 static int
70 queue_setup_local(queue_elem_t *q_elem, const queue_setup_t *setup,
71  const char **err_str);
72 static int
73 queue_setup_output(queue_elem_t *q_elem, const queue_setup_t *setup,
74  const char **err_str);
75 
76 static inline queue_elem_t *
77 queue_poolelem2queue(objpool_elem_t *const queue_pool_elem)
78 {
79  return (queue_elem_t *)((uintptr_t)queue_pool_elem -
80  offsetof(queue_elem_t, queue_pool_elem));
81 }
82 
83 static int
84 read_config_file(void)
85 {
86  const char *conf_str;
87  int val = 0;
88  int ret;
89 
90  EM_PRINT("EM-queue config:\n");
91 
92  /*
93  * Option: queue.max_num
94  */
95  conf_str = "queue.max_num";
96  ret = em_libconfig_lookup_int(&em_shm->libconfig, conf_str, &val);
97  if (unlikely(!ret)) {
98  EM_LOG(EM_LOG_ERR, "Config option '%s' not found.\n", conf_str);
99  return -1;
100  }
101  /* Need to have enough space for all queues */
102  if (val <= (EM_QUEUE_STATIC_NUM + MAX_INTERNAL_QUEUES)) {
103  EM_LOG(EM_LOG_ERR,
104  "Bad config value '%s = %d', Value must be larger than %d\n"
105  "to have space for static queues, internal ctrl queues and\n"
106  "at least one dynamic queue.\n",
107  conf_str, val, (EM_QUEUE_STATIC_NUM + MAX_INTERNAL_QUEUES));
108  return -1;
109  }
110 
111  if ((unsigned int)val > UINT16_MAX) {
112  EM_LOG(EM_LOG_ERR, "Bad config value '%s = %d', Value > UINT16_MAX\n",
113  conf_str, val);
114  return -1;
115  }
116  /* Check that odp supports the given number of queues */
117  if ((unsigned int)val > em_shm->queue_tbl.odp_queue_capability.max_queues) {
118  EM_LOG(EM_LOG_ERR, "Bad config value '%s = %d', Value > odp-max-queues:%u\n",
119  conf_str, val, em_shm->queue_tbl.odp_queue_capability.max_queues);
120  return -1;
121  }
122 
123  /* store & print the value */
124  em_shm->opt.queue.max_num = (unsigned int)val;
125  EM_PRINT(" %s: %d\n", conf_str, val);
126 
127  /*
128  * Option: queue.min_events_default
129  */
130  conf_str = "queue.min_events_default";
131  ret = em_libconfig_lookup_int(&em_shm->libconfig, conf_str, &val);
132  if (unlikely(!ret)) {
133  EM_LOG(EM_LOG_ERR, "Config option '%s' not found.\n", conf_str);
134  return -1;
135  }
136  if (val < 0) {
137  EM_LOG(EM_LOG_ERR, "Bad config value '%s = %d'\n",
138  conf_str, val);
139  return -1;
140  }
141  /* store & print the value */
142  em_shm->opt.queue.min_events_default = val;
143  EM_PRINT(" %s: %d\n", conf_str, val);
144 
145  /*
146  * Option: queue.prio_map_mode
147  */
148  conf_str = "queue.priority.map_mode";
149  ret = em_libconfig_lookup_int(&em_shm->libconfig, conf_str, &val);
150  if (unlikely(!ret)) {
151  EM_LOG(EM_LOG_ERR, "Config option '%s' not found\n", conf_str);
152  return -1;
153  }
154  if (val < 0 || val > 2) {
155  EM_LOG(EM_LOG_ERR, "Bad config value '%s = %d'\n", conf_str, val);
156  return -1;
157  }
158  em_shm->opt.queue.priority.map_mode = val;
159  EM_PRINT(" %s: %d\n", conf_str, val);
160 
161  if (val == 2) { /* custom map */
162  conf_str = "queue.priority.custom_map";
163  ret = em_libconfig_lookup_array(&em_shm->libconfig, conf_str,
164  em_shm->opt.queue.priority.custom_map,
166  if (unlikely(!ret)) {
167  EM_LOG(EM_LOG_ERR, "Config option '%s' not found or invalid\n", conf_str);
168  return -1;
169  }
170  EM_PRINT(" %s: [", conf_str);
171  for (int i = 0; i < EM_QUEUE_PRIO_NUM; i++) {
172  EM_PRINT("%d", em_shm->opt.queue.priority.custom_map[i]);
173  if (i < (EM_QUEUE_PRIO_NUM - 1))
174  EM_PRINT(",");
175  }
176  EM_PRINT("]\n");
177  }
178  return 0;
179 }
180 
181 /**
182  * Helper: initialize a queue pool (populate pool with q_elems)
183  */
184 static int
185 queue_pool_init(queue_tbl_t *const queue_tbl,
186  queue_pool_t *const queue_pool,
187  int min_qidx, int max_qidx)
188 {
189  const uint32_t objpool_subpools = MIN(4, OBJSUBPOOLS_MAX);
190  const int qs_per_pool = (max_qidx - min_qidx + 1);
191  int qs_per_subpool = qs_per_pool / objpool_subpools;
192  int qs_leftover = qs_per_pool % objpool_subpools;
193  uint32_t subpool_idx = 0;
194  int add_cnt = 0;
195 
196  if (objpool_init(&queue_pool->objpool, objpool_subpools) != 0)
197  return -1;
198 
199  for (int i = min_qidx; i <= max_qidx; i++) {
200  objpool_add(&queue_pool->objpool, subpool_idx,
201  &queue_tbl->queue_elem[i].queue_pool_elem);
202  add_cnt++;
203  if (add_cnt == qs_per_subpool + qs_leftover) {
204  subpool_idx++; /* add to next subpool */
205  qs_leftover = 0; /* added leftovers to subpool 0 */
206  add_cnt = 0;
207  }
208  }
209 
210  return 0;
211 }
212 
213 /**
214  * Initialize the EM queues
215  */
217 queue_init(queue_tbl_t *const queue_tbl,
218  queue_pool_t *const queue_pool,
219  queue_pool_t *const queue_pool_static)
220 {
221  odp_queue_capability_t *const odp_queue_capa =
222  &queue_tbl->odp_queue_capability;
223  odp_schedule_capability_t *const odp_sched_capa =
224  &queue_tbl->odp_schedule_capability;
225  int min;
226  int max;
227  int ret;
228 
229  memset(queue_tbl, 0, sizeof(queue_tbl_t));
230  memset(queue_pool, 0, sizeof(queue_pool_t));
231  memset(queue_pool_static, 0, sizeof(queue_pool_t));
232  env_atomic32_init(&em_shm->queue_count);
233  env_atomic32_init(&em_shm->queue_tbl.output_queue_count);
234 
235  /* Retrieve and store the ODP queue capabilities into 'queue_tbl' */
236  ret = odp_queue_capability(odp_queue_capa);
238  "odp_queue_capability():%d failed", ret);
239 
240  /* Retrieve and store the ODP schedule capabilities into 'queue_tbl' */
241  ret = odp_schedule_capability(odp_sched_capa);
243  "odp_schedule_capability():%d failed", ret);
244 
245  if (read_config_file())
246  return EM_ERR_LIB_FAILED;
247 
248  unsigned int max_queues = em_shm->opt.queue.max_num;
249 
250  size_t qelem_tbl_sz = sizeof(queue_elem_t) * max_queues;
251  size_t qname_tbl_sz = sizeof(char) * EM_QUEUE_NAME_LEN * max_queues;
252  size_t shm_sz = qelem_tbl_sz + qname_tbl_sz;
253 
254  void *shm_tbl = env_shared_reserve("EM q_elem tbl and names", shm_sz);
255 
257  "env_shared_reserve() failed when reserving \"EM q_elem tbl and names\"");
258  memset(shm_tbl, 0, shm_sz);
259 
260  /* Store the q_elem tbl and q_name tbl pointers, points into the allocated shared mem */
261  em_shm->queue_tbl.queue_elem = shm_tbl;
262  em_shm->queue_tbl.name = (char(*)[EM_QUEUE_NAME_LEN])((uintptr_t)shm_tbl + qelem_tbl_sz);
263 
264  /* Initialize the queue element table */
265  for (unsigned int i = 0; i < max_queues; i++)
266  queue_tbl->queue_elem[i].queue = (uint32_t)(uintptr_t)queue_idx2hdl(i);
267 
268  /* Initialize the static queue pool */
269  min = queue_id2idx(EM_QUEUE_STATIC_MIN);
270  max = queue_id2idx(LAST_INTERNAL_QUEUE);
271  if (queue_pool_init(queue_tbl, queue_pool_static, min, max) != 0)
272  return EM_ERR_LIB_FAILED;
273 
274  uint16_t last_dyn_queue = (uint16_t)(max_queues - 1 + EM_QUEUE_RANGE_OFFSET);
275 
276  /* Initialize the dynamic queue pool */
277  min = queue_id2idx(FIRST_DYN_QUEUE);
278  max = queue_id2idx(last_dyn_queue);
279  if (queue_pool_init(queue_tbl, queue_pool, min, max) != 0)
280  return EM_ERR_LIB_FAILED;
281 
282  /* Initialize priority mapping, adapt to values from ODP */
283  min = odp_schedule_min_prio();
284  max = odp_schedule_max_prio();
285  em_shm->queue_prio.num_runtime = max - min + 1;
286  ret = queue_init_prio_map(min, max, em_shm->queue_prio.num_runtime);
288  "mapping odp priorities failed: %d", ret);
289 
290  /* Initialize output queue free indexes */
291  for (unsigned int i = 0; i < EM_MAX_OUTPUT_QUEUES; i++)
292  em_shm->queue_tbl.output_queue_idx_free[i] = true;
293 
294  return EM_OK;
295 }
296 
297 /**
298  * Queue inits done during EM core local init (once at startup on each core).
299  *
300  * Initialize event storage for queues of type 'EM_QUEUE_TYPE_LOCAL'.
301  */
304 {
305  em_locm_t *const locm = &em_locm;
306  odp_stash_capability_t stash_capa;
307  odp_stash_param_t stash_param;
308  unsigned int num_obj = 0;
309  int core = em_core_id();
310  char name[ODP_STASH_NAME_LEN];
311 
312  int ret = odp_stash_capability(&stash_capa, ODP_STASH_TYPE_FIFO);
313 
314  if (ret != 0)
315  return EM_ERR_LIB_FAILED;
316 
317  odp_stash_param_init(&stash_param);
318 
319  stash_param.type = ODP_STASH_TYPE_FIFO;
320  stash_param.put_mode = ODP_STASH_OP_ST;
321  stash_param.get_mode = ODP_STASH_OP_ST;
322 
323  /* Stash size: use EM default queue size value from config file: */
324  num_obj = em_shm->opt.queue.min_events_default;
325  if (num_obj != 0)
326  stash_param.num_obj = num_obj;
327  /* else: use odp default as set by odp_stash_param_init() */
328 
329  stash_param.obj_size = sizeof(uint64_t);
330  if (stash_param.num_obj > stash_capa.max_num.u64) {
331  EM_LOG(EM_LOG_PRINT,
332  "%s(): req stash.num_obj(%" PRIu64 ") > capa.max_num.u64(%" PRIu64 ").\n"
333  " ==> using max value:%" PRIu64 "\n", __func__,
334  stash_param.num_obj, stash_capa.max_num.u64, stash_capa.max_num.u64);
335  stash_param.num_obj = stash_capa.max_num.u64;
336  }
337 
338  stash_param.cache_size = 0; /* No core local caching */
339 
340  locm->local_queues.empty = 1;
341 
342  for (int prio = 0; prio < EM_QUEUE_PRIO_NUM; prio++) {
343  snprintf(name, sizeof(name),
344  "local-q:c%02d:prio%d", core, prio);
345  name[sizeof(name) - 1] = '\0';
346 
347  locm->local_queues.prio[prio].empty_prio = 1;
348  locm->local_queues.prio[prio].stash =
349  odp_stash_create(name, &stash_param);
350  if (unlikely(locm->local_queues.prio[prio].stash ==
351  ODP_STASH_INVALID))
352  return EM_ERR_ALLOC_FAILED;
353  }
354 
355  memset(&locm->output_queue_track, 0,
356  sizeof(locm->output_queue_track));
357 
358  return EM_OK;
359 }
360 
361 /**
362  * Queue termination done during em_term_core().
363  *
364  * Flush & destroy event storage for queues of type 'EM_QUEUE_TYPE_LOCAL'.
365  */
368 {
370  em_event_t ev_tbl[EM_QUEUE_LOCAL_MULTI_MAX_BURST];
372  em_status_t stat = EM_OK;
373 
374  for (;;) {
375  int num = next_local_queue_events(entry_tbl /*[out]*/,
377  if (num <= 0)
378  break;
379 
380  for (int i = 0; i < num; i++)
381  ev_tbl[i] = (em_event_t)(uintptr_t)entry_tbl[i].evptr;
382 
383  event_to_hdr_multi(ev_tbl, ev_hdr_tbl, num);
384 
385  if (esv_enabled())
386  evstate_em2usr_multi(ev_tbl, ev_hdr_tbl, num,
387  EVSTATE__TERM_CORE__QUEUE_LOCAL);
388  em_free_multi(ev_tbl, num);
389  }
390 
391  for (int prio = 0; prio < EM_QUEUE_PRIO_NUM; prio++) {
392  int ret = odp_stash_destroy(em_locm.local_queues.prio[prio].stash);
393 
394  if (unlikely(ret != 0))
395  stat = EM_ERR_LIB_FAILED;
396  }
397 
398  return stat;
399 }
400 
401 /**
402  * Allocate a new EM queue
403  *
404  * @param queue EM queue handle if a specific EM queue is requested,
405  * EM_QUEUE_UNDEF if any EM queue will do.
406  * @param[out] err_str Output var for error message in case of failure.
407  *
408  * @return EM queue handle
409  * @retval EM_QUEUE_UNDEF on failure
410  */
411 em_queue_t queue_alloc(em_queue_t queue, const char **err_str /*out*/)
412 {
413  queue_elem_t *queue_elem;
414  objpool_elem_t *queue_pool_elem;
415 
416  if (queue == EM_QUEUE_UNDEF) {
417  /*
418  * Allocate a dynamic queue, i.e. take next available
419  */
420  queue_pool_elem = objpool_rem(&em_shm->queue_pool.objpool,
421  em_core_id());
422  if (unlikely(queue_pool_elem == NULL)) {
423  *err_str = "queue pool element alloc failed!";
424  return EM_QUEUE_UNDEF;
425  }
426  queue_elem = queue_poolelem2queue(queue_pool_elem);
427  } else {
428  /*
429  * Allocate a specific static-handle queue, handle given
430  */
431  internal_queue_t iq;
432 
433  iq.queue = queue;
434  if (iq.device_id != em_shm->conf.device_id ||
435  iq.queue_id < EM_QUEUE_STATIC_MIN ||
436  iq.queue_id > LAST_INTERNAL_QUEUE) {
437  *err_str = "Invalid queue requested or handle not from static range!";
438  return EM_QUEUE_UNDEF;
439  }
440 
441  queue_elem = queue_elem_get(queue);
442  if (unlikely(queue_elem == NULL)) {
443  *err_str = "queue_elem ptr NULL!";
444  return EM_QUEUE_UNDEF;
445  }
446  /* Verify that the queue is not allocated */
447  if (queue_allocated(queue_elem)) {
448  *err_str = "queue already allocated!";
449  return EM_QUEUE_UNDEF;
450  }
451  /* Remove the queue from the pool */
452  int ret = objpool_rem_elem(&em_shm->queue_pool_static.objpool,
453  &queue_elem->queue_pool_elem);
454  if (unlikely(ret != 0)) {
455  *err_str = "static queue pool element alloc failed!";
456  return EM_QUEUE_UNDEF;
457  }
458  }
459 
460  env_atomic32_inc(&em_shm->queue_count);
461  return (em_queue_t)(uintptr_t)queue_elem->queue;
462 }
463 
464 em_status_t queue_free(em_queue_t queue)
465 {
466  queue_elem_t *const queue_elem = queue_elem_get(queue);
467  objpool_t *objpool;
468  internal_queue_t iq;
469 
470  iq.queue = queue;
471 
472  if (unlikely(queue_elem == NULL))
473  return EM_ERR_BAD_ID;
474 
475  if (iq.queue_id >= EM_QUEUE_STATIC_MIN &&
476  iq.queue_id <= LAST_INTERNAL_QUEUE)
477  objpool = &em_shm->queue_pool_static.objpool;
478  else
479  objpool = &em_shm->queue_pool.objpool;
480 
481  queue_elem->state = EM_QUEUE_STATE_INVALID;
482 
483  objpool_add(objpool,
484  queue_elem->queue_pool_elem.subpool_idx,
485  &queue_elem->queue_pool_elem);
486 
487  env_atomic32_dec(&em_shm->queue_count);
488  return EM_OK;
489 }
490 
491 static int
492 queue_create_check_sched(const queue_setup_t *setup, const char **err_str)
493 {
494  const queue_group_elem_t *queue_group_elem = NULL;
495  const atomic_group_elem_t *ag_elem = NULL;
496 
497  queue_group_elem = queue_group_elem_get(setup->queue_group);
498  /* scheduled queues are always associated with a queue group */
499  if (unlikely(queue_group_elem == NULL || !queue_group_allocated(queue_group_elem))) {
500  *err_str = "Invalid queue group!";
501  return -1;
502  }
503 
504  if (setup->atomic_group != EM_ATOMIC_GROUP_UNDEF) {
505  ag_elem = atomic_group_elem_get(setup->atomic_group);
506  if (unlikely(ag_elem == NULL || !atomic_group_allocated(ag_elem))) {
507  *err_str = "Invalid atomic group!";
508  return -1;
509  }
510  }
511 
512  if (unlikely(setup->prio >= EM_QUEUE_PRIO_NUM)) {
513  *err_str = "Invalid queue priority!";
514  return -1;
515  }
516  return 0;
517 }
518 
519 static int
520 queue_create_check_args(const queue_setup_t *setup, const char **err_str)
521 {
522  /* scheduled queue */
523  if (setup->type == EM_QUEUE_TYPE_ATOMIC ||
524  setup->type == EM_QUEUE_TYPE_PARALLEL ||
525  setup->type == EM_QUEUE_TYPE_PARALLEL_ORDERED)
526  return queue_create_check_sched(setup, err_str);
527 
528  /* other queue types */
529  switch (setup->type) {
531  /* API arg checks for unscheduled queues */
532  if (unlikely(setup->prio != EM_QUEUE_PRIO_UNDEF)) {
533  *err_str = "Invalid priority for unsched queue!";
534  return -1;
535  }
536  if (unlikely(setup->queue_group != EM_QUEUE_GROUP_UNDEF)) {
537  *err_str = "Queue group not used with unsched queues!";
538  return -1;
539  }
540  if (unlikely(setup->atomic_group != EM_ATOMIC_GROUP_UNDEF)) {
541  *err_str = "Atomic group not used with unsched queues!";
542  return -1;
543  }
544  break;
545 
546  case EM_QUEUE_TYPE_LOCAL:
547  /* API arg checks for local queues */
548  if (unlikely(setup->queue_group != EM_QUEUE_GROUP_UNDEF)) {
549  *err_str = "Queue group not used with local queues!";
550  return -1;
551  }
552  if (unlikely(setup->atomic_group != EM_ATOMIC_GROUP_UNDEF)) {
553  *err_str = "Atomic group not used with local queues!";
554  return -1;
555  }
556  if (unlikely(setup->prio >= EM_QUEUE_PRIO_NUM)) {
557  *err_str = "Invalid queue priority!";
558  return -1;
559  }
560  break;
561 
563  /* API arg checks for output queues */
564  if (unlikely(setup->queue_group != EM_QUEUE_GROUP_UNDEF)) {
565  *err_str = "Queue group not used with output queues!";
566  return -1;
567  }
568  if (unlikely(setup->atomic_group != EM_ATOMIC_GROUP_UNDEF)) {
569  *err_str = "Atomic group not used with output queues!";
570  return -1;
571  }
572  if (unlikely(setup->conf == NULL ||
573  setup->conf->conf_len < sizeof(em_output_queue_conf_t) ||
574  setup->conf->conf == NULL)) {
575  *err_str = "Invalid output queue conf";
576  return -1;
577  }
578  break;
579 
580  default:
581  *err_str = "Unknown queue type";
582  return -1;
583  }
584 
585  return 0;
586 }
587 
588 /**
589  * Create an EM queue: alloc, setup and add to queue group list
590  */
591 em_queue_t
592 queue_create(const char *name, em_queue_type_t type, em_queue_prio_t prio,
593  em_queue_group_t queue_group, em_queue_t queue_req,
594  em_atomic_group_t atomic_group, const em_queue_conf_t *conf,
595  const char **err_str)
596 {
597  int err;
598 
599  /* Use default EM queue conf if none given */
600  if (conf == NULL)
601  conf = &default_queue_conf;
602 
603  queue_setup_t setup = {.name = name, .type = type, .prio = prio,
604  .atomic_group = atomic_group,
605  .queue_group = queue_group, .conf = conf};
606 
607  err = queue_create_check_args(&setup, err_str);
608  if (err) {
609  /* 'err_str' set by queue_create_check_args() */
610  return EM_QUEUE_UNDEF;
611  }
612 
613  /*
614  * Allocate the queue handle and obtain the corresponding queue-element
615  */
616  const char *alloc_err_str = "";
617 
618  em_queue_t queue = queue_alloc(queue_req, &alloc_err_str);
619 
620  if (unlikely(queue == EM_QUEUE_UNDEF)) {
621  *err_str = alloc_err_str;
622  return EM_QUEUE_UNDEF;
623  }
624  if (unlikely(queue_req != EM_QUEUE_UNDEF && queue_req != queue)) {
625  queue_free(queue);
626  *err_str = "Failed to allocate the requested queue!";
627  return EM_QUEUE_UNDEF;
628  }
629 
630  queue_elem_t *queue_elem = queue_elem_get(queue);
631 
632  if (unlikely(!queue_elem)) {
633  queue_free(queue);
634  *err_str = "Queue elem NULL!";
635  return EM_QUEUE_UNDEF;
636  }
637 
638  /*
639  * Setup/configure the queue
640  */
641  err = queue_setup(queue_elem, &setup, err_str);
642  if (unlikely(err)) {
643  queue_free(queue);
644  /* 'err_str' set by queue_setup() */
645  return EM_QUEUE_UNDEF;
646  }
647 
648  return queue;
649 }
650 
652 queue_delete(queue_elem_t *const queue_elem)
653 {
654  queue_state_t old_state;
655  queue_state_t new_state;
656  em_status_t ret;
657  em_queue_t queue = (em_queue_t)(uintptr_t)queue_elem->queue;
658  em_queue_type_t type = queue_elem->type;
659 
660  if (unlikely(!queue_allocated(queue_elem)))
661  return EM_ERR_BAD_STATE;
662 
663  old_state = queue_elem->state;
664  new_state = EM_QUEUE_STATE_INVALID;
665 
666  if (type != EM_QUEUE_TYPE_UNSCHEDULED &&
667  type != EM_QUEUE_TYPE_OUTPUT) {
668  /* verify scheduled queue state transition */
669  ret = queue_state_change__check(old_state, new_state,
670  0/*!is_setup*/);
671  RETURN_ERROR_IF(ret != EM_OK, ret, EM_ESCOPE_QUEUE_DELETE,
672  "EM-Q:%" PRI_QUEUE " inv. state change:%d=>%d",
673  queue, old_state, new_state);
674  }
675 
676  if (type != EM_QUEUE_TYPE_UNSCHEDULED &&
677  type != EM_QUEUE_TYPE_LOCAL &&
678  type != EM_QUEUE_TYPE_OUTPUT) {
679  queue_group_elem_t *const queue_group_elem =
680  queue_group_elem_get(queue_elem->queue_group);
681 
682  RETURN_ERROR_IF(queue_group_elem == NULL ||
683  !queue_group_allocated(queue_group_elem),
684  EM_ERR_BAD_ID, EM_ESCOPE_QUEUE_DELETE,
685  "Invalid queue group: %" PRI_QGRP "",
686  queue_elem->queue_group);
687 
688  /* Remove the queue from the queue group list */
689  queue_group_rem_queue_list(queue_group_elem, queue_elem);
690  }
691 
692  if (type == EM_QUEUE_TYPE_OUTPUT) {
693  env_spinlock_t *const lock = &queue_elem->output.lock;
694  q_elem_output_t *const q_out = &queue_elem->output;
695 
696  env_spinlock_lock(lock);
697  /* Drain any remaining events from the output queue */
698  output_queue_drain(queue_elem);
699  env_spinlock_unlock(lock);
700 
701  /* delete the fn-args storage if allocated in create */
702  if (q_out->output_fn_args_event != EM_EVENT_UNDEF) {
705  }
706 
707  env_spinlock_lock(&em_shm->queue_tbl.output_queue_lock);
708  em_shm->queue_tbl.output_queue_idx_free[q_out->idx] = true;
709  env_atomic32_dec(&em_shm->queue_tbl.output_queue_count);
710  env_spinlock_unlock(&em_shm->queue_tbl.output_queue_lock);
711  }
712 
713  if (queue_elem->odp_queue != ODP_QUEUE_INVALID &&
714  !queue_elem->flags.is_pktin) {
715  int err = odp_queue_destroy(queue_elem->odp_queue);
716 
717  RETURN_ERROR_IF(err, EM_ERR_LIB_FAILED, EM_ESCOPE_QUEUE_DELETE,
718  "EM-Q:%" PRI_QUEUE ":odp_queue_destroy(" PRIu64 "):%d",
719  queue, odp_queue_to_u64(queue_elem->odp_queue),
720  err);
721  }
722 
723  queue_elem->odp_queue = ODP_QUEUE_INVALID;
724 
725  /* Zero queue name */
726  em_shm->queue_tbl.name[queue_hdl2idx(queue)][0] = '\0';
727 
728  /* Remove the queue from the atomic group it belongs to, if any */
729  atomic_group_remove_queue(queue_elem);
730 
731  return queue_free(queue);
732 }
733 
734 /**
735  * Setup an allocated/created queue before use.
736  */
737 static int
738 queue_setup(queue_elem_t *q_elem, const queue_setup_t *setup,
739  const char **err_str)
740 {
741  int ret;
742 
743  /* Set common queue-elem fields based on setup */
744  queue_setup_common(q_elem, setup);
745 
746  switch (setup->type) {
747  case EM_QUEUE_TYPE_ATOMIC: /* fallthrough */
748  case EM_QUEUE_TYPE_PARALLEL: /* fallthrough */
750  ret = queue_setup_scheduled(q_elem, setup, err_str);
751  break;
753  ret = queue_setup_unscheduled(q_elem, setup, err_str);
754  break;
755  case EM_QUEUE_TYPE_LOCAL:
756  ret = queue_setup_local(q_elem, setup, err_str);
757  break;
759  ret = queue_setup_output(q_elem, setup, err_str);
760  break;
761  default:
762  *err_str = "Queue setup: unknown queue type";
763  ret = -1;
764  break;
765  }
766 
767  if (unlikely(ret))
768  return -1;
769 
770  env_sync_mem();
771  return 0;
772 }
773 
774 /**
775  * Helper function to queue_setup()
776  *
777  * Set EM queue params common to all EM queues based on EM config
778  */
779 void queue_setup_common(queue_elem_t *q_elem /*out*/,
780  const queue_setup_t *setup)
781 {
782  const em_queue_t queue = (em_queue_t)(uintptr_t)q_elem->queue;
783  char *const qname = &em_shm->queue_tbl.name[queue_hdl2idx(queue)][0];
784 
785  /* checks that the odp queue context points to an EM queue elem */
786  q_elem->valid_check = QUEUE_ELEM_VALID;
787 
788  /* Store queue name */
789  if (setup->name)
790  strncpy(qname, setup->name, EM_QUEUE_NAME_LEN);
791  else /* default unique name: "EM_Q_" + Q-id = e.g. EM_Q_1234 */
792  snprintf(qname, EM_QUEUE_NAME_LEN,
793  "%s%" PRI_QUEUE "", EM_Q_BASENAME, queue);
794  qname[EM_QUEUE_NAME_LEN - 1] = '\0';
795 
796  q_elem->flags.all = 0;
797  /* Init q_elem fields based on setup params and clear the rest */
798  q_elem->type = (uint8_t)setup->type;
799  q_elem->priority = (uint8_t)setup->prio;
800  q_elem->queue_group = setup->queue_group;
801 
802  /* Does this queue belong to an EM Atomic Group? */
803  if (setup->atomic_group == EM_ATOMIC_GROUP_UNDEF) {
804  q_elem->flags.in_atomic_group = false;
805  } else {
806  q_elem->flags.in_atomic_group = true;
807  q_elem->agrp.atomic_group = setup->atomic_group;
808  }
809 
810  /* Clear the rest */
811  q_elem->odp_queue = ODP_QUEUE_INVALID;
812  q_elem->state = EM_QUEUE_STATE_INVALID;
813  q_elem->context = NULL;
814  q_elem->eo = (uint16_t)(uintptr_t)EM_EO_UNDEF;
815  q_elem->eo_elem = NULL;
816  q_elem->eo_ctx = NULL;
817  q_elem->max_events = 0;
818  q_elem->receive_func = NULL;
819  q_elem->receive_multi_func = NULL; /* union */
820 }
821 
822 /**
823  * Helper function to queue_setup_...()
824  *
825  * Set common ODP queue params based on EM config
826  */
827 static void
828 queue_setup_odp_common(const queue_setup_t *setup,
829  odp_queue_param_t *odp_queue_param /*out*/)
830 {
831  /*
832  * Set ODP queue params according to EM queue conf flags
833  */
834  const em_queue_conf_t *conf = setup->conf;
835  em_queue_flag_t flags = conf->flags & EM_QUEUE_FLAG_MASK;
836 
837  if (flags != EM_QUEUE_FLAG_DEFAULT) {
838  if (flags & EM_QUEUE_FLAG_NONBLOCKING_WF)
839  odp_queue_param->nonblocking = ODP_NONBLOCKING_WF;
840  else if (flags & EM_QUEUE_FLAG_NONBLOCKING_LF)
841  odp_queue_param->nonblocking = ODP_NONBLOCKING_LF;
842 
843  if (flags & EM_QUEUE_FLAG_ENQ_NOT_MTSAFE)
844  odp_queue_param->enq_mode = ODP_QUEUE_OP_MT_UNSAFE;
845  if (flags & EM_QUEUE_FLAG_DEQ_NOT_MTSAFE)
846  odp_queue_param->deq_mode = ODP_QUEUE_OP_MT_UNSAFE;
847  }
848 
849  /*
850  * Set minimum queue size if other than 'default'(0)
851  */
852  if (conf->min_events == 0) {
853  /* use EM default value from config file: */
854  unsigned int size = em_shm->opt.queue.min_events_default;
855 
856  if (size != 0)
857  odp_queue_param->size = size;
858  /* else: use odp default as set by odp_queue_param_init() */
859  } else {
860  /* use user provided value: */
861  odp_queue_param->size = conf->min_events;
862  }
863 }
864 
865 /**
866  * Create an ODP queue for the newly created EM queue
867  */
868 static int create_odp_queue(queue_elem_t *q_elem,
869  const odp_queue_param_t *odp_queue_param)
870 {
871  char odp_name[ODP_QUEUE_NAME_LEN];
872  odp_queue_t odp_queue;
873 
874  (void)queue_get_name(q_elem, odp_name/*out*/, sizeof(odp_name));
875 
876  odp_queue = odp_queue_create(odp_name, odp_queue_param);
877  if (unlikely(odp_queue == ODP_QUEUE_INVALID))
878  return -1;
879 
880  /* Store the corresponding ODP Queue */
881  q_elem->odp_queue = odp_queue;
882 
883  return 0;
884 }
885 
886 /**
887  * Helper function to queue_setup()
888  *
889  * Set EM and ODP queue params for scheduled queues
890  */
891 static int
892 queue_setup_scheduled(queue_elem_t *q_elem /*in,out*/,
893  const queue_setup_t *setup, const char **err_str)
894 {
895  /* validity checks done earlier for queue_group */
896  queue_group_elem_t *qgrp_elem = queue_group_elem_get(setup->queue_group);
897  int err;
898 
899  if (unlikely(qgrp_elem == NULL)) {
900  *err_str = "Q-setup-sched: invalid queue group!";
901  return -1;
902  }
903 
904  q_elem->priority = (uint8_t)setup->prio;
905  q_elem->type = (uint8_t)setup->type;
906  q_elem->queue_group = setup->queue_group;
907 
908  /* Does this queue belong to an EM Atomic Group? */
909  if (setup->atomic_group == EM_ATOMIC_GROUP_UNDEF) {
910  q_elem->flags.in_atomic_group = false;
911  } else {
912  q_elem->flags.in_atomic_group = true;
913  q_elem->agrp.atomic_group = setup->atomic_group;
914  }
915 
916  q_elem->flags.scheduled = true;
917  q_elem->state = EM_QUEUE_STATE_INIT;
918 
919  /*
920  * Set up a scheduled ODP queue for the EM scheduled queue
921  */
922  odp_queue_param_t odp_queue_param;
923  /* Default values. Always changed unless error: */
924  odp_schedule_sync_t odp_schedule_sync = ODP_SCHED_SYNC_PARALLEL;
925  odp_schedule_prio_t odp_prio = odp_schedule_min_prio();
926 
927  /* Init odp queue params to default values */
928  odp_queue_param_init(&odp_queue_param);
929  /* Set common ODP queue params based on the EM Queue config */
930  queue_setup_odp_common(setup, &odp_queue_param /*out*/);
931 
932  err = scheduled_queue_type_em2odp(setup->type,
933  &odp_schedule_sync /*out*/);
934  if (unlikely(err)) {
935  *err_str = "Q-setup-sched: invalid queue type!";
936  return -2;
937  }
938 
939  err = prio_em2odp(setup->prio, &odp_prio /*out*/);
940  if (unlikely(err)) {
941  *err_str = "Q-setup-sched: invalid queue priority!";
942  return -3;
943  }
944 
945  odp_queue_param.type = ODP_QUEUE_TYPE_SCHED;
946  odp_queue_param.sched.prio = odp_prio;
947  odp_queue_param.sched.sync = odp_schedule_sync;
948  odp_queue_param.sched.group = qgrp_elem->odp_sched_group;
949 
950  /* Retrieve previously stored ODP scheduler capabilities */
951  const odp_schedule_capability_t *odp_sched_capa =
952  &em_shm->queue_tbl.odp_schedule_capability;
953 
954  /*
955  * Check nonblocking level against sched queue capabilities.
956  * Related ODP queue params set earlier in queue_setup_common().
957  */
958  if (odp_queue_param.nonblocking == ODP_NONBLOCKING_LF &&
959  odp_sched_capa->lockfree_queues == ODP_SUPPORT_NO) {
960  *err_str = "Q-setup-sched: non-blocking, lock-free sched queues unavailable";
961  return -4;
962  }
963  if (odp_queue_param.nonblocking == ODP_NONBLOCKING_WF &&
964  odp_sched_capa->waitfree_queues == ODP_SUPPORT_NO) {
965  *err_str = "Q-setup-sched: non-blocking, wait-free sched queues unavailable";
966  return -5;
967  }
968  if (odp_queue_param.enq_mode != ODP_QUEUE_OP_MT ||
969  odp_queue_param.deq_mode != ODP_QUEUE_OP_MT) {
970  *err_str = "Q-setup-sched: invalid flag: scheduled queues must be MT-safe";
971  return -6;
972  }
973 
974  /*
975  * Note: The ODP queue context points to the EM queue elem.
976  * The EM queue context set by the user using the API function
977  * em_queue_set_context() is accessed through the queue_elem_t::context
978  * and retrieved with em_queue_get_context() or passed by EM to the
979  * EO-receive function for scheduled queues.
980  */
981  odp_queue_param.context = q_elem;
982  /*
983  * Set the context data length (in bytes) for potential prefetching.
984  * The ODP implementation may use this value as a hint for the number
985  * of context data bytes to prefetch.
986  */
987  odp_queue_param.context_len = sizeof(*q_elem);
988 
989  err = create_odp_queue(q_elem, &odp_queue_param);
990  if (unlikely(err)) {
991  *err_str = "Q-setup-sched: scheduled odp queue creation failed!";
992  return -7;
993  }
994 
995  /*
996  * Add the scheduled queue to the queue group
997  */
998  queue_group_add_queue_list(qgrp_elem, q_elem);
999 
1000  return 0;
1001 }
1002 
1003 /*
1004  * Helper function to queue_setup()
1005  *
1006  * Set EM and ODP queue params for unscheduled queues
1007  */
1008 static int
1009 queue_setup_unscheduled(queue_elem_t *q_elem /*in,out*/,
1010  const queue_setup_t *setup, const char **err_str)
1011 {
1012  q_elem->priority = EM_QUEUE_PRIO_UNDEF;
1013  q_elem->type = EM_QUEUE_TYPE_UNSCHEDULED;
1015  /* unscheduled queues are not scheduled */
1016  q_elem->flags.scheduled = false;
1017  q_elem->state = EM_QUEUE_STATE_UNSCHEDULED;
1018 
1019  /*
1020  * Set up a plain ODP queue for the EM unscheduled queue.
1021  */
1022  odp_queue_param_t odp_queue_param;
1023  /* Retrieve previously stored ODP queue capabilities */
1024  const odp_queue_capability_t *odp_queue_capa =
1025  &em_shm->queue_tbl.odp_queue_capability;
1026 
1027  /* Init odp queue params to default values */
1028  odp_queue_param_init(&odp_queue_param);
1029  /* Set common ODP queue params based on the EM Queue config */
1030  queue_setup_odp_common(setup, &odp_queue_param);
1031 
1032  odp_queue_param.type = ODP_QUEUE_TYPE_PLAIN;
1033  /* don't order events enqueued into unsched queues */
1034  odp_queue_param.order = ODP_QUEUE_ORDER_IGNORE;
1035 
1036  /*
1037  * Check nonblocking level against plain queue capabilities.
1038  * Related ODP queue params set earlier in queue_setup_common().
1039  */
1040  if (odp_queue_param.nonblocking == ODP_NONBLOCKING_LF &&
1041  odp_queue_capa->plain.lockfree.max_num == 0) {
1042  *err_str = "Q-setup-unsched: non-blocking, lock-free unsched queues unavailable";
1043  return -1;
1044  }
1045  if (odp_queue_param.nonblocking == ODP_NONBLOCKING_WF &&
1046  odp_queue_capa->plain.waitfree.max_num == 0) {
1047  *err_str = "Q-setup-unsched: non-blocking, wait-free unsched queues unavailable";
1048  return -2;
1049  }
1050 
1051  /*
1052  * Note: The ODP queue context points to the EM queue elem.
1053  * The EM queue context set by the user using the API function
1054  * em_queue_set_context() is accessed through the queue_elem_t::context
1055  * and retrieved with em_queue_get_context().
1056  */
1057  odp_queue_param.context = q_elem;
1058  /*
1059  * Set the context data length (in bytes) for potential prefetching.
1060  * The ODP implementation may use this value as a hint for the number
1061  * of context data bytes to prefetch.
1062  */
1063  odp_queue_param.context_len = sizeof(*q_elem);
1064 
1065  int err = create_odp_queue(q_elem, &odp_queue_param);
1066 
1067  if (unlikely(err)) {
1068  *err_str = "Q-setup-unsched: plain odp queue creation failed!";
1069  return -3;
1070  }
1071 
1072  return 0;
1073 }
1074 
1075 /*
1076  * Helper function to queue_setup()
1077  *
1078  * Set EM queue params for (core-)local queues
1079  */
1080 static int
1081 queue_setup_local(queue_elem_t *q_elem, const queue_setup_t *setup,
1082  const char **err_str)
1083 {
1084  (void)err_str;
1085 
1086  q_elem->priority = (uint8_t)setup->prio;
1087  q_elem->type = EM_QUEUE_TYPE_LOCAL;
1089  /* local queues are not scheduled */
1090  q_elem->flags.scheduled = false;
1091  q_elem->state = EM_QUEUE_STATE_INIT;
1092 
1093  return 0;
1094 }
1095 
1096 /*
1097  * Helper function to queue_setup()
1098  *
1099  * Set EM queue params for output queues
1100  */
1101 static int
1102 queue_setup_output(queue_elem_t *q_elem, const queue_setup_t *setup,
1103  const char **err_str)
1104 {
1105  const em_queue_conf_t *qconf = setup->conf;
1106  const em_output_queue_conf_t *output_conf = qconf->conf;
1107  uint32_t nbr_output_queues;
1108 
1109  nbr_output_queues = env_atomic32_add_return(&em_shm->queue_tbl.output_queue_count, 1);
1110 
1111  if (unlikely(nbr_output_queues >= EM_MAX_OUTPUT_QUEUES)) {
1112  *err_str = "Q-setup-output: too many output queues";
1113  return -1;
1114  }
1115 
1116  q_elem->priority = EM_QUEUE_PRIO_UNDEF;
1117  q_elem->type = EM_QUEUE_TYPE_OUTPUT;
1119  /* output queues are not scheduled */
1120  q_elem->flags.scheduled = false;
1121  /* use unsched state for output queues */
1122  q_elem->state = EM_QUEUE_STATE_UNSCHEDULED;
1123 
1124  if (unlikely(output_conf->output_fn == NULL)) {
1125  *err_str = "Q-setup-output: invalid output function";
1126  return -2;
1127  }
1128 
1129  env_spinlock_lock(&em_shm->queue_tbl.output_queue_lock);
1130  for (unsigned int i = 0; i < EM_MAX_OUTPUT_QUEUES; i++) {
1131  if (em_shm->queue_tbl.output_queue_idx_free[i]) {
1132  em_shm->queue_tbl.output_queue_idx_free[i] = false;
1133  q_elem->output.idx = i;
1134  break;
1135  }
1136  }
1137  env_spinlock_unlock(&em_shm->queue_tbl.output_queue_lock);
1138 
1139  /* copy whole output conf */
1140  q_elem->output.output_conf = *output_conf;
1141  q_elem->output.output_fn_args_event = EM_EVENT_UNDEF;
1142  if (output_conf->args_len == 0) {
1143  /* 'output_fn_args' is ignored, if 'args_len' is 0 */
1144  q_elem->output.output_conf.output_fn_args = NULL;
1145  } else {
1146  em_event_t args_event;
1147  void *args_storage;
1148 
1149  /* alloc an event to copy the given fn-args into */
1150  args_event = em_alloc((uint32_t)output_conf->args_len,
1152  if (unlikely(args_event == EM_EVENT_UNDEF)) {
1153  *err_str = "Q-setup-output: alloc output_fn_args fails";
1154  return -3;
1155  }
1156  /* store the event handle for em_free() later */
1157  q_elem->output.output_fn_args_event = args_event;
1158  args_storage = em_event_pointer(args_event);
1159  memcpy(args_storage, output_conf->output_fn_args,
1160  output_conf->args_len);
1161  /* update the args ptr to point to the copied content */
1162  q_elem->output.output_conf.output_fn_args = args_storage;
1163  }
1164  env_spinlock_init(&q_elem->output.lock);
1165 
1166  /*
1167  * Set up a plain ODP queue for EM output queue (re-)ordering.
1168  *
1169  * EM output-queues need an odp-queue to ensure re-ordering if
1170  * events are sent into it from within an ordered context.
1171  */
1172  odp_queue_param_t odp_queue_param;
1173  /* Retrieve previously stored ODP queue capabilities */
1174  const odp_queue_capability_t *odp_queue_capa =
1175  &em_shm->queue_tbl.odp_queue_capability;
1176 
1177  /* Init odp queue params to default values */
1178  odp_queue_param_init(&odp_queue_param);
1179  /* Set common ODP queue params based on the EM Queue config */
1180  queue_setup_odp_common(setup, &odp_queue_param);
1181 
1182  odp_queue_param.type = ODP_QUEUE_TYPE_PLAIN;
1183  odp_queue_param.order = ODP_QUEUE_ORDER_KEEP;
1184 
1185  /* check nonblocking level against plain queue capabilities */
1186  if (odp_queue_param.nonblocking == ODP_NONBLOCKING_LF &&
1187  odp_queue_capa->plain.lockfree.max_num == 0) {
1188  *err_str = "Q-setup-output: non-blocking, lock-free unsched queues unavailable";
1189  return -4;
1190  }
1191  if (odp_queue_param.nonblocking == ODP_NONBLOCKING_WF &&
1192  odp_queue_capa->plain.waitfree.max_num == 0) {
1193  *err_str = "Q-setup-output: non-blocking, wait-free unsched queues unavailable";
1194  return -5;
1195  }
1196 
1197  /* output-queue dequeue protected by q_elem->output.lock */
1198  odp_queue_param.deq_mode = ODP_QUEUE_OP_MT_UNSAFE;
1199 
1200  /* explicitly show here that output queues should not set odp-context */
1201  odp_queue_param.context = NULL;
1202 
1203  int err = create_odp_queue(q_elem, &odp_queue_param);
1204 
1205  if (unlikely(err)) {
1206  *err_str = "Q-setup-output: plain odp queue creation failed!";
1207  return -6;
1208  }
1209 
1210  return 0;
1211 }
1212 
1213 /**
1214  * Helper func for queue_state_change() - check that state change is valid
1215  *
1216  * Valid state transitions:
1217  * ---------------------------------
1218  * | |new-state|new-state |
1219  * |old_state|is_setup |is_teardown|
1220  * |---------|---------|-----------|
1221  * |INVALID | INIT | (NULL) |
1222  * |INIT | BIND | INVALID |
1223  * |BIND | READY | INIT |
1224  * |READY | (NULL) | BIND |
1225  * ---------------------------------
1226  * State change check is made easy because the following condition is true
1227  * for valid state transitions: abs(old-new)=1
1228  */
1231  int is_setup /* vs. is_teardown */)
1232 {
1233  uint32_t state_diff;
1234 
1235  if (is_setup)
1236  state_diff = new_state - old_state;
1237  else
1238  state_diff = old_state - new_state;
1239 
1240  return (state_diff == 1) ? EM_OK : EM_ERR_BAD_STATE;
1241 }
1242 
1243 static inline em_status_t
1244 queue_state_set(queue_elem_t *const q_elem, queue_state_t new_state)
1245 {
1246  const queue_state_t old_state = q_elem->state;
1247  const int is_setup = (new_state == EM_QUEUE_STATE_READY);
1248  em_status_t err;
1249 
1250  /* allow multiple queue_enable/disable() calls */
1251  if (new_state == old_state &&
1252  (new_state == EM_QUEUE_STATE_READY ||
1253  new_state == EM_QUEUE_STATE_BIND))
1254  return EM_OK;
1255 
1256  err = queue_state_change__check(old_state, new_state, is_setup);
1257  if (unlikely(err != EM_OK))
1258  return err;
1259 
1260  q_elem->state = new_state;
1261  return EM_OK;
1262 }
1263 
1264 /**
1265  * Change the queue state
1266  */
1269 {
1270  em_status_t err = queue_state_set(q_elem, new_state);
1271 
1272  RETURN_ERROR_IF(err != EM_OK, err, EM_ESCOPE_QUEUE_STATE_CHANGE,
1273  "EM-Q:%" PRI_QUEUE " inv. state: %d=>%d",
1274  q_elem->queue, q_elem->state, new_state);
1275  return EM_OK;
1276 }
1277 
1278 /**
1279  * Change the queue state for all queues associated with the given EO
1280  */
1283 {
1284  em_status_t err = EM_OK;
1285  queue_elem_t *q_elem = NULL;
1286  list_node_t *pos;
1287  const list_node_t *list_node;
1288 
1289  /*
1290  * Loop through all queues associated with the EO
1291  */
1292  env_spinlock_lock(&eo_elem->lock);
1293 
1294  list_for_each(&eo_elem->queue_list, pos, list_node) {
1295  q_elem = list_node_to_queue_elem(list_node);
1296  err = queue_state_set(q_elem, new_state);
1297  if (unlikely(err != EM_OK))
1298  break;
1299  } /* end loop */
1300 
1301  env_spinlock_unlock(&eo_elem->lock);
1302 
1303  if (unlikely(err != EM_OK)) {
1304  uint32_t queue_u32 = q_elem ? q_elem->queue : 0;
1305  queue_state_t state = q_elem ? q_elem->state : EM_QUEUE_STATE_INVALID;
1306 
1307  return INTERNAL_ERROR(err, EM_ESCOPE_QUEUE_STATE_CHANGE,
1308  "EM-Q:%" PRIx32 " inv. state: %d=>%d",
1309  queue_u32, state, new_state);
1310  }
1311 
1312  return EM_OK;
1313 }
1314 
1315 /**
1316  * Enable event reception of an EM queue
1317  */
1320 {
1321  em_status_t ret;
1322 
1323  RETURN_ERROR_IF(q_elem == NULL || !queue_allocated(q_elem),
1324  EM_ERR_BAD_ID, EM_ESCOPE_QUEUE_ENABLE,
1325  "Invalid queue");
1326 
1328 
1329  RETURN_ERROR_IF(ret != EM_OK, ret, EM_ESCOPE_QUEUE_ENABLE,
1330  "queue_state_change()->READY fails EM-Q:%" PRI_QUEUE "",
1331  q_elem->queue);
1332 
1333  return EM_OK;
1334 }
1335 
1336 /**
1337  * Enable event reception of ALL queues belonging to an EO
1338  */
1341 {
1342  em_status_t ret;
1343 
1344  RETURN_ERROR_IF(eo_elem == NULL || !eo_allocated(eo_elem),
1345  EM_ERR_BAD_ID, EM_ESCOPE_QUEUE_ENABLE_ALL,
1346  "Invalid EO");
1347 
1349  RETURN_ERROR_IF(ret != EM_OK, ret, EM_ESCOPE_QUEUE_ENABLE_ALL,
1350  "queue_state_change_all()->READY fails EO:%" PRI_EO "",
1351  eo_elem->eo);
1352 
1353  return EM_OK;
1354 }
1355 
1356 /**
1357  * Disable event reception of an EM queue
1358  */
1361 {
1362  em_status_t ret;
1363 
1364  RETURN_ERROR_IF(q_elem == NULL || !queue_allocated(q_elem),
1365  EM_ERR_BAD_ID, EM_ESCOPE_QUEUE_DISABLE,
1366  "Invalid queue");
1367 
1368  /* Change the state of the queue */
1369  ret = queue_state_change(q_elem, EM_QUEUE_STATE_BIND);
1370  RETURN_ERROR_IF(ret != EM_OK, ret, EM_ESCOPE_QUEUE_DISABLE,
1371  "queue_state_change()->BIND fails, Q:%" PRI_QUEUE "",
1372  q_elem->queue);
1373 
1374  return EM_OK;
1375 }
1376 
1377 /**
1378  * Disable event reception of ALL queues belonging to an EO
1379  */
1382 {
1383  em_status_t ret;
1384 
1385  RETURN_ERROR_IF(eo_elem == NULL || !eo_allocated(eo_elem),
1386  EM_ERR_BAD_ID, EM_ESCOPE_QUEUE_DISABLE_ALL,
1387  "Invalid EO");
1388 
1390  RETURN_ERROR_IF(ret != EM_OK, ret, EM_ESCOPE_QUEUE_DISABLE_ALL,
1391  "queue_state_change_all()->BIND: EO:%" PRI_EO "",
1392  eo_elem->eo);
1393 
1394  return EM_OK;
1395 }
1396 
1397 void print_queue_elem_info(void)
1398 {
1399  EM_PRINT("queue-elem size: %zu B\n",
1400  sizeof(queue_elem_t));
1401 
1402  EM_DBG("\t\toffset\tsize\n"
1403  "\t\t------\t----\n"
1404  "valid_check:\t%3zu B\t%2zu B\n"
1405  "flags:\t\t%3zu B\t%2zu B\n"
1406  "state:\t\t%3zu B\t%2zu B\n"
1407  "priority:\t%3zu B\t%2zu B\n"
1408  "type:\t\t%3zu B\t%2zu B\n"
1409  "max_events:\t%3zu B\t%2zu B\n"
1410  "eo:\t\t%3zu B\t%2zu B\n"
1411  "queue:\t\t%3zu B\t%2zu B\n"
1412  "odp_queue:\t%3zu B\t%2zu B\n"
1413  "context:\t%3zu B\t%2zu B\n"
1414  "union {\n"
1415  " rcv_fn:\t%3zu B\t%2zu B\n"
1416  " rcv_multi_fn:\t%3zu B\t%2zu B\n"
1417  "}\n"
1418  "eo_ctx:\t\t%3zu B\t%2zu B\n"
1419  "union {\n"
1420  " agrp {\t%3zu B\t%2zu B\n"
1421  " .atomic_grp:\t%3zu B\t%2zu B\n"
1422  " .agrp_node:\t%3zu B\t%2zu B\n"
1423  " }\n"
1424  " output {\t%3zu B\t%2zu B\n"
1425  " .conf:\t%3zu B\t%2zu B\n"
1426  " .args_event:\t%3zu B\t%2zu B\n"
1427  " .idx:\t%3zu B\t%2zu B\n"
1428  " .lock:\t%3zu B\t%2zu B\n"
1429  " }\n"
1430  "}\n"
1431  "eo_elem:\t%3zu B\t%2zu B\n"
1432  "queue_group:\t%3zu B\t%2zu B\n"
1433  "queue_node:\t%3zu B\t%2zu B\n"
1434  "qgrp_node:\t%3zu B\t%2zu B\n"
1435  "queue_pool_elem:%3zu B\t%2zu B\n"
1436  "end:\t\t%3zu B\t%2zu B\n",
1437  offsetof(queue_elem_t, valid_check), sizeof_field(queue_elem_t, valid_check),
1438  offsetof(queue_elem_t, flags), sizeof_field(queue_elem_t, flags),
1439  offsetof(queue_elem_t, state), sizeof_field(queue_elem_t, state),
1440  offsetof(queue_elem_t, priority), sizeof_field(queue_elem_t, priority),
1441  offsetof(queue_elem_t, type), sizeof_field(queue_elem_t, type),
1442  offsetof(queue_elem_t, max_events), sizeof_field(queue_elem_t, max_events),
1443  offsetof(queue_elem_t, eo), sizeof_field(queue_elem_t, eo),
1444  offsetof(queue_elem_t, queue), sizeof_field(queue_elem_t, queue),
1445  offsetof(queue_elem_t, odp_queue), sizeof_field(queue_elem_t, odp_queue),
1446  offsetof(queue_elem_t, context), sizeof_field(queue_elem_t, context),
1447  offsetof(queue_elem_t, receive_func), sizeof_field(queue_elem_t, receive_func),
1448  offsetof(queue_elem_t, receive_multi_func),
1449  sizeof_field(queue_elem_t, receive_multi_func),
1450  offsetof(queue_elem_t, eo_ctx), sizeof_field(queue_elem_t, eo_ctx),
1451  offsetof(queue_elem_t, agrp), sizeof_field(queue_elem_t, agrp),
1452  offsetof(queue_elem_t, agrp.atomic_group),
1453  sizeof_field(queue_elem_t, agrp.atomic_group),
1454  offsetof(queue_elem_t, agrp.agrp_node), sizeof_field(queue_elem_t, agrp.agrp_node),
1455  offsetof(queue_elem_t, output), sizeof_field(queue_elem_t, output),
1456  offsetof(queue_elem_t, output.output_conf),
1457  sizeof_field(queue_elem_t, output.output_conf),
1458  offsetof(queue_elem_t, output.output_fn_args_event),
1459  sizeof_field(queue_elem_t, output.output_fn_args_event),
1460  offsetof(queue_elem_t, output.idx), sizeof_field(queue_elem_t, output.idx),
1461  offsetof(queue_elem_t, output.lock), sizeof_field(queue_elem_t, output.lock),
1462  offsetof(queue_elem_t, eo_elem), sizeof_field(queue_elem_t, eo_elem),
1463  offsetof(queue_elem_t, queue_group), sizeof_field(queue_elem_t, queue_group),
1464  offsetof(queue_elem_t, queue_node), sizeof_field(queue_elem_t, queue_node),
1465  offsetof(queue_elem_t, qgrp_node), sizeof_field(queue_elem_t, qgrp_node),
1466  offsetof(queue_elem_t, queue_pool_elem), sizeof_field(queue_elem_t, queue_pool_elem),
1467  offsetof(queue_elem_t, end), sizeof_field(queue_elem_t, end));
1468 
1469  EM_PRINT("\n");
1470 }
1471 
1473 {
1474  const odp_queue_capability_t *queue_capa =
1475  &em_shm->queue_tbl.odp_queue_capability;
1476  const odp_schedule_capability_t *sched_capa =
1477  &em_shm->queue_tbl.odp_schedule_capability;
1478  char plain_sz[24] = "n/a";
1479  char plain_lf_sz[24] = "n/a";
1480  char plain_wf_sz[24] = "n/a";
1481  char sched_sz[24] = "nolimit";
1482  const unsigned int max_queues = em_shm->opt.queue.max_num;
1483 
1484  /* Last dynamic EM queue */
1485  uint16_t last_dyn_queue = (uint16_t)(max_queues - 1 + EM_QUEUE_RANGE_OFFSET);
1486 
1487  if (queue_capa->plain.max_size > 0)
1488  snprintf(plain_sz, sizeof(plain_sz), "%u",
1489  queue_capa->plain.max_size);
1490  if (queue_capa->plain.lockfree.max_size > 0)
1491  snprintf(plain_lf_sz, sizeof(plain_lf_sz), "%u",
1492  queue_capa->plain.lockfree.max_size);
1493  if (queue_capa->plain.waitfree.max_size > 0)
1494  snprintf(plain_wf_sz, sizeof(plain_wf_sz), "%u",
1495  queue_capa->plain.waitfree.max_size);
1496 
1497  if (sched_capa->max_queue_size > 0)
1498  snprintf(sched_sz, sizeof(sched_sz), "%u",
1499  sched_capa->max_queue_size);
1500 
1501  plain_sz[sizeof(plain_sz) - 1] = '\0';
1502  plain_lf_sz[sizeof(plain_lf_sz) - 1] = '\0';
1503  plain_wf_sz[sizeof(plain_wf_sz) - 1] = '\0';
1504  sched_sz[sizeof(sched_sz) - 1] = '\0';
1505 
1506  EM_PRINT("ODP Queue Capabilities\n"
1507  "----------------------\n"
1508  " Max number of ODP queues: %u\n"
1509  " Max number of ODP ordered locks per queue: %u\n"
1510  " Max number of ODP scheduling groups: %u\n"
1511  " Max number of ODP scheduling priorities: %u\n"
1512  " PLAIN queues:\n"
1513  " blocking: count: %6u size: %6s\n"
1514  " nonblocking-lf: count: %6u size: %6s\n"
1515  " nonblocking-wf: count: %6u size: %6s\n"
1516  " SCHED queues:\n"
1517  " blocking: count: %6u size: %6s\n"
1518  " nonblocking-lf: %ssupported\n"
1519  " nonblocking-wf: %ssupported\n\n",
1520  queue_capa->max_queues, sched_capa->max_ordered_locks,
1521  sched_capa->max_groups, sched_capa->max_prios,
1522  queue_capa->plain.max_num, plain_sz,
1523  queue_capa->plain.lockfree.max_num, plain_lf_sz,
1524  queue_capa->plain.waitfree.max_num, plain_wf_sz,
1525  sched_capa->max_queues, sched_sz,
1526  sched_capa->lockfree_queues == ODP_SUPPORT_NO ? "not " : "",
1527  sched_capa->waitfree_queues == ODP_SUPPORT_NO ? "not " : "");
1528 
1529  EM_PRINT("EM Queues\n"
1530  "---------\n"
1531  " Max number of EM queues: %d (0x%x)\n"
1532  " EM queue handle offset: %d (0x%x)\n"
1533  " EM queue range: [%d - %d] ([0x%x - 0x%x])\n"
1534  " static range: [%d - %d] ([0x%x - 0x%x])\n"
1535  " internal range: [%d - %d] ([0x%x - 0x%x])\n"
1536  " dynamic range: [%d - %d] ([0x%x - 0x%x])\n"
1537  "\n",
1538  max_queues, max_queues,
1540  EM_QUEUE_STATIC_MIN, last_dyn_queue,
1541  EM_QUEUE_STATIC_MIN, last_dyn_queue,
1544  FIRST_INTERNAL_QUEUE, LAST_INTERNAL_QUEUE,
1545  FIRST_INTERNAL_QUEUE, LAST_INTERNAL_QUEUE,
1546  FIRST_DYN_QUEUE, last_dyn_queue,
1547  FIRST_DYN_QUEUE, last_dyn_queue);
1548 }
1549 
1550 void print_queue_prio_info(void)
1551 {
1552  #define MAXPRIOBUF 128
1553  char buf[MAXPRIOBUF];
1554  int pos = 0;
1555 
1556  for (int i = 0; i < EM_QUEUE_PRIO_NUM; i++) {
1557  /* comma separated list of priorities */
1558  int num = snprintf(&buf[pos], MAXPRIOBUF - pos, "%d%c",
1559  em_shm->queue_prio.map[i],
1560  i < (EM_QUEUE_PRIO_NUM - 1) ? ',' : '\0');
1561  if (num < 0 || num >= (MAXPRIOBUF - pos))
1562  break;
1563  pos += num;
1564  }
1565 
1566  buf[MAXPRIOBUF - 1] = 0;
1567  EM_PRINT(" Current queue priority map: [%s]\n", buf);
1568 }
1569 
1570 unsigned int
1571 queue_count(void)
1572 {
1573  return env_atomic32_get(&em_shm->queue_count);
1574 }
1575 
1576 size_t queue_get_name(const queue_elem_t *const q_elem,
1577  char name[/*out*/], const size_t maxlen)
1578 {
1579  em_queue_t queue = (em_queue_t)(uintptr_t)q_elem->queue;
1580  const char *queue_name = &em_shm->queue_tbl.name[queue_hdl2idx(queue)][0];
1581  size_t len = strnlen(queue_name, EM_QUEUE_NAME_LEN - 1);
1582 
1583  if (maxlen - 1 < len)
1584  len = maxlen - 1;
1585 
1586  if (len)
1587  memcpy(name, queue_name, len);
1588  name[len] = '\0';
1589 
1590  return len;
1591 }
1592 
1593 static void queue_init_prio_legacy(int minp, int maxp)
1594 {
1595  /* legacy mode - match the previous simple 3-level implementation */
1596 
1597  int def = odp_schedule_default_prio();
1598 
1599  /* needs to be synced with queue_prio_e values. Due to enum this can't be #if */
1600  COMPILE_TIME_ASSERT(EM_QUEUE_PRIO_HIGHEST < EM_QUEUE_PRIO_NUM,
1601  "queue_prio_e values / EM_QUEUE_PRIO_NUM mismatch!\n");
1602 
1603  /* init both ends first */
1604  for (int i = 0; i < EM_QUEUE_PRIO_NUM; i++)
1605  em_shm->queue_prio.map[i] = i < (EM_QUEUE_PRIO_NUM / 2) ? minp : maxp;
1606 
1607  /* then add NORMAL in the middle */
1609  /* if room: widen the normal range a bit */
1610  if (EM_QUEUE_PRIO_NORMAL - EM_QUEUE_PRIO_LOW > 1) /* legacy 4-2 */
1612  if (EM_QUEUE_PRIO_HIGH - EM_QUEUE_PRIO_NORMAL > 1) /* legacy 6-4 */
1614 }
1615 
1616 static void queue_init_prio_adaptive(int minp, int maxp, int nump)
1617 {
1618  double step = (double)nump / EM_QUEUE_PRIO_NUM;
1619  double cur = (double)minp;
1620 
1621  /* simple linear fit to available levels */
1622 
1623  for (int i = 0; i < EM_QUEUE_PRIO_NUM; i++) {
1624  em_shm->queue_prio.map[i] = (int)cur;
1625  cur += step;
1626  }
1627 
1628  /* last EM prio always highest ODP level */
1629  if (em_shm->queue_prio.map[EM_QUEUE_PRIO_NUM - 1] != maxp)
1630  em_shm->queue_prio.map[EM_QUEUE_PRIO_NUM - 1] = maxp;
1631 }
1632 
1633 static int queue_init_prio_custom(int minp, int maxp)
1634 {
1635  for (int i = 0; i < EM_QUEUE_PRIO_NUM; i++) {
1636  em_shm->queue_prio.map[i] = minp + em_shm->opt.queue.priority.custom_map[i];
1637  if (em_shm->queue_prio.map[i] > maxp || em_shm->queue_prio.map[i] < minp) {
1638  EM_PRINT("Invalid odp priority %d!\n", em_shm->queue_prio.map[i]);
1639  return -1;
1640  }
1641  }
1642  return 0;
1643 }
1644 
1645 static int queue_init_prio_map(int minp, int maxp, int nump)
1646 {
1647  /* EM normally uses 8 priority levels (EM_QUEUE_PRIO_NUM).
1648  * These are mapped to ODP runtime values depending on selected map mode
1649  */
1650 
1651  switch (em_shm->opt.queue.priority.map_mode) {
1652  case 0: /* legacy mode, use only 3 levels */
1653  queue_init_prio_legacy(minp, maxp);
1654  break;
1655  case 1: /* adapt to runtime (full spread) */
1656  queue_init_prio_adaptive(minp, maxp, nump);
1657  break;
1658  case 2: /** custom */
1659  if (queue_init_prio_custom(minp, maxp) != 0)
1660  return -1;
1661  break;
1662  default:
1663  EM_PRINT("Unknown map_mode %d!\n", em_shm->opt.queue.priority.map_mode);
1664  return -1;
1665  }
1666 
1667  EM_PRINT(" EM uses %d priorities, runtime %d (%d-%d)\n",
1668  EM_QUEUE_PRIO_NUM, nump, minp, nump - minp - 1);
1669  print_queue_prio_info();
1670  return 0;
1671 }
1672 
1674 {
1675  const char *str;
1676 
1677  switch (state) {
1679  str = "INVALID";
1680  break;
1681  case EM_QUEUE_STATE_INIT:
1682  str = "INIT";
1683  break;
1684  case EM_QUEUE_STATE_BIND:
1685  str = "BIND";
1686  break;
1687  case EM_QUEUE_STATE_READY:
1688  str = "READY";
1689  break;
1690  case EM_QUEUE_STATE_UNSCHEDULED:
1691  str = "UNSCH";
1692  break;
1693  default:
1694  str = "UNKNOWN";
1695  break;
1696  }
1697 
1698  return str;
1699 }
1700 
1702 {
1703  const char *type_str;
1704 
1705  switch (type) {
1706  case EM_QUEUE_TYPE_UNDEF:
1707  type_str = "UNDEF";
1708  break;
1709  case EM_QUEUE_TYPE_ATOMIC:
1710  type_str = "ATOMIC";
1711  break;
1713  type_str = "PARALLEL";
1714  break;
1716  type_str = "ORDERED";
1717  break;
1719  type_str = "UNSCH";
1720  break;
1721  case EM_QUEUE_TYPE_LOCAL:
1722  type_str = "LOCAL";
1723  break;
1724  case EM_QUEUE_TYPE_OUTPUT:
1725  type_str = "OUTPUT";
1726  break;
1727  default:
1728  type_str = "UNKNOWN";
1729  break;
1730  }
1731 
1732  return type_str;
1733 }
1734 
1735 #define QUEUE_INFO_HDR_STR \
1736 "Number of queues: %d\n\n" \
1737 "Handle Name Priority Type State Qgrp" \
1738 " Agrp EO Multi-rcv Max-events Ctx\n" \
1739 "---------------------------------------------------------------------------" \
1740 "----------------------------------------------------\n" \
1741 "%s\n"
1742 
1743 #define QUEUE_INFO_LEN 128
1744 
1745 #define QUEUE_INFO_FMT \
1746 "%-10" PRI_QUEUE "%-32s%-10" PRI_QPRIO "%-10s%-9s%-10" PRI_QGRP "%-10" PRI_AGRP \
1747 "%-10" PRI_EO "%-11c%-12d%-3c\n" /*128 bytes per queue*/
1748 
1750 {
1751  unsigned int q_num;
1752  const queue_elem_t *q_elem;
1753  char q_name[EM_QUEUE_NAME_LEN];
1754  int len = 0;
1755  int n_print = 0;
1756 
1757  em_queue_t q = em_queue_get_first(&q_num);
1758 
1759  /* q_num may not match the amount of queues actually returned by iterating
1760  * using em_queue_get_next() if queues are added or removed in parallel
1761  * by another core. Thus space for 10 extra queues is reserved. If more
1762  * than 10 queues are added by other cores in parallel, we print only info
1763  * of the (q_num + 10) queues.
1764  */
1765  const int q_info_buf_len = (q_num + 10) * QUEUE_INFO_LEN + 1/*Terminating null byte*/;
1766  char q_info_buf[q_info_buf_len];
1767 
1768  while (q != EM_QUEUE_UNDEF) {
1769  q_elem = queue_elem_get(q);
1770 
1771  if (unlikely(q_elem == NULL || !queue_allocated(q_elem))) {
1772  q = em_queue_get_next();
1773  continue;
1774  }
1775 
1776  em_atomic_group_t atomic_group = EM_ATOMIC_GROUP_UNDEF;
1777  em_eo_t eo = (em_eo_t)(uintptr_t)q_elem->eo;
1778 
1779  if (q_elem->flags.in_atomic_group)
1780  atomic_group = q_elem->agrp.atomic_group;
1781 
1782  queue_get_name(q_elem, q_name, EM_QUEUE_NAME_LEN - 1);
1783  n_print = snprintf(q_info_buf + len,
1784  q_info_buf_len - len,
1785  QUEUE_INFO_FMT,
1786  q, q_name, q_elem->priority,
1787  queue_get_type_str(q_elem->type),
1788  queue_get_state_str(q_elem->state),
1789  q_elem->queue_group, atomic_group, eo,
1790  q_elem->flags.use_multi_rcv ? 'Y' : 'N',
1791  q_elem->max_events,
1792  q_elem->context ? 'Y' : 'N');
1793 
1794  /* Not enough space to hold more queue info */
1795  if (n_print >= q_info_buf_len - len)
1796  break;
1797 
1798  len += n_print;
1799  q = em_queue_get_next();
1800  }
1801 
1802  /* No queue */
1803  if (len == 0) {
1804  EM_PRINT("No EM queue!\n");
1805  return;
1806  }
1807 
1808  /*
1809  * To prevent printing incomplete information of the last queue when
1810  * there is not enough space to hold all queue info.
1811  */
1812  q_info_buf[len] = '\0';
1813  EM_PRINT(QUEUE_INFO_HDR_STR, q_num, q_info_buf);
1814 }
EM_QUEUE_PRIO_HIGHEST
@ EM_QUEUE_PRIO_HIGHEST
Definition: event_machine_hw_types.h:155
EM_QUEUE_GROUP_UNDEF
#define EM_QUEUE_GROUP_UNDEF
Definition: event_machine_types.h:127
eo_elem_t::queue_list
list_node_t queue_list
Definition: em_eo_types.h:75
queue_elem_t::eo_ctx
void * eo_ctx
Definition: em_queue_types.h:241
queue_get_state_str
const char * queue_get_state_str(queue_state_t state)
Definition: em_queue.c:1673
queue_elem_t::queue_group
em_queue_group_t queue_group
Definition: em_queue_types.h:252
queue_state_change
em_status_t queue_state_change(queue_elem_t *const queue_elem, queue_state_t new_state)
Definition: em_queue.c:1268
EM_OK
#define EM_OK
Definition: event_machine_types.h:329
EM_EVENT_TYPE_SW
@ EM_EVENT_TYPE_SW
Definition: event_machine_hw_types.h:72
EM_QUEUE_STATE_BIND
@ EM_QUEUE_STATE_BIND
Definition: em_queue_types.h:135
em_locm_t::local_queues
local_queues_t local_queues
Definition: em_mem.h:222
EM_QUEUE_PRIO_UNDEF
#define EM_QUEUE_PRIO_UNDEF
Definition: event_machine_hw_types.h:157
em_locm_t::output_queue_track
output_queue_track_t output_queue_track
Definition: em_mem.h:242
EM_QUEUE_PRIO_NORMAL
@ EM_QUEUE_PRIO_NORMAL
Definition: event_machine_hw_types.h:153
EM_QUEUE_PRIO_NUM
#define EM_QUEUE_PRIO_NUM
Definition: event_machine_hw_config.h:103
em_queue_conf_t
Definition: event_machine_types.h:212
EM_QUEUE_STATE_INVALID
@ EM_QUEUE_STATE_INVALID
Definition: em_queue_types.h:126
queue_pool_t
Definition: em_queue_types.h:298
queue_elem_t::max_events
uint16_t max_events
Definition: em_queue_types.h:219
em_shm_t::queue_prio
struct em_shm_t::@53 queue_prio
queue_tbl_t::odp_queue_capability
odp_queue_capability_t odp_queue_capability
Definition: em_queue_types.h:273
queue_state_change_all
em_status_t queue_state_change_all(eo_elem_t *const eo_elem, queue_state_t new_state)
Definition: em_queue.c:1282
EM_EVENT_UNDEF
#define EM_EVENT_UNDEF
Definition: event_machine_types.h:62
queue_elem_t::is_pktin
uint8_t is_pktin
Definition: em_queue_types.h:203
queue_group_elem_t
Definition: em_queue_group_types.h:59
queue_get_type_str
const char * queue_get_type_str(em_queue_type_t type)
Definition: em_queue.c:1701
queue_elem_t::receive_multi_func
em_receive_multi_func_t receive_multi_func
Definition: em_queue_types.h:237
EM_POOL_DEFAULT
#define EM_POOL_DEFAULT
Definition: event_machine_hw_config.h:191
eo_elem_t::eo
em_eo_t eo
Definition: em_eo_types.h:81
em_output_queue_conf_t::args_len
size_t args_len
Definition: event_machine_hw_types.h:411
queue_elem_t::eo_elem
eo_elem_t * eo_elem
Definition: em_queue_types.h:249
EM_QUEUE_TYPE_PARALLEL
@ EM_QUEUE_TYPE_PARALLEL
Definition: event_machine_hw_types.h:117
em_output_queue_conf_t::output_fn
em_output_func_t output_fn
Definition: event_machine_hw_types.h:402
queue_elem_t::type
uint8_t type
Definition: em_queue_types.h:216
print_queue_capa
void print_queue_capa(void)
Definition: em_queue.c:1472
queue_group_elem_t::odp_sched_group
odp_schedule_group_t odp_sched_group
Definition: em_queue_group_types.h:63
em_locm
ENV_LOCAL em_locm_t em_locm
PRI_EO
#define PRI_EO
Definition: event_machine_types.h:97
em_free
void em_free(em_event_t event)
Definition: event_machine_event.c:261
em_output_queue_conf_t::output_fn_args
void * output_fn_args
Definition: event_machine_hw_types.h:406
stash_entry_t
Definition: em_event_types.h:86
queue_elem_t::queue_pool_elem
objpool_elem_t queue_pool_elem
Definition: em_queue_types.h:259
EM_QUEUE_STATE_INIT
@ EM_QUEUE_STATE_INIT
Definition: em_queue_types.h:133
EM_QUEUE_STATIC_NUM
#define EM_QUEUE_STATIC_NUM
Definition: event_machine_hw_config.h:136
EM_ERR_ALLOC_FAILED
@ EM_ERR_ALLOC_FAILED
Definition: event_machine_hw_types.h:287
EM_ERR_LIB_FAILED
@ EM_ERR_LIB_FAILED
Definition: event_machine_hw_types.h:291
EM_EO_UNDEF
#define EM_EO_UNDEF
Definition: event_machine_types.h:95
EM_QUEUE_TYPE_ATOMIC
@ EM_QUEUE_TYPE_ATOMIC
Definition: event_machine_hw_types.h:112
q_elem_output_::lock
env_spinlock_t lock
Definition: em_queue_types.h:174
queue_elem_t::state
queue_state_t state
Definition: em_queue_types.h:210
list_node_t
Definition: list.h:42
em_queue_type_t
uint32_t em_queue_type_t
Definition: event_machine_types.h:168
em_queue_get_first
em_queue_t em_queue_get_first(unsigned int *num)
Definition: event_machine_queue.c:307
queue_enable_all
em_status_t queue_enable_all(eo_elem_t *const eo_elem)
Definition: em_queue.c:1340
EM_QUEUE_FLAG_DEQ_NOT_MTSAFE
#define EM_QUEUE_FLAG_DEQ_NOT_MTSAFE
Definition: event_machine_hw_types.h:229
EM_QUEUE_NAME_LEN
#define EM_QUEUE_NAME_LEN
Definition: event_machine_config.h:125
q_elem_output_::output_conf
em_output_queue_conf_t output_conf
Definition: em_queue_types.h:168
EM_QUEUE_TYPE_UNSCHEDULED
@ EM_QUEUE_TYPE_UNSCHEDULED
Definition: event_machine_hw_types.h:127
event_hdr
Definition: em_event_types.h:184
eo_elem_t
Definition: em_eo_types.h:47
em_output_queue_conf_t
Definition: event_machine_hw_types.h:396
queue_tbl_t::odp_schedule_capability
odp_schedule_capability_t odp_schedule_capability
Definition: em_queue_types.h:275
EM_QUEUE_RANGE_OFFSET
#define EM_QUEUE_RANGE_OFFSET
Definition: event_machine_hw_config.h:115
em_shm_t::libconfig
libconfig_t libconfig
Definition: em_mem.h:146
EM_QUEUE_TYPE_UNDEF
@ EM_QUEUE_TYPE_UNDEF
Definition: event_machine_hw_types.h:107
em_shm_t::map
int map[EM_QUEUE_PRIO_NUM]
Definition: em_mem.h:150
queue_setup_t
Definition: em_queue_types.h:85
queue_elem_t::odp_queue
odp_queue_t odp_queue
Definition: em_queue_types.h:228
queue_elem_t::queue
uint32_t queue
Definition: em_queue_types.h:225
em_queue_conf_t::flags
em_queue_flag_t flags
Definition: event_machine_types.h:218
atomic_group_remove_queue
void atomic_group_remove_queue(queue_elem_t *const q_elem)
Definition: em_atomic_group.c:126
atomic_group_elem_t
Definition: em_atomic_group_types.h:46
RETURN_ERROR_IF
#define RETURN_ERROR_IF(cond, error, escope, fmt,...)
Definition: em_error.h:50
queue_elem_t::context
void * context
Definition: em_queue_types.h:231
em_alloc
em_event_t em_alloc(uint32_t size, em_event_type_t type, em_pool_t pool)
Definition: event_machine_event.c:33
em_queue_conf_t::conf_len
size_t conf_len
Definition: event_machine_types.h:229
EM_ERR_BAD_ID
@ EM_ERR_BAD_ID
Definition: event_machine_hw_types.h:265
queue_enable
em_status_t queue_enable(queue_elem_t *const q_elem)
Definition: em_queue.c:1319
INTERNAL_ERROR
#define INTERNAL_ERROR(error, escope, fmt,...)
Definition: em_error.h:43
queue_state_t
uint8_t queue_state_t
Definition: em_queue_types.h:151
queue_elem_t::in_atomic_group
uint8_t in_atomic_group
Definition: em_queue_types.h:201
queue_tbl_t
Definition: em_queue_types.h:271
em_queue_conf_t::conf
void * conf
Definition: event_machine_types.h:234
queue_setup_common
void queue_setup_common(queue_elem_t *q_elem, const queue_setup_t *setup)
Definition: em_queue.c:779
queue_delete
em_status_t queue_delete(queue_elem_t *const queue_elem)
Definition: em_queue.c:652
queue_create
em_queue_t queue_create(const char *name, em_queue_type_t type, em_queue_prio_t prio, em_queue_group_t queue_group, em_queue_t queue_req, em_atomic_group_t atomic_group, const em_queue_conf_t *conf, const char **err_str)
Definition: em_queue.c:592
queue_disable
em_status_t queue_disable(queue_elem_t *const q_elem)
Definition: em_queue.c:1360
objpool_t
Definition: objpool.h:64
queue_elem_t::receive_func
em_receive_func_t receive_func
Definition: em_queue_types.h:235
queue_elem_t::scheduled
uint8_t scheduled
Definition: em_queue_types.h:199
em_status_t
uint32_t em_status_t
Definition: event_machine_types.h:321
EM_QUEUE_TYPE_LOCAL
@ EM_QUEUE_TYPE_LOCAL
Definition: event_machine_hw_types.h:131
PRI_QUEUE
#define PRI_QUEUE
Definition: event_machine_types.h:109
EM_QUEUE_STATE_READY
@ EM_QUEUE_STATE_READY
Definition: em_queue_types.h:137
PRI_QGRP
#define PRI_QGRP
Definition: event_machine_types.h:129
queue_elem_t::priority
uint8_t priority
Definition: em_queue_types.h:213
queue_elem_t::valid_check
uint16_t valid_check
Definition: em_queue_types.h:191
objpool_elem_t
Definition: objpool.h:48
EM_ATOMIC_GROUP_UNDEF
#define EM_ATOMIC_GROUP_UNDEF
Definition: event_machine_types.h:156
EM_QUEUE_PRIO_HIGH
@ EM_QUEUE_PRIO_HIGH
Definition: event_machine_hw_types.h:154
q_elem_output_
Definition: em_queue_types.h:166
em_shm
em_shm_t * em_shm
Definition: event_machine_init.c:41
EM_QUEUE_UNDEF
#define EM_QUEUE_UNDEF
Definition: event_machine_types.h:107
queue_state_change__check
em_status_t queue_state_change__check(queue_state_t old_state, queue_state_t new_state, int is_setup)
Definition: em_queue.c:1230
q_elem_output_::output_fn_args_event
em_event_t output_fn_args_event
Definition: em_queue_types.h:170
em_include.h
EM_QUEUE_TYPE_PARALLEL_ORDERED
@ EM_QUEUE_TYPE_PARALLEL_ORDERED
Definition: event_machine_hw_types.h:122
EM_QUEUE_TYPE_OUTPUT
@ EM_QUEUE_TYPE_OUTPUT
Definition: event_machine_hw_types.h:137
eo_elem_t::lock
env_spinlock_t lock
Definition: em_eo_types.h:73
em_core_id
int em_core_id(void)
Definition: event_machine_core.c:34
queue_disable_all
em_status_t queue_disable_all(eo_elem_t *const eo_elem)
Definition: em_queue.c:1381
EM_QUEUE_FLAG_NONBLOCKING_WF
#define EM_QUEUE_FLAG_NONBLOCKING_WF
Definition: event_machine_hw_types.h:203
queue_elem_t::eo
uint16_t eo
Definition: em_queue_types.h:222
EM_QUEUE_FLAG_ENQ_NOT_MTSAFE
#define EM_QUEUE_FLAG_ENQ_NOT_MTSAFE
Definition: event_machine_hw_types.h:216
queue_alloc
em_queue_t queue_alloc(em_queue_t queue, const char **err_str)
Definition: em_queue.c:411
queue_init_local
em_status_t queue_init_local(void)
Definition: em_queue.c:303
EM_QUEUE_STATIC_MAX
#define EM_QUEUE_STATIC_MAX
Definition: event_machine_hw_config.h:131
EM_QUEUE_PRIO_LOW
@ EM_QUEUE_PRIO_LOW
Definition: event_machine_hw_types.h:152
evstate_em2usr_multi
void evstate_em2usr_multi(em_event_t ev_tbl[], event_hdr_t *const ev_hdr_tbl[], const int num, const uint16_t api_op)
Definition: em_event_state.c:970
EM_ESCOPE_INIT
#define EM_ESCOPE_INIT
Definition: event_machine_hw_types.h:463
print_queue_info
void print_queue_info(void)
Definition: em_queue.c:1749
EM_QUEUE_STATIC_MIN
#define EM_QUEUE_STATIC_MIN
Definition: event_machine_hw_config.h:126
queue_init
em_status_t queue_init(queue_tbl_t *const queue_tbl, queue_pool_t *const queue_pool, queue_pool_t *const queue_pool_static)
Definition: em_queue.c:217
EM_QUEUE_FLAG_MASK
#define EM_QUEUE_FLAG_MASK
Definition: event_machine_types.h:205
em_queue_conf_t::min_events
unsigned int min_events
Definition: event_machine_types.h:224
EM_QUEUE_LOCAL_MULTI_MAX_BURST
#define EM_QUEUE_LOCAL_MULTI_MAX_BURST
Definition: event_machine_hw_config.h:239
em_queue_prio_t
uint32_t em_queue_prio_t
Definition: event_machine_types.h:186
queue_term_local
em_status_t queue_term_local(void)
Definition: em_queue.c:367
q_elem_atomic_group_::atomic_group
em_atomic_group_t atomic_group
Definition: em_queue_types.h:158
em_free_multi
void em_free_multi(em_event_t events[], int num)
Definition: event_machine_event.c:370
em_locm_t
Definition: em_mem.h:188
em_queue_get_next
em_queue_t em_queue_get_next(void)
Definition: event_machine_queue.c:333
EM_ERR_BAD_STATE
@ EM_ERR_BAD_STATE
Definition: event_machine_hw_types.h:263
em_queue_flag_t
uint32_t em_queue_flag_t
Definition: event_machine_types.h:199
EM_MAX_OUTPUT_QUEUES
#define EM_MAX_OUTPUT_QUEUES
Definition: event_machine_config.h:131
internal_queue_t
Definition: em_queue_types.h:98
queue_elem_t
Definition: em_queue_types.h:180
em_event_pointer
void * em_event_pointer(em_event_t event)
Definition: event_machine_event.c:750
EM_QUEUE_FLAG_DEFAULT
#define EM_QUEUE_FLAG_DEFAULT
Definition: event_machine_hw_types.h:171
EM_QUEUE_FLAG_NONBLOCKING_LF
#define EM_QUEUE_FLAG_NONBLOCKING_LF
Definition: event_machine_hw_types.h:191
q_elem_output_::idx
uint32_t idx
Definition: em_queue_types.h:172