EM-ODP  3.7.0
Event Machine on ODP
em_queue_group.c
1 /*
2  * Copyright (c) 2015, Nokia Solutions and Networks
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * * Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in the
13  * documentation and/or other materials provided with the distribution.
14  * * Neither the name of the copyright holder nor the names of its
15  * contributors may be used to endorse or promote products derived
16  * from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "em_include.h"
32 
33 /**
34  * em_queue_group_modify() triggers an internal 'Done'-notification event
35  * that updates the queue group mask. This struct contains the callback args.
36  */
37 typedef struct {
38  queue_group_elem_t *qgrp_elem;
39  em_core_mask_t new_mask;
41 
42 static em_status_t core_queue_groups_create(void);
43 static em_status_t core_queue_group_join(void);
44 static em_queue_group_t default_queue_group_create(void);
45 static em_queue_group_t default_queue_group_join(void);
46 
47 static em_queue_group_t
48 queue_group_create_escope(const char *name, const em_core_mask_t *mask,
49  int num_notif, const em_notif_t notif_tbl[],
50  em_queue_group_t requested_queue_group,
51  em_escope_t escope);
52 
53 static void q_grp_add_core(const queue_group_elem_t *qgrp_elem);
54 static void q_grp_rem_core(const queue_group_elem_t *qgrp_elem);
55 
56 static void q_grp_create_done_callback(void *arg_ptr);
57 static void q_grp_create_sync_done_callback(void *arg_ptr);
58 static void q_grp_create_done(const queue_group_elem_t *const qgrp_elem,
59  const em_core_mask_t *const new_mask);
60 static void q_grp_create_sync_done(const queue_group_elem_t *const qgrp_elem,
61  const em_core_mask_t *const new_mask);
62 
63 static void q_grp_modify_done_callback(void *arg_ptr);
64 static void q_grp_modify_sync_done_callback(void *arg_ptr);
65 static void q_grp_modify_done(const queue_group_elem_t *const qgrp_elem,
66  const em_core_mask_t *const new_mask);
67 
68 static void q_grp_delete_done_callback(void *arg_ptr);
69 static void q_grp_delete_sync_done_callback(void *arg_ptr);
70 static void q_grp_delete_done(queue_group_elem_t *const qgrp_elem,
71  const em_core_mask_t *const new_mask);
72 
73 static em_status_t
74 send_qgrp_addrem_reqs(queue_group_elem_t *qgrp_elem,
75  const em_core_mask_t *new_mask,
76  const em_core_mask_t *add_mask,
77  const em_core_mask_t *rem_mask,
78  int num_notif, const em_notif_t notif_tbl[],
79  em_escope_t escope);
80 
81 /**
82  * Return the queue group elem that includes the given objpool_elem_t
83  */
84 static inline queue_group_elem_t *
85 queue_group_poolelem2qgrpelem(objpool_elem_t *const queue_group_pool_elem)
86 {
87  return (queue_group_elem_t *)((uintptr_t)queue_group_pool_elem -
88  offsetof(queue_group_elem_t, queue_group_pool_elem));
89 }
90 
91 static int
92 read_config_file(void)
93 {
94  const char *conf_str;
95  bool val_bool = false;
96  int ret;
97 
98  EM_PRINT("EM queue group config:\n");
99 
100  /*
101  * Option: esv.enable - runtime enable/disable
102  */
103  conf_str = "queue_group.create_core_queue_groups";
104  ret = em_libconfig_lookup_bool(&em_shm->libconfig, conf_str, &val_bool);
105  if (unlikely(!ret)) {
106  EM_LOG(EM_LOG_ERR, "Config option '%s' not found\n", conf_str);
107  return -1;
108  }
109  /* store & print the value */
110  em_shm->opt.queue_group.create_core_queue_groups = val_bool;
111  EM_PRINT(" %s: %s(%d)\n", conf_str, val_bool ? "true" : "false",
112  val_bool);
113 
114  em_shm->opt.queue_group.create_core_queue_groups = val_bool;
115 
116  return 0;
117 }
118 
119 /**
120  * Queue group inits done at global init (once at startup on one core)
121  */
123  queue_group_pool_t *const queue_group_pool)
124 {
125  const uint32_t objpool_subpools = MIN(4, OBJSUBPOOLS_MAX);
126  queue_group_elem_t *queue_group_elem;
127  int ret;
128 
129  if (read_config_file())
130  return EM_ERR_LIB_FAILED;
131 
132  memset(queue_group_tbl, 0, sizeof(queue_group_tbl_t));
133  memset(queue_group_pool, 0, sizeof(queue_group_pool_t));
134  env_atomic32_init(&em_shm->queue_group_count);
135 
136  for (int i = 0; i < EM_MAX_QUEUE_GROUPS; i++) {
137  queue_group_elem = &queue_group_tbl->queue_group_elem[i];
138  queue_group_elem->queue_group = qgrp_idx2hdl(i);
139  /* Initialize empty queue list */
140  env_spinlock_init(&queue_group_elem->lock);
141  list_init(&queue_group_elem->queue_list);
142  }
143 
144  ret = objpool_init(&queue_group_pool->objpool, objpool_subpools);
145  if (ret != 0)
146  return EM_ERR_LIB_FAILED;
147 
148  for (uint32_t i = 0; i < EM_MAX_QUEUE_GROUPS; i++) {
149  queue_group_elem = &queue_group_tbl->queue_group_elem[i];
150  objpool_add(&queue_group_pool->objpool, i % objpool_subpools,
151  &queue_group_elem->queue_group_pool_elem);
152  }
153 
154  /*
155  * Create the EM default queue group: EM_QUEUE_GROUP_DEFAULT, "default"
156  */
157  em_queue_group_t default_queue_group = default_queue_group_create();
158 
159  if (default_queue_group != EM_QUEUE_GROUP_DEFAULT) {
160  EM_LOG(EM_LOG_ERR, "default_queue_group_create() failed!\n");
161  return EM_ERR_LIB_FAILED;
162  }
163 
164  /*
165  * Create EM single-core queue groups if enabled by config.
166  */
167  if (em_shm->opt.queue_group.create_core_queue_groups) {
168  em_status_t stat = core_queue_groups_create();
169 
170  if (stat != EM_OK) {
171  EM_LOG(EM_LOG_ERR, "core_queue_groups_create():%" PRI_STAT "\n", stat);
172  return stat;
173  }
174  }
175 
176  return EM_OK;
177 }
178 
179 em_status_t queue_group_init_local(void)
180 {
181  /*
182  * Update the EM default queue group with this cores information
183  */
184  em_queue_group_t def_qgrp = default_queue_group_join();
185 
186  if (def_qgrp != EM_QUEUE_GROUP_DEFAULT) {
187  EM_LOG(EM_LOG_ERR, "default_queue_group_join() failed!\n");
188  return EM_ERR_LIB_FAILED;
189  }
190 
191  /*
192  * Update the single-core queue group with this core's information
193  * if enabled by config.
194  */
195  if (em_shm->opt.queue_group.create_core_queue_groups) {
196  em_status_t stat = core_queue_group_join();
197 
198  if (stat != EM_OK) {
199  EM_LOG(EM_LOG_ERR, "core_queue_group_join():%" PRI_STAT "\n", stat);
200  return EM_ERR_LIB_FAILED;
201  }
202  }
203 
204  return EM_OK;
205 }
206 
207 /**
208  * Allocate a new EM queue group
209  *
210  * @param queue_group EM queue group handle if a specific EM queue group is
211  * requested, EM_QUEUE_GROUP_UNDEF if any EM queue group
212  * will do.
213  *
214  * @return EM queue group handle
215  * @retval EM_QUEUE_GROUP_UNDEF on failure
216  */
217 static em_queue_group_t
218 queue_group_alloc(em_queue_group_t queue_group)
219 {
220  queue_group_elem_t *qgrp_elem;
221  objpool_elem_t *qgrp_pool_elem;
222 
223  if (queue_group == EM_QUEUE_GROUP_UNDEF) {
224  /*
225  * Allocate any queue group, i.e. take next available
226  */
227  qgrp_pool_elem = objpool_rem(&em_shm->queue_group_pool.objpool,
228  em_core_id());
229  if (unlikely(qgrp_pool_elem == NULL))
230  return EM_QUEUE_GROUP_UNDEF;
231 
232  qgrp_elem = queue_group_poolelem2qgrpelem(qgrp_pool_elem);
233  } else {
234  /*
235  * Allocate a specific queue group, handle given as argument
236  */
237  qgrp_elem = queue_group_elem_get(queue_group);
238  if (unlikely(qgrp_elem == NULL))
239  return EM_QUEUE_GROUP_UNDEF;
240 
241  env_spinlock_lock(&qgrp_elem->lock);
242  /* Verify that the queue group is not allocated */
243  if (queue_group_allocated(qgrp_elem)) {
244  env_spinlock_unlock(&qgrp_elem->lock);
245  return EM_QUEUE_GROUP_UNDEF;
246  }
247 
248  /* Remove the queue group from the pool */
249  int ret = objpool_rem_elem(&em_shm->queue_group_pool.objpool,
250  &qgrp_elem->queue_group_pool_elem);
251  env_spinlock_unlock(&qgrp_elem->lock);
252  if (unlikely(ret != 0))
253  return EM_QUEUE_GROUP_UNDEF;
254  }
255 
256  env_atomic32_inc(&em_shm->queue_group_count);
257  return qgrp_elem->queue_group;
258 }
259 
260 /**
261  * Free an EM queue group
262  *
263  * @param queue_group EM queue group handle
264  *
265  * @return EM status
266  * @retval EM_QUEUE_GROUP_UNDEF on failure
267  */
268 static em_status_t
269 queue_group_free(em_queue_group_t queue_group)
270 {
271  queue_group_elem_t *const queue_group_elem =
272  queue_group_elem_get(queue_group);
273 
274  if (unlikely(queue_group_elem == NULL))
275  return EM_ERR_BAD_ID;
276 
277  objpool_add(&em_shm->queue_group_pool.objpool,
278  queue_group_elem->queue_group_pool_elem.subpool_idx,
279  &queue_group_elem->queue_group_pool_elem);
280 
281  env_atomic32_dec(&em_shm->queue_group_count);
282  return EM_OK;
283 }
284 
285 /**
286  * Create the EM default queue group 'EM_QUEUE_GROUP_DEFAULT'
287  */
288 static em_queue_group_t default_queue_group_create(void)
289 {
290  em_queue_group_t default_qgrp;
291  queue_group_elem_t *default_qgrp_elem;
292  em_core_mask_t *mask;
293  odp_thrmask_t zero_thrmask;
294  odp_schedule_group_t odp_sched_group;
295 
296  default_qgrp = queue_group_alloc(EM_QUEUE_GROUP_DEFAULT);
297  if (unlikely(default_qgrp != EM_QUEUE_GROUP_DEFAULT))
298  return EM_QUEUE_GROUP_UNDEF; /* sanity check */
299 
300  default_qgrp_elem = queue_group_elem_get(EM_QUEUE_GROUP_DEFAULT);
301  if (unlikely(default_qgrp_elem == NULL))
302  return EM_QUEUE_GROUP_UNDEF; /* sanity check */
303 
304  mask = &default_qgrp_elem->core_mask;
305  em_core_mask_zero(mask);
307 
308  odp_thrmask_zero(&zero_thrmask);
309 
310  /*
311  * Create a new odp schedule group for the EM default queue group.
312  * Don't use the ODP_SCHED_GROUP_WORKER or other predefined ODP groups
313  * since those groups can't be modified.
314  * Create the group without any cores/threads and update it during
315  * em_init_core() -> default_queue_group_join() calls for each
316  * EM core.
317  */
318  default_qgrp_elem->odp_sched_group = ODP_SCHED_GROUP_INVALID;
319  odp_sched_group = odp_schedule_group_create(EM_QUEUE_GROUP_DEFAULT_NAME,
320  &zero_thrmask);
321  if (unlikely(odp_sched_group == ODP_SCHED_GROUP_INVALID))
322  return EM_QUEUE_GROUP_UNDEF;
323  /* Store the created odp sched group as the EM default queue group */
324  default_qgrp_elem->odp_sched_group = odp_sched_group;
325 
326  return EM_QUEUE_GROUP_DEFAULT;
327 }
328 
329 /**
330  * Update the EM default queue group with valid group information for each
331  * core local init and add the ODP thread-id to the scheduling mask.
332  * Run by each call to em_init_core().
333  */
334 static em_queue_group_t default_queue_group_join(void)
335 {
336  queue_group_elem_t *default_qgrp_elem;
337  odp_thrmask_t odp_joinmask;
338  const int core_id = em_locm.core_id;
339  const int odp_thr = odp_thread_id();
340  int ret;
341 
342  default_qgrp_elem = queue_group_elem_get(EM_QUEUE_GROUP_DEFAULT);
343  if (unlikely(!default_qgrp_elem))
344  return EM_QUEUE_GROUP_UNDEF;
345 
346  /* Set this thread in the odp schedule group join-mask */
347  odp_thrmask_zero(&odp_joinmask);
348  odp_thrmask_set(&odp_joinmask, odp_thr);
349 
350  env_spinlock_lock(&default_qgrp_elem->lock);
351  em_core_mask_set(core_id, &default_qgrp_elem->core_mask);
352  /* Join this thread into the "EM default" schedule group */
353  ret = odp_schedule_group_join(default_qgrp_elem->odp_sched_group,
354  &odp_joinmask);
355  env_spinlock_unlock(&default_qgrp_elem->lock);
356 
357  if (unlikely(ret))
358  return EM_QUEUE_GROUP_UNDEF;
359 
360  return EM_QUEUE_GROUP_DEFAULT;
361 }
362 
363 /**
364  * @brief The calling core joins all available queue groups
365  *
366  * Main use case for em_term(): to be able to flush the scheduler with only the
367  * last EM-core running we need to modify all queue groups to include this last
368  * core in the queue groups' core masks
369  */
371 {
372  em_queue_group_t qgrp = em_queue_group_get_first(NULL);
373  const int core_id = em_locm.core_id;
374 
375  while (qgrp != EM_QUEUE_GROUP_UNDEF) {
376  queue_group_elem_t *qgrp_elem = queue_group_elem_get(qgrp);
377 
378  env_spinlock_lock(&qgrp_elem->lock);
379 
380  int allocated = queue_group_allocated(qgrp_elem);
381  bool ongoing_delete = qgrp_elem->ongoing_delete;
382 
383  if (allocated && !ongoing_delete &&
384  !em_core_mask_isset(core_id, &qgrp_elem->core_mask)) {
385  em_core_mask_set(core_id, &qgrp_elem->core_mask);
386  q_grp_add_core(qgrp_elem);
387  }
388  env_spinlock_unlock(&qgrp_elem->lock);
389 
390  qgrp = em_queue_group_get_next();
391  }
392 }
393 
394 static em_status_t core_queue_groups_create(void)
395 {
396  em_queue_group_t qgrp;
397  em_queue_group_t qgrp_req;
398  queue_group_elem_t *qgrp_elem;
399  em_core_mask_t *mask;
400  odp_thrmask_t zero_thrmask;
401  odp_schedule_group_t odp_sched_group;
402  const int num_cores = em_core_count();
403  char qgrp_name[EM_QUEUE_GROUP_NAME_LEN];
404 
405  for (int i = 0; i < num_cores; i++) {
406  qgrp_req = qgrp_idx2hdl(i);
407  qgrp = queue_group_alloc(qgrp_req);
408  if (unlikely(qgrp == EM_QUEUE_GROUP_UNDEF || qgrp != qgrp_req)) {
409  EM_DBG("queue_group_alloc() fails for core-qgrp:%d\n", i);
410  return EM_ERR_ALLOC_FAILED;
411  }
412 
413  qgrp_elem = queue_group_elem_get(qgrp);
414  if (unlikely(qgrp_elem == NULL)) {
415  EM_DBG("qgrp_elem NULL for core-qgrp:%d\n", i);
416  return EM_ERR_BAD_POINTER;
417  }
418 
419  mask = &qgrp_elem->core_mask;
420  em_core_mask_zero(mask);
421  em_core_mask_set(i, mask);
422 
423  odp_thrmask_zero(&zero_thrmask);
424 
425  /*
426  * Create a new odp schedule group for each EM core.
427  * Create the group without the core/thread set and update it
428  * during em_init_core() -> core_queue_group_join()
429  * calls for each EM core.
430  */
431  qgrp_elem->odp_sched_group = ODP_SCHED_GROUP_INVALID;
432  core_queue_grp_name(i/*core*/, qgrp_name/*out*/,
433  sizeof(qgrp_name));
434  odp_sched_group = odp_schedule_group_create(qgrp_name,
435  &zero_thrmask);
436  if (unlikely(odp_sched_group == ODP_SCHED_GROUP_INVALID)) {
437  EM_DBG("odp_schedule_group_create() fails for core-qgrp:%d\n", i);
438  return EM_ERR_LIB_FAILED;
439  }
440  /* Store the created odp sched group for this EM queue group */
441  qgrp_elem->odp_sched_group = odp_sched_group;
442  }
443 
444  return EM_OK;
445 }
446 
447 static em_status_t core_queue_group_join(void)
448 {
449  char qgrp_name[EM_QUEUE_GROUP_NAME_LEN];
450  int core = em_core_id();
451  const int odp_thr = odp_thread_id();
452 
453  core_queue_grp_name(core, qgrp_name/*out*/, sizeof(qgrp_name));
454 
455  em_queue_group_t qgrp = em_queue_group_find(qgrp_name);
456 
457  if (unlikely(qgrp == EM_QUEUE_GROUP_UNDEF)) {
458  EM_DBG("%s(): core:%d, %s not found", __func__, core, qgrp_name);
459  return EM_ERR_NOT_FOUND;
460  }
461 
462  queue_group_elem_t *qgrp_elem = queue_group_elem_get(qgrp);
463 
464  if (unlikely(!qgrp_elem)) {
465  EM_DBG("%s(): qgrp_elem NULL for core-qgrp:%d\n",
466  __func__, core);
467  return EM_ERR_BAD_POINTER;
468  }
469 
470  /* Set this thread in the odp schedule group join-mask */
471  odp_thrmask_t odp_joinmask;
472 
473  odp_thrmask_zero(&odp_joinmask);
474  odp_thrmask_set(&odp_joinmask, odp_thr);
475 
476  env_spinlock_lock(&qgrp_elem->lock);
477  /* Join this thread into the core-local schedule group */
478  int ret = odp_schedule_group_join(qgrp_elem->odp_sched_group,
479  &odp_joinmask);
480  env_spinlock_unlock(&qgrp_elem->lock);
481 
482  if (unlikely(ret)) {
483  EM_DBG("%s(): odp_schedule_group_join():%d, core-qgrp:%d\n",
484  __func__, ret, core);
485  return EM_ERR_LIB_FAILED;
486  }
487 
488  return EM_OK;
489 }
490 
491 /**
492  * Allow creating a queue group with a specific handle if requested and
493  * available, use EM_QUEUE_GROUP_UNDEF to take any free handle.
494  * Called from queue_group_create() and queue_group_create_sync() with an
495  * appropriate escope.
496  */
497 static em_queue_group_t
498 queue_group_create_escope(const char *name, const em_core_mask_t *mask,
499  int num_notif, const em_notif_t notif_tbl[],
500  em_queue_group_t requested_queue_group,
501  em_escope_t escope)
502 {
503  em_queue_group_t queue_group;
504  queue_group_elem_t *qgrp_elem;
505  odp_schedule_group_t odp_sched_group;
506  odp_thrmask_t zero_thrmask;
507  em_status_t stat;
508  em_core_mask_t add_mask;
509  em_core_mask_t rem_zero_mask;
510  const int core = em_core_id();
511 
512  odp_thrmask_zero(&zero_thrmask);
513  em_core_mask_zero(&rem_zero_mask);
514  em_core_mask_zero(&add_mask);
515  em_core_mask_copy(&add_mask, mask);
516 
517  /*
518  * Allocate the queue group element,
519  * if 'requested_queue_group' == EM_QUEUE_GROUP_UNDEF take any handle.
520  */
521  queue_group = queue_group_alloc(requested_queue_group);
522  qgrp_elem = queue_group_elem_get(queue_group);
523  if (unlikely(qgrp_elem == NULL)) {
525  "Queue group alloc failed!");
526  /* No free queue group found */
527  return EM_QUEUE_GROUP_UNDEF;
528  }
529 
530  /* Create empty schedule group, each core adds itself via an add-req */
531  odp_sched_group = odp_schedule_group_create(name, &zero_thrmask);
532  if (unlikely(odp_sched_group == ODP_SCHED_GROUP_INVALID)) {
533  queue_group_free(queue_group);
535  "ODP schedule group creation failed!");
536  return EM_QUEUE_GROUP_UNDEF;
537  }
538 
539  env_spinlock_lock(&qgrp_elem->lock);
540 
541  /* Initialize the data of the newly allocated queue group */
542  qgrp_elem->odp_sched_group = odp_sched_group;
543  em_core_mask_copy(&qgrp_elem->core_mask, mask); /* set new mask */
544  list_init(&qgrp_elem->queue_list);
545  env_atomic32_init(&qgrp_elem->num_queues);
546  qgrp_elem->ongoing_delete = false;
547 
548  if (em_core_mask_isset(core, &add_mask)) {
549  em_core_mask_clr(core, &add_mask);
550  q_grp_add_core(qgrp_elem);
551  }
552 
553  if (em_core_mask_iszero(&add_mask)) {
554  if (escope == EM_ESCOPE_QUEUE_GROUP_CREATE_SYNC)
555  q_grp_create_sync_done(qgrp_elem, mask);
556  else
557  q_grp_create_done(qgrp_elem, mask);
558 
559  env_spinlock_unlock(&qgrp_elem->lock);
560 
561  stat = send_notifs(num_notif, notif_tbl);
562  if (unlikely(stat != EM_OK))
563  INTERNAL_ERROR(stat, escope, "Sending notifs failed!");
564 
565  return queue_group;
566  }
567 
568  env_spinlock_unlock(&qgrp_elem->lock);
569 
570  stat = send_qgrp_addrem_reqs(qgrp_elem, mask, &add_mask, &rem_zero_mask,
571  num_notif, notif_tbl, escope);
572  if (unlikely(stat != EM_OK))
573  INTERNAL_ERROR(stat, escope, "qgrp add/rem-req(s) send failed");
574 
575  return queue_group;
576 }
577 
578 /**
579  * Allow creating a queue group with a specific handle
580  * if requested and available.
581  */
582 em_queue_group_t
583 queue_group_create(const char *name, const em_core_mask_t *mask,
584  int num_notif, const em_notif_t notif_tbl[],
585  em_queue_group_t requested_queue_group)
586 {
587  return queue_group_create_escope(name, mask, num_notif, notif_tbl,
588  requested_queue_group,
589  EM_ESCOPE_QUEUE_GROUP_CREATE);
590 }
591 
592 /**
593  * Allow creating a queue group synchronously with a specific handle
594  * if requested and available.
595  * No need for sync blocking when creating a new queue group.
596  */
597 em_queue_group_t
598 queue_group_create_sync(const char *name, const em_core_mask_t *mask,
599  em_queue_group_t requested_queue_group)
600 {
601  return queue_group_create_escope(name, mask, 0, NULL,
602  requested_queue_group,
603  EM_ESCOPE_QUEUE_GROUP_CREATE_SYNC);
604 }
605 
606 /*
607  * queue_group_create/modify/_sync() helper:
608  * Can only set core mask bits for running cores - verify this.
609  */
611 {
612  const int core_count = em_core_count();
613  em_core_mask_t max_mask;
614  em_core_mask_t check_mask;
615 
616  /*
617  * 'mask' can contain set bits only for cores running EM,
618  * 'max_mask' contains all allowed set bits. Check that mask
619  * only contains set bits that are also found in max_mask.
620  */
621  em_core_mask_zero(&max_mask);
622  em_core_mask_set_count(core_count, &max_mask);
623  em_core_mask_or(&check_mask, mask, &max_mask);
624 
625  if (unlikely(!em_core_mask_equal(&check_mask, &max_mask)))
626  return EM_ERR_TOO_LARGE;
627 
628  return EM_OK;
629 }
630 
631 /*
632  * queue_group_modify/_sync() helper: check Queue Group state
633  */
634 static em_status_t
635 check_qgrp_state(const queue_group_elem_t *qgrp_elem, bool is_delete,
636  const char **err_str/*out*/)
637 {
638  if (unlikely(!queue_group_allocated(qgrp_elem))) {
639  *err_str = "Queue group not allocated";
640  return EM_ERR_BAD_ID;
641  }
642  if (unlikely(qgrp_elem->ongoing_delete)) {
643  *err_str = "Contending queue group delete ongoing";
644  return EM_ERR_BAD_STATE;
645  }
646  if (unlikely(is_delete && !list_is_empty(&qgrp_elem->queue_list))) {
647  *err_str = "Queue group contains queues, cannot delete group";
648  return EM_ERR_NOT_FREE;
649  }
650 
651  return EM_OK;
652 }
653 
654 /*
655  * queue_group_modify/_sync() helper: count cores to be added to the queue group
656  */
657 static int count_qgrp_adds(const em_core_mask_t *old_mask,
658  const em_core_mask_t *new_mask,
659  em_core_mask_t *add_mask /*out*/)
660 {
661  int core_count = em_core_count();
662  int adds = 0;
663 
664  em_core_mask_zero(add_mask);
665 
666  /* Count added cores */
667  for (int i = 0; i < core_count; i++) {
668  if (!em_core_mask_isset(i, old_mask) &&
669  em_core_mask_isset(i, new_mask)) {
670  em_core_mask_set(i, add_mask);
671  adds++;
672  }
673  }
674 
675  return adds;
676 }
677 
678 /*
679  * queue_group_modify/_sync() helper: count cores to be removed from the queue group
680  */
681 static int count_qgrp_rems(const em_core_mask_t *old_mask,
682  const em_core_mask_t *new_mask,
683  em_core_mask_t *rem_mask /*out*/)
684 {
685  int core_count = em_core_count();
686  int rems = 0;
687 
688  em_core_mask_zero(rem_mask);
689 
690  /* Count removed cores */
691  for (int i = 0; i < core_count; i++) {
692  if (em_core_mask_isset(i, old_mask) &&
693  !em_core_mask_isset(i, new_mask)) {
694  em_core_mask_set(i, rem_mask);
695  rems++;
696  }
697  }
698 
699  return rems;
700 }
701 
702 /**
703  * @brief send_qgrp_addrem_reqs() helper: free unsent add/rem req events
704  */
705 static void addrem_events_free(em_event_t add_events[], int add_count,
706  em_event_t rem_events[], int rem_count)
707 {
708  for (int i = 0; i < add_count; i++) {
709  if (add_events[i] != EM_EVENT_UNDEF)
710  em_free(add_events[i]);
711  }
712  for (int i = 0; i < rem_count; i++) {
713  if (rem_events[i] != EM_EVENT_UNDEF)
714  em_free(rem_events[i]);
715  }
716 }
717 
718 /**
719  * @brief send_qgrp_addrem_reqs() helper: send add or rem req events to cores
720  * in 'mask', send with an event group to trigger 'done' notification.
721  *
722  * Mark each sent event in the array as 'undef' to help detect sent vs. unsent
723  */
724 static em_status_t send_addrem_events(em_event_t addrem_events[],
725  const em_core_mask_t *mask,
726  em_event_group_t event_group)
727 {
728  const int core_count = em_core_count();
729  const int first_qidx = queue_id2idx(FIRST_INTERNAL_UNSCHED_QUEUE);
730  int ev_idx = 0;
731  em_status_t err;
732 
733  for (int i = 0; i < core_count; i++) {
734  if (em_core_mask_isset(i, mask)) {
735  /*
736  * Send a add/rem-req to each core-specific queue,
737  * track completion using an event group.
738  */
739  err = em_send_group(addrem_events[ev_idx],
740  queue_idx2hdl(first_qidx + i),
741  event_group);
742  if (unlikely(err != EM_OK))
743  return err;
744  addrem_events[ev_idx] = EM_EVENT_UNDEF;
745  ev_idx++;
746  }
747  }
748 
749  return EM_OK;
750 }
751 
752 /**
753  * @brief send_qgrp_addrem_reqs() helper: create the add/rem req events
754  */
755 static int create_addrem_events(em_event_t addrem_events[/*out*/], int count,
756  uint64_t ev_id, em_queue_group_t queue_group)
757 {
758  internal_event_t *i_event;
759 
760  if (unlikely(count < 1))
761  return 0;
762 
763  addrem_events[0] = em_alloc(sizeof(internal_event_t),
765  if (unlikely(addrem_events[0] == EM_EVENT_UNDEF))
766  return 0;
767 
768  /* Init the QUEUE_GROUP_ADD_REQ internal ctrl event(s) */
769  i_event = em_event_pointer(addrem_events[0]);
770  i_event->id = ev_id;
771  i_event->q_grp.queue_group = queue_group;
772 
773  for (int i = 1; i < count; i++) {
774  addrem_events[i] = em_event_clone(addrem_events[0],
775  EM_POOL_UNDEF);
776  if (unlikely(addrem_events[i] == EM_EVENT_UNDEF))
777  return i;
778  }
779 
780  return count;
781 }
782 
783 /**
784  * @brief send_qgrp_addrem_reqs() helper: set callback based on err-scope (=id)
785  */
786 static int
787 set_qgrp_done_func(em_escope_t escope,
788  void (**f_done_callback)(void *arg_ptr) /*out*/,
789  bool *sync_operation /*out*/)
790 {
791  *sync_operation = false;
792 
793  switch (escope) {
794  case EM_ESCOPE_QUEUE_GROUP_CREATE:
795  *f_done_callback = q_grp_create_done_callback;
796  break;
797  case EM_ESCOPE_QUEUE_GROUP_CREATE_SYNC:
798  *f_done_callback = q_grp_create_sync_done_callback;
799  *sync_operation = true;
800  break;
801  case EM_ESCOPE_QUEUE_GROUP_MODIFY:
802  *f_done_callback = q_grp_modify_done_callback;
803  break;
804  case EM_ESCOPE_QUEUE_GROUP_MODIFY_SYNC:
805  *f_done_callback = q_grp_modify_sync_done_callback;
806  *sync_operation = true;
807  break;
808  case EM_ESCOPE_QUEUE_GROUP_DELETE:
809  *f_done_callback = q_grp_delete_done_callback;
810  break;
811  case EM_ESCOPE_QUEUE_GROUP_DELETE_SYNC:
812  *f_done_callback = q_grp_delete_sync_done_callback;
813  *sync_operation = true;
814  break;
815  default:
816  *f_done_callback = NULL;
817  return -1;
818  }
819 
820  return 0;
821 }
822 
823 /**
824  * @brief queue_group_create/modify/_sync() helper: send qgrp addrem-req events to cores
825  */
826 static em_status_t
827 send_qgrp_addrem_reqs(queue_group_elem_t *qgrp_elem,
828  const em_core_mask_t *new_mask,
829  const em_core_mask_t *add_mask,
830  const em_core_mask_t *rem_mask,
831  int num_notif, const em_notif_t notif_tbl[],
832  em_escope_t escope)
833 {
834  const em_queue_group_t queue_group = qgrp_elem->queue_group;
835  const int add_count = em_core_mask_count(add_mask);
836  const int rem_count = em_core_mask_count(rem_mask);
837  const int addrem_count = add_count + rem_count; /* Subset of cores*/
838  em_event_t add_events[add_count];
839  em_event_t rem_events[rem_count];
840  em_event_group_t event_group;
841  em_status_t err;
842  int cnt;
843  int ret;
844 
845  em_event_t callback_args_event =
848  if (unlikely(callback_args_event == EM_EVENT_UNDEF))
849  return EM_ERR_ALLOC_FAILED;
850 
851  /* Init the 'done'-callback function arguments */
852  q_grp_done_callback_args_t *callback_args =
853  em_event_pointer(callback_args_event);
854  callback_args->qgrp_elem = qgrp_elem;
855  em_core_mask_copy(&callback_args->new_mask, new_mask);
856 
857  /*
858  * Set the 'qgrp operation done'-callback func based on given
859  * escope (identifies operation).
860  * f_done_callback(f_done_arg_ptr)
861  */
862  void (*f_done_callback)(void *arg_ptr);
863  void *f_done_arg_ptr = callback_args_event;
864  bool sync_operation = false;
865 
866  ret = set_qgrp_done_func(escope, &f_done_callback/*out*/,
867  &sync_operation/*out*/);
868  if (unlikely(ret)) {
869  em_free(callback_args_event);
870  return EM_ERR_NOT_FOUND;
871  }
872 
873  /*
874  * Create an event group to track completion of all sent add/rem-reqs.
875  * Set up notifications to be sent when all cores are done handling the
876  * queue group add/rem-reqs.
877  */
878  event_group = internal_done_w_notif_req(addrem_count,
879  f_done_callback, f_done_arg_ptr,
880  num_notif, notif_tbl,
881  sync_operation);
882  if (unlikely(event_group == EM_EVENT_GROUP_UNDEF)) {
883  em_free(callback_args_event);
884  return EM_ERR_NOT_FREE;
885  }
886 
887  for (int i = 0; i < add_count; i++)
888  add_events[i] = EM_EVENT_UNDEF;
889  for (int i = 0; i < rem_count; i++)
890  rem_events[i] = EM_EVENT_UNDEF;
891 
892  /* Create internal events for queue group add-reqs */
893  if (add_count) {
894  cnt = create_addrem_events(add_events /*out*/, add_count,
895  QUEUE_GROUP_ADD_REQ, queue_group);
896  if (unlikely(cnt != add_count))
897  goto err_free_resources;
898  }
899  /* Create internal events for queue group rem-reqs */
900  if (rem_count) {
901  cnt = create_addrem_events(rem_events /*out*/, rem_count,
902  QUEUE_GROUP_REM_REQ, queue_group);
903  if (unlikely(cnt != rem_count))
904  goto err_free_resources;
905  }
906 
907  /*
908  * Send rem-req events to the concerned cores
909  */
910  err = send_addrem_events(rem_events, rem_mask, event_group);
911  if (unlikely(err != EM_OK))
912  goto err_free_resources;
913  /*
914  * Send add-req events to the concerned cores
915  */
916  err = send_addrem_events(add_events, add_mask, event_group);
917  if (unlikely(err != EM_OK))
918  goto err_free_resources;
919 
920  return EM_OK;
921 
922 err_free_resources:
923  addrem_events_free(add_events, add_count,
924  rem_events, rem_count);
925  evgrp_abort_delete(event_group);
926  em_free(callback_args_event);
927 
929 }
930 
931 /**
932  * Called by em_queue_group_modify with flag is_delete=0 and by
933  * em_queue_group_delete() with flag is_delete=1
934  *
935  * @param qgrp_elem Queue group element
936  * @param new_mask New core mask
937  * @param num_notif Number of entries in notif_tbl (0 for no notification)
938  * @param notif_tbl Array of notifications to send as the operation completes
939  * @param is_delete Is this modify triggered by em_queue_group_delete()?
940  */
943  const em_core_mask_t *new_mask,
944  int num_notif, const em_notif_t notif_tbl[],
945  bool is_delete)
946 {
947  em_status_t err;
948  const char *err_str = "";
949  const em_escope_t escope = is_delete ? EM_ESCOPE_QUEUE_GROUP_DELETE :
950  EM_ESCOPE_QUEUE_GROUP_MODIFY;
951  const int core = em_core_id();
952 
953  env_spinlock_lock(&qgrp_elem->lock);
954 
955  /* Check Queue Group state */
956  err = check_qgrp_state(qgrp_elem, is_delete, &err_str/*out*/);
957  if (unlikely(err != EM_OK)) {
958  env_spinlock_unlock(&qgrp_elem->lock);
959  return INTERNAL_ERROR(err, escope, err_str);
960  }
961 
962  em_core_mask_t old_mask;
963 
964  /* store previous mask */
965  em_core_mask_copy(&old_mask, &qgrp_elem->core_mask);
966  /* update with new_mask */
967  em_core_mask_copy(&qgrp_elem->core_mask, new_mask);
968 
969  /* Count added & removed cores */
970  em_core_mask_t add_mask;
971  em_core_mask_t rem_mask;
972  int adds = count_qgrp_adds(&old_mask, new_mask, &add_mask /*out*/);
973  int rems = count_qgrp_rems(&old_mask, new_mask, &rem_mask /*out*/);
974  /*
975  * Remove the calling core from the add-mask. The core adds itself.
976  * Don't do the same for the rem-mask: we want to send a rem-event to
977  * this core to ensure the core is not currently processing from that
978  * queue-group.
979  */
980  if (adds > 0 && em_core_mask_isset(core, &add_mask)) {
981  em_core_mask_clr(core, &add_mask);
982  adds--;
983  q_grp_add_core(qgrp_elem);
984  }
985 
986  /*
987  * If the new mask is equal to the one in use:
988  * send notifs immediately and return.
989  */
990  if (em_core_mask_equal(&old_mask, new_mask) || (adds == 0 && rems == 0)) {
991  /* New mask == curr mask, or both zero, send notifs & return */
992  if (is_delete)
993  q_grp_delete_done(qgrp_elem, new_mask);
994  else
995  q_grp_modify_done(qgrp_elem, new_mask);
996 
997  env_spinlock_unlock(&qgrp_elem->lock);
998 
999  err = send_notifs(num_notif, notif_tbl);
1000  RETURN_ERROR_IF(err != EM_OK, err, escope,
1001  "notif sending failed");
1002  return EM_OK;
1003  }
1004 
1005  /* Catch contending queue group operations while delete is ongoing */
1006  if (is_delete)
1007  qgrp_elem->ongoing_delete = true;
1008 
1009  env_spinlock_unlock(&qgrp_elem->lock);
1010 
1011  /*
1012  * Send add/rem-req events to all other concerned cores.
1013  * Note: if .ongoing_delete = true:
1014  * Treat errors as EM_FATAL because failures will leave
1015  * .ongoing_delete = true for the group until restart of EM.
1016  */
1017  err = send_qgrp_addrem_reqs(qgrp_elem, new_mask, &add_mask, &rem_mask,
1018  num_notif, notif_tbl, escope);
1019  RETURN_ERROR_IF(err != EM_OK, is_delete ? EM_FATAL(err) : err, escope,
1020  "qgrp rem req(s) sending failed");
1021 
1022  return EM_OK;
1023 }
1024 
1025 /**
1026  * Called by em_queue_group_modify_sync with flag is_delete=0 and by
1027  * em_queue_group_delete_sync() with flag is_delete=1
1028  *
1029  * @param qgrp_elem Queue group element
1030  * @param new_mask New core mask
1031  * @param is_delete Is this modify triggered by em_queue_group_delete_sync()?
1032  */
1035  const em_core_mask_t *new_mask, bool is_delete)
1036 {
1037  em_locm_t *const locm = &em_locm;
1038  const em_queue_group_t queue_group = qgrp_elem->queue_group;
1039  em_status_t err = EM_OK;
1040  const char *err_str = "";
1041  const em_escope_t escope = is_delete ? EM_ESCOPE_QUEUE_GROUP_DELETE_SYNC
1042  : EM_ESCOPE_QUEUE_GROUP_MODIFY_SYNC;
1043  const int core = em_core_id();
1044 
1045  /* Mark that a sync-API call is in progress */
1046  locm->sync_api.in_progress = true;
1047 
1048  env_spinlock_lock(&qgrp_elem->lock);
1049 
1050  /* Check Queue Group state */
1051  err = check_qgrp_state(qgrp_elem, is_delete, &err_str/*out*/);
1052  if (unlikely(err != EM_OK)) {
1053  env_spinlock_unlock(&qgrp_elem->lock);
1054  goto queue_group_modify_sync_error;
1055  }
1056 
1057  em_core_mask_t old_mask;
1058 
1059  /* store previous mask */
1060  em_core_mask_copy(&old_mask, &qgrp_elem->core_mask);
1061  /* update with new_mask */
1062  em_core_mask_copy(&qgrp_elem->core_mask, new_mask);
1063 
1064  if (em_core_mask_equal(&old_mask, new_mask)) {
1065  /* New mask == curr mask, or both zero */
1066  if (is_delete)
1067  q_grp_delete_done(qgrp_elem, new_mask);
1068 
1069  env_spinlock_unlock(&qgrp_elem->lock);
1070 
1071  err = EM_OK;
1072  goto queue_group_modify_sync_error; /* no error, just return */
1073  }
1074 
1075  /* Catch contending queue group operations while delete is ongoing */
1076  if (is_delete)
1077  qgrp_elem->ongoing_delete = true;
1078 
1079  /* Count added & removed cores */
1080  em_core_mask_t add_mask;
1081  em_core_mask_t rem_mask;
1082  int adds = count_qgrp_adds(&old_mask, new_mask, &add_mask /*out*/);
1083  int rems = count_qgrp_rems(&old_mask, new_mask, &rem_mask /*out*/);
1084 
1085  /*
1086  * Remove the calling core from the add/rem-mask and -count since no
1087  * add/rem-req event should be sent to it during this _sync operation.
1088  */
1089  if (adds > 0 && em_core_mask_isset(core, &add_mask)) {
1090  em_core_mask_clr(core, &add_mask);
1091  adds--;
1092  q_grp_add_core(qgrp_elem);
1093  }
1094  if (rems > 0 && em_core_mask_isset(core, &rem_mask)) {
1095  em_core_mask_clr(core, &rem_mask);
1096  rems--;
1097  q_grp_rem_core(qgrp_elem);
1098  }
1099 
1100  /* No cores to send rem-reqs to, mark operation done and return */
1101  if (adds == 0 && rems == 0) {
1102  if (is_delete)
1103  q_grp_delete_done(qgrp_elem, new_mask);
1104  else
1105  q_grp_modify_done(qgrp_elem, new_mask);
1106 
1107  env_spinlock_unlock(&qgrp_elem->lock);
1108  err = EM_OK;
1109  goto queue_group_modify_sync_error; /* no error, just return */
1110  }
1111 
1112  env_spinlock_unlock(&qgrp_elem->lock);
1113 
1114  /*
1115  * Send add/rem-req events to all other concerned cores.
1116  * Note: if .ongoing_delete = true:
1117  * Treat errors as EM_FATAL because failures will leave
1118  * .ongoing_delete = true for the group until restart of EM.
1119  */
1120  err = send_qgrp_addrem_reqs(qgrp_elem, new_mask, &add_mask, &rem_mask,
1121  0, NULL, escope);
1122  if (unlikely(err != EM_OK)) {
1123  if (is_delete)
1124  err = EM_FATAL(err);
1125  goto queue_group_modify_sync_error;
1126  }
1127 
1128  /*
1129  * Poll the core-local unscheduled control-queue for events.
1130  * These events request the core to do a core-local operation (or not).
1131  * Poll and handle events until 'locm->sync_api.in_progress == false'
1132  * indicating that this sync-API is 'done' on all concerned cores.
1133  */
1134  while (locm->sync_api.in_progress)
1136 
1137  return EM_OK;
1138 
1139 queue_group_modify_sync_error:
1140  locm->sync_api.in_progress = false;
1141  RETURN_ERROR_IF(err != EM_OK, err, escope,
1142  "Failure: Modify sync QGrp:%" PRI_QGRP ":%s",
1143  queue_group, err_str);
1144  return EM_OK;
1145 }
1146 
1147 /**
1148  * @brief Add the calling core to the odp schedule group that is used by
1149  * the given EM queue group.
1150  *
1151  * @param qgrp_elem Queue group element
1152  */
1153 static void q_grp_add_core(const queue_group_elem_t *qgrp_elem)
1154 {
1155  int odp_thr = odp_thread_id();
1156  odp_thrmask_t odp_joinmask;
1157 
1158  odp_thrmask_zero(&odp_joinmask);
1159  odp_thrmask_set(&odp_joinmask, odp_thr);
1160 
1161  /* Join this thread into the core-local schedule group */
1162  int ret = odp_schedule_group_join(qgrp_elem->odp_sched_group,
1163  &odp_joinmask);
1164  if (unlikely(ret)) {
1165  char mask_str[EM_CORE_MASK_STRLEN];
1166  em_queue_group_t queue_group = qgrp_elem->queue_group;
1167 
1169  &qgrp_elem->core_mask);
1170  INTERNAL_ERROR(EM_ERR_LIB_FAILED, EM_ESCOPE_QUEUE_GROUP_ADD_CORE,
1171  "QGrp ADD core%02d: odp_schedule_group_join(thr:%d):%d\n"
1172  "QueueGroup:%" PRI_QGRP " core-mask:%s",
1173  em_core_id(), odp_thr, ret, queue_group, mask_str);
1174  }
1175 }
1176 
1177 /**
1178  * @brief Remove the calling core from the odp schedule group that is used by
1179  * the given EM queue group.
1180  *
1181  * @param qgrp_elem Queue group element
1182  */
1183 static void q_grp_rem_core(const queue_group_elem_t *qgrp_elem)
1184 {
1185  int odp_thr = odp_thread_id();
1186  odp_thrmask_t odp_leavemask;
1187 
1188  odp_thrmask_zero(&odp_leavemask);
1189  odp_thrmask_set(&odp_leavemask, odp_thr);
1190 
1191  /* Join this thread into the core-local schedule group */
1192  int ret = odp_schedule_group_leave(qgrp_elem->odp_sched_group,
1193  &odp_leavemask);
1194  if (unlikely(ret)) {
1195  char mask_str[EM_CORE_MASK_STRLEN];
1196  em_queue_group_t queue_group = qgrp_elem->queue_group;
1197 
1199  &qgrp_elem->core_mask);
1200  INTERNAL_ERROR(EM_ERR_LIB_FAILED, EM_ESCOPE_QUEUE_GROUP_REM_CORE,
1201  "QGrp REM core%02d: odp_schedule_group_leave(thr:%d):%d\n"
1202  "QueueGroup:%" PRI_QGRP " core-mask:%s",
1203  em_core_id(), odp_thr, ret, queue_group, mask_str);
1204  }
1205 }
1206 
1208 {
1209  em_queue_group_t qgrp = i_ev->q_grp.queue_group;
1210  queue_group_elem_t *qgrp_elem = queue_group_elem_get(qgrp);
1211 
1212  if (unlikely(!qgrp_elem))
1213  return;
1214 
1215  env_spinlock_lock(&qgrp_elem->lock);
1216  q_grp_add_core(qgrp_elem);
1217  env_spinlock_unlock(&qgrp_elem->lock);
1218 }
1219 
1221 {
1222  em_queue_group_t qgrp = i_ev->q_grp.queue_group;
1223  queue_group_elem_t *qgrp_elem = queue_group_elem_get(qgrp);
1224 
1225  if (unlikely(!qgrp_elem))
1226  return;
1227 
1228  env_spinlock_lock(&qgrp_elem->lock);
1229  q_grp_rem_core(qgrp_elem);
1230  env_spinlock_unlock(&qgrp_elem->lock);
1231 }
1232 
1233 /**
1234  * Callback function when a em_queue_group_create()
1235  * completes with the internal DONE-event
1236  */
1237 static void q_grp_create_done_callback(void *arg_ptr)
1238 {
1239  em_event_t event = (em_event_t)arg_ptr;
1240  const q_grp_done_callback_args_t *args = em_event_pointer(event);
1241  queue_group_elem_t *const qgrp_elem = args->qgrp_elem;
1242 
1243  env_spinlock_lock(&qgrp_elem->lock);
1244  q_grp_create_done(qgrp_elem, &args->new_mask);
1245  env_spinlock_unlock(&qgrp_elem->lock);
1246 
1247  em_free(event);
1248 }
1249 
1250 /**
1251  * Callback function when a em_queue_group_create_sync()
1252  * completes with the internal DONE-event
1253  */
1254 static void q_grp_create_sync_done_callback(void *arg_ptr)
1255 {
1256  em_event_t event = (em_event_t)arg_ptr;
1257  const q_grp_done_callback_args_t *args = em_event_pointer(event);
1258  queue_group_elem_t *const qgrp_elem = args->qgrp_elem;
1259 
1260  env_spinlock_lock(&qgrp_elem->lock);
1261  q_grp_create_sync_done(qgrp_elem, &args->new_mask);
1262  env_spinlock_unlock(&qgrp_elem->lock);
1263 
1264  em_free(event);
1265 }
1266 
1267 static void q_grp_create_done(const queue_group_elem_t *const qgrp_elem,
1268  const em_core_mask_t *const new_mask)
1269 {
1270  (void)qgrp_elem;
1271  (void)new_mask;
1272 }
1273 
1274 static void q_grp_create_sync_done(const queue_group_elem_t *const qgrp_elem,
1275  const em_core_mask_t *const new_mask)
1276 {
1277  (void)qgrp_elem;
1278  (void)new_mask;
1279 }
1280 
1281 /**
1282  * Callback function when a em_queue_group_modify()
1283  * completes with the internal DONE-event
1284  */
1285 static void q_grp_modify_done_callback(void *arg_ptr)
1286 {
1287  em_event_t event = (em_event_t)arg_ptr;
1288  const q_grp_done_callback_args_t *args = em_event_pointer(event);
1289  queue_group_elem_t *const qgrp_elem = args->qgrp_elem;
1290 
1291  env_spinlock_lock(&qgrp_elem->lock);
1292  q_grp_modify_done(qgrp_elem, &args->new_mask);
1293  env_spinlock_unlock(&qgrp_elem->lock);
1294 
1295  em_free(event);
1296 }
1297 
1298 /**
1299  * Callback function when a em_queue_group_modify_sync()
1300  * completes with the internal DONE-event
1301  */
1302 static void q_grp_modify_sync_done_callback(void *arg_ptr)
1303 {
1304  em_locm_t *const locm = &em_locm;
1305 
1306  q_grp_modify_done_callback(arg_ptr);
1307 
1308  /* Enable the caller of the sync API func to proceed (on this core) */
1309  locm->sync_api.in_progress = false;
1310 }
1311 
1312 static void q_grp_modify_done(const queue_group_elem_t *const qgrp_elem,
1313  const em_core_mask_t *const new_mask)
1314 {
1315  (void)qgrp_elem;
1316  (void)new_mask;
1317 }
1318 
1319 /**
1320  * Callback function when a em_queue_group_modify(delete flag set)
1321  * completes with the internal DONE-event
1322  */
1323 static void q_grp_delete_done_callback(void *arg_ptr)
1324 {
1325  em_event_t event = (em_event_t)arg_ptr;
1326  const q_grp_done_callback_args_t *args = em_event_pointer(event);
1327  queue_group_elem_t *const qgrp_elem = args->qgrp_elem;
1328 
1329  env_spinlock_lock(&qgrp_elem->lock);
1330  q_grp_delete_done(qgrp_elem, &args->new_mask);
1331  env_spinlock_unlock(&qgrp_elem->lock);
1332 
1333  em_free(event);
1334 }
1335 
1336 /**
1337  * Callback function when a em_queue_group_modify_sync(delete flag set)
1338  * completes with the internal DONE-event
1339  */
1340 static void q_grp_delete_sync_done_callback(void *arg_ptr)
1341 {
1342  em_locm_t *const locm = &em_locm;
1343 
1344  q_grp_delete_done_callback(arg_ptr);
1345 
1346  /* Enable the caller of the sync API func to proceed (on this core) */
1347  locm->sync_api.in_progress = false;
1348 }
1349 
1350 static void q_grp_delete_done(queue_group_elem_t *const qgrp_elem,
1351  const em_core_mask_t *const new_mask)
1352 {
1353  const unsigned int num_queues = env_atomic32_get(&qgrp_elem->num_queues);
1354  const em_queue_group_t queue_group = qgrp_elem->queue_group;
1355 
1356  /* Sanity check: new core mask for delete is always zero */
1357  if (unlikely(!em_core_mask_iszero(new_mask))) {
1358  char mstr[EM_CORE_MASK_STRLEN];
1359 
1360  em_core_mask_tostr(mstr, EM_CORE_MASK_STRLEN, new_mask);
1361  INTERNAL_ERROR(EM_FATAL(EM_ERR_BAD_STATE), EM_ESCOPE_QUEUE_GROUP_DELETE,
1362  "Delete QGrp:%" PRI_QGRP " mask not zero:%s",
1363  queue_group, mstr);
1364  }
1365  /* Sanity check: grp must not have been modified since start of delete */
1366  if (unlikely(!em_core_mask_equal(&qgrp_elem->core_mask, new_mask))) {
1367  char mstr1[EM_CORE_MASK_STRLEN];
1368  char mstr2[EM_CORE_MASK_STRLEN];
1369 
1370  em_core_mask_tostr(mstr1, EM_CORE_MASK_STRLEN, &qgrp_elem->core_mask);
1371  em_core_mask_tostr(mstr2, EM_CORE_MASK_STRLEN, new_mask);
1372  INTERNAL_ERROR(EM_FATAL(EM_ERR_BAD_STATE), EM_ESCOPE_QUEUE_GROUP_DELETE,
1373  "Delete QGrp:%" PRI_QGRP ", masks modified during delete:%s vs. %s",
1374  queue_group, mstr1, mstr2);
1375  }
1376 
1377  if (unlikely(!list_is_empty(&qgrp_elem->queue_list) || num_queues))
1378  INTERNAL_ERROR(EM_FATAL(EM_ERR_NOT_FREE), EM_ESCOPE_QUEUE_GROUP_DELETE,
1379  "Delete QGrp:%" PRI_QGRP ", contains %u queues, cannot delete!",
1380  queue_group, num_queues);
1381 
1382  int ret = odp_schedule_group_destroy(qgrp_elem->odp_sched_group);
1383 
1384  if (unlikely(ret != 0))
1385  INTERNAL_ERROR(EM_FATAL(EM_ERR_LIB_FAILED), EM_ESCOPE_QUEUE_GROUP_DELETE,
1386  "Delete QGrp:%" PRI_QGRP ", ODP sched grp destroy fails:%d",
1387  queue_group, ret);
1388 
1389  qgrp_elem->odp_sched_group = ODP_SCHED_GROUP_INVALID;
1390  qgrp_elem->ongoing_delete = false;
1391 
1392  /* Free the queue group */
1393  queue_group_free(qgrp_elem->queue_group);
1394 }
1395 
1396 void queue_group_add_queue_list(queue_group_elem_t *const queue_group_elem,
1397  queue_elem_t *const queue_elem)
1398 {
1399  env_spinlock_lock(&queue_group_elem->lock);
1400  list_add(&queue_group_elem->queue_list, &queue_elem->qgrp_node);
1401  env_atomic32_inc(&queue_group_elem->num_queues);
1402  env_spinlock_unlock(&queue_group_elem->lock);
1403 }
1404 
1405 void queue_group_rem_queue_list(queue_group_elem_t *const queue_group_elem,
1406  queue_elem_t *const queue_elem)
1407 {
1408  env_spinlock_lock(&queue_group_elem->lock);
1409  if (!list_is_empty(&queue_group_elem->queue_list)) {
1410  list_rem(&queue_group_elem->queue_list, &queue_elem->qgrp_node);
1411  env_atomic32_dec(&queue_group_elem->num_queues);
1412  }
1413  env_spinlock_unlock(&queue_group_elem->lock);
1414 }
1415 
1416 unsigned int queue_group_count(void)
1417 {
1418  return env_atomic32_get(&em_shm->queue_group_count);
1419 }
1420 
1421 #define QGRP_INFO_HDR_STR \
1422 "EM Queue group(s):%2u\n" \
1423 "ID Name EM-mask Cpumask " \
1424 " ODP-mask Q-num\n" \
1425 "------------------------------------------------------------------------------" \
1426 "------------------------------\n" \
1427 "%s\n"
1428 
1429 /* Info len (in bytes) per queue group, calculated from QGRP_INFO_FMT */
1430 #define QGRP_INFO_LEN (108 + 1 /* Terminating null byte */)
1431 #define QGRP_INFO_FMT "%-10" PRI_QGRP "%-32s%-20s%-20s%-20s%-5d\n" /*108 bytes*/
1432 
1433 static void queue_group_info_str(em_queue_group_t queue_group,
1434  char qgrp_info_str[/*out*/])
1435 {
1436  em_core_mask_t core_mask;
1437  odp_thrmask_t odp_thrmask;
1438  odp_cpumask_t odp_cpumask;
1439  char qgrp_name[EM_QUEUE_GROUP_NAME_LEN];
1440  char em_mask_str[EM_CORE_MASK_STRLEN];
1441  char odp_thrmask_str[ODP_THRMASK_STR_SIZE];
1442  char odp_cpumask_str[ODP_CPUMASK_STR_SIZE];
1443  em_status_t err;
1444  int ret;
1445  int len = 0;
1446 
1447  const queue_group_elem_t *qgrp_elem = queue_group_elem_get(queue_group);
1448 
1449  if (unlikely(!qgrp_elem || !queue_group_allocated(qgrp_elem)))
1450  goto info_print_err;
1451 
1452  em_queue_group_get_name(queue_group, qgrp_name, sizeof(qgrp_name));
1453  err = em_queue_group_get_mask(queue_group, &core_mask);
1454  if (unlikely(err != EM_OK))
1455  goto info_print_err;
1456  em_core_mask_tostr(em_mask_str, sizeof(em_mask_str), &core_mask);
1457 
1458  /* ODP thread mask */
1459  ret = odp_schedule_group_thrmask(qgrp_elem->odp_sched_group,
1460  &odp_thrmask /*out*/);
1461  if (unlikely(ret))
1462  goto info_print_err;
1463  ret = odp_thrmask_to_str(&odp_thrmask, odp_thrmask_str,
1464  sizeof(odp_thrmask_str));
1465  if (unlikely(ret <= 0))
1466  goto info_print_err;
1467  odp_thrmask_str[ret - 1] = '\0';
1468 
1469  /* Physical mask */
1470  mask_em2phys(&core_mask, &odp_cpumask /*out*/);
1471  ret = odp_cpumask_to_str(&odp_cpumask, odp_cpumask_str,
1472  sizeof(odp_cpumask_str));
1473  if (unlikely(ret <= 0))
1474  goto info_print_err;
1475  odp_cpumask_str[ret - 1] = '\0';
1476 
1477  len = snprintf(qgrp_info_str, QGRP_INFO_LEN, QGRP_INFO_FMT,
1478  queue_group, qgrp_name, em_mask_str,
1479  odp_cpumask_str, odp_thrmask_str,
1480  env_atomic32_get(&qgrp_elem->num_queues));
1481 
1482  qgrp_info_str[len] = '\0';
1483  return;
1484 
1485 info_print_err:
1486  len = snprintf(qgrp_info_str, QGRP_INFO_LEN, QGRP_INFO_FMT,
1487  queue_group, "err:n/a", "n/a", "n/a", "n/a", 0);
1488  qgrp_info_str[len] = '\0';
1489 }
1490 
1492 {
1493  em_queue_group_t qgrp;
1494  unsigned int qgrp_num;
1495  char single_qgrp_info_str[QGRP_INFO_LEN];
1496  int len = 0;
1497  int n_print = 0;
1498 
1499  qgrp = em_queue_group_get_first(&qgrp_num);
1500 
1501  /*
1502  * qgrp_num may not match the amount of queue groups actually returned
1503  * by iterating using em_queue_group_get_next() if queue groups are added
1504  * or removed in parallel by another core. Thus space for 10 extra queue
1505  * groups is reserved. If more than 10 queue groups are added by other
1506  * cores in parallel, we print only information of the (qgrp_num + 10)
1507  * queue groups.
1508  *
1509  * The extra 1 byte is reserved for the terminating null byte.
1510  */
1511  const int all_qgrp_info_str_len = (qgrp_num + 10) * QGRP_INFO_LEN + 1;
1512  char all_qgrp_info_str[all_qgrp_info_str_len];
1513 
1514  while (qgrp != EM_QUEUE_GROUP_UNDEF) {
1515  queue_group_info_str(qgrp, single_qgrp_info_str);
1516 
1517  n_print = snprintf(all_qgrp_info_str + len,
1518  all_qgrp_info_str_len - len,
1519  "%s", single_qgrp_info_str);
1520 
1521  /* Not enough space to hold more queue group info */
1522  if (n_print >= all_qgrp_info_str_len - len)
1523  break;
1524 
1525  len += n_print;
1526  qgrp = em_queue_group_get_next();
1527  }
1528 
1529  /* No EM queue group */
1530  if (len == 0) {
1531  EM_PRINT("No EM queue group!\n");
1532  return;
1533  }
1534 
1535  /*
1536  * To prevent printing incomplete information of the last queue group
1537  * when there is not enough space to hold all queue group info.
1538  */
1539  all_qgrp_info_str[len] = '\0';
1540  EM_PRINT(QGRP_INFO_HDR_STR, qgrp_num, all_qgrp_info_str);
1541 }
1542 
1543 #define QGRO_QUEUE_INFO_HDR_STR \
1544 "Queue group %" PRI_QGRP "(%s) has %d queue(s):\n\n" \
1545 "Id Name Priority Type State Ctx\n" \
1546 "--------------------------------------------------------------------------\n" \
1547 "%s\n"
1548 
1549 /* Info len (in bytes) per queue group queue, calculated from QGRP_Q_INFO_FMT */
1550 #define QGRP_Q_LEN 75
1551 #define QGRP_Q_INFO_FMT "%-10" PRI_QUEUE "%-32s%-10d%-10s%-9s%-3c\n" /*75 bytes*/
1552 
1553 void queue_group_queues_print(em_queue_group_t qgrp)
1554 {
1555  unsigned int q_num;
1556  em_queue_t qgrp_queue;
1557  const queue_elem_t *q_elem;
1558  char qgrp_name[EM_QUEUE_GROUP_NAME_LEN];
1559  char q_name[EM_QUEUE_NAME_LEN];
1560  int len = 0;
1561  int n_print = 0;
1562 
1563  const queue_group_elem_t *qgrp_elem = queue_group_elem_get(qgrp);
1564 
1565  if (unlikely(!qgrp_elem || !queue_group_allocated(qgrp_elem))) {
1566  EM_PRINT("Queue group %" PRI_QGRP " is not created!\n", qgrp);
1567  return;
1568  }
1569 
1570  em_queue_group_get_name(qgrp, qgrp_name, sizeof(qgrp_name));
1571  qgrp_queue = em_queue_group_queue_get_first(&q_num, qgrp);
1572 
1573  /*
1574  * q_num may not match the amount of queues actually returned by iterating
1575  * using em_queue_group_queue_get_next() if queues are added or removed
1576  * in parallel by another core. Thus space for 10 extra queues is reserved.
1577  * If more than 10 extra queues are added to this queue group by other
1578  * cores in parallel, we print only information of the (q_num + 10) queues.
1579  *
1580  * The extra 1 byte is reserved for the terminating null byte.
1581  */
1582  const int q_info_len = (q_num + 10) * QGRP_Q_LEN + 1;
1583  char q_info_str[q_info_len];
1584 
1585  while (qgrp_queue != EM_QUEUE_UNDEF) {
1586  q_elem = queue_elem_get(qgrp_queue);
1587 
1588  if (unlikely(q_elem == NULL || !queue_allocated(q_elem))) {
1589  qgrp_queue = em_queue_group_queue_get_next();
1590  continue;
1591  }
1592 
1593  queue_get_name(q_elem, q_name, EM_QUEUE_NAME_LEN - 1);
1594 
1595  n_print = snprintf(q_info_str + len, q_info_len - len,
1596  QGRP_Q_INFO_FMT, qgrp_queue, q_name,
1597  q_elem->priority,
1598  queue_get_type_str(q_elem->type),
1599  queue_get_state_str(q_elem->state),
1600  q_elem->context ? 'Y' : 'N');
1601 
1602  /* Not enough space to hold more queue info */
1603  if (n_print >= q_info_len - len)
1604  break;
1605 
1606  len += n_print;
1607  qgrp_queue = em_queue_group_queue_get_next();
1608  }
1609 
1610  /* No queue belonging to the queue group */
1611  if (!len) {
1612  EM_PRINT("Queue group %" PRI_QGRP "(%s) has no queue!\n",
1613  qgrp, qgrp_name);
1614  return;
1615  }
1616 
1617  /*
1618  * To prevent printing incomplete information of the last queue when
1619  * there is not enough space to hold all queue info.
1620  */
1621  q_info_str[len] = '\0';
1622  EM_PRINT(QGRO_QUEUE_INFO_HDR_STR, qgrp, qgrp_name, q_num, q_info_str);
1623 }
q_grp_done_callback_args_t
Definition: em_queue_group.c:37
EM_QUEUE_GROUP_UNDEF
#define EM_QUEUE_GROUP_UNDEF
Definition: event_machine_types.h:127
queue_get_state_str
const char * queue_get_state_str(queue_state_t state)
Definition: em_queue.c:1673
em_core_mask_set_count
void em_core_mask_set_count(int count, em_core_mask_t *mask)
Definition: event_machine_hw_specific.c:76
EM_OK
#define EM_OK
Definition: event_machine_types.h:329
EM_EVENT_TYPE_SW
@ EM_EVENT_TYPE_SW
Definition: event_machine_hw_types.h:72
em_core_mask_zero
void em_core_mask_zero(em_core_mask_t *mask)
Definition: event_machine_hw_specific.c:43
em_core_mask_set
void em_core_mask_set(int core, em_core_mask_t *mask)
Definition: event_machine_hw_specific.c:48
queue_group_join_all
void queue_group_join_all(void)
The calling core joins all available queue groups.
Definition: em_queue_group.c:370
em_core_mask_equal
int em_core_mask_equal(const em_core_mask_t *mask1, const em_core_mask_t *mask2)
Definition: event_machine_hw_specific.c:71
queue_group_queues_print
void queue_group_queues_print(em_queue_group_t qgrp)
Print info about all queues belonging to the given queue group.
Definition: em_queue_group.c:1553
em_queue_group_get_next
em_queue_group_t em_queue_group_get_next(void)
Definition: event_machine_queue_group.c:363
em_event_clone
em_event_t em_event_clone(em_event_t event, em_pool_t pool)
Clone an event.
Definition: event_machine_event.c:1454
EM_EVENT_UNDEF
#define EM_EVENT_UNDEF
Definition: event_machine_types.h:62
queue_group_elem_t
Definition: em_queue_group_types.h:59
EM_QUEUE_GROUP_DEFAULT
#define EM_QUEUE_GROUP_DEFAULT
Definition: event_machine_hw_config.h:147
queue_get_type_str
const char * queue_get_type_str(em_queue_type_t type)
Definition: em_queue.c:1701
i_event__qgrp_rem_core_req
void i_event__qgrp_rem_core_req(const internal_event_t *i_ev)
EM internal event handler, remove core from an EM queue group. (see em_internal_event....
Definition: em_queue_group.c:1220
EM_QUEUE_GROUP_NAME_LEN
#define EM_QUEUE_GROUP_NAME_LEN
Definition: event_machine_hw_config.h:152
EM_POOL_DEFAULT
#define EM_POOL_DEFAULT
Definition: event_machine_hw_config.h:191
i_event__qgrp_add_core_req
void i_event__qgrp_add_core_req(const internal_event_t *i_ev)
EM internal event handler, add core to an EM queue group. (see em_internal_event.c&h)
Definition: em_queue_group.c:1207
internal_event_t::q_grp
struct internal_event_t::@51 q_grp
queue_elem_t::type
uint8_t type
Definition: em_queue_types.h:216
queue_group_elem_t::odp_sched_group
odp_schedule_group_t odp_sched_group
Definition: em_queue_group_types.h:63
em_locm
ENV_LOCAL em_locm_t em_locm
em_free
void em_free(em_event_t event)
Definition: event_machine_event.c:261
queue_group_tbl_t
Definition: em_queue_group_types.h:86
queue_group_create_sync
em_queue_group_t queue_group_create_sync(const char *name, const em_core_mask_t *mask, em_queue_group_t requested_queue_group)
Definition: em_queue_group.c:598
EM_ERR_ALLOC_FAILED
@ EM_ERR_ALLOC_FAILED
Definition: event_machine_hw_types.h:287
EM_ERR_LIB_FAILED
@ EM_ERR_LIB_FAILED
Definition: event_machine_hw_types.h:291
em_locm_t::sync_api
sync_api_t sync_api
Definition: em_mem.h:236
em_core_mask_isset
int em_core_mask_isset(int core, const em_core_mask_t *mask)
Definition: event_machine_hw_specific.c:58
queue_elem_t::state
queue_state_t state
Definition: em_queue_types.h:210
queue_group_init
em_status_t queue_group_init(queue_group_tbl_t *const queue_group_tbl, queue_group_pool_t *const queue_group_pool)
Definition: em_queue_group.c:122
em_core_mask_iszero
int em_core_mask_iszero(const em_core_mask_t *mask)
Definition: event_machine_hw_specific.c:63
queue_group_elem_t::core_mask
em_core_mask_t core_mask
Definition: em_queue_group_types.h:65
em_core_mask_t
Definition: event_machine_hw_types.h:242
em_core_count
int em_core_count(void)
Definition: event_machine_core.c:40
em_queue_group_queue_get_next
em_queue_t em_queue_group_queue_get_next(void)
Definition: event_machine_queue_group.c:440
EM_ERR_TOO_LARGE
@ EM_ERR_TOO_LARGE
Definition: event_machine_hw_types.h:294
em_send_group
em_status_t em_send_group(em_event_t event, em_queue_t queue, em_event_group_t event_group)
Definition: event_machine_event_group.c:474
em_core_mask_or
void em_core_mask_or(em_core_mask_t *dst, const em_core_mask_t *src1, const em_core_mask_t *src2)
Definition: event_machine_hw_specific.c:188
EM_QUEUE_NAME_LEN
#define EM_QUEUE_NAME_LEN
Definition: event_machine_config.h:125
internal_event_t::id
uint64_t id
Definition: em_internal_event_types.h:95
queue_group_elem_t::ongoing_delete
bool ongoing_delete
Definition: em_queue_group_types.h:73
queue_group_elem_t::queue_group_pool_elem
objpool_elem_t queue_group_pool_elem
Definition: em_queue_group_types.h:69
queue_group_info_print_all
void queue_group_info_print_all(void)
Print EM queue group info.
Definition: em_queue_group.c:1491
em_shm_t::libconfig
libconfig_t libconfig
Definition: em_mem.h:146
em_queue_group_find
em_queue_group_t em_queue_group_find(const char *name)
Definition: event_machine_queue_group.c:236
EM_ERR_NOT_FREE
@ EM_ERR_NOT_FREE
Definition: event_machine_hw_types.h:276
poll_unsched_ctrl_queue
void poll_unsched_ctrl_queue(void)
Poll EM's internal unscheduled control queues during dispatch.
Definition: em_internal_event.c:540
RETURN_ERROR_IF
#define RETURN_ERROR_IF(cond, error, escope, fmt,...)
Definition: em_error.h:50
queue_elem_t::context
void * context
Definition: em_queue_types.h:231
em_alloc
em_event_t em_alloc(uint32_t size, em_event_type_t type, em_pool_t pool)
Definition: event_machine_event.c:33
EM_ERR_BAD_ID
@ EM_ERR_BAD_ID
Definition: event_machine_hw_types.h:265
INTERNAL_ERROR
#define INTERNAL_ERROR(error, escope, fmt,...)
Definition: em_error.h:43
sync_api_t::in_progress
bool in_progress
Definition: em_sync_api_types.h:65
queue_group_elem_t::num_queues
env_atomic32_t num_queues
Definition: em_queue_group_types.h:71
em_locm_t::core_id
int core_id
Definition: em_mem.h:196
em_escope_t
uint32_t em_escope_t
Definition: event_machine_types.h:348
em_queue_group_get_name
size_t em_queue_group_get_name(em_queue_group_t queue_group, char *name, size_t maxlen)
Definition: event_machine_queue_group.c:288
internal_event_t
Definition: em_internal_event_types.h:93
queue_group_elem_t::queue_group
em_queue_group_t queue_group
Definition: em_queue_group_types.h:61
em_status_t
uint32_t em_status_t
Definition: event_machine_types.h:321
em_queue_group_get_first
em_queue_group_t em_queue_group_get_first(unsigned int *num)
Definition: event_machine_queue_group.c:333
PRI_QGRP
#define PRI_QGRP
Definition: event_machine_types.h:129
EM_ERR_OPERATION_FAILED
@ EM_ERR_OPERATION_FAILED
Definition: event_machine_hw_types.h:289
EM_CORE_MASK_STRLEN
#define EM_CORE_MASK_STRLEN
Definition: event_machine_hw_types.h:247
queue_elem_t::priority
uint8_t priority
Definition: em_queue_types.h:213
objpool_elem_t
Definition: objpool.h:48
em_shm_t::queue_group_count
env_atomic32_t queue_group_count
Definition: em_mem.h:138
EM_QUEUE_GROUP_DEFAULT_NAME
#define EM_QUEUE_GROUP_DEFAULT_NAME
Definition: event_machine_hw_config.h:158
em_core_mask_copy
void em_core_mask_copy(em_core_mask_t *dst, const em_core_mask_t *src)
Definition: event_machine_hw_specific.c:82
em_shm
em_shm_t * em_shm
Definition: event_machine_init.c:41
EM_MAX_QUEUE_GROUPS
#define EM_MAX_QUEUE_GROUPS
Definition: event_machine_hw_config.h:142
EM_QUEUE_UNDEF
#define EM_QUEUE_UNDEF
Definition: event_machine_types.h:107
em_include.h
em_core_id
int em_core_id(void)
Definition: event_machine_core.c:34
em_core_mask_tostr
void em_core_mask_tostr(char *mask_str, int len, const em_core_mask_t *mask)
Definition: event_machine_hw_specific.c:156
EM_ERR_NOT_FOUND
@ EM_ERR_NOT_FOUND
Definition: event_machine_hw_types.h:278
EM_POOL_UNDEF
#define EM_POOL_UNDEF
Definition: event_machine_hw_types.h:60
send_notifs
em_status_t send_notifs(const int num_notif, const em_notif_t notif_tbl[])
Helper func to send notifications events.
Definition: em_internal_event.c:423
queue_group_modify
em_status_t queue_group_modify(queue_group_elem_t *const qgrp_elem, const em_core_mask_t *new_mask, int num_notif, const em_notif_t notif_tbl[], bool is_delete)
Definition: em_queue_group.c:942
EM_ERR_BAD_POINTER
@ EM_ERR_BAD_POINTER
Definition: event_machine_hw_types.h:271
em_notif_t
Definition: event_machine_types.h:268
queue_group_create
em_queue_group_t queue_group_create(const char *name, const em_core_mask_t *mask, int num_notif, const em_notif_t notif_tbl[], em_queue_group_t queue_group)
Definition: em_queue_group.c:583
queue_group_modify_sync
em_status_t queue_group_modify_sync(queue_group_elem_t *const qgrp_elem, const em_core_mask_t *new_mask, bool is_delete)
Definition: em_queue_group.c:1034
queue_group_tbl_t::queue_group_elem
queue_group_elem_t queue_group_elem[EM_MAX_QUEUE_GROUPS]
Definition: em_queue_group_types.h:88
em_core_mask_count
int em_core_mask_count(const em_core_mask_t *mask)
Definition: event_machine_hw_specific.c:87
internal_done_w_notif_req
em_event_group_t internal_done_w_notif_req(int event_group_count, void(*f_done_callback)(void *arg_ptr), void *f_done_arg_ptr, int num_notif, const em_notif_t notif_tbl[], bool sync_operation)
Helper func: Allocate & set up the internal 'done' event with function callbacks and notification eve...
Definition: em_internal_event.c:335
em_locm_t
Definition: em_mem.h:188
queue_group_pool_t
Definition: em_queue_group_types.h:94
EM_ERR_BAD_STATE
@ EM_ERR_BAD_STATE
Definition: event_machine_hw_types.h:263
queue_group_elem_t::queue_list
list_node_t queue_list
Definition: em_queue_group_types.h:67
queue_elem_t::qgrp_node
list_node_t qgrp_node
Definition: em_queue_types.h:257
EM_EVENT_GROUP_UNDEF
#define EM_EVENT_GROUP_UNDEF
Definition: event_machine_types.h:141
evgrp_abort_delete
void evgrp_abort_delete(em_event_group_t event_group)
internal_done_w_notif_req() 'companion' to abort and delete the event group created by the mentioned ...
Definition: em_internal_event.c:406
queue_group_elem_t::lock
env_spinlock_t lock
Definition: em_queue_group_types.h:75
queue_elem_t
Definition: em_queue_types.h:180
em_event_pointer
void * em_event_pointer(em_event_t event)
Definition: event_machine_event.c:750
queue_group_check_mask
em_status_t queue_group_check_mask(const em_core_mask_t *mask)
Check that only running EM cores are set in mask.
Definition: em_queue_group.c:610
em_queue_group_get_mask
em_status_t em_queue_group_get_mask(em_queue_group_t queue_group, em_core_mask_t *mask)
Definition: event_machine_queue_group.c:260
em_core_mask_clr
void em_core_mask_clr(int core, em_core_mask_t *mask)
Definition: event_machine_hw_specific.c:53
em_queue_group_queue_get_first
em_queue_t em_queue_group_queue_get_first(unsigned int *num, em_queue_group_t queue_group)
Definition: event_machine_queue_group.c:389