EM-ODP  3.7.0
Event Machine on ODP
event_machine_eo.c
1 /*
2  * Copyright (c) 2015-2023, Nokia Solutions and Networks
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * * Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in the
13  * documentation and/or other materials provided with the distribution.
14  * * Neither the name of the copyright holder nor the names of its
15  * contributors may be used to endorse or promote products derived
16  * from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "em_include.h"
32 
33 /* Per core (thread) state of em_eo_get_next() */
34 static ENV_LOCAL unsigned int _eo_tbl_iter_idx;
35 /* Per core (thread) state of em_eo_queue_get_next() */
36 static ENV_LOCAL unsigned int _eo_q_iter_idx;
37 static ENV_LOCAL em_eo_t _eo_q_iter_eo;
38 
39 em_eo_t
40 em_eo_create(const char *name,
41  em_start_func_t start,
42  em_start_local_func_t local_start,
43  em_stop_func_t stop,
44  em_stop_local_func_t local_stop,
45  em_receive_func_t receive,
46  const void *eo_ctx)
47 {
48  em_eo_t eo;
49  eo_elem_t *eo_elem;
50 
51  if (unlikely(start == NULL || stop == NULL || receive == NULL)) {
52  INTERNAL_ERROR(EM_ERR_BAD_ARG, EM_ESCOPE_EO_CREATE,
53  "Mandatory EO function pointer(s) NULL!");
54  return EM_EO_UNDEF;
55  }
56 
57  eo = eo_alloc();
58  if (unlikely(eo == EM_EO_UNDEF)) {
59  INTERNAL_ERROR(EM_ERR_ALLOC_FAILED, EM_ESCOPE_EO_CREATE,
60  "EO alloc failed!");
61  return EM_EO_UNDEF;
62  }
63 
64  eo_elem = eo_elem_get(eo);
65  if (unlikely(eo_elem == NULL)) {
66  /* Fatal since eo_alloc() returned 'ok', should never happen */
67  INTERNAL_ERROR(EM_FATAL(EM_ERR_BAD_ID), EM_ESCOPE_EO_CREATE,
68  "Invalid EO:%" PRI_EO "", eo);
69  return EM_EO_UNDEF;
70  }
71 
72  env_spinlock_lock(&eo_elem->lock);
73 
74  /* Store the name */
75  if (name != NULL) {
76  strncpy(eo_elem->name, name, sizeof(eo_elem->name));
77  eo_elem->name[sizeof(eo_elem->name) - 1] = '\0';
78  } else {
79  eo_elem->name[0] = '\0';
80  }
81 
82  /* EO's queue list init */
83  list_init(&eo_elem->queue_list);
84  /* EO start: event buffering init */
85  eo_elem->stash = ODP_STASH_INVALID;
86 
87  eo_elem->state = EM_EO_STATE_CREATED;
88  eo_elem->start_func = start;
89  eo_elem->start_local_func = local_start;
90  eo_elem->stop_func = stop;
91  eo_elem->stop_local_func = local_stop;
92 
93  eo_elem->use_multi_rcv = EM_FALSE;
94  eo_elem->max_events = 1;
95  eo_elem->receive_func = receive;
96  eo_elem->receive_multi_func = NULL;
97 
98  eo_elem->error_handler_func = NULL;
99  eo_elem->eo_ctx = (void *)(uintptr_t)eo_ctx;
100  eo_elem->eo = eo;
101  env_atomic32_init(&eo_elem->num_queues);
102 
103  env_spinlock_unlock(&eo_elem->lock);
104 
105  return eo;
106 }
107 
109 {
110  if (unlikely(!param)) {
111  INTERNAL_ERROR(EM_FATAL(EM_ERR_BAD_ARG),
112  EM_ESCOPE_EO_MULTIRCV_PARAM_INIT,
113  "Param pointer NULL!");
114  return;
115  }
116  memset(param, 0, sizeof(em_eo_multircv_param_t));
119 }
120 
121 em_eo_t
122 em_eo_create_multircv(const char *name, const em_eo_multircv_param_t *param)
123 {
124  em_eo_t eo;
125  eo_elem_t *eo_elem;
126  int max_events;
127 
128  if (unlikely(!param ||
130  INTERNAL_ERROR(EM_ERR_BAD_ARG, EM_ESCOPE_EO_CREATE_MULTIRCV,
131  "Invalid param ptr:\n"
132  "Use em_eo_multircv_param_init() before create");
133  return EM_EO_UNDEF;
134  }
135 
136  if (unlikely(!param->start || !param->stop || !param->receive_multi)) {
137  INTERNAL_ERROR(EM_ERR_BAD_POINTER, EM_ESCOPE_EO_CREATE_MULTIRCV,
138  "Mandatory EO function pointer(s) NULL!");
139  return EM_EO_UNDEF;
140  }
141 
142  if (unlikely(param->max_events < 0)) {
143  INTERNAL_ERROR(EM_ERR_TOO_SMALL, EM_ESCOPE_EO_CREATE_MULTIRCV,
144  "Max number of events too small:%d",
145  param->max_events);
146  return EM_EO_UNDEF;
147  }
148  max_events = param->max_events;
149  if (max_events == 0) /* user requests default value */
150  max_events = EM_EO_MULTIRCV_MAX_EVENTS;
151 
152  eo = eo_alloc();
153  if (unlikely(eo == EM_EO_UNDEF)) {
154  INTERNAL_ERROR(EM_ERR_ALLOC_FAILED, EM_ESCOPE_EO_CREATE_MULTIRCV,
155  "EO alloc failed!");
156  return EM_EO_UNDEF;
157  }
158 
159  eo_elem = eo_elem_get(eo);
160  if (unlikely(eo_elem == NULL)) {
161  /* Fatal since eo_alloc() returned 'ok', should never happen */
162  INTERNAL_ERROR(EM_FATAL(EM_ERR_BAD_ID),
163  EM_ESCOPE_EO_CREATE_MULTIRCV,
164  "Invalid EO:%" PRI_EO "", eo);
165  return EM_EO_UNDEF;
166  }
167 
168  env_spinlock_lock(&eo_elem->lock);
169 
170  /* Store the name */
171  if (name) {
172  strncpy(eo_elem->name, name, sizeof(eo_elem->name));
173  eo_elem->name[sizeof(eo_elem->name) - 1] = '\0';
174  } else {
175  eo_elem->name[0] = '\0';
176  }
177 
178  /* EO's queue list init */
179  list_init(&eo_elem->queue_list);
180  /* EO start: event buffering init */
181  eo_elem->stash = ODP_STASH_INVALID;
182 
183  eo_elem->state = EM_EO_STATE_CREATED;
184  eo_elem->start_func = param->start;
185  eo_elem->start_local_func = param->local_start;
186  eo_elem->stop_func = param->stop;
187  eo_elem->stop_local_func = param->local_stop;
188 
189  eo_elem->use_multi_rcv = EM_TRUE;
190  eo_elem->max_events = max_events;
191  eo_elem->receive_func = NULL;
192  eo_elem->receive_multi_func = param->receive_multi;
193 
194  eo_elem->error_handler_func = NULL;
195  eo_elem->eo_ctx = (void *)(uintptr_t)param->eo_ctx;
196  eo_elem->eo = eo;
197  env_atomic32_init(&eo_elem->num_queues);
198 
199  env_spinlock_unlock(&eo_elem->lock);
200 
201  return eo;
202 }
203 
205 em_eo_delete(em_eo_t eo)
206 {
207  eo_elem_t *const eo_elem = eo_elem_get(eo);
208  em_status_t status;
209 
210  RETURN_ERROR_IF(eo_elem == NULL, EM_ERR_BAD_ARG, EM_ESCOPE_EO_DELETE,
211  "Invalid EO:%" PRI_EO "!", eo);
212 
213  RETURN_ERROR_IF(!eo_allocated(eo_elem),
214  EM_ERR_NOT_CREATED, EM_ESCOPE_EO_DELETE,
215  "EO not allocated:%" PRI_EO "", eo);
216 
218  eo_elem->state != EM_EO_STATE_ERROR,
219  EM_ERR_BAD_STATE, EM_ESCOPE_EO_DELETE,
220  "EO invalid state, cannot delete:%d", eo_elem->state);
221 
222  status = eo_delete_queue_all(eo_elem);
223 
224  RETURN_ERROR_IF(status != EM_OK, status, EM_ESCOPE_EO_DELETE,
225  "EO delete: delete queues failed!");
226 
227  /* Free EO back into the eo-pool and mark state=EO_STATE_UNDEF */
228  status = eo_free(eo);
229  RETURN_ERROR_IF(status != EM_OK, status, EM_ESCOPE_EO_DELETE,
230  "EO delete failed!");
231 
232  return status;
233 }
234 
235 size_t
236 em_eo_get_name(em_eo_t eo, char *name, size_t maxlen)
237 {
238  const eo_elem_t *eo_elem = eo_elem_get(eo);
239 
240  if (name == NULL || maxlen == 0) {
241  INTERNAL_ERROR(EM_ERR_BAD_ARG, EM_ESCOPE_EO_GET_NAME,
242  "Invalid ptr or maxlen (name=0x%" PRIx64 ", maxlen=%zu)",
243  name, maxlen);
244  return 0;
245  }
246 
247  name[0] = '\0';
248 
249  if (unlikely(eo_elem == NULL)) {
250  INTERNAL_ERROR(EM_ERR_BAD_ARG, EM_ESCOPE_EO_GET_NAME,
251  "Invalid EO%" PRI_EO "", eo);
252  return 0;
253  }
254 
255  if (unlikely(!eo_allocated(eo_elem))) {
256  INTERNAL_ERROR(EM_ERR_NOT_CREATED, EM_ESCOPE_EO_GET_NAME,
257  "EO not created:%" PRI_EO "", eo);
258  return 0;
259  }
260 
261  return eo_get_name(eo_elem, name, maxlen);
262 }
263 
264 em_eo_t
265 em_eo_find(const char *name)
266 {
267  if (name && *name) {
268  for (int i = 0; i < EM_MAX_EOS; i++) {
269  const eo_elem_t *eo_elem = &em_shm->eo_tbl.eo_elem[i];
270 
271  if (eo_elem->state != EM_EO_STATE_UNDEF &&
272  !strncmp(name, eo_elem->name, EM_EO_NAME_LEN - 1))
273  return eo_elem->eo;
274  }
275  }
276  return EM_EO_UNDEF;
277 }
278 
279 /**
280  * @brief Helper for em_eo_add_queue/_sync()
281  */
282 static em_status_t
283 eo_add_queue_escope(em_eo_t eo, em_queue_t queue,
284  int num_notif, const em_notif_t notif_tbl[],
285  em_escope_t escope)
286 { eo_elem_t *const eo_elem = eo_elem_get(eo);
287  queue_elem_t *const q_elem = queue_elem_get(queue);
288  em_queue_type_t q_type;
289  em_status_t err;
290  int valid;
291 
292  RETURN_ERROR_IF(eo_elem == NULL || q_elem == NULL,
293  EM_ERR_BAD_ARG, escope,
294  "Invalid args: EO:%" PRI_EO " Q:%" PRI_QUEUE "",
295  eo, queue);
296  RETURN_ERROR_IF(!eo_allocated(eo_elem) || !queue_allocated(q_elem),
297  EM_ERR_NOT_CREATED, escope,
298  "Not created: EO:%" PRI_EO " Q:%" PRI_QUEUE "",
299  eo, queue);
300 
301  q_type = em_queue_get_type(queue);
302  valid = q_type == EM_QUEUE_TYPE_ATOMIC ||
303  q_type == EM_QUEUE_TYPE_PARALLEL ||
304  q_type == EM_QUEUE_TYPE_PARALLEL_ORDERED ||
305  q_type == EM_QUEUE_TYPE_LOCAL;
306  RETURN_ERROR_IF(!valid, EM_ERR_BAD_TYPE, escope,
307  "Invalid queue type: %" PRI_QTYPE "", q_type);
308 
309  if (num_notif > 0) {
310  err = check_notif_tbl(num_notif, notif_tbl);
311  RETURN_ERROR_IF(err != EM_OK, err, escope,
312  "Invalid notif cfg given!");
313  }
314 
315  err = eo_add_queue(eo_elem, q_elem);
316  RETURN_ERROR_IF(err != EM_OK, err, escope,
317  "eo_add_queue(Q:%" PRI_QUEUE ") fails", queue);
318 
319  if (eo_elem->state == EM_EO_STATE_RUNNING) {
320  err = queue_enable(q_elem); /* otherwise enabled in eo-start */
321  RETURN_ERROR_IF(err != EM_OK, err, escope,
322  "queue_enable(Q:%" PRI_QUEUE ") fails", queue);
323  }
324 
325  if (num_notif > 0) {
326  /* Send notifications if requested */
327  err = send_notifs(num_notif, notif_tbl);
328  RETURN_ERROR_IF(err != EM_OK, err, escope,
329  "EO:%" PRI_EO " send notif fails", eo);
330  }
331 
332  return EM_OK;
333 }
334 
336 em_eo_add_queue(em_eo_t eo, em_queue_t queue,
337  int num_notif, const em_notif_t notif_tbl[])
338 {
339  return eo_add_queue_escope(eo, queue, num_notif, notif_tbl,
340  EM_ESCOPE_EO_ADD_QUEUE);
341 }
342 
344 em_eo_add_queue_sync(em_eo_t eo, em_queue_t queue)
345 {
346  /* No sync blocking needed when adding a queue to an EO */
347  return eo_add_queue_escope(eo, queue, 0, NULL,
348  EM_ESCOPE_EO_ADD_QUEUE_SYNC);
349 }
350 
352 em_eo_remove_queue(em_eo_t eo, em_queue_t queue,
353  int num_notif, const em_notif_t notif_tbl[])
354 {
355  eo_elem_t *const eo_elem = eo_elem_get(eo);
356  queue_elem_t *const q_elem = queue_elem_get(queue);
357  em_queue_type_t q_type;
358  em_status_t ret;
359  int valid;
360 
361  RETURN_ERROR_IF(eo_elem == NULL || q_elem == NULL,
362  EM_ERR_BAD_ARG, EM_ESCOPE_EO_REMOVE_QUEUE,
363  "Invalid args: EO:%" PRI_EO " Q:%" PRI_QUEUE "",
364  eo, queue);
365  RETURN_ERROR_IF(!eo_allocated(eo_elem) || !queue_allocated(q_elem),
366  EM_ERR_NOT_CREATED, EM_ESCOPE_EO_REMOVE_QUEUE,
367  "Not created: EO:%" PRI_EO " Q:%" PRI_QUEUE "",
368  eo, queue);
369 
370  q_type = em_queue_get_type(queue);
371  valid = q_type == EM_QUEUE_TYPE_ATOMIC ||
372  q_type == EM_QUEUE_TYPE_PARALLEL ||
373  q_type == EM_QUEUE_TYPE_PARALLEL_ORDERED ||
374  q_type == EM_QUEUE_TYPE_LOCAL;
375  RETURN_ERROR_IF(!valid, EM_ERR_BAD_TYPE, EM_ESCOPE_EO_REMOVE_QUEUE,
376  "Invalid queue type: %" PRI_QTYPE "", q_type);
377 
378  ret = check_notif_tbl(num_notif, notif_tbl);
379  RETURN_ERROR_IF(ret != EM_OK, ret, EM_ESCOPE_EO_REMOVE_QUEUE,
380  "Invalid notif cfg given!");
381  RETURN_ERROR_IF(eo_elem != q_elem->eo_elem,
382  EM_ERR_BAD_POINTER, EM_ESCOPE_EO_REMOVE_QUEUE,
383  "Can't remove Q:%" PRI_QUEUE ", not added to this EO",
384  queue);
385 
386  /*
387  * Disable the queue if not already done, dispatcher will drop any
388  * further events. Need to handle events from the queue being processed
389  * in an EO receive function properly still.
390  */
391  if (q_elem->state == EM_QUEUE_STATE_READY) {
392  ret = queue_disable(q_elem);
393 
394  RETURN_ERROR_IF(ret != EM_OK, ret, EM_ESCOPE_EO_REMOVE_QUEUE,
395  "queue_disable(Q:%" PRI_QUEUE ") fails",
396  queue);
397  }
398 
399  /*
400  * Request each core to run locally the eo_remove_queue_local() function
401  * and when all are done call eo_remove_queue_done_callback().
402  * The callback will finally remove the queue from the EO when it's
403  * known that no core is anymore processing events from that EO/queue.
404  */
405  return eo_remove_queue_local_req(eo_elem, q_elem, num_notif, notif_tbl);
406 }
407 
409 em_eo_remove_queue_sync(em_eo_t eo, em_queue_t queue)
410 {
411  em_locm_t *const locm = &em_locm;
412  eo_elem_t *const eo_elem = eo_elem_get(eo);
413  queue_elem_t *const q_elem = queue_elem_get(queue);
414  em_queue_type_t q_type;
415  em_status_t ret;
416  int valid;
417 
418  RETURN_ERROR_IF(eo_elem == NULL || q_elem == NULL,
419  EM_ERR_BAD_ARG, EM_ESCOPE_EO_REMOVE_QUEUE_SYNC,
420  "Invalid args: EO:%" PRI_EO " Q:%" PRI_QUEUE "",
421  eo, queue);
422  RETURN_ERROR_IF(!eo_allocated(eo_elem) || !queue_allocated(q_elem),
423  EM_ERR_NOT_CREATED, EM_ESCOPE_EO_REMOVE_QUEUE_SYNC,
424  "Not created: EO:%" PRI_EO " Q:%" PRI_QUEUE "",
425  eo, queue);
426 
427  q_type = em_queue_get_type(queue);
428  valid = q_type == EM_QUEUE_TYPE_ATOMIC ||
429  q_type == EM_QUEUE_TYPE_PARALLEL ||
430  q_type == EM_QUEUE_TYPE_PARALLEL_ORDERED ||
431  q_type == EM_QUEUE_TYPE_LOCAL;
433  EM_ESCOPE_EO_REMOVE_QUEUE_SYNC,
434  "Invalid queue type: %" PRI_QTYPE "", q_type);
435 
436  RETURN_ERROR_IF(eo_elem != q_elem->eo_elem,
437  EM_ERR_BAD_POINTER, EM_ESCOPE_EO_REMOVE_QUEUE_SYNC,
438  "Can't remove Q:%" PRI_QUEUE ", not added to this EO",
439  queue);
440 
441  /* Mark that a sync-API call is in progress */
442  locm->sync_api.in_progress = true;
443 
444  /*
445  * Disable the queue if not already done, dispatcher will drop any
446  * further events. Need to handle events from the queue being processed
447  * in an EO receive function properly still.
448  */
449  if (q_elem->state == EM_QUEUE_STATE_READY) {
450  ret = queue_disable(q_elem);
451 
452  if (unlikely(ret != EM_OK))
453  goto eo_remove_queue_sync_error;
454  }
455 
456  /*
457  * Request each core to run locally the eo_remove_queue_sync_local() function
458  * and when all are done call eo_remove_queue_sync_done_callback.
459  * The callback will finally remove the queue from the EO when it's
460  * known that no core is anymore processing events from that EO/queue.
461  */
462  ret = eo_remove_queue_sync_local_req(eo_elem, q_elem);
463  if (unlikely(ret != EM_OK))
464  goto eo_remove_queue_sync_error;
465 
466  /*
467  * Poll the core-local unscheduled control-queue for events.
468  * These events request the core to do a core-local operation (or nop).
469  * Poll and handle events until 'locm->sync_api.in_progress == false'
470  * indicating that this sync-API is 'done' on all concerned cores.
471  */
472  while (locm->sync_api.in_progress)
474 
475  return EM_OK;
476 
477 eo_remove_queue_sync_error:
478  locm->sync_api.in_progress = false;
479 
480  return INTERNAL_ERROR(ret, EM_ESCOPE_EO_REMOVE_QUEUE_SYNC,
481  "Failure: EO:%" PRI_EO " Q:%" PRI_QUEUE "",
482  eo, queue);
483 }
484 
486 em_eo_remove_queue_all(em_eo_t eo, int delete_queues,
487  int num_notif, const em_notif_t notif_tbl[])
488 {
489  eo_elem_t *const eo_elem = eo_elem_get(eo);
490  em_status_t ret;
491 
492  RETURN_ERROR_IF(eo_elem == NULL, EM_ERR_BAD_ARG,
493  EM_ESCOPE_EO_REMOVE_QUEUE_ALL,
494  "Invalid EO:%" PRI_EO "", eo);
495  RETURN_ERROR_IF(!eo_allocated(eo_elem), EM_ERR_NOT_CREATED,
496  EM_ESCOPE_EO_REMOVE_QUEUE_ALL,
497  "EO:%" PRI_EO " not created", eo);
498  ret = check_notif_tbl(num_notif, notif_tbl);
499  RETURN_ERROR_IF(ret != EM_OK, ret, EM_ESCOPE_EO_REMOVE_QUEUE_ALL,
500  "Invalid notif cfg given!");
501 
502  ret = queue_disable_all(eo_elem);
503  RETURN_ERROR_IF(ret != EM_OK, ret, EM_ESCOPE_EO_REMOVE_QUEUE_ALL,
504  "queue_disable_all() failed!");
505 
506  /*
507  * Request each core to run locally the eo_remove_queue_all_local() function
508  * and when all are done call eo_remove_queue_all_done_callback().
509  * The callback will finally remove the queue from the EO when it's
510  * known that no core is anymore processing events from that EO/queue.
511  */
512  return eo_remove_queue_all_local_req(eo_elem, delete_queues,
513  num_notif, notif_tbl);
514 }
515 
517 em_eo_remove_queue_all_sync(em_eo_t eo, int delete_queues)
518 {
519  em_locm_t *const locm = &em_locm;
520  eo_elem_t *const eo_elem = eo_elem_get(eo);
521  em_status_t ret;
522 
523  RETURN_ERROR_IF(eo_elem == NULL, EM_ERR_BAD_ARG,
524  EM_ESCOPE_EO_REMOVE_QUEUE_ALL_SYNC,
525  "Invalid EO:%" PRI_EO "", eo);
526  RETURN_ERROR_IF(!eo_allocated(eo_elem), EM_ERR_NOT_CREATED,
527  EM_ESCOPE_EO_REMOVE_QUEUE_ALL_SYNC,
528  "EO:%" PRI_EO " not created", eo);
529 
530  /* Mark that a sync-API call is in progress */
531  locm->sync_api.in_progress = true;
532 
533  ret = queue_disable_all(eo_elem);
534  if (unlikely(ret != EM_OK))
535  goto eo_remove_queue_all_sync_error;
536 
537  /*
538  * Request each core to run locally the eo_remove_queue_all_sync_local() function
539  * and when all are done call eo_remove_queue_all_sync_done_callback().
540  * The callback will finally remove the queue from the EO when it's
541  * known that no core is anymore processing events from that EO/queue.
542  */
543  ret = eo_remove_queue_all_sync_local_req(eo_elem, delete_queues);
544  if (unlikely(ret != EM_OK))
545  goto eo_remove_queue_all_sync_error;
546 
547  /*
548  * Poll the core-local unscheduled control-queue for events.
549  * These events request the core to do a core-local operation (or nop).
550  * Poll and handle events until 'locm->sync_api.in_progress == false'
551  * indicating that this sync-API is 'done' on all concerned cores.
552  */
553  while (locm->sync_api.in_progress)
555 
556  return EM_OK;
557 
558 eo_remove_queue_all_sync_error:
559  locm->sync_api.in_progress = false;
560 
561  return INTERNAL_ERROR(ret, EM_ESCOPE_EO_REMOVE_QUEUE_SYNC,
562  "Failure: EO:%" PRI_EO "", eo);
563 }
564 
567 {
568  eo_elem_t *const eo_elem = eo_elem_get(eo);
569 
570  RETURN_ERROR_IF(eo_elem == NULL || handler == NULL,
571  EM_ERR_BAD_ARG, EM_ESCOPE_EO_REGISTER_ERROR_HANDLER,
572  "Invalid args: EO:%" PRI_EO " handler:%p", eo, handler);
573  RETURN_ERROR_IF(!eo_allocated(eo_elem),
574  EM_ERR_NOT_CREATED, EM_ESCOPE_EO_REGISTER_ERROR_HANDLER,
575  "EO:%" PRI_EO " not created", eo);
576 
577  env_spinlock_lock(&eo_elem->lock);
578  eo_elem->error_handler_func = handler;
579  env_spinlock_unlock(&eo_elem->lock);
580 
581  return EM_OK;
582 }
583 
586 {
587  eo_elem_t *const eo_elem = eo_elem_get(eo);
588 
589  RETURN_ERROR_IF(eo_elem == NULL, EM_ERR_BAD_ARG,
590  EM_ESCOPE_EO_UNREGISTER_ERROR_HANDLER,
591  "Invalid EO id %" PRI_EO "", eo);
592  RETURN_ERROR_IF(!eo_allocated(eo_elem), EM_ERR_NOT_CREATED,
593  EM_ESCOPE_EO_UNREGISTER_ERROR_HANDLER,
594  "EO not created:%" PRI_EO "", eo);
595 
596  env_spinlock_lock(&eo_elem->lock);
597  eo_elem->error_handler_func = NULL;
598  env_spinlock_unlock(&eo_elem->lock);
599 
600  return EM_OK;
601 }
602 
604 em_eo_start(em_eo_t eo, em_status_t *result, const em_eo_conf_t *conf,
605  int num_notif, const em_notif_t notif_tbl[])
606 {
607  em_locm_t *const locm = &em_locm;
608  eo_elem_t *const eo_elem = eo_elem_get(eo);
609  queue_elem_t *const save_q_elem = locm->current.q_elem;
610  queue_elem_t tmp_q_elem;
611  em_status_t ret;
612 
613  RETURN_ERROR_IF(eo_elem == NULL, EM_ERR_BAD_ARG, EM_ESCOPE_EO_START,
614  "Invalid EO id %" PRI_EO "", eo);
615  RETURN_ERROR_IF(!eo_allocated(eo_elem),
616  EM_ERR_NOT_CREATED, EM_ESCOPE_EO_START,
617  "EO not created:%" PRI_EO "", eo);
619  EM_ERR_BAD_STATE, EM_ESCOPE_EO_START,
620  "EO invalid state, cannot start:%d", eo_elem->state);
621  ret = check_notif_tbl(num_notif, notif_tbl);
622  RETURN_ERROR_IF(ret != EM_OK, ret, EM_ESCOPE_EO_START,
623  "Invalid notif cfg given!");
624 
625  eo_elem->state = EM_EO_STATE_STARTING;
626 
627  /* Create a stash to buffer events sent during EO-start */
628  eo_elem->stash = eo_start_stash_create();
629  if (unlikely(eo_elem->stash == ODP_STASH_INVALID)) {
630  ret = INTERNAL_ERROR(EM_ERR, EM_ESCOPE_EO_START,
631  "EO:%" PRI_EO " start stash creation fails", eo);
632  goto eo_start_error;
633  }
634  /* This core is in the EO start function: buffer all sent events */
635  locm->start_eo_elem = eo_elem;
636  /*
637  * Use a tmp q_elem as the 'current q_elem' to enable calling
638  * em_eo_current() from the EO start functions.
639  * Before returning, restore the original 'current q_elem' from
640  * 'save_q_elem'.
641  */
642  memset(&tmp_q_elem, 0, sizeof(tmp_q_elem));
643  tmp_q_elem.eo = (uint16_t)(uintptr_t)eo;
644 
645  locm->current.q_elem = &tmp_q_elem;
646  /* Call the global EO start function */
647  ret = eo_elem->start_func(eo_elem->eo_ctx, eo, conf);
648  /* Restore the original 'current q_elem' */
649  locm->current.q_elem = save_q_elem;
650  locm->start_eo_elem = NULL;
651 
652  /* Store the return value of the actual EO global start function */
653  if (result != NULL)
654  *result = ret;
655 
656  if (unlikely(ret != EM_OK)) {
657  ret = INTERNAL_ERROR(EM_ERR, EM_ESCOPE_EO_START,
658  "EO:%" PRI_EO " start func fails:0x%08x",
659  eo, ret);
660  /* user error handler might change error from own eo-start */
661  if (ret != EM_OK)
662  goto eo_start_error;
663  }
664 
665  if (eo_elem->start_local_func != NULL) {
666  /*
667  * Notifications sent when the local start functions
668  * have completed.
669  */
670  ret = eo_start_local_req(eo_elem, num_notif, notif_tbl);
671 
672  if (unlikely(ret != EM_OK)) {
673  INTERNAL_ERROR(ret, EM_ESCOPE_EO_START,
674  "EO:%" PRI_EO " local start func fails",
675  eo);
676  /* Can't allow user err handler to change error here */
677  goto eo_start_error;
678  }
679  /*
680  * Note: Return here, queues will be enabled after the local
681  * start funcs complete.
682  * EO state changed to 'EM_EO_STATE_RUNNING' after successful
683  * completion of EO local starts on all cores.
684  */
685  return EM_OK;
686  }
687 
688  /*
689  * Enable all the EO's queues.
690  * Note: if local start functions are given then enable can be done only
691  * after they have been run on each core.
692  */
693  ret = queue_enable_all(eo_elem);
694  if (unlikely(ret != EM_OK))
695  goto eo_start_error;
696 
697  eo_elem->state = EM_EO_STATE_RUNNING;
698 
699  /* Send events buffered during the EO-start/local-start functions */
701 
702  if (num_notif > 0) {
703  /* Send notifications if requested */
704  ret = send_notifs(num_notif, notif_tbl);
705 
706  if (unlikely(ret != EM_OK)) {
707  ret = INTERNAL_ERROR(ret, EM_ESCOPE_EO_START,
708  "EO:%" PRI_EO " send notif fails",
709  eo);
710  /* user error handler might change error */
711  if (ret != EM_OK)
712  goto eo_start_error;
713  }
714  }
715 
716  return EM_OK;
717 
718 eo_start_error:
719  /* roll back state to allow EO delete */
720  eo_elem->state = EM_EO_STATE_ERROR;
721  return ret;
722 }
723 
725 em_eo_start_sync(em_eo_t eo, em_status_t *result, const em_eo_conf_t *conf)
726 {
727  em_locm_t *const locm = &em_locm;
728  eo_elem_t *const eo_elem = eo_elem_get(eo);
729  queue_elem_t *const save_q_elem = locm->current.q_elem;
730  queue_elem_t tmp_q_elem;
731  em_status_t ret;
732 
733  RETURN_ERROR_IF(eo_elem == NULL, EM_ERR_BAD_ARG, EM_ESCOPE_EO_START_SYNC,
734  "Invalid EO id %" PRI_EO "", eo);
735  RETURN_ERROR_IF(!eo_allocated(eo_elem),
736  EM_ERR_NOT_CREATED, EM_ESCOPE_EO_START_SYNC,
737  "EO not created:%" PRI_EO "", eo);
739  EM_ERR_BAD_STATE, EM_ESCOPE_EO_START_SYNC,
740  "EO invalid state, cannot start:%d", eo_elem->state);
741 
742  eo_elem->state = EM_EO_STATE_STARTING;
743 
744  /* Create a stash to buffer events sent during EO-start */
745  eo_elem->stash = eo_start_stash_create();
746  if (unlikely(eo_elem->stash == ODP_STASH_INVALID)) {
747  ret = INTERNAL_ERROR(EM_ERR, EM_ESCOPE_EO_START,
748  "EO:%" PRI_EO " start stash creation fails", eo);
749  /* roll back state to allow EO delete */
750  eo_elem->state = EM_EO_STATE_ERROR;
751  return ret;
752  }
753  /* This core is in the EO start function: buffer all sent events */
754  locm->start_eo_elem = eo_elem;
755  /*
756  * Use a tmp q_elem as the 'current q_elem' to enable calling
757  * em_eo_current() from the EO start functions.
758  * Before returning, restore the original 'current q_elem' from
759  * 'save_q_elem'.
760  */
761  memset(&tmp_q_elem, 0, sizeof(tmp_q_elem));
762  tmp_q_elem.eo = (uint16_t)(uintptr_t)eo;
763  locm->current.q_elem = &tmp_q_elem;
764  /* Call the global EO start function */
765  ret = eo_elem->start_func(eo_elem->eo_ctx, eo, conf);
766  /* Restore the original 'current q_elem' */
767  locm->current.q_elem = save_q_elem;
768  locm->start_eo_elem = NULL;
769 
770  /* Store the return value of the actual EO global start function */
771  if (result != NULL)
772  *result = ret;
773 
774  if (unlikely(ret != EM_OK)) {
775  ret = INTERNAL_ERROR(EM_ERR, EM_ESCOPE_EO_START_SYNC,
776  "EO:%" PRI_EO " start func fails:0x%08x",
777  eo, ret);
778  /* user error handler might change error from own eo-start */
779  if (ret != EM_OK) {
780  /* roll back state to allow EO delete */
781  eo_elem->state = EM_EO_STATE_ERROR;
782  return ret;
783  }
784  }
785 
786  if (eo_elem->start_local_func != NULL) {
787  /* Mark that a sync-API call is in progress */
788  locm->sync_api.in_progress = true;
789 
790  locm->start_eo_elem = eo_elem;
791  locm->current.q_elem = &tmp_q_elem;
792  /* Call the local start on this core */
793  ret = eo_elem->start_local_func(eo_elem->eo_ctx, eo);
794  /* Restore the original 'current q_elem' */
795  locm->current.q_elem = save_q_elem;
796  locm->start_eo_elem = NULL;
797 
798  if (unlikely(ret != EM_OK)) {
799  INTERNAL_ERROR(ret, EM_ESCOPE_EO_START_SYNC,
800  "EO:%" PRI_EO " local start func fails", eo);
801  /* Can't allow user err handler to change error here */
802  goto eo_start_sync_error;
803  }
804 
805  ret = eo_start_sync_local_req(eo_elem);
806  if (unlikely(ret != EM_OK)) {
807  INTERNAL_ERROR(ret, EM_ESCOPE_EO_START_SYNC,
808  "EO:%" PRI_EO " eo_start_sync_local_req", eo);
809  /* Can't allow user err handler to change error here */
810  goto eo_start_sync_error;
811  }
812 
813  /*
814  * Poll the core-local unscheduled control-queue for events.
815  * These events request the core to do a core-local operation (or nop).
816  * Poll and handle events until 'locm->sync_api.in_progress == false'
817  * indicating that this sync-API is 'done' on all concerned cores.
818  */
819  while (locm->sync_api.in_progress)
821 
822  /* Send events buffered during the EO-start/local-start funcs */
824  /*
825  * EO state changed to 'EO_STATE_RUNNING' after successful
826  * completion of EO local starts on all cores.
827  */
828  return EM_OK;
829  }
830 
831  /*
832  * Enable all the EO's queues.
833  * Note: if local start functions are given then enable can be done only
834  * after they have been run on each core.
835  */
836  ret = queue_enable_all(eo_elem);
837  if (unlikely(ret != EM_OK))
838  goto eo_start_sync_error;
839 
840  eo_elem->state = EM_EO_STATE_RUNNING;
841 
842  /* Send events buffered during the EO-start/local-start functions */
844  return EM_OK;
845 
846 eo_start_sync_error:
847  locm->sync_api.in_progress = false;
848  /* roll back state to allow EO delete */
849  eo_elem->state = EM_EO_STATE_ERROR;
850  return ret;
851 }
852 
854 em_eo_stop(em_eo_t eo, int num_notif, const em_notif_t notif_tbl[])
855 {
856  eo_elem_t *const eo_elem = eo_elem_get(eo);
857  em_status_t ret;
858 
859  RETURN_ERROR_IF(eo_elem == NULL || !eo_allocated(eo_elem),
860  EM_ERR_BAD_ARG, EM_ESCOPE_EO_STOP,
861  "Invalid EO:%" PRI_EO "", eo);
863  EM_ERR_BAD_STATE, EM_ESCOPE_EO_STOP,
864  "EO invalid state, cannot stop:%d", eo_elem->state);
865  ret = check_notif_tbl(num_notif, notif_tbl);
866  RETURN_ERROR_IF(ret != EM_OK, ret, EM_ESCOPE_EO_STOP,
867  "Invalid notif cfg given!");
868 
869  eo_elem->state = EM_EO_STATE_STOPPING;
870 
871  /*
872  * Disable all queues.
873  * It doesn't matter if some of the queues are already disabled.
874  */
875  queue_disable_all(eo_elem);
876 
877  /*
878  * Notifications sent when the local stop functions
879  * have completed. EO global stop called when all local stops have
880  * been completed. EO state changed to 'stopped' only after completing
881  * the EO global stop function.
882  */
883  ret = eo_stop_local_req(eo_elem, num_notif, notif_tbl);
884 
885  if (unlikely(ret != EM_OK)) {
886  eo_elem->state = EM_EO_STATE_ERROR;
887  INTERNAL_ERROR(ret, EM_ESCOPE_EO_STOP,
888  "EO:%" PRI_EO " local stop func fails", eo);
889  /* Can't allow user err handler to change error here */
890  return ret;
891  }
892 
893  return EM_OK;
894 }
895 
897 em_eo_stop_sync(em_eo_t eo)
898 {
899  em_locm_t *const locm = &em_locm;
900  eo_elem_t *const eo_elem = eo_elem_get(eo);
901  queue_elem_t *const save_q_elem = locm->current.q_elem;
902  queue_elem_t tmp_q_elem;
903  em_status_t ret;
904 
905  RETURN_ERROR_IF(eo_elem == NULL || !eo_allocated(eo_elem),
906  EM_ERR_BAD_ARG, EM_ESCOPE_EO_STOP_SYNC,
907  "Invalid EO:%" PRI_EO "", eo);
909  EM_ERR_BAD_STATE, EM_ESCOPE_EO_STOP_SYNC,
910  "EO invalid state, cannot stop:%d", eo_elem->state);
911 
912  /* Mark that a sync-API call is in progress */
913  locm->sync_api.in_progress = true;
914 
915  eo_elem->state = EM_EO_STATE_STOPPING;
916 
917  /*
918  * Disable all queues.
919  * It doesn't matter if some of the queues are already disabled.
920  */
921  ret = queue_disable_all(eo_elem);
922  if (unlikely(ret != EM_OK))
923  goto eo_stop_sync_error;
924 
925  /*
926  * Use a tmp q_elem as the 'current q_elem' to enable calling
927  * em_eo_current() from the EO stop functions.
928  * Before returning, restore the original 'current q_elem' from
929  * 'save_q_elem'.
930  */
931  memset(&tmp_q_elem, 0, sizeof(tmp_q_elem));
932  tmp_q_elem.eo = (uint16_t)(uintptr_t)eo;
933 
934  if (eo_elem->stop_local_func != NULL) {
935  locm->current.q_elem = &tmp_q_elem;
936  /* Call the local stop on this core */
937  ret = eo_elem->stop_local_func(eo_elem->eo_ctx, eo_elem->eo);
938  /* Restore the original 'current q_elem' */
939  locm->current.q_elem = save_q_elem;
940  if (unlikely(ret != EM_OK))
941  goto eo_stop_sync_error;
942  }
943 
944  /*
945  * Notifications sent when the local stop functions have completed.
946  * EO global stop called when all local stops have been completed.
947  * EO state changed to 'stopped' only after completing the EO global
948  * stop function.
949  */
950  ret = eo_stop_sync_local_req(eo_elem);
951 
952  if (unlikely(ret != EM_OK)) {
953  eo_elem->state = EM_EO_STATE_ERROR;
954  INTERNAL_ERROR(ret, EM_ESCOPE_EO_STOP_SYNC,
955  "EO:%" PRI_EO " local stop func fails", eo);
956  /* Can't allow user err handler to change error here */
957  goto eo_stop_sync_error;
958  }
959 
960  /*
961  * Poll the core-local unscheduled control-queue for events.
962  * These events request the core to do a core-local operation (or nop).
963  * Poll and handle events until 'locm->sync_api.in_progress == false'
964  * indicating that this sync-API is 'done' on all concerned cores.
965  */
966  while (locm->sync_api.in_progress)
968 
969  /* Change state here to allow em_eo_delete() from EO global stop */
970  eo_elem->state = EM_EO_STATE_CREATED; /* == stopped */
971 
972  locm->current.q_elem = &tmp_q_elem;
973  /*
974  * Call the Global EO stop function now that all
975  * EO local stop functions are done.
976  */
977  ret = eo_elem->stop_func(eo_elem->eo_ctx, eo);
978  /* Restore the original 'current q_elem' */
979  locm->current.q_elem = save_q_elem;
980 
981  RETURN_ERROR_IF(ret != EM_OK, ret, EM_ESCOPE_EO_STOP_SYNC,
982  "EO:%" PRI_EO " stop-func failed", eo);
983  /*
984  * Note: the EO might not be available after this if the EO global stop
985  * called em_eo_delete()!
986  */
987  return EM_OK;
988 
989 eo_stop_sync_error:
990  locm->sync_api.in_progress = false;
991  return INTERNAL_ERROR(ret, EM_ESCOPE_EO_STOP_SYNC,
992  "Failure: EO:%" PRI_EO "", eo);
993 }
994 
995 em_eo_t
997 {
998  return eo_current();
999 }
1000 
1001 void *
1003 {
1004  const eo_elem_t *eo_elem = eo_elem_get(eo);
1005  em_eo_state_t eo_state;
1006 
1007  if (unlikely(EM_CHECK_LEVEL > 0 && eo_elem == NULL)) {
1008  INTERNAL_ERROR(EM_ERR_BAD_ARG, EM_ESCOPE_EO_GET_CONTEXT,
1009  "Invalid EO:%" PRI_EO "", eo);
1010  return NULL;
1011  }
1012 
1013  if (unlikely(EM_CHECK_LEVEL >= 2 && !eo_allocated(eo_elem))) {
1014  INTERNAL_ERROR(EM_ERR_NOT_CREATED, EM_ESCOPE_EO_GET_CONTEXT,
1015  "EO:%" PRI_EO " not created!", eo);
1016  return NULL;
1017  }
1018 
1019  eo_state = eo_elem->state;
1020  if (unlikely(EM_CHECK_LEVEL > 0 && eo_state < EM_EO_STATE_CREATED)) {
1021  INTERNAL_ERROR(EM_ERR_BAD_STATE, EM_ESCOPE_EO_GET_CONTEXT,
1022  "Invalid EO state: EO:%" PRI_EO " state:%d",
1023  eo, eo_state);
1024  return NULL;
1025  }
1026 
1027  return eo_elem->eo_ctx;
1028 }
1029 
1031 em_eo_get_state(em_eo_t eo)
1032 {
1033  const eo_elem_t *eo_elem = eo_elem_get(eo);
1034 
1035  if (unlikely(EM_CHECK_LEVEL > 0 && eo_elem == NULL)) {
1036  INTERNAL_ERROR(EM_ERR_BAD_ARG, EM_ESCOPE_EO_GET_STATE,
1037  "Invalid EO:%" PRI_EO "", eo);
1038  return EM_EO_STATE_UNDEF;
1039  }
1040 
1041  if (unlikely(EM_CHECK_LEVEL >= 2 && !eo_allocated(eo_elem))) {
1042  INTERNAL_ERROR(EM_ERR_NOT_CREATED, EM_ESCOPE_EO_GET_STATE,
1043  "EO:%" PRI_EO " not created", eo);
1044  return EM_EO_STATE_UNDEF;
1045  }
1046 
1047  return eo_elem->state;
1048 }
1049 
1050 em_eo_t
1051 em_eo_get_first(unsigned int *num)
1052 {
1053  _eo_tbl_iter_idx = 0; /* reset iteration */
1054  const unsigned int eo_cnt = eo_count();
1055 
1056  if (num)
1057  *num = eo_cnt;
1058 
1059  if (eo_cnt == 0) {
1060  _eo_tbl_iter_idx = EM_MAX_EOS; /* UNDEF = _get_next() */
1061  return EM_EO_UNDEF;
1062  }
1063 
1064  /* find first */
1065  while (!eo_allocated(&em_shm->eo_tbl.eo_elem[_eo_tbl_iter_idx])) {
1066  _eo_tbl_iter_idx++;
1067  if (_eo_tbl_iter_idx >= EM_MAX_EOS)
1068  return EM_EO_UNDEF;
1069  }
1070 
1071  return eo_idx2hdl(_eo_tbl_iter_idx);
1072 }
1073 
1074 em_eo_t
1076 {
1077  if (_eo_tbl_iter_idx >= EM_MAX_EOS - 1)
1078  return EM_EO_UNDEF;
1079 
1080  _eo_tbl_iter_idx++;
1081 
1082  /* find next */
1083  while (!eo_allocated(&em_shm->eo_tbl.eo_elem[_eo_tbl_iter_idx])) {
1084  _eo_tbl_iter_idx++;
1085  if (_eo_tbl_iter_idx >= EM_MAX_EOS)
1086  return EM_EO_UNDEF;
1087  }
1088 
1089  return eo_idx2hdl(_eo_tbl_iter_idx);
1090 }
1091 
1092 em_queue_t
1093 em_eo_queue_get_first(unsigned int *num, em_eo_t eo)
1094 {
1095  const eo_elem_t *eo_elem = eo_elem_get(eo);
1096  const unsigned int max_queues = em_shm->opt.queue.max_num;
1097 
1098  if (unlikely(eo_elem == NULL || !eo_allocated(eo_elem))) {
1099  INTERNAL_ERROR(EM_ERR_BAD_ARG, EM_ESCOPE_EO_QUEUE_GET_FIRST,
1100  "Invalid EO:%" PRI_EO "", eo);
1101  if (num)
1102  *num = 0;
1103  return EM_QUEUE_UNDEF;
1104  }
1105 
1106  const unsigned int num_queues = env_atomic32_get(&eo_elem->num_queues);
1107 
1108  if (num)
1109  *num = num_queues;
1110 
1111  if (num_queues == 0) {
1112  _eo_q_iter_idx = max_queues; /* UNDEF = _get_next() */
1113  return EM_QUEUE_UNDEF;
1114  }
1115 
1116  /*
1117  * An 'eo_elem' contains a linked list with all it's queues. That list
1118  * might be modified while processing this iteration, so instead we just
1119  * go through the whole queue table.
1120  * This is potentially a slow implementation and perhaps worth
1121  * re-thinking?
1122  */
1123  const queue_tbl_t *const queue_tbl = &em_shm->queue_tbl;
1124 
1125  _eo_q_iter_idx = 0; /* reset list */
1126  _eo_q_iter_eo = eo;
1127 
1128  /* find first */
1129  while (!queue_allocated(&queue_tbl->queue_elem[_eo_q_iter_idx]) ||
1130  queue_tbl->queue_elem[_eo_q_iter_idx].eo != (uint16_t)(uintptr_t)_eo_q_iter_eo) {
1131  _eo_q_iter_idx++;
1132  if (_eo_q_iter_idx >= max_queues)
1133  return EM_QUEUE_UNDEF;
1134  }
1135 
1136  return queue_idx2hdl(_eo_q_iter_idx);
1137 }
1138 
1139 em_queue_t
1141 {
1142  const unsigned int max_queues = em_shm->opt.queue.max_num;
1143 
1144  if (_eo_q_iter_idx >= max_queues - 1)
1145  return EM_QUEUE_UNDEF;
1146 
1147  _eo_q_iter_idx++;
1148 
1149  const queue_tbl_t *const queue_tbl = &em_shm->queue_tbl;
1150 
1151  /* find next */
1152  while (!queue_allocated(&queue_tbl->queue_elem[_eo_q_iter_idx]) ||
1153  queue_tbl->queue_elem[_eo_q_iter_idx].eo != (uint16_t)(uintptr_t)_eo_q_iter_eo) {
1154  _eo_q_iter_idx++;
1155  if (_eo_q_iter_idx >= max_queues)
1156  return EM_QUEUE_UNDEF;
1157  }
1158 
1159  return queue_idx2hdl(_eo_q_iter_idx);
1160 }
1161 
1162 uint64_t em_eo_to_u64(em_eo_t eo)
1163 {
1164  return (uint64_t)eo;
1165 }
eo_elem_t::queue_list
list_node_t queue_list
Definition: em_eo_types.h:75
em_eo_remove_queue_sync
em_status_t em_eo_remove_queue_sync(em_eo_t eo, em_queue_t queue)
Definition: event_machine_eo.c:409
EM_EO_STATE_CREATED
@ EM_EO_STATE_CREATED
Definition: event_machine_types.h:299
EM_OK
#define EM_OK
Definition: event_machine_types.h:329
em_eo_start
em_status_t em_eo_start(em_eo_t eo, em_status_t *result, const em_eo_conf_t *conf, int num_notif, const em_notif_t notif_tbl[])
Definition: event_machine_eo.c:604
eo_elem_t::receive_multi_func
em_receive_multi_func_t receive_multi_func
Definition: em_eo_types.h:66
EM_ERR
@ EM_ERR
Definition: event_machine_hw_types.h:312
EM_EO_STATE_STARTING
@ EM_EO_STATE_STARTING
Definition: event_machine_types.h:301
EM_EO_STATE_RUNNING
@ EM_EO_STATE_RUNNING
Definition: event_machine_types.h:303
em_eo_multircv_param_t
Definition: event_machine_eo.h:338
em_eo_multircv_param_t::local_stop
em_stop_local_func_t local_stop
Definition: event_machine_eo.h:360
em_eo_remove_queue
em_status_t em_eo_remove_queue(em_eo_t eo, em_queue_t queue, int num_notif, const em_notif_t notif_tbl[])
Definition: event_machine_eo.c:352
EM_ERR_NOT_CREATED
@ EM_ERR_NOT_CREATED
Definition: event_machine_hw_types.h:274
eo_elem_t::eo
em_eo_t eo
Definition: em_eo_types.h:81
queue_elem_t::eo_elem
eo_elem_t * eo_elem
Definition: em_queue_types.h:249
EM_QUEUE_TYPE_PARALLEL
@ EM_QUEUE_TYPE_PARALLEL
Definition: event_machine_hw_types.h:117
eo_elem_t::eo_ctx
void * eo_ctx
Definition: em_eo_types.h:71
em_eo_get_name
size_t em_eo_get_name(em_eo_t eo, char *name, size_t maxlen)
Definition: event_machine_eo.c:236
EM_ERR_TOO_SMALL
@ EM_ERR_TOO_SMALL
Definition: event_machine_hw_types.h:296
em_stop_func_t
em_status_t(* em_stop_func_t)(void *eo_ctx, em_eo_t eo)
Definition: event_machine_eo.h:301
em_eo_get_context
void * em_eo_get_context(em_eo_t eo)
Definition: event_machine_eo.c:1002
em_locm
ENV_LOCAL em_locm_t em_locm
PRI_EO
#define PRI_EO
Definition: event_machine_types.h:97
EM_EO_MULTIRCV_MAX_EVENTS
#define EM_EO_MULTIRCV_MAX_EVENTS
Definition: event_machine_hw_config.h:249
eo_elem_t::num_queues
env_atomic32_t num_queues
Definition: em_eo_types.h:77
em_locm_t::current
em_locm_current_t current
Definition: em_mem.h:190
EM_EO_STATE_UNDEF
@ EM_EO_STATE_UNDEF
Definition: event_machine_types.h:297
EM_MAX_EOS
#define EM_MAX_EOS
Definition: event_machine_config.h:149
EM_EO_STATE_ERROR
@ EM_EO_STATE_ERROR
Definition: event_machine_types.h:307
EM_ERR_ALLOC_FAILED
@ EM_ERR_ALLOC_FAILED
Definition: event_machine_hw_types.h:287
em_locm_t::sync_api
sync_api_t sync_api
Definition: em_mem.h:236
EM_EO_UNDEF
#define EM_EO_UNDEF
Definition: event_machine_types.h:95
eo_elem_t::stop_local_func
em_stop_local_func_t stop_local_func
Definition: em_eo_types.h:59
em_eo_add_queue_sync
em_status_t em_eo_add_queue_sync(em_eo_t eo, em_queue_t queue)
Definition: event_machine_eo.c:344
EM_QUEUE_TYPE_ATOMIC
@ EM_QUEUE_TYPE_ATOMIC
Definition: event_machine_hw_types.h:112
em_eo_unregister_error_handler
em_status_t em_eo_unregister_error_handler(em_eo_t eo)
Definition: event_machine_eo.c:585
eo_add_queue
em_status_t eo_add_queue(eo_elem_t *const eo_elem, queue_elem_t *const q_elem)
Definition: em_eo.c:164
em_eo_multircv_param_t::local_start
em_start_local_func_t local_start
Definition: event_machine_eo.h:349
eo_elem_t::stop_func
em_stop_func_t stop_func
Definition: em_eo_types.h:57
queue_elem_t::state
queue_state_t state
Definition: em_queue_types.h:210
check_notif_tbl
em_status_t check_notif_tbl(const int num_notif, const em_notif_t notif_tbl[])
Check that the usage of a table of notifications is valid.
Definition: em_internal_event.c:477
em_queue_type_t
uint32_t em_queue_type_t
Definition: event_machine_types.h:168
eo_elem_t::error_handler_func
em_error_handler_t error_handler_func
Definition: em_eo_types.h:69
em_eo_current
em_eo_t em_eo_current(void)
Definition: event_machine_eo.c:996
queue_enable_all
em_status_t queue_enable_all(eo_elem_t *const eo_elem)
Definition: em_queue.c:1340
em_eo_remove_queue_all_sync
em_status_t em_eo_remove_queue_all_sync(em_eo_t eo, int delete_queues)
Definition: event_machine_eo.c:517
em_eo_get_next
em_eo_t em_eo_get_next(void)
Definition: event_machine_eo.c:1075
em_eo_multircv_param_t::start
em_start_func_t start
Definition: event_machine_eo.h:344
eo_elem_t
Definition: em_eo_types.h:47
eo_start_stash_create
odp_stash_t eo_start_stash_create(void)
Create a stash used to buffer events sent during EO-start.
Definition: em_eo.c:1398
EM_TRUE
#define EM_TRUE
Definition: event_machine_types.h:53
em_start_func_t
em_status_t(* em_start_func_t)(void *eo_ctx, em_eo_t eo, const em_eo_conf_t *conf)
Definition: event_machine_eo.h:227
poll_unsched_ctrl_queue
void poll_unsched_ctrl_queue(void)
Poll EM's internal unscheduled control queues during dispatch.
Definition: em_internal_event.c:540
RETURN_ERROR_IF
#define RETURN_ERROR_IF(cond, error, escope, fmt,...)
Definition: em_error.h:50
EM_EO_NAME_LEN
#define EM_EO_NAME_LEN
Definition: event_machine_config.h:155
eo_elem_t::state
em_eo_state_t state
Definition: em_eo_types.h:51
em_eo_delete
em_status_t em_eo_delete(em_eo_t eo)
Definition: event_machine_eo.c:205
EM_ERR_BAD_ID
@ EM_ERR_BAD_ID
Definition: event_machine_hw_types.h:265
queue_enable
em_status_t queue_enable(queue_elem_t *const q_elem)
Definition: em_queue.c:1319
INTERNAL_ERROR
#define INTERNAL_ERROR(error, escope, fmt,...)
Definition: em_error.h:43
em_eo_multircv_param_t::max_events
int max_events
Definition: event_machine_eo.h:373
queue_tbl_t
Definition: em_queue_types.h:271
EM_FALSE
#define EM_FALSE
Definition: event_machine_types.h:54
sync_api_t::in_progress
bool in_progress
Definition: em_sync_api_types.h:65
em_eo_register_error_handler
em_status_t em_eo_register_error_handler(em_eo_t eo, em_error_handler_t handler)
Definition: event_machine_eo.c:566
em_start_local_func_t
em_status_t(* em_start_local_func_t)(void *eo_ctx, em_eo_t eo)
Definition: event_machine_eo.h:259
EM_EO_STATE_STOPPING
@ EM_EO_STATE_STOPPING
Definition: event_machine_types.h:305
em_escope_t
uint32_t em_escope_t
Definition: event_machine_types.h:348
queue_disable
em_status_t queue_disable(queue_elem_t *const q_elem)
Definition: em_queue.c:1360
em_eo_start_sync
em_status_t em_eo_start_sync(em_eo_t eo, em_status_t *result, const em_eo_conf_t *conf)
Definition: event_machine_eo.c:725
em_status_t
uint32_t em_status_t
Definition: event_machine_types.h:321
ODP_PACKED::q_elem
queue_elem_t * q_elem
Definition: em_mem.h:174
EM_QUEUE_TYPE_LOCAL
@ EM_QUEUE_TYPE_LOCAL
Definition: event_machine_hw_types.h:131
PRI_QUEUE
#define PRI_QUEUE
Definition: event_machine_types.h:109
EM_QUEUE_STATE_READY
@ EM_QUEUE_STATE_READY
Definition: em_queue_types.h:137
EM_CHECK_LEVEL
#define EM_CHECK_LEVEL
Definition: event_machine_config.h:253
em_error_handler_t
em_status_t(* em_error_handler_t)(em_eo_t eo, em_status_t error, em_escope_t escope, va_list args)
Definition: event_machine_error.h:94
em_eo_stop
em_status_t em_eo_stop(em_eo_t eo, int num_notif, const em_notif_t notif_tbl[])
Definition: event_machine_eo.c:854
em_shm
em_shm_t * em_shm
Definition: event_machine_init.c:41
EM_QUEUE_UNDEF
#define EM_QUEUE_UNDEF
Definition: event_machine_types.h:107
em_include.h
EM_QUEUE_TYPE_PARALLEL_ORDERED
@ EM_QUEUE_TYPE_PARALLEL_ORDERED
Definition: event_machine_hw_types.h:122
em_eo_multircv_param_t::receive_multi
em_receive_multi_func_t receive_multi
Definition: event_machine_eo.h:364
eo_elem_t::lock
env_spinlock_t lock
Definition: em_eo_types.h:73
em_eo_multircv_param_t::eo_ctx
const void * eo_ctx
Definition: event_machine_eo.h:378
em_stop_local_func_t
em_status_t(* em_stop_local_func_t)(void *eo_ctx, em_eo_t eo)
Definition: event_machine_eo.h:280
queue_disable_all
em_status_t queue_disable_all(eo_elem_t *const eo_elem)
Definition: em_queue.c:1381
EM_ERR_BAD_TYPE
@ EM_ERR_BAD_TYPE
Definition: event_machine_hw_types.h:267
queue_elem_t::eo
uint16_t eo
Definition: em_queue_types.h:222
em_queue_get_type
em_queue_type_t em_queue_get_type(em_queue_t queue)
Definition: event_machine_queue.c:201
EM_CHECK_INIT_CALLED
#define EM_CHECK_INIT_CALLED
Definition: em_include.h:69
em_eo_multircv_param_t::stop
em_stop_func_t stop
Definition: event_machine_eo.h:355
send_notifs
em_status_t send_notifs(const int num_notif, const em_notif_t notif_tbl[])
Helper func to send notifications events.
Definition: em_internal_event.c:423
EM_ERR_BAD_POINTER
@ EM_ERR_BAD_POINTER
Definition: event_machine_hw_types.h:271
em_notif_t
Definition: event_machine_types.h:268
eo_elem_t::start_local_func
em_start_local_func_t start_local_func
Definition: em_eo_types.h:55
eo_start_send_buffered_events
void eo_start_send_buffered_events(eo_elem_t *const eo_elem)
Definition: em_eo.c:471
em_receive_func_t
void(* em_receive_func_t)(void *eo_ctx, em_event_t event, em_event_type_t type, em_queue_t queue, void *q_ctx)
Definition: event_machine_eo.h:149
em_eo_to_u64
uint64_t em_eo_to_u64(em_eo_t eo)
Definition: event_machine_eo.c:1162
em_eo_create
em_eo_t em_eo_create(const char *name, em_start_func_t start, em_start_local_func_t local_start, em_stop_func_t stop, em_stop_local_func_t local_stop, em_receive_func_t receive, const void *eo_ctx)
Definition: event_machine_eo.c:40
eo_elem_t::start_func
em_start_func_t start_func
Definition: em_eo_types.h:53
em_eo_multircv_param_init
void em_eo_multircv_param_init(em_eo_multircv_param_t *param)
Definition: event_machine_eo.c:108
em_eo_queue_get_first
em_queue_t em_eo_queue_get_first(unsigned int *num, em_eo_t eo)
Definition: event_machine_eo.c:1093
em_eo_queue_get_next
em_queue_t em_eo_queue_get_next(void)
Definition: event_machine_eo.c:1140
em_eo_stop_sync
em_status_t em_eo_stop_sync(em_eo_t eo)
Definition: event_machine_eo.c:897
EM_ERR_BAD_ARG
@ EM_ERR_BAD_ARG
Definition: event_machine_hw_types.h:261
em_eo_add_queue
em_status_t em_eo_add_queue(em_eo_t eo, em_queue_t queue, int num_notif, const em_notif_t notif_tbl[])
Definition: event_machine_eo.c:336
em_eo_multircv_param_t::__internal_check
uint32_t __internal_check
Definition: event_machine_eo.h:386
em_locm_t
Definition: em_mem.h:188
em_eo_state_t
em_eo_state_t
Definition: event_machine_types.h:295
ENV_LOCAL
#define ENV_LOCAL
Definition: environment.h:57
EM_ERR_BAD_STATE
@ EM_ERR_BAD_STATE
Definition: event_machine_hw_types.h:263
em_eo_conf_t
Definition: event_machine_types.h:242
em_eo_get_state
em_eo_state_t em_eo_get_state(em_eo_t eo)
Definition: event_machine_eo.c:1031
eo_elem_t::receive_func
em_receive_func_t receive_func
Definition: em_eo_types.h:64
em_eo_get_first
em_eo_t em_eo_get_first(unsigned int *num)
Definition: event_machine_eo.c:1051
queue_elem_t
Definition: em_queue_types.h:180
em_locm_t::start_eo_elem
eo_elem_t * start_eo_elem
Definition: em_mem.h:225
em_eo_find
em_eo_t em_eo_find(const char *name)
Definition: event_machine_eo.c:265
em_eo_remove_queue_all
em_status_t em_eo_remove_queue_all(em_eo_t eo, int delete_queues, int num_notif, const em_notif_t notif_tbl[])
Definition: event_machine_eo.c:486
em_eo_create_multircv
em_eo_t em_eo_create_multircv(const char *name, const em_eo_multircv_param_t *param)
Definition: event_machine_eo.c:122
eo_elem_t::stash
odp_stash_t stash
Definition: em_eo_types.h:79