EM-ODP  3.7.0
Event Machine on ODP
em_eo.c
1 /*
2  * Copyright (c) 2015, Nokia Solutions and Networks
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * * Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in the
13  * documentation and/or other materials provided with the distribution.
14  * * Neither the name of the copyright holder nor the names of its
15  * contributors may be used to endorse or promote products derived
16  * from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "em_include.h"
32 
33 /**
34  * Params for eo_local_func_call_req().
35  * Init params with eo_local_func_call_param_init() before usage.
36  */
37 typedef struct {
38  eo_elem_t *eo_elem;
39  queue_elem_t *q_elem;
40  int delete_queues;
41  uint64_t ev_id;
42  void (*f_done_callback)(void *arg_ptr);
43  int num_notif;
44  const em_notif_t *notif_tbl; /* notif_tbl[num_notif] */
45  int exclude_current_core;
46  bool sync_operation;
48 
49 static void
50 eo_local_func_call_param_init(eo_local_func_call_param_t *param);
51 static em_status_t
52 eo_local_func_call_req(const eo_local_func_call_param_t *param);
53 
54 static em_status_t
55 check_eo_local_status(const loc_func_retval_t *loc_func_retvals);
56 
57 static void
58 eo_start_done_callback(void *args);
59 static void
60 eo_start_sync_done_callback(void *args);
61 
62 static void
63 eo_stop_done_callback(void *args);
64 static void
65 eo_stop_sync_done_callback(void *args);
66 
67 static em_status_t
68 eo_remove_queue_local(const eo_elem_t *eo_elem, const queue_elem_t *q_elem);
69 static void
70 eo_remove_queue_done_callback(void *args);
71 
72 static em_status_t
73 eo_remove_queue_sync_local(const eo_elem_t *eo_elem,
74  const queue_elem_t *q_elem);
75 static void
76 eo_remove_queue_sync_done_callback(void *args);
77 
78 static em_status_t
79 eo_remove_queue_all_local(const eo_elem_t *eo_elem, int delete_queues);
80 static void
81 eo_remove_queue_all_done_callback(void *args);
82 
83 static em_status_t
84 eo_remove_queue_all_sync_local(const eo_elem_t *eo_elem, int delete_queues);
85 static void
86 eo_remove_queue_all_sync_done_callback(void *args);
87 
88 static inline eo_elem_t *
89 eo_poolelem2eo(const objpool_elem_t *const eo_pool_elem)
90 {
91  return (eo_elem_t *)((uintptr_t)eo_pool_elem -
92  offsetof(eo_elem_t, eo_pool_elem));
93 }
94 
96 eo_init(eo_tbl_t eo_tbl[], eo_pool_t *eo_pool)
97 {
98  int ret;
99  const uint32_t objpool_subpools = MIN(4, OBJSUBPOOLS_MAX);
100 
101  memset(eo_tbl, 0, sizeof(eo_tbl_t));
102  memset(eo_pool, 0, sizeof(eo_pool_t));
103 
104  for (int i = 0; i < EM_MAX_EOS; i++) {
105  eo_elem_t *const eo_elem = &eo_tbl->eo_elem[i];
106  /* Store EO handle */
107  eo_elem->eo = eo_idx2hdl(i);
108  /* Initialize empty EO-queue list */
109  env_spinlock_init(&eo_elem->lock);
110  list_init(&eo_elem->queue_list);
111  eo_elem->stash = ODP_STASH_INVALID;
112  }
113 
114  ret = objpool_init(&eo_pool->objpool, objpool_subpools);
115  if (ret != 0)
116  return EM_ERR_LIB_FAILED;
117 
118  for (uint32_t i = 0; i < EM_MAX_EOS; i++)
119  objpool_add(&eo_pool->objpool, i % objpool_subpools,
120  &eo_tbl->eo_elem[i].eo_pool_elem);
121 
122  env_atomic32_init(&em_shm->eo_count);
123 
124  return EM_OK;
125 }
126 
127 em_eo_t
128 eo_alloc(void)
129 {
130  const eo_elem_t *eo_elem;
131  const objpool_elem_t *eo_pool_elem;
132 
133  eo_pool_elem = objpool_rem(&em_shm->eo_pool.objpool, em_core_id());
134  if (unlikely(eo_pool_elem == NULL))
135  return EM_EO_UNDEF;
136 
137  eo_elem = eo_poolelem2eo(eo_pool_elem);
138  env_atomic32_inc(&em_shm->eo_count);
139 
140  return eo_elem->eo;
141 }
142 
144 eo_free(em_eo_t eo)
145 {
146  eo_elem_t *eo_elem = eo_elem_get(eo);
147 
148  if (unlikely(eo_elem == NULL))
149  return EM_ERR_BAD_ID;
150 
151  eo_elem->state = EM_EO_STATE_UNDEF;
152 
153  objpool_add(&em_shm->eo_pool.objpool,
154  eo_elem->eo_pool_elem.subpool_idx, &eo_elem->eo_pool_elem);
155  env_atomic32_dec(&em_shm->eo_count);
156 
157  return EM_OK;
158 }
159 
160 /**
161  * Add a queue to an EO
162  */
164 eo_add_queue(eo_elem_t *const eo_elem, queue_elem_t *const q_elem)
165 {
166  queue_state_t old_state = q_elem->state;
168  em_status_t err;
169 
170  err = queue_state_change__check(old_state, new_state, 1/*is_setup*/);
171  if (unlikely(err != EM_OK))
172  return err;
173 
174  q_elem->max_events = (uint16_t)eo_elem->max_events;
175 
176  q_elem->flags.use_multi_rcv = eo_elem->use_multi_rcv ? true : false;
177  if (eo_elem->use_multi_rcv)
178  q_elem->receive_multi_func = eo_elem->receive_multi_func;
179  else
180  q_elem->receive_func = eo_elem->receive_func;
181 
182  q_elem->eo = (uint16_t)(uintptr_t)eo_elem->eo;
183  q_elem->eo_ctx = eo_elem->eo_ctx;
184  q_elem->eo_elem = eo_elem;
185  q_elem->state = new_state;
186 
187  /* Link the new queue into the EO's queue-list */
188  env_spinlock_lock(&eo_elem->lock);
189  list_add(&eo_elem->queue_list, &q_elem->queue_node);
190  env_atomic32_inc(&eo_elem->num_queues);
191  env_spinlock_unlock(&eo_elem->lock);
192 
193  return EM_OK;
194 }
195 
196 static inline em_status_t
197 eo_rem_queue_locked(eo_elem_t *const eo_elem, queue_elem_t *const q_elem)
198 {
199  queue_state_t old_state = q_elem->state;
201  em_status_t err;
202 
203  err = queue_state_change__check(old_state, new_state, 0/*!is_setup*/);
204  if (unlikely(err != EM_OK))
205  return err;
206 
207  list_rem(&eo_elem->queue_list, &q_elem->queue_node);
208  env_atomic32_dec(&eo_elem->num_queues);
209 
210  q_elem->state = new_state;
211  q_elem->eo = (uint16_t)(uintptr_t)EM_EO_UNDEF;
212  q_elem->eo_elem = NULL;
213 
214  return EM_OK;
215 }
216 
217 /**
218  * Remove a queue from an EO
219  */
221 eo_rem_queue(eo_elem_t *const eo_elem, queue_elem_t *const q_elem)
222 {
223  em_status_t err;
224 
225  env_spinlock_lock(&eo_elem->lock);
226  err = eo_rem_queue_locked(eo_elem, q_elem);
227  env_spinlock_unlock(&eo_elem->lock);
228 
229  if (unlikely(err != EM_OK))
230  return err;
231 
232  return EM_OK;
233 }
234 
235 /*
236  * Remove all queues associated with the EO.
237  * Note: does not delete the queues.
238  */
240 eo_rem_queue_all(eo_elem_t *const eo_elem)
241 {
242  em_status_t err = EM_OK;
243  queue_elem_t *q_elem;
244 
245  list_node_t *pos;
246  const list_node_t *list_node;
247 
248  env_spinlock_lock(&eo_elem->lock);
249 
250  /* Loop through all queues associated with the EO */
251  list_for_each(&eo_elem->queue_list, pos, list_node) {
252  q_elem = list_node_to_queue_elem(list_node);
253  /* remove the queue from the EO */
254  err = eo_rem_queue_locked(eo_elem, q_elem);
255  if (unlikely(err != EM_OK))
256  break;
257  } /* end loop */
258 
259  env_spinlock_unlock(&eo_elem->lock);
260 
261  return err;
262 }
263 
264 /*
265  * Delete all queues associated with the EO.
266  * The queue needs to be removed from the EO before the actual delete.
267  */
269 eo_delete_queue_all(eo_elem_t *const eo_elem)
270 {
271  em_status_t err = EM_OK;
272  queue_elem_t *q_elem;
273 
274  list_node_t *pos;
275  const list_node_t *list_node;
276 
277  env_spinlock_lock(&eo_elem->lock);
278 
279  /* Loop through all queues associated with the EO */
280  list_for_each(&eo_elem->queue_list, pos, list_node) {
281  q_elem = list_node_to_queue_elem(list_node);
282  /* remove the queue from the EO */
283  err = eo_rem_queue_locked(eo_elem, q_elem);
284  if (unlikely(err != EM_OK))
285  break;
286  /* delete the queue */
287  err = queue_delete(q_elem);
288  if (unlikely(err != EM_OK))
289  break;
290  } /* end loop */
291 
292  env_spinlock_unlock(&eo_elem->lock);
293 
294  return err;
295 }
296 
298 eo_start_local_req(eo_elem_t *const eo_elem,
299  int num_notif, const em_notif_t notif_tbl[])
300 {
302 
303  eo_local_func_call_param_init(&param);
304  param.eo_elem = eo_elem;
305  param.q_elem = NULL; /* no q_elem */
306  param.delete_queues = EM_FALSE;
307  param.ev_id = EO_START_LOCAL_REQ;
308  param.f_done_callback = eo_start_done_callback;
309  param.num_notif = num_notif;
310  param.notif_tbl = notif_tbl;
311  param.exclude_current_core = EM_FALSE; /* all cores */
312  param.sync_operation = false;
313 
314  return eo_local_func_call_req(&param);
315 }
316 
317 /**
318  * Callback function run when all start_local functions are finished,
319  * triggered by calling em_eo_start() when using local-start functions
320  */
321 static void
322 eo_start_done_callback(void *args)
323 {
324  const loc_func_retval_t *loc_func_retvals = args;
325  eo_elem_t *const eo_elem = loc_func_retvals->eo_elem;
326  em_status_t ret;
327 
328  if (unlikely(eo_elem == NULL)) {
330  EM_ESCOPE_EO_START_DONE_CB,
331  "eo_elem is NULL!");
332  return;
333  }
334 
335  if (check_eo_local_status(loc_func_retvals) == EM_OK) {
336  ret = queue_enable_all(eo_elem); /* local starts OK */
337  if (ret == EM_OK)
338  eo_elem->state = EM_EO_STATE_RUNNING;
339  }
340 
341  /* free the storage for local func return values */
342  em_free(loc_func_retvals->event);
343 
344  /* Send events buffered during the EO-start/local-start functions */
346 }
347 
349 eo_start_sync_local_req(eo_elem_t *const eo_elem)
350 {
352 
353  eo_local_func_call_param_init(&param);
354  param.eo_elem = eo_elem;
355  param.q_elem = NULL; /* no q_elem */
356  param.delete_queues = EM_FALSE;
357  param.ev_id = EO_START_SYNC_LOCAL_REQ;
358  param.f_done_callback = eo_start_sync_done_callback;
359  param.num_notif = 0;
360  param.notif_tbl = NULL;
361  param.exclude_current_core = EM_TRUE; /* exclude this core */
362  param.sync_operation = true;
363 
364  return eo_local_func_call_req(&param);
365 }
366 
367 /**
368  * Callback function run when all start_local functions are finished,
369  * triggered by calling em_eo_start_sync() when using local-start functions
370  */
371 static void
372 eo_start_sync_done_callback(void *args)
373 {
374  em_locm_t *const locm = &em_locm;
375  const loc_func_retval_t *loc_func_retvals = args;
376  eo_elem_t *const eo_elem = loc_func_retvals->eo_elem;
377  em_status_t ret;
378 
379  if (unlikely(eo_elem == NULL)) {
381  EM_ESCOPE_EO_START_SYNC_DONE_CB,
382  "eo_elem is NULL!");
383  return;
384  }
385 
386  if (check_eo_local_status(loc_func_retvals) == EM_OK) {
387  ret = queue_enable_all(eo_elem); /* local starts OK */
388  if (ret == EM_OK)
389  eo_elem->state = EM_EO_STATE_RUNNING;
390  }
391 
392  /* free the storage for local func return values */
393  em_free(loc_func_retvals->event);
394 
395  /* Enable the caller of the sync API func to proceed (on this core) */
396  locm->sync_api.in_progress = false;
397 
398  /*
399  * Events buffered during the EO-start/local-start functions are sent
400  * from em_eo_start_sync() after this.
401  */
402 }
403 
404 /**
405  * Called by em_send() & variants during an EO start-function.
406  *
407  * Events sent from within the EO-start functions are buffered and sent
408  * after the start-operation has completed. Otherwise it would not be
409  * possible to reliably send events from the start-functions to the
410  * EO's own queues.
411  */
412 int eo_start_buffer_events(const em_event_t events[], int num, em_queue_t queue)
413 {
414  eo_elem_t *const eo_elem = em_locm.start_eo_elem;
415  const uint16_t qidx = queue_hdl2idx(queue);
416  const evhdl_t *const evhdl_tbl = (const evhdl_t *const)events;
417  stash_entry_t entry_tbl[num];
418 
419  if (unlikely(eo_elem == NULL))
420  return 0;
421 
422  env_spinlock_lock(&eo_elem->lock);
423 
424  for (int i = 0; i < num; i++) {
425  entry_tbl[i].qidx = qidx;
426  entry_tbl[i].evptr = evhdl_tbl[i].evptr; /* ESV evgen dropped */
427  }
428 
429  /* Enqueue events to internal queue */
430  int ret = odp_stash_put_u64(eo_elem->stash, &entry_tbl[0].u64, num);
431 
432  if (unlikely(ret < 0))
433  ret = 0;
434 
435  env_spinlock_unlock(&eo_elem->lock);
436 
437  return ret;
438 }
439 
440 /**
441  * @brief Helper to eo_start_send_buffered_events()
442  */
443 static void eo_start_send_multi(em_event_t ev_tbl[], int num,
444  em_queue_t queue, em_event_group_t event_group)
445 {
446  int num_sent = 0;
447 
448  /* send events with same destination queue and event group */
449  if (event_group == EM_EVENT_GROUP_UNDEF)
450  num_sent = em_send_multi(ev_tbl, num, queue);
451  else
452  num_sent = em_send_group_multi(ev_tbl, num, queue, event_group);
453 
454  if (unlikely(num_sent != num)) {
455  /* User's eo-start saw successful em_send, free here */
456  em_free_multi(&ev_tbl[num_sent], num - num_sent);
457  INTERNAL_ERROR(EM_ERR_LIB_FAILED, EM_ESCOPE_EO_START,
458  "Q:%" PRI_QUEUE " req:%u sent:%u",
459  queue, num, num_sent);
460  }
461 }
462 
463 /**
464  * Send the buffered events at the end of the EO-start operation.
465  *
466  * Events sent from within the EO-start functions are buffered and sent
467  * after the start-operation has completed. Otherwise it would not be
468  * possible to reliably send events from the start-functions to the
469  * EO's own queues.
470  */
472 {
473  /* max events to send in a burst */
474  const unsigned int max_ev = 32;
475  stash_entry_t entry_tbl[max_ev];
476  em_event_t ev_tbl[max_ev];
477  event_hdr_t *ev_hdr_tbl[max_ev];
478 
479  env_spinlock_lock(&eo_elem->lock);
480 
481  /*
482  * Send the buffered events in bursts into the destination queue.
483  *
484  * This is startup: we can use some extra cycles to create the
485  * event-arrays to send in bursts.
486  */
487  int err = 0;
488  int num = 0;
489 
490  do {
491  num = odp_stash_get_u64(eo_elem->stash, &entry_tbl[0].u64 /*[out]*/, max_ev);
492  if (num <= 0) {
493  if (unlikely(num < 0))
494  INTERNAL_ERROR(EM_ERR_LIB_FAILED, EM_ESCOPE_EO_START,
495  "odp_stash_get_u64() fails: %d", num);
496  goto buffered_send_exit;
497  }
498 
499  for (int i = 0; i < num; i++)
500  ev_tbl[i] = (em_event_t)(uintptr_t)entry_tbl[i].evptr;
501 
502  event_to_hdr_multi(ev_tbl, ev_hdr_tbl, num);
503 
504  if (esv_enabled())
505  evstate_em2usr_multi(ev_tbl/*in/out*/, ev_hdr_tbl, num,
506  EVSTATE__EO_START_SEND_BUFFERED);
507 
508  int tbl_idx = 0; /* index into ..._tbl[] */
509 
510  /*
511  * Send in batches of 'batch_cnt' events.
512  * Each batch contains events from the same queue & evgrp.
513  */
514  do {
515  const int qidx = entry_tbl[tbl_idx].qidx;
516  const em_queue_t queue = queue_idx2hdl(qidx);
517  const em_event_group_t event_group = ev_hdr_tbl[tbl_idx]->egrp;
518  int batch_cnt = 1;
519 
520  for (int i = tbl_idx + 1; i < num &&
521  entry_tbl[i].qidx == qidx &&
522  ev_hdr_tbl[i]->egrp == event_group; i++) {
523  batch_cnt++;
524  }
525 
526  /* send events with same destination queue and event group */
527  eo_start_send_multi(&ev_tbl[tbl_idx], batch_cnt, queue, event_group);
528 
529  tbl_idx += batch_cnt;
530  } while (tbl_idx < num);
531  } while (num > 0);
532 
533 buffered_send_exit:
534  err = odp_stash_destroy(eo_elem->stash);
535 
536  eo_elem->stash = ODP_STASH_INVALID;
537  env_spinlock_unlock(&eo_elem->lock);
538 
539  if (unlikely(err)) {
540  INTERNAL_ERROR(EM_ERR_LIB_FAILED, EM_ESCOPE_EO_START,
541  "odp_stash_destroy() fails: %d", err);
542  }
543 }
544 
546 eo_stop_local_req(eo_elem_t *const eo_elem,
547  int num_notif, const em_notif_t notif_tbl[])
548 {
550 
551  eo_local_func_call_param_init(&param);
552  param.eo_elem = eo_elem;
553  param.q_elem = NULL; /* no q_elem */
554  param.delete_queues = EM_FALSE;
555  param.ev_id = EO_STOP_LOCAL_REQ;
556  param.f_done_callback = eo_stop_done_callback;
557  param.num_notif = num_notif;
558  param.notif_tbl = notif_tbl;
559  param.exclude_current_core = EM_FALSE; /* all cores */
560  param.sync_operation = false;
561 
562  return eo_local_func_call_req(&param);
563 }
564 
565 /**
566  * Callback function run when all stop_local functions are finished,
567  * triggered by calling eo_eo_stop().
568  */
569 static void
570 eo_stop_done_callback(void *args)
571 {
572  em_locm_t *const locm = &em_locm;
573  const loc_func_retval_t *loc_func_retvals = args;
574  eo_elem_t *const eo_elem = loc_func_retvals->eo_elem;
575  void *const eo_ctx = eo_elem->eo_ctx;
576  queue_elem_t *const save_q_elem = locm->current.q_elem;
577  queue_elem_t tmp_q_elem;
578  em_eo_t eo;
579  em_status_t ret;
580 
581  if (unlikely(eo_elem == NULL)) {
583  EM_ESCOPE_EO_STOP_DONE_CB,
584  "eo_elem is NULL!");
585  return;
586  }
587 
588  eo = eo_elem->eo;
589  (void)check_eo_local_status(loc_func_retvals);
590 
591  /* Change state here to allow em_eo_delete() from EO global stop */
592  eo_elem->state = EM_EO_STATE_CREATED; /* == EO_STATE_STOPPED */
593 
594  /*
595  * Use a tmp q_elem as the 'current q_elem' to enable calling
596  * em_eo_current() from the EO stop functions.
597  * Before returning, restore the original 'current q_elem' from
598  * 'save_q_elem'.
599  */
600  memset(&tmp_q_elem, 0, sizeof(tmp_q_elem));
601  tmp_q_elem.eo = (uint16_t)(uintptr_t)eo;
602 
603  locm->current.q_elem = &tmp_q_elem;
604  /*
605  * Call the Global EO stop function now that all
606  * EO local stop functions are done.
607  */
608  ret = eo_elem->stop_func(eo_ctx, eo);
609  /* Restore the original 'current q_elem' */
610  locm->current.q_elem = save_q_elem;
611 
612  /*
613  * Note: the EO might not be available after this if the EO global stop
614  * called em_eo_delete()!
615  */
616 
617  if (unlikely(ret != EM_OK))
618  INTERNAL_ERROR(ret, EM_ESCOPE_EO_STOP_DONE_CB,
619  "EO:%" PRI_EO " stop-func failed", eo);
620 
621  /* free the storage for local func return values */
622  em_free(loc_func_retvals->event);
623 }
624 
626 eo_stop_sync_local_req(eo_elem_t *const eo_elem)
627 {
629 
630  eo_local_func_call_param_init(&param);
631  param.eo_elem = eo_elem;
632  param.q_elem = NULL; /* no q_elem */
633  param.delete_queues = EM_FALSE;
634  param.ev_id = EO_STOP_SYNC_LOCAL_REQ;
635  param.f_done_callback = eo_stop_sync_done_callback;
636  param.num_notif = 0;
637  param.notif_tbl = NULL;
638  param.exclude_current_core = EM_TRUE; /* exclude this core */
639  param.sync_operation = true;
640 
641  return eo_local_func_call_req(&param);
642 }
643 
644 /**
645  * Callback function run when all stop_local functions are finished,
646  * triggered by calling eo_eo_stop_sync().
647  */
648 static void
649 eo_stop_sync_done_callback(void *args)
650 {
651  em_locm_t *const locm = &em_locm;
652  const loc_func_retval_t *loc_func_retvals = args;
653  const eo_elem_t *eo_elem = loc_func_retvals->eo_elem;
654 
655  if (unlikely(eo_elem == NULL)) {
657  EM_ESCOPE_EO_STOP_SYNC_DONE_CB,
658  "eo_elem is NULL!");
659  /* Enable the caller of the sync API func to proceed */
660  locm->sync_api.in_progress = false;
661  return;
662  }
663 
664  (void)check_eo_local_status(loc_func_retvals);
665 
666  /* free the storage for local func return values */
667  em_free(loc_func_retvals->event);
668 
669  /* Enable the caller of the sync API func to proceed (on this core) */
670  locm->sync_api.in_progress = false;
671 }
672 
674 eo_remove_queue_local_req(eo_elem_t *const eo_elem, queue_elem_t *const q_elem,
675  int num_notif, const em_notif_t notif_tbl[])
676 {
678 
679  eo_local_func_call_param_init(&param);
680  param.eo_elem = eo_elem;
681  param.q_elem = q_elem;
682  param.delete_queues = EM_FALSE;
683  param.ev_id = EO_REM_QUEUE_LOCAL_REQ;
684  param.f_done_callback = eo_remove_queue_done_callback;
685  param.num_notif = num_notif;
686  param.notif_tbl = notif_tbl;
687  param.exclude_current_core = EM_FALSE; /* all cores */
688  param.sync_operation = false;
689 
690  return eo_local_func_call_req(&param);
691 }
692 
693 static em_status_t
694 eo_remove_queue_local(const eo_elem_t *eo_elem, const queue_elem_t *q_elem)
695 {
696  (void)eo_elem;
697  (void)q_elem;
698 
699  return EM_OK;
700 }
701 
702 static void
703 eo_remove_queue_done_callback(void *args)
704 {
705  const loc_func_retval_t *loc_func_retvals = args;
706  eo_elem_t *const eo_elem = loc_func_retvals->eo_elem;
707  queue_elem_t *const q_elem = loc_func_retvals->q_elem;
708  em_status_t ret;
709 
710  if (unlikely(eo_elem == NULL || q_elem == NULL)) {
712  EM_ESCOPE_EO_REMOVE_QUEUE_DONE_CB,
713  "eo_elem/q_elem is NULL!");
714  return;
715  }
716 
717  (void)check_eo_local_status(loc_func_retvals);
718 
719  /* Remove the queue from the EO */
720  ret = eo_rem_queue(eo_elem, q_elem);
721 
722  if (unlikely(ret != EM_OK))
723  INTERNAL_ERROR(ret, EM_ESCOPE_EO_REMOVE_QUEUE_DONE_CB,
724  "EO:%" PRI_EO " remove Q:%" PRI_QUEUE " failed",
725  eo_elem->eo, q_elem->queue);
726 
727  /* free the storage for local func return values */
728  em_free(loc_func_retvals->event);
729 }
730 
732 eo_remove_queue_sync_local_req(eo_elem_t *const eo_elem,
733  queue_elem_t *const q_elem)
734 {
736 
737  eo_local_func_call_param_init(&param);
738  param.eo_elem = eo_elem;
739  param.q_elem = q_elem;
740  param.delete_queues = EM_FALSE;
741  param.ev_id = EO_REM_QUEUE_SYNC_LOCAL_REQ;
742  param.f_done_callback = eo_remove_queue_sync_done_callback;
743  param.num_notif = 0;
744  param.notif_tbl = NULL;
745  param.exclude_current_core = EM_TRUE; /* exclude this core */
746  param.sync_operation = true;
747 
748  return eo_local_func_call_req(&param);
749 }
750 
751 static em_status_t
752 eo_remove_queue_sync_local(const eo_elem_t *eo_elem, const queue_elem_t *q_elem)
753 {
754  (void)eo_elem;
755  (void)q_elem;
756 
757  return EM_OK;
758 }
759 
760 static void
761 eo_remove_queue_sync_done_callback(void *args)
762 {
763  em_locm_t *const locm = &em_locm;
764  const loc_func_retval_t *loc_func_retvals = args;
765  eo_elem_t *const eo_elem = loc_func_retvals->eo_elem;
766  queue_elem_t *const q_elem = loc_func_retvals->q_elem;
767  em_status_t ret;
768 
769  if (unlikely(eo_elem == NULL || q_elem == NULL)) {
771  EM_ESCOPE_EO_REMOVE_QUEUE_SYNC_DONE_CB,
772  "eo_elem/q_elem is NULL!");
773  /* Enable the caller of the sync API func to proceed */
774  locm->sync_api.in_progress = false;
775  return;
776  }
777 
778  (void)check_eo_local_status(loc_func_retvals);
779 
780  /* Remove the queue from the EO */
781  ret = eo_rem_queue(eo_elem, q_elem);
782 
783  if (unlikely(ret != EM_OK))
784  INTERNAL_ERROR(ret,
785  EM_ESCOPE_EO_REMOVE_QUEUE_SYNC_DONE_CB,
786  "EO:%" PRI_EO " remove Q:%" PRI_QUEUE " failed",
787  eo_elem->eo, q_elem->queue);
788 
789  /* free the storage for local func return values */
790  em_free(loc_func_retvals->event);
791 
792  /* Enable the caller of the sync API func to proceed (on this core) */
793  locm->sync_api.in_progress = false;
794 }
795 
797 eo_remove_queue_all_local_req(eo_elem_t *const eo_elem, int delete_queues,
798  int num_notif, const em_notif_t notif_tbl[])
799 {
801 
802  eo_local_func_call_param_init(&param);
803  param.eo_elem = eo_elem;
804  param.q_elem = NULL; /* no q_elem */
805  param.delete_queues = delete_queues;
806  param.ev_id = EO_REM_QUEUE_ALL_LOCAL_REQ;
807  param.f_done_callback = eo_remove_queue_all_done_callback;
808  param.num_notif = num_notif;
809  param.notif_tbl = notif_tbl;
810  param.exclude_current_core = EM_FALSE; /* all cores */
811  param.sync_operation = false;
812 
813  return eo_local_func_call_req(&param);
814 }
815 
816 static em_status_t
817 eo_remove_queue_all_local(const eo_elem_t *eo_elem, int delete_queues)
818 {
819  (void)eo_elem;
820  (void)delete_queues;
821 
822  return EM_OK;
823 }
824 
825 static void
826 eo_remove_queue_all_done_callback(void *args)
827 {
828  const loc_func_retval_t *loc_func_retvals = args;
829  eo_elem_t *const eo_elem = loc_func_retvals->eo_elem;
830  int delete_queues = loc_func_retvals->delete_queues;
831  em_status_t ret;
832 
833  if (unlikely(eo_elem == NULL)) {
835  EM_ESCOPE_EO_REMOVE_QUEUE_ALL_DONE_CB,
836  "eo_elem is NULL!");
837  return;
838  }
839 
840  (void)check_eo_local_status(loc_func_retvals);
841 
842  /* Remove or delete all the EO's queues */
843  if (delete_queues)
844  ret = eo_delete_queue_all(eo_elem);
845  else
846  ret = eo_rem_queue_all(eo_elem);
847 
848  if (unlikely(ret != EM_OK))
849  INTERNAL_ERROR(ret, EM_ESCOPE_EO_REMOVE_QUEUE_ALL_DONE_CB,
850  "EO:%" PRI_EO " removing all queues failed",
851  eo_elem->eo);
852 
853  /* free the storage for local func return values */
854  em_free(loc_func_retvals->event);
855 }
856 
858 eo_remove_queue_all_sync_local_req(eo_elem_t *const eo_elem, int delete_queues)
859 {
861 
862  eo_local_func_call_param_init(&param);
863  param.eo_elem = eo_elem;
864  param.q_elem = NULL; /* no q_elem */
865  param.delete_queues = delete_queues;
866  param.ev_id = EO_REM_QUEUE_ALL_SYNC_LOCAL_REQ;
867  param.f_done_callback = eo_remove_queue_all_sync_done_callback;
868  param.num_notif = 0;
869  param.notif_tbl = NULL;
870  param.exclude_current_core = EM_TRUE; /* exclude this core */
871  param.sync_operation = true;
872 
873  return eo_local_func_call_req(&param);
874 }
875 
876 static em_status_t
877 eo_remove_queue_all_sync_local(const eo_elem_t *eo_elem, int delete_queues)
878 {
879  (void)eo_elem;
880  (void)delete_queues;
881 
882  return EM_OK;
883 }
884 
885 static void
886 eo_remove_queue_all_sync_done_callback(void *args)
887 {
888  em_locm_t *const locm = &em_locm;
889  const loc_func_retval_t *loc_func_retvals = args;
890  eo_elem_t *const eo_elem = loc_func_retvals->eo_elem;
891  int delete_queues = loc_func_retvals->delete_queues;
892  em_status_t ret;
893 
894  if (unlikely(eo_elem == NULL)) {
896  EM_ESCOPE_EO_REMOVE_QUEUE_ALL_SYNC_DONE_CB,
897  "eo_elem is NULL!");
898  /* Enable the caller of the sync API func to proceed */
899  locm->sync_api.in_progress = false;
900  return;
901  }
902 
903  (void)check_eo_local_status(loc_func_retvals);
904 
905  /* Remove or delete all the EO's queues */
906  if (delete_queues)
907  ret = eo_delete_queue_all(eo_elem);
908  else
909  ret = eo_rem_queue_all(eo_elem);
910 
911  if (unlikely(ret != EM_OK))
912  INTERNAL_ERROR(ret,
913  EM_ESCOPE_EO_REMOVE_QUEUE_ALL_SYNC_DONE_CB,
914  "EO:%" PRI_EO " removing all queues failed",
915  eo_elem->eo);
916 
917  /* free the storage for local func return values */
918  em_free(loc_func_retvals->event);
919 
920  /* Enable the caller of the sync API func to proceed (on this core) */
921  locm->sync_api.in_progress = false;
922 }
923 
924 static em_status_t
925 check_eo_local_status(const loc_func_retval_t *loc_func_retvals)
926 {
927  const int cores = em_core_count();
928  static const char core_err[] = "coreXX:0x12345678 ";
929  char errmsg[cores * sizeof(core_err)];
930  int n = 0;
931  int c = 0;
932  int local_fail = 0;
933  em_status_t err;
934 
935  for (int i = 0; i < cores; i++) {
936  err = loc_func_retvals->core[i];
937  if (err != EM_OK) {
938  local_fail = 1;
939  break;
940  }
941  }
942 
943  if (!local_fail)
944  return EM_OK;
945 
946  for (int i = 0; i < cores; i++) {
947  err = loc_func_retvals->core[i];
948  if (err != EM_OK) {
949  n = snprintf(&errmsg[c], sizeof(core_err),
950  "core%02d:0x%08X ", i, err);
951  if ((unsigned int)n >= sizeof(core_err))
952  break;
953  c += n;
954  }
955  }
956  errmsg[cores * sizeof(core_err) - 1] = '\0';
957 
958  INTERNAL_ERROR(EM_ERR, EM_ESCOPE_EVENT_INTERNAL_LFUNC_CALL,
959  "\nLocal start function failed on cores:\n"
960  "%s", errmsg);
961  return EM_ERR;
962 }
963 
964 static void
965 eo_local_func_call_param_init(eo_local_func_call_param_t *param)
966 {
967  memset(param, 0, sizeof(*param));
968 }
969 
970 /**
971  * Request a function to be run on each core and call 'f_done_callback(arg_ptr)'
972  * when all those functions have completed.
973  */
974 static em_status_t
975 eo_local_func_call_req(const eo_local_func_call_param_t *param)
976 {
977  int err;
978  em_event_t event;
979  em_event_t tmp;
980  internal_event_t *i_event;
981  int core_count;
982  int free_count;
983  em_core_mask_t core_mask;
984  loc_func_retval_t *loc_func_retvals;
985  void *f_done_arg_ptr;
986 
987  core_count = em_core_count();
988  em_core_mask_zero(&core_mask);
989  em_core_mask_set_count(core_count, &core_mask);
990  free_count = core_count + 1; /* all cores + 'done' event */
991  if (param->exclude_current_core) {
992  /* EM _sync API func: exclude the calling core */
993  em_core_mask_clr(em_core_id(), &core_mask);
994  free_count -= 1;
995  }
996 
997  event = em_alloc(sizeof(internal_event_t),
1000  EM_ERR_ALLOC_FAILED, EM_ESCOPE_EO_LOCAL_FUNC_CALL_REQ,
1001  "Internal event (%u) allocation failed", param->ev_id);
1002  i_event = em_event_pointer(event);
1003  i_event->id = param->ev_id;
1004  i_event->loc_func.eo_elem = param->eo_elem;
1005  i_event->loc_func.q_elem = param->q_elem;
1006  i_event->loc_func.delete_queues = param->delete_queues;
1007 
1008  tmp = em_alloc(sizeof(loc_func_retval_t),
1011  EM_ERR_ALLOC_FAILED, EM_ESCOPE_EO_LOCAL_FUNC_CALL_REQ,
1012  "Internal loc_func_retval_t allocation failed");
1013  loc_func_retvals = em_event_pointer(tmp);
1014  loc_func_retvals->eo_elem = param->eo_elem;
1015  loc_func_retvals->q_elem = param->q_elem;
1016  loc_func_retvals->delete_queues = param->delete_queues;
1017  loc_func_retvals->event = tmp; /* store event handle for em_free() */
1018  env_atomic32_init(&loc_func_retvals->free_at_zero);
1019  env_atomic32_set(&loc_func_retvals->free_at_zero, free_count);
1020  for (int i = 0; i < core_count; i++)
1021  loc_func_retvals->core[i] = EM_OK;
1022 
1023  /* ptr to retval storage so loc func calls can record retval there */
1024  i_event->loc_func.retvals = loc_func_retvals;
1025 
1026  /* Give ptr to retval storage also to 'done' function */
1027  f_done_arg_ptr = loc_func_retvals;
1028 
1029  if (em_core_mask_iszero(&core_mask)) {
1030  /*
1031  * Special handling when calling sync APIs with one core in use.
1032  * Need to call both local- and done-funcs here and return.
1033  */
1034  env_atomic32_inc(&loc_func_retvals->free_at_zero);
1036  em_free(event);
1037  param->f_done_callback(f_done_arg_ptr);
1038 
1039  return EM_OK;
1040  }
1041 
1042  err = send_core_ctrl_events(&core_mask, event,
1043  param->f_done_callback, f_done_arg_ptr,
1044  param->num_notif, param->notif_tbl,
1045  param->sync_operation);
1046  if (unlikely(err)) {
1047  char core_mask_str[EM_CORE_MASK_STRLEN];
1048  uint32_t unsent_cnt = err;
1049  uint32_t cnt;
1050 
1051  em_free(event);
1052  cnt = env_atomic32_sub_return(&loc_func_retvals->free_at_zero,
1053  unsent_cnt + 1);
1054  if (cnt == 0)
1055  em_free(tmp);
1056 
1057  em_core_mask_tostr(core_mask_str, EM_CORE_MASK_STRLEN,
1058  &core_mask);
1060  EM_ESCOPE_EO_LOCAL_FUNC_CALL_REQ,
1061  "send_core_ctrl_events(mask=%s) failed",
1062  core_mask_str);
1063  }
1064 
1065  return EM_OK;
1066 }
1067 
1068 /**
1069  * EM internal event handler (see em_internal_event.c&h)
1070  * Handle the internal event requesting a local function call.
1071  */
1072 void
1074 {
1075  em_locm_t *const locm = &em_locm;
1076  const uint64_t f_type = i_ev->loc_func.id;
1077  eo_elem_t *eo_elem = i_ev->loc_func.eo_elem;
1078  const queue_elem_t *q_elem = i_ev->loc_func.q_elem;
1079  int delete_queues = i_ev->loc_func.delete_queues;
1080  loc_func_retval_t *const loc_func_retvals = i_ev->loc_func.retvals;
1081  em_status_t status = EM_ERR;
1082  queue_elem_t *const save_q_elem = locm->current.q_elem;
1083  queue_elem_t tmp_q_elem;
1084 
1085  switch (f_type) {
1086  case EO_START_SYNC_LOCAL_REQ:
1087  if (em_core_count() == 1) {
1088  /*
1089  * Special handling when calling sync API with only one
1090  * core in use: start-local() func already called by
1091  * em_eo_start_sync() and this func called directly from
1092  * within eo_local_func_call_req().
1093  */
1094  status = EM_OK;
1095  break;
1096  }
1097  /* fallthrough */
1098  case EO_START_LOCAL_REQ:
1099  /*
1100  * Use a tmp q_elem as the 'current q_elem' to enable calling
1101  * em_eo_current() from the EO start functions.
1102  * Before returning, restore the original 'current q_elem' from
1103  * 'save_q_elem'.
1104  */
1105  memset(&tmp_q_elem, 0, sizeof(tmp_q_elem));
1106  tmp_q_elem.eo = (uint16_t)(uintptr_t)eo_elem->eo;
1107  locm->current.q_elem = &tmp_q_elem;
1108 
1109  locm->start_eo_elem = eo_elem;
1110  status = eo_elem->start_local_func(eo_elem->eo_ctx,
1111  eo_elem->eo);
1112  locm->start_eo_elem = NULL;
1113  /* Restore the original 'current q_elem' */
1114  locm->current.q_elem = save_q_elem;
1115  break;
1116 
1117  case EO_STOP_SYNC_LOCAL_REQ:
1118  if (em_core_count() == 1) {
1119  /*
1120  * Special handling when calling sync API with only one
1121  * core in use: stop-local() func already called by
1122  * em_eo_stop_sync() and this func called directly from
1123  * within eo_local_func_call_req().
1124  */
1125  status = EM_OK;
1126  break;
1127  }
1128  /* fallthrough */
1129  case EO_STOP_LOCAL_REQ:
1130  if (eo_elem->stop_local_func != NULL) {
1131  /*
1132  * Use a tmp q_elem as the 'current q_elem' to enable
1133  * calling em_eo_current() from the EO start functions.
1134  * Before returning, restore the original 'current
1135  * q_elem' from 'save_q_elem'.
1136  */
1137  memset(&tmp_q_elem, 0, sizeof(tmp_q_elem));
1138  tmp_q_elem.eo = (uint16_t)(uintptr_t)eo_elem->eo;
1139  locm->current.q_elem = &tmp_q_elem;
1140 
1141  status = eo_elem->stop_local_func(eo_elem->eo_ctx,
1142  eo_elem->eo);
1143  /* Restore the original 'current q_elem' */
1144  locm->current.q_elem = save_q_elem;
1145  } else {
1146  status = EM_OK; /* No local stop func given */
1147  }
1148  break;
1149 
1150  case EO_REM_QUEUE_LOCAL_REQ:
1151  status = eo_remove_queue_local(eo_elem, q_elem);
1152  break;
1153  case EO_REM_QUEUE_SYNC_LOCAL_REQ:
1154  status = eo_remove_queue_sync_local(eo_elem, q_elem);
1155  break;
1156  case EO_REM_QUEUE_ALL_LOCAL_REQ:
1157  status = eo_remove_queue_all_local(eo_elem, delete_queues);
1158  break;
1159  case EO_REM_QUEUE_ALL_SYNC_LOCAL_REQ:
1160  status = eo_remove_queue_all_sync_local(eo_elem, delete_queues);
1161  break;
1162  default:
1163  status = EM_FATAL(EM_ERR_BAD_ID);
1164  break;
1165  }
1166 
1167  if (status != EM_OK) {
1168  /* store failing status, egrp 'done' can check if all ok */
1169  loc_func_retvals->core[em_core_id()] = status;
1170 
1171  INTERNAL_ERROR(status, EM_ESCOPE_EVENT_INTERNAL_LFUNC_CALL,
1172  "EO:%" PRI_EO "-%s:Local func(%" PRIx64 ")fail",
1173  eo_elem->eo, eo_elem->name, f_type);
1174  }
1175 
1176  /*
1177  * In case of setup error, determine if 'loc_func_retvals' should be
1178  * freed here, in the setup code in eo_local_func_call_req() or
1179  * normally in a successful case in the
1180  * eo_start/stop_local__done_callback() function when the event group
1181  * completion notif is handled.
1182  */
1183  const uint32_t cnt =
1184  env_atomic32_sub_return(&loc_func_retvals->free_at_zero, 1);
1185  if (unlikely(cnt == 0)) {
1186  (void)check_eo_local_status(loc_func_retvals);
1187  em_free(loc_func_retvals->event);
1188  }
1189 }
1190 
1191 unsigned int
1192 eo_count(void)
1193 {
1194  return env_atomic32_get(&em_shm->eo_count);
1195 }
1196 
1197 size_t eo_get_name(const eo_elem_t *const eo_elem,
1198  char name[/*out*/], const size_t maxlen)
1199 {
1200  size_t len;
1201 
1202  len = strnlen(eo_elem->name, sizeof(eo_elem->name) - 1);
1203  if (maxlen - 1 < len)
1204  len = maxlen - 1;
1205 
1206  memcpy(name, eo_elem->name, len);
1207  name[len] = '\0';
1208 
1209  return len;
1210 }
1211 
1212 static const char *state_to_str(em_eo_state_t state)
1213 {
1214  const char *state_str;
1215 
1216  switch (state) {
1217  case EM_EO_STATE_UNDEF:
1218  state_str = "UNDEF";
1219  break;
1220  case EM_EO_STATE_CREATED:
1221  state_str = "CREATED";
1222  break;
1223  case EM_EO_STATE_STARTING:
1224  state_str = "STARTING";
1225  break;
1226  case EM_EO_STATE_RUNNING:
1227  state_str = "RUNNING";
1228  break;
1229  case EM_EO_STATE_STOPPING:
1230  state_str = "STOPPING";
1231  break;
1232  case EM_EO_STATE_ERROR:
1233  state_str = "ERROR";
1234  break;
1235  default:
1236  state_str = "UNKNOWN";
1237  break;
1238  }
1239 
1240  return state_str;
1241 }
1242 
1243 #define EO_INFO_HDR_FMT \
1244 "Number of EOs: %d\n\n" \
1245 "ID Name State Start-local Stop-local" \
1246 " Multi-rcv Max-events Err-hdl Q-num EO-ctx\n" \
1247 "---------------------------------------------------------------------------" \
1248 "-----------------------------------------------\n%s\n"
1249 
1250 #define EO_INFO_LEN 123
1251 #define EO_INFO_FMT "%-10" PRI_EO "%-32s%-10s%-13c%-12c%-11c%-12d%-9c%-7d%-6c\n"
1252 
1254 {
1255  unsigned int num_eo;
1256  eo_elem_t *eo_elem;
1257  int len = 0;
1258  int n_print = 0;
1259  em_eo_t eo = em_eo_get_first(&num_eo);
1260 
1261  /*
1262  * num_eo may not match the amount of EOs actually returned by iterating
1263  * using em_eo_get_next() if EOs are added or removed in parallel by
1264  * another core. Thus space for 10 extra EOs is reserved. If more than 10
1265  * EOs are added by other cores in parallel, we only print information of
1266  * the (num_eo + 10) EOs.
1267  *
1268  * The extra 1 byte is reserved for the terminating null byte.
1269  */
1270  const int eo_info_str_len = (num_eo + 10) * EO_INFO_LEN + 1;
1271  char eo_info_str[eo_info_str_len];
1272 
1273  while (eo != EM_EO_UNDEF) {
1274  eo_elem = eo_elem_get(eo);
1275  if (unlikely(eo_elem == NULL || !eo_allocated(eo_elem))) {
1276  eo = em_eo_get_next();
1277  continue;
1278  }
1279 
1280  n_print = snprintf(eo_info_str + len,
1281  eo_info_str_len - len,
1282  EO_INFO_FMT, eo, eo_elem->name,
1283  state_to_str(eo_elem->state),
1284  eo_elem->start_local_func ? 'Y' : 'N',
1285  eo_elem->stop_local_func ? 'Y' : 'N',
1286  eo_elem->use_multi_rcv ? 'Y' : 'N',
1287  eo_elem->max_events,
1288  eo_elem->error_handler_func ? 'Y' : 'N',
1289  env_atomic32_get(&eo_elem->num_queues),
1290  eo_elem->eo_ctx ? 'Y' : 'N');
1291 
1292  /* Not enough space to hold more eo info */
1293  if (n_print >= eo_info_str_len - len)
1294  break;
1295 
1296  len += n_print;
1297  eo = em_eo_get_next();
1298  }
1299 
1300  /* No EO */
1301  if (!len) {
1302  EM_PRINT("No EO has been created!\n");
1303  return;
1304  }
1305 
1306  /*
1307  * To prevent printing incomplete information of the last eo when there
1308  * is not enough space to hold all eo info.
1309  */
1310  eo_info_str[len] = '\0';
1311  EM_PRINT(EO_INFO_HDR_FMT, num_eo, eo_info_str);
1312 }
1313 
1314 #define EO_Q_INFO_HDR_FMT \
1315 "EO %" PRI_EO "(%s) has %d queue(s):\n\n" \
1316 "Handle Name Priority Type State Qgrp" \
1317 " Ctx\n" \
1318 "---------------------------------------------------------------------------" \
1319 "---------\n" \
1320 "%s\n"
1321 
1322 #define EO_Q_INFO_LEN 85
1323 #define EO_Q_INFO_FMT \
1324 "%-10" PRI_QUEUE "%-32s%-10d%-10s%-9s%-10" PRI_QGRP "%-3c\n" /*85 characters*/
1325 
1326 void eo_queue_info_print(em_eo_t eo)
1327 {
1328  unsigned int q_num;
1329  em_queue_t q;
1330  const queue_elem_t *q_elem;
1331  char q_name[EM_QUEUE_NAME_LEN];
1332  int len = 0;
1333  int n_print = 0;
1334  const eo_elem_t *eo_elem = eo_elem_get(eo);
1335 
1336  if (unlikely(eo_elem == NULL || !eo_allocated(eo_elem))) {
1337  EM_PRINT("EO %" PRI_EO " is not created!\n", eo);
1338  return;
1339  }
1340 
1341  q = em_eo_queue_get_first(&q_num, eo);
1342 
1343  /*
1344  * q_num may not match the amount of queues actually returned by iterating
1345  * using em_eo_queue_get_next() if queues are added or removed in parallel
1346  * by another core. Thus space for 10 extra queues is reserved. If more
1347  * than 10 queues are added to this EO by other cores, we only print info
1348  * of the (q_num + 10) queues.
1349  *
1350  * The extra 1 byte is reserved for the terminating null byte.
1351  */
1352  const int eo_q_info_str_len = (q_num + 10) * EO_Q_INFO_LEN + 1;
1353  char eo_q_info_str[eo_q_info_str_len];
1354 
1355  while (q != EM_QUEUE_UNDEF) {
1356  q_elem = queue_elem_get(q);
1357  if (unlikely(q_elem == NULL || !queue_allocated(q_elem))) {
1358  q = em_eo_queue_get_next();
1359  continue;
1360  }
1361 
1362  queue_get_name(q_elem, q_name, EM_QUEUE_NAME_LEN - 1);
1363 
1364  n_print = snprintf(eo_q_info_str + len,
1365  eo_q_info_str_len - len,
1366  EO_Q_INFO_FMT,
1367  q, q_name, q_elem->priority,
1368  queue_get_type_str(q_elem->type),
1369  queue_get_state_str(q_elem->state),
1370  q_elem->queue_group,
1371  q_elem->context ? 'Y' : 'N');
1372 
1373  /* Not enough space to hold more queue info */
1374  if (n_print >= eo_q_info_str_len - len)
1375  break;
1376 
1377  len += n_print;
1378  q = em_eo_queue_get_next();
1379  }
1380 
1381  /* EO has no queue */
1382  if (!len) {
1383  EM_PRINT("EO %" PRI_EO "(%s) has no queue!\n", eo, eo_elem->name);
1384  return;
1385  }
1386 
1387  /*
1388  * To prevent printing incomplete information of the last queue when
1389  * there is not enough space to hold all queue info.
1390  */
1391  eo_q_info_str[len] = '\0';
1392  EM_PRINT(EO_Q_INFO_HDR_FMT, eo, eo_elem->name, q_num, eo_q_info_str);
1393 }
1394 
1395 /**
1396  * @brief Create a stash used to buffer events sent during EO-start
1397  */
1398 odp_stash_t eo_start_stash_create(void)
1399 {
1400  unsigned int num_obj = 0;
1401  odp_stash_capability_t stash_capa;
1402  odp_stash_param_t stash_param;
1403  odp_stash_t stash = ODP_STASH_INVALID;
1404 
1405  int ret = odp_stash_capability(&stash_capa, ODP_STASH_TYPE_FIFO);
1406 
1407  if (ret != 0)
1408  return ODP_STASH_INVALID;
1409 
1410  odp_stash_param_init(&stash_param);
1411 
1412  stash_param.type = ODP_STASH_TYPE_FIFO;
1413  stash_param.put_mode = ODP_STASH_OP_MT;
1414  stash_param.get_mode = ODP_STASH_OP_MT;
1415 
1416  /* Stash size: use EM default queue size value from config file: */
1417  num_obj = em_shm->opt.queue.min_events_default;
1418  if (num_obj != 0)
1419  stash_param.num_obj = num_obj;
1420  /* else: use odp default as set by odp_stash_param_init() */
1421 
1422  stash_param.obj_size = sizeof(uint64_t);
1423  if (stash_param.num_obj > stash_capa.max_num.u64) {
1424  EM_LOG(EM_LOG_PRINT,
1425  "%s(): req stash.num_obj(%" PRIu64 ") > capa.max_num.u64(%" PRIu64 ").\n"
1426  " ==> using max value:%" PRIu64 "\n", __func__,
1427  stash_param.num_obj, stash_capa.max_num.u64, stash_capa.max_num.u64);
1428  stash_param.num_obj = stash_capa.max_num.u64;
1429  }
1430 
1431  stash_param.cache_size = 0; /* No core local caching */
1432 
1433  stash = odp_stash_create(NULL, &stash_param);
1434  if (unlikely(stash == ODP_STASH_INVALID))
1435  return ODP_STASH_INVALID;
1436 
1437  return stash;
1438 }
eo_elem_t::queue_list
list_node_t queue_list
Definition: em_eo_types.h:75
queue_elem_t::eo_ctx
void * eo_ctx
Definition: em_queue_types.h:241
queue_get_state_str
const char * queue_get_state_str(queue_state_t state)
Definition: em_queue.c:1673
queue_elem_t::queue_group
em_queue_group_t queue_group
Definition: em_queue_types.h:252
EM_EO_STATE_CREATED
@ EM_EO_STATE_CREATED
Definition: event_machine_types.h:299
em_core_mask_set_count
void em_core_mask_set_count(int count, em_core_mask_t *mask)
Definition: event_machine_hw_specific.c:76
EM_OK
#define EM_OK
Definition: event_machine_types.h:329
eo_pool_t
Definition: em_eo_types.h:94
eo_elem_t::receive_multi_func
em_receive_multi_func_t receive_multi_func
Definition: em_eo_types.h:66
EM_ERR
@ EM_ERR
Definition: event_machine_hw_types.h:312
loc_func_retval_t::core
em_status_t core[EM_MAX_CORES]
Definition: em_internal_event_types.h:87
EM_EVENT_TYPE_SW
@ EM_EVENT_TYPE_SW
Definition: event_machine_hw_types.h:72
EM_QUEUE_STATE_BIND
@ EM_QUEUE_STATE_BIND
Definition: em_queue_types.h:135
em_core_mask_zero
void em_core_mask_zero(em_core_mask_t *mask)
Definition: event_machine_hw_specific.c:43
EM_EO_STATE_STARTING
@ EM_EO_STATE_STARTING
Definition: event_machine_types.h:301
EM_EO_STATE_RUNNING
@ EM_EO_STATE_RUNNING
Definition: event_machine_types.h:303
loc_func_retval_t::free_at_zero
env_atomic32_t free_at_zero
Definition: em_internal_event_types.h:85
queue_elem_t::max_events
uint16_t max_events
Definition: em_queue_types.h:219
eo_elem_t::eo_pool_elem
objpool_elem_t eo_pool_elem
Definition: em_eo_types.h:83
eo_info_print_all
void eo_info_print_all(void)
Definition: em_eo.c:1253
EM_EVENT_UNDEF
#define EM_EVENT_UNDEF
Definition: event_machine_types.h:62
queue_get_type_str
const char * queue_get_type_str(em_queue_type_t type)
Definition: em_queue.c:1701
queue_elem_t::receive_multi_func
em_receive_multi_func_t receive_multi_func
Definition: em_queue_types.h:237
EM_POOL_DEFAULT
#define EM_POOL_DEFAULT
Definition: event_machine_hw_config.h:191
eo_elem_t::eo
em_eo_t eo
Definition: em_eo_types.h:81
queue_elem_t::eo_elem
eo_elem_t * eo_elem
Definition: em_queue_types.h:249
eo_elem_t::eo_ctx
void * eo_ctx
Definition: em_eo_types.h:71
queue_elem_t::type
uint8_t type
Definition: em_queue_types.h:216
em_locm
ENV_LOCAL em_locm_t em_locm
PRI_EO
#define PRI_EO
Definition: event_machine_types.h:97
em_free
void em_free(em_event_t event)
Definition: event_machine_event.c:261
eo_queue_info_print
void eo_queue_info_print(em_eo_t eo)
Definition: em_eo.c:1326
eo_elem_t::num_queues
env_atomic32_t num_queues
Definition: em_eo_types.h:77
stash_entry_t
Definition: em_event_types.h:86
EM_QUEUE_STATE_INIT
@ EM_QUEUE_STATE_INIT
Definition: em_queue_types.h:133
em_locm_t::current
em_locm_current_t current
Definition: em_mem.h:190
EM_EO_STATE_UNDEF
@ EM_EO_STATE_UNDEF
Definition: event_machine_types.h:297
EM_MAX_EOS
#define EM_MAX_EOS
Definition: event_machine_config.h:149
EM_EO_STATE_ERROR
@ EM_EO_STATE_ERROR
Definition: event_machine_types.h:307
EM_ERR_ALLOC_FAILED
@ EM_ERR_ALLOC_FAILED
Definition: event_machine_hw_types.h:287
EM_ERR_LIB_FAILED
@ EM_ERR_LIB_FAILED
Definition: event_machine_hw_types.h:291
em_locm_t::sync_api
sync_api_t sync_api
Definition: em_mem.h:236
EM_EO_UNDEF
#define EM_EO_UNDEF
Definition: event_machine_types.h:95
eo_elem_t::stop_local_func
em_stop_local_func_t stop_local_func
Definition: em_eo_types.h:59
queue_elem_t::queue_node
list_node_t queue_node
Definition: em_queue_types.h:255
eo_add_queue
em_status_t eo_add_queue(eo_elem_t *const eo_elem, queue_elem_t *const q_elem)
Definition: em_eo.c:164
eo_start_buffer_events
int eo_start_buffer_events(const em_event_t events[], int num, em_queue_t queue)
Definition: em_eo.c:412
eo_elem_t::stop_func
em_stop_func_t stop_func
Definition: em_eo_types.h:57
queue_elem_t::state
queue_state_t state
Definition: em_queue_types.h:210
evhdl_t
Definition: em_event_types.h:67
em_send_group_multi
int em_send_group_multi(const em_event_t events[], int num, em_queue_t queue, em_event_group_t event_group)
Definition: event_machine_event_group.c:556
eo_rem_queue
em_status_t eo_rem_queue(eo_elem_t *const eo_elem, queue_elem_t *const q_elem)
Definition: em_eo.c:221
em_core_mask_iszero
int em_core_mask_iszero(const em_core_mask_t *mask)
Definition: event_machine_hw_specific.c:63
list_node_t
Definition: list.h:42
eo_elem_t::error_handler_func
em_error_handler_t error_handler_func
Definition: em_eo_types.h:69
em_core_mask_t
Definition: event_machine_hw_types.h:242
em_core_count
int em_core_count(void)
Definition: event_machine_core.c:40
queue_enable_all
em_status_t queue_enable_all(eo_elem_t *const eo_elem)
Definition: em_queue.c:1340
EM_QUEUE_NAME_LEN
#define EM_QUEUE_NAME_LEN
Definition: event_machine_config.h:125
internal_event_t::id
uint64_t id
Definition: em_internal_event_types.h:95
em_eo_get_next
em_eo_t em_eo_get_next(void)
Definition: event_machine_eo.c:1075
event_hdr
Definition: em_event_types.h:184
eo_elem_t
Definition: em_eo_types.h:47
eo_start_stash_create
odp_stash_t eo_start_stash_create(void)
Create a stash used to buffer events sent during EO-start.
Definition: em_eo.c:1398
EM_TRUE
#define EM_TRUE
Definition: event_machine_types.h:53
queue_elem_t::queue
uint32_t queue
Definition: em_queue_types.h:225
loc_func_retval_t::q_elem
queue_elem_t * q_elem
Definition: em_internal_event_types.h:73
eo_local_func_call_param_t
Definition: em_eo.c:37
RETURN_ERROR_IF
#define RETURN_ERROR_IF(cond, error, escope, fmt,...)
Definition: em_error.h:50
eo_elem_t::state
em_eo_state_t state
Definition: em_eo_types.h:51
queue_elem_t::context
void * context
Definition: em_queue_types.h:231
em_alloc
em_event_t em_alloc(uint32_t size, em_event_type_t type, em_pool_t pool)
Definition: event_machine_event.c:33
EM_ERR_BAD_ID
@ EM_ERR_BAD_ID
Definition: event_machine_hw_types.h:265
INTERNAL_ERROR
#define INTERNAL_ERROR(error, escope, fmt,...)
Definition: em_error.h:43
queue_state_t
uint8_t queue_state_t
Definition: em_queue_types.h:151
EM_FALSE
#define EM_FALSE
Definition: event_machine_types.h:54
sync_api_t::in_progress
bool in_progress
Definition: em_sync_api_types.h:65
queue_delete
em_status_t queue_delete(queue_elem_t *const queue_elem)
Definition: em_queue.c:652
EM_EO_STATE_STOPPING
@ EM_EO_STATE_STOPPING
Definition: event_machine_types.h:305
i_event__eo_local_func_call_req
void i_event__eo_local_func_call_req(const internal_event_t *i_ev)
Definition: em_eo.c:1073
queue_elem_t::receive_func
em_receive_func_t receive_func
Definition: em_queue_types.h:235
internal_event_t
Definition: em_internal_event_types.h:93
em_status_t
uint32_t em_status_t
Definition: event_machine_types.h:321
ODP_PACKED::q_elem
queue_elem_t * q_elem
Definition: em_mem.h:174
PRI_QUEUE
#define PRI_QUEUE
Definition: event_machine_types.h:109
loc_func_retval_t::delete_queues
int delete_queues
Definition: em_internal_event_types.h:78
event_hdr::egrp
em_event_group_t egrp
Definition: em_event_types.h:265
EM_CORE_MASK_STRLEN
#define EM_CORE_MASK_STRLEN
Definition: event_machine_hw_types.h:247
queue_elem_t::priority
uint8_t priority
Definition: em_queue_types.h:213
objpool_elem_t
Definition: objpool.h:48
loc_func_retval_t::eo_elem
eo_elem_t * eo_elem
Definition: em_internal_event_types.h:71
em_shm
em_shm_t * em_shm
Definition: event_machine_init.c:41
EM_QUEUE_UNDEF
#define EM_QUEUE_UNDEF
Definition: event_machine_types.h:107
queue_state_change__check
em_status_t queue_state_change__check(queue_state_t old_state, queue_state_t new_state, int is_setup)
Definition: em_queue.c:1230
loc_func_retval_t::event
em_event_t event
Definition: em_internal_event_types.h:80
em_include.h
eo_elem_t::lock
env_spinlock_t lock
Definition: em_eo_types.h:73
em_core_id
int em_core_id(void)
Definition: event_machine_core.c:34
em_core_mask_tostr
void em_core_mask_tostr(char *mask_str, int len, const em_core_mask_t *mask)
Definition: event_machine_hw_specific.c:156
queue_elem_t::eo
uint16_t eo
Definition: em_queue_types.h:222
internal_event_t::loc_func
struct internal_event_t::@52 loc_func
evstate_em2usr_multi
void evstate_em2usr_multi(em_event_t ev_tbl[], event_hdr_t *const ev_hdr_tbl[], const int num, const uint16_t api_op)
Definition: em_event_state.c:970
EM_ERR_BAD_POINTER
@ EM_ERR_BAD_POINTER
Definition: event_machine_hw_types.h:271
em_notif_t
Definition: event_machine_types.h:268
eo_elem_t::start_local_func
em_start_local_func_t start_local_func
Definition: em_eo_types.h:55
eo_start_send_buffered_events
void eo_start_send_buffered_events(eo_elem_t *const eo_elem)
Definition: em_eo.c:471
loc_func_retval_t
Definition: em_internal_event_types.h:69
em_eo_queue_get_first
em_queue_t em_eo_queue_get_first(unsigned int *num, em_eo_t eo)
Definition: event_machine_eo.c:1093
em_eo_queue_get_next
em_queue_t em_eo_queue_get_next(void)
Definition: event_machine_eo.c:1140
eo_tbl_t
Definition: em_eo_types.h:89
em_free_multi
void em_free_multi(em_event_t events[], int num)
Definition: event_machine_event.c:370
em_locm_t
Definition: em_mem.h:188
em_eo_state_t
em_eo_state_t
Definition: event_machine_types.h:295
em_send_multi
int em_send_multi(const em_event_t events[], int num, em_queue_t queue)
Definition: event_machine_event.c:710
eo_elem_t::receive_func
em_receive_func_t receive_func
Definition: em_eo_types.h:64
send_core_ctrl_events
int send_core_ctrl_events(const em_core_mask_t *const mask, em_event_t ctrl_event, void(*f_done_callback)(void *arg_ptr), void *f_done_arg_ptr, int num_notif, const em_notif_t notif_tbl[], bool sync_operation)
Sends an internal control event to each core set in 'mask'.
Definition: em_internal_event.c:135
eo_tbl_t::eo_elem
eo_elem_t eo_elem[EM_MAX_EOS]
Definition: em_eo_types.h:91
EM_EVENT_GROUP_UNDEF
#define EM_EVENT_GROUP_UNDEF
Definition: event_machine_types.h:141
em_eo_get_first
em_eo_t em_eo_get_first(unsigned int *num)
Definition: event_machine_eo.c:1051
queue_elem_t
Definition: em_queue_types.h:180
em_event_pointer
void * em_event_pointer(em_event_t event)
Definition: event_machine_event.c:750
em_locm_t::start_eo_elem
eo_elem_t * start_eo_elem
Definition: em_mem.h:225
em_core_mask_clr
void em_core_mask_clr(int core, em_core_mask_t *mask)
Definition: event_machine_hw_specific.c:53
eo_elem_t::stash
odp_stash_t stash
Definition: em_eo_types.h:79