EM-ODP  3.7.0
Event Machine on ODP
em_dispatcher_inline.h
Go to the documentation of this file.
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright (c) 2023, Nokia Solutions and Networks
4  */
5 
6 /**
7  * @file
8  * EM internal dispatcher functions
9  */
10 
11 #ifndef EM_DISPATCHER_INLINE_H_
12 #define EM_DISPATCHER_INLINE_H_
13 
15 
16 #ifdef __cplusplus
17 extern "C" {
18 #endif
19 
20 static inline void
21 dispatch_multi_receive(em_event_t ev_tbl[], event_hdr_t *ev_hdr_tbl[],
22  const int num_events, queue_elem_t *const q_elem,
23  const bool check_local_qs);
24 static inline void
25 dispatch_single_receive(em_event_t ev_tbl[], event_hdr_t *ev_hdr_tbl[],
26  const int num_events, queue_elem_t *const q_elem,
27  const bool check_local_qs);
28 
29 /**
30  * Helper: Remove undef-event entries from ev_tbl[]
31  */
32 static inline int pack_ev_tbl(em_event_t ev_tbl[/*in,out*/], const int num)
33 {
34  if (num == 1) {
35  if (ev_tbl[0] != EM_EVENT_UNDEF)
36  return 1;
37  else
38  return 0;
39  }
40 
41  int pack = 0;
42 
43  for (int i = 0; i < num; i++) {
44  if (ev_tbl[i] != EM_EVENT_UNDEF) {
45  if (pack < i)
46  ev_tbl[pack] = ev_tbl[i];
47  pack++;
48  }
49  }
50 
51  return pack;
52 }
53 
54 static inline uint64_t debug_timestamp(void)
55 {
56  /* compile time selection */
57  return EM_DEBUG_TIMESTAMP_ENABLE == 1 ? odp_time_global_ns() : odp_time_global_strict_ns();
58 }
59 
60 /**
61  * Run all dispatch enter-callback functions.
62  *
63  * @note Neither EO-receive nor any further enter-callbacks will be called if
64  * all events have been dropped by the callbacks already run, i.e.
65  * no callback or EO-receive will be called with 'num=0'.
66  *
67  * @param eo EO handle
68  * @param eo_ctx EO context data
69  * @param[in,out] ev_tbl Event table
70  * @param num_events Number of events in the event table
71  * @param queue Queue from which this event came from
72  * @param q_ctx Queue context data
73  *
74  * @return The number of events in ev_tbl[] after all dispatch enter callbacks
75  */
76 static inline int
77 dispatch_enter_cb(em_eo_t eo, void **eo_ctx,
78  em_event_t ev_tbl[/*in,out*/], const int num_events,
79  em_queue_t *queue, void **q_ctx)
80 {
81  const hook_tbl_t *cb_tbl = em_shm->dispatch_enter_cb_tbl;
82  em_dispatch_enter_func_t dispatch_enter_fn;
83  int num = num_events;
84 
85  for (int i = 0; i < EM_CALLBACKS_MAX && num > 0; i++) {
86  dispatch_enter_fn = cb_tbl->tbl[i].disp_enter;
87  if (dispatch_enter_fn == NULL)
88  break;
89  dispatch_enter_fn(eo, eo_ctx, ev_tbl, num, queue, q_ctx);
90  num = pack_ev_tbl(ev_tbl, num);
91  }
92 
93  return num;
94 }
95 
96 /**
97  * Run all dispatch exit-callback functions.
98  *
99  * @param eo EO handle
100  */
101 static inline void
102 dispatch_exit_cb(em_eo_t eo)
103 {
104  const hook_tbl_t *dispatch_exit_cb_tbl = em_shm->dispatch_exit_cb_tbl;
105  em_dispatch_exit_func_t dispatch_exit_fn;
106 
107  for (int i = 0; i < EM_CALLBACKS_MAX; i++) {
108  dispatch_exit_fn = dispatch_exit_cb_tbl->tbl[i].disp_exit;
109  if (dispatch_exit_fn == NULL)
110  return;
111  dispatch_exit_fn(eo);
112  }
113 }
114 
115 static inline void
116 call_eo_receive_fn(const em_eo_t eo, const em_receive_func_t eo_receive_func,
117  em_event_t event, event_hdr_t *ev_hdr, queue_elem_t *const q_elem)
118 {
119  em_locm_t *const locm = &em_locm;
120  em_queue_t queue = (em_queue_t)(uintptr_t)q_elem->queue;
121  void *queue_ctx = q_elem->context;
122  void *eo_ctx = q_elem->eo_ctx;
123  int num = 1;
124 
125  locm->current.rcv_multi_cnt = 1;
126  /* Check and set core local event group (before dispatch callback(s)) */
127  event_group_set_local(ev_hdr->egrp, ev_hdr->egrp_gen, 1);
128 
129  if (EM_DISPATCH_CALLBACKS_ENABLE) {
130  em_event_t ev_tbl[1] = {event};
131 
132  num = dispatch_enter_cb(eo, &eo_ctx, ev_tbl/*in,out*/, 1,
133  &queue, &queue_ctx);
134  if (num && ev_tbl[0] != event) {
135  /* user-callback changed event: update event & hdr */
136  event = ev_tbl[0];
137  ev_hdr = event_to_hdr(event);
138  }
139  }
140 
141  if (likely(num == 1)) {
142  em_event_type_t event_type = ev_hdr->event_type;
143  /*
144  * Call the EO receive function
145  * (only if the dispatch callback(s) did not free the event)
146  */
147  eo_receive_func(eo_ctx, event, event_type,
148  queue, queue_ctx);
149  }
150 
151  if (EM_DISPATCH_CALLBACKS_ENABLE)
152  dispatch_exit_cb(eo);
153 
154  /*
155  * Event belongs to an event_group, update the count and
156  * if requested send notifications
157  */
158  if (locm->current.egrp != EM_EVENT_GROUP_UNDEF) {
159  /*
160  * Atomically decrease the event group count.
161  * If the new count is zero, send notification events.
162  */
163  event_group_count_decrement(1);
164  }
166 }
167 
168 /**
169  * @note All events belong to the same event group
170  * @note Event type dropped from multi-event receive - use em_event_get_type()
171  */
172 static inline void
173 call_eo_receive_multi_fn(const em_eo_t eo,
174  const em_receive_multi_func_t eo_receive_multi_func,
175  em_event_t ev_tbl[], event_hdr_t *ev_hdr_tbl[],
176  const int num_events, queue_elem_t *const q_elem)
177 {
178  em_locm_t *const locm = &em_locm;
179  em_queue_t queue = (em_queue_t)(uintptr_t)q_elem->queue;
180  void *queue_ctx = q_elem->context;
181  void *eo_ctx = q_elem->eo_ctx;
182  int num = num_events;
183 
184  locm->current.rcv_multi_cnt = num_events;
185  /* Check and set core local event group (before dispatch callback(s)) */
186  event_group_set_local(ev_hdr_tbl[0]->egrp, ev_hdr_tbl[0]->egrp_gen,
187  num_events);
188 
189  if (EM_DISPATCH_CALLBACKS_ENABLE)
190  num = dispatch_enter_cb(eo, &eo_ctx,
191  ev_tbl/*in,out*/, num_events,
192  &queue, &queue_ctx);
193  if (likely(num > 0)) {
194  /*
195  * Call the EO multi-event receive function
196  * (only if the dispatch callback(s) did not free all events)
197  */
198  eo_receive_multi_func(eo_ctx, ev_tbl, num, queue, queue_ctx);
199  }
200 
201  if (EM_DISPATCH_CALLBACKS_ENABLE)
202  dispatch_exit_cb(eo);
203 
204  /*
205  * Event belongs to an event_group, update the count and
206  * if requested send notifications
207  */
208  if (locm->current.egrp != EM_EVENT_GROUP_UNDEF) {
209  /*
210  * Atomically decrease the event group count.
211  * If the new count is zero, send notification events.
212  */
213  event_group_count_decrement(num_events);
214  }
216 }
217 
218 /**
219  * @brief Helper to dispatch_local_queues() for a single event
220  */
221 static inline void
222 _dispatch_local(stash_entry_t entry)
223 {
224  em_locm_t *const locm = &em_locm;
225  odp_event_t odp_event;
226  em_event_t event;
227  event_hdr_t *ev_hdr;
228 
229  /* dst local queue */
230  const int qidx = entry.qidx;
231  const em_queue_t queue = queue_idx2hdl(qidx);
232  queue_elem_t *const q_elem = queue_elem_get(queue);
233 
234  locm->current.q_elem = q_elem; /* before event_init_... for ESV error prints */
235 
236  odp_event = (odp_event_t)(uintptr_t)entry.evptr;
237  /* Event might originate from outside (via polled pktio) of EM and need init */
238  event = event_init_odp(odp_event, true/*is_extev*/, &ev_hdr/*out*/);
239 
240  if (unlikely(!q_elem || q_elem->state != EM_QUEUE_STATE_READY)) {
241  em_free(event);
242  /* Consider removing the logging */
243  EM_LOG(EM_LOG_PRINT,
244  "EM info: %s(): localQ:%" PRI_QUEUE ":\n"
245  "Not ready - state:%d drop:1 event\n",
246  __func__, queue, q_elem ? q_elem->state : 0);
247  return;
248  }
249 
250  if (q_elem->flags.use_multi_rcv)
251  dispatch_multi_receive(&event, &ev_hdr, 1, q_elem, false);
252  else
253  dispatch_single_receive(&event, &ev_hdr, 1, q_elem, false);
254 }
255 
256 /**
257  * @brief Helper to dispatch_local_queues() for multiple events
258  */
259 static inline void
260 _dispatch_local_multi(const stash_entry_t entry_tbl[], const int num)
261 {
262  em_locm_t *const locm = &em_locm;
263  int idx = 0; /* index into ev_tbl[] & ev_hdr_tbl[] */
264  int ev_cnt; /* number of events to the same local-queue */
265 
266  odp_event_t odp_evtbl[num];
267  em_event_t ev_tbl[num];
268  event_hdr_t *evhdr_tbl[num];
269 
270  for (int i = 0; i < num; i++)
271  odp_evtbl[i] = (odp_event_t)(uintptr_t)entry_tbl[i].evptr;
272 
273  /* Loop through 'num' events and dispatch in batches to local queues */
274  do {
275  /* dst local queue */
276  const int qidx = entry_tbl[idx].qidx;
277  const em_queue_t queue = queue_idx2hdl(qidx);
278  queue_elem_t *const q_elem = queue_elem_get(queue);
279  int i;
280 
281  locm->current.q_elem = q_elem; /* before event_init_... for ESV error prints */
282 
283  /*
284  * Count events sent to the same local queue,
285  * i < num <= EM_QUEUE_LOCAL_MULTI_MAX_BURST
286  */
287  for (i = idx + 1; i < num && entry_tbl[i].qidx == qidx; i++)
288  ;
289 
290  ev_cnt = i - idx; /* '1 to num' events */
291 
292  /* Events might originate from outside (via polled pktio) of EM and need init */
293  event_init_odp_multi(&odp_evtbl[idx], ev_tbl/*out*/, evhdr_tbl/*out*/,
294  ev_cnt, true/*is_extev*/);
295 
296  if (unlikely(!q_elem || q_elem->state != EM_QUEUE_STATE_READY)) {
297  em_free_multi(ev_tbl, ev_cnt);
298  /* Consider removing the logging */
299  EM_LOG(EM_LOG_PRINT,
300  "EM info: %s(): localQ:%" PRI_QUEUE ":\n"
301  "Not ready - state:%d drop:%d events\n",
302  __func__, queue, q_elem ? q_elem->state : 0, ev_cnt);
303  idx += ev_cnt;
304  continue;
305  }
306 
307  if (q_elem->flags.use_multi_rcv)
308  dispatch_multi_receive(ev_tbl, evhdr_tbl, ev_cnt, q_elem, false);
309  else
310  dispatch_single_receive(ev_tbl, evhdr_tbl, ev_cnt, q_elem, false);
311 
312  idx += ev_cnt;
313  } while (idx < num);
314 }
315 
316 static inline void
317 dispatch_local_queues(const stash_entry_t entry_tbl[], const int num)
318 {
319  if (num == 1)
320  _dispatch_local(entry_tbl[0]);
321  else
322  _dispatch_local_multi(entry_tbl, num);
323 }
324 
325 static inline void
326 check_local_queues(void)
327 {
328  em_locm_t *const locm = &em_locm;
329 
330  if (locm->local_queues.empty)
331  return;
332 
333  /*
334  * Check if the previous EO receive function sent events to a
335  * local queue ('EM_QUEUE_TYPE_LOCAL') - and if so, dispatch
336  * those events immediately.
337  */
339 
340  for (;;) {
342  locm->debug_ts[EM_DEBUG_TSP_SCHED_ENTRY] = debug_timestamp();
343 
344  int num = next_local_queue_events(entry_tbl /*[out]*/,
347  locm->debug_ts[EM_DEBUG_TSP_SCHED_RETURN] = debug_timestamp();
348 
349  if (num <= 0)
350  break;
351 
352  dispatch_local_queues(entry_tbl, num);
353  }
354 
355  /* Restore */
356  locm->current.q_elem = locm->current.sched_q_elem;
357 }
358 
359 /**
360  * Count events (hdrs) sent/tagged with the same event group
361  */
362 static inline int
363 count_same_evgroup(event_hdr_t *ev_hdr_tbl[], const unsigned int num)
364 {
365  if (unlikely(num < 2))
366  return num;
367 
368  const em_event_group_t egrp = ev_hdr_tbl[0]->egrp;
369  unsigned int i = 1; /* 2nd hdr */
370 
372  const int32_t egrp_gen = ev_hdr_tbl[0]->egrp_gen;
373 
374  for (; i < num &&
375  egrp == ev_hdr_tbl[i]->egrp &&
376  egrp_gen == ev_hdr_tbl[i]->egrp_gen; i++)
377  ;
378  } else {
379  for (; i < num &&
380  egrp == ev_hdr_tbl[i]->egrp; i++)
381  ;
382  }
383 
384  return i;
385 }
386 
387 static inline void
388 dispatch_multi_receive(em_event_t ev_tbl[], event_hdr_t *ev_hdr_tbl[],
389  const int num_events, queue_elem_t *const q_elem,
390  const bool check_local_qs)
391 {
392  em_locm_t *const locm = &em_locm;
393  const em_eo_t eo = (em_eo_t)(uintptr_t)q_elem->eo;
394  const em_receive_multi_func_t eo_rcv_multi_fn =
395  q_elem->receive_multi_func;
396  int idx = 0; /* index into ev_hdr_tbl[] */
397  int i;
398  int j;
399 
400  do {
401  /* count same event groups: 1 to num_events */
402  const int egrp_cnt = count_same_evgroup(&ev_hdr_tbl[idx],
403  num_events - idx);
404  const int max = q_elem->max_events;
405  const int num = MIN(egrp_cnt, max);
406  const int rounds = egrp_cnt / num;
407  const int left_over = egrp_cnt % num;
408 
409  if (check_local_qs) {
410  j = idx;
411  for (i = 0; i < rounds; i++) {
412  locm->event_burst_cnt -= num;
413  call_eo_receive_multi_fn(eo, eo_rcv_multi_fn,
414  &ev_tbl[j],
415  &ev_hdr_tbl[j],
416  num, q_elem);
417  check_local_queues();
418  j += num;
419  }
420  if (left_over) {
421  locm->event_burst_cnt = 0;
422  call_eo_receive_multi_fn(eo, eo_rcv_multi_fn,
423  &ev_tbl[j],
424  &ev_hdr_tbl[j],
425  left_over, q_elem);
426  check_local_queues();
427  }
428  } else {
429  j = idx;
430  for (i = 0; i < rounds; i++) {
431  call_eo_receive_multi_fn(eo, eo_rcv_multi_fn,
432  &ev_tbl[j],
433  &ev_hdr_tbl[j],
434  num, q_elem);
435  j += num;
436  }
437  if (left_over) {
438  call_eo_receive_multi_fn(eo, eo_rcv_multi_fn,
439  &ev_tbl[j],
440  &ev_hdr_tbl[j],
441  left_over, q_elem);
442  }
443  }
444 
445  idx += egrp_cnt;
446  } while (idx < num_events);
447 }
448 
449 static inline void
450 dispatch_single_receive(em_event_t ev_tbl[], event_hdr_t *ev_hdr_tbl[],
451  const int num_events, queue_elem_t *const q_elem,
452  const bool check_local_qs)
453 {
454  em_locm_t *const locm = &em_locm;
455  const em_eo_t eo = (em_eo_t)(uintptr_t)q_elem->eo;
456  const em_receive_func_t eo_rcv_fn = q_elem->receive_func;
457  int i;
458 
459  if (check_local_qs) {
460  for (i = 0; i < num_events; i++) {
461  locm->event_burst_cnt--;
462  call_eo_receive_fn(eo, eo_rcv_fn,
463  ev_tbl[i], ev_hdr_tbl[i],
464  q_elem);
465  check_local_queues();
466  }
467  } else {
468  for (i = 0; i < num_events; i++)
469  call_eo_receive_fn(eo, eo_rcv_fn,
470  ev_tbl[i], ev_hdr_tbl[i],
471  q_elem);
472  }
473 }
474 
475 /**
476  * Dispatch events - call the EO-receive functions and pass the
477  * events for processing
478  */
479 static inline void
480 dispatch_events(odp_event_t odp_evtbl[], const int num_events,
481  queue_elem_t *const q_elem)
482 {
483  em_locm_t *const locm = &em_locm;
484  const em_queue_type_t q_type = q_elem->type;
486 
487  if (q_type == EM_QUEUE_TYPE_ATOMIC)
488  sched_ctx_type = EM_SCHED_CONTEXT_TYPE_ATOMIC;
489  else if (q_type == EM_QUEUE_TYPE_PARALLEL_ORDERED)
490  sched_ctx_type = EM_SCHED_CONTEXT_TYPE_ORDERED;
491 
492  locm->current.sched_context_type = sched_ctx_type;
493  locm->current.sched_q_elem = q_elem;
494  locm->current.q_elem = q_elem; /* before event_init_... for ESV error prints */
495  /* here: locm->current.egrp == EM_EVENT_GROUP_UNDEF */
496 
497  event_hdr_t *evhdr_tbl[num_events];
498  em_event_t ev_tbl[num_events];
499 
500  /* Events might originate from outside of EM and need init */
501  event_init_odp_multi(odp_evtbl, ev_tbl/*out*/, evhdr_tbl/*out*/,
502  num_events, true/*is_extev*/);
503 
504  /*
505  * Call the Execution Object (EO) receive function.
506  * Scheduling context may be released during this.
507  */
508  if (q_elem->flags.use_multi_rcv)
509  dispatch_multi_receive(ev_tbl, evhdr_tbl, num_events,
510  q_elem, true);
511  else
512  dispatch_single_receive(ev_tbl, evhdr_tbl, num_events,
513  q_elem, true);
514 
515  /*
516  * Check for buffered events sent to output queues during the previous
517  * dispatch rounds. Currently buffered only for ordered sched context,
518  * use local var 'sched_ctx_type' since the type might have been changed
519  * from _ORDERED by 'em_ordered_processing_end()'.
520  */
522  sched_ctx_type == EM_SCHED_CONTEXT_TYPE_ORDERED &&
523  locm->output_queue_track.idx_cnt > 0)
524  output_queue_buffering_drain();
525 
526  locm->current.q_elem = NULL;
527  locm->current.sched_q_elem = NULL;
529 }
530 
531 static inline void
532 dispatch_poll_ctrl_queue(void)
533 {
534  const unsigned int poll_interval = em_shm->opt.dispatch.poll_ctrl_interval;
535 
536  /*
537  * Rate limit how often this core checks the unsched ctrl queue.
538  */
539 
540  if (poll_interval > 1) {
541  em_locm_t *const locm = &em_locm;
542 
543  locm->dispatch_cnt--;
544  if (locm->dispatch_cnt > 0)
545  return;
546  locm->dispatch_cnt = poll_interval;
547 
548  odp_time_t now = odp_time_global();
549  odp_time_t period = odp_time_diff(now, locm->dispatch_last_run);
550  odp_time_t poll_period = em_shm->opt.dispatch.poll_ctrl_interval_time;
551 
552  if (odp_time_cmp(period, poll_period) < 0)
553  return;
554  locm->dispatch_last_run = now;
555  }
556 
557  /* Poll internal unscheduled ctrl queues */
559 }
560 
561 /*
562  * Change the core state to idle and call idle hooks. If the core state changes,
563  * call to_idle hooks. If the core state is already idle, call while_idle hooks.
564  */
565 static inline void
566 to_idle(const em_dispatch_opt_t *opt)
567 {
568  if (EM_IDLE_HOOKS_ENABLE) {
569  em_locm_t *const locm = &em_locm;
570 
571  if (locm->idle_state == IDLE_STATE_ACTIVE) {
572  uint64_t to_idle_delay_ns = 0;
573 
575  to_idle_delay_ns = debug_timestamp() -
576  locm->debug_ts[EM_DEBUG_TSP_SCHED_ENTRY];
577  } else if (EM_SCHED_WAIT_ENABLE) {
578  to_idle_delay_ns = opt ? opt->wait_ns :
579  em_shm->opt.dispatch.sched_wait_ns;
580  } else if (opt) {
581  to_idle_delay_ns = opt->wait_ns;
582  }
583 
584  call_idle_hooks_to_idle(to_idle_delay_ns);
585  locm->idle_state = IDLE_STATE_IDLE;
586  } else if (locm->idle_state == IDLE_STATE_IDLE) {
587  call_idle_hooks_while_idle();
588  }
589  }
590 }
591 
592 /*
593  * Change the core state to active and call idle hooks. If the core state
594  * changes call to_active hooks. If the core state is already active no idle
595  * hooks will be called.
596  */
597 static inline void
598 to_active(void)
599 {
600  if (EM_IDLE_HOOKS_ENABLE) {
601  em_locm_t *const locm = &em_locm;
602 
603  if (locm->idle_state == IDLE_STATE_IDLE) {
604  call_idle_hooks_to_active();
606  }
607  }
608 }
609 
610 /**
611  * @brief Dispatcher calls the scheduler and requests events for processing
612  */
613 static inline int
614 dispatch_schedule(odp_queue_t *odp_queue /*out*/, uint64_t sched_wait,
615  odp_event_t odp_evtbl[/*out*/], int num)
616 {
617  int ret;
618 
620  em_locm.debug_ts[EM_DEBUG_TSP_SCHED_ENTRY] = debug_timestamp();
621 
622  ret = odp_schedule_multi(odp_queue, sched_wait, odp_evtbl, num);
623 
625  em_locm.debug_ts[EM_DEBUG_TSP_SCHED_RETURN] = debug_timestamp();
626 
627  return ret;
628 }
629 
630 static inline bool
631 is_invalid_queue(const queue_elem_t *const q_elem)
632 {
633  const bool not_emq = !q_elem || (EM_CHECK_LEVEL > 2 &&
634  q_elem->valid_check != QUEUE_ELEM_VALID);
635 
636  if (unlikely(not_emq || q_elem->state != EM_QUEUE_STATE_READY)) {
637  if (not_emq)
638  INTERNAL_ERROR(EM_ERR_BAD_POINTER, EM_ESCOPE_DISPATCH,
639  "Drop event(s) from non-EM Q");
640  else
641  INTERNAL_ERROR(EM_ERR_BAD_STATE, EM_ESCOPE_DISPATCH,
642  "Drop event(s) from Q:%" PRI_QUEUE ": not ready, state=%d",
643  q_elem->queue, q_elem->state);
644  return true; /* invalid queue */
645  }
646 
647  return false; /* not invalid, i.e. a valid queue */
648 }
649 
650 static inline void
651 free_invalid_events(odp_event_t odp_evtbl[], int num)
652 {
654  em_event_t ev_tbl[EM_SCHED_MULTI_MAX_BURST];
655 
656  event_init_odp_multi(odp_evtbl, ev_tbl/*out*/, ev_hdr_tbl/*out*/,
657  num, true/*is_extev*/);
658  em_free_multi(ev_tbl, num);
659 }
660 
661 /*
662  * Run a dispatch round - query the scheduler for events and dispatch
663  */
664 static inline int
665 dispatch_round(uint64_t sched_wait, uint16_t burst_size,
666  const em_dispatch_opt_t *opt /*optional, can be NULL*/)
667 {
668  odp_queue_t odp_queue;
669  odp_event_t odp_evtbl[burst_size];
670  int num;
671 
672  dispatch_poll_ctrl_queue();
673 
674  num = dispatch_schedule(&odp_queue/*out*/, sched_wait,
675  odp_evtbl/*out[]*/, burst_size);
676  if (unlikely(num <= 0)) {
677  /*
678  * No scheduled events available, check if the local queues
679  * contain anything on this core - e.g. pktio or something
680  * outside the dispatch-context might have sent to a local queue
681  * Update the EM_IDLE_STATE and call idle hooks if they are
682  * enabled
683  */
684  if (em_locm.local_queues.empty) {
685  to_idle(opt);
686  } else {
687  to_active();
688  check_local_queues();
689  }
690  return 0;
691  }
692 
693  queue_elem_t *const q_elem = odp_queue_context(odp_queue);
694 
695  if (unlikely(is_invalid_queue(q_elem))) {
696  /* Free all events from an invalid queue */
697  free_invalid_events(odp_evtbl, num);
698  return 0;
699  }
700 
701  /*
702  * If scheduled events are available, update the EM_IDLE_STATE and
703  * call idle hooks if they are enabled.
704  */
705  to_active();
706 
707  if (q_elem->flags.in_atomic_group) {
708  atomic_group_dispatch(odp_evtbl, num, q_elem);
709  } else {
710  em_locm.event_burst_cnt = num;
711  dispatch_events(odp_evtbl, num, q_elem);
712  }
713 
714  return num;
715 }
716 
717 /*
718  * em_dispatch() helper: check if the user provided callback functions
719  * 'input_poll' and 'output_drain' should be called in
720  * this dispatch round
721  */
722 static inline bool
723 check_poll_drain_round(unsigned int interval, odp_time_t poll_drain_period)
724 {
725  if (interval > 1) {
726  em_locm_t *const locm = &em_locm;
727 
728  locm->poll_drain_dispatch_cnt--;
729  if (locm->poll_drain_dispatch_cnt == 0) {
730  odp_time_t now = odp_time_global();
731  odp_time_t period;
732 
733  period = odp_time_diff(now, locm->poll_drain_dispatch_last_run);
734  locm->poll_drain_dispatch_cnt = interval;
735 
736  if (odp_time_cmp(poll_drain_period, period) < 0) {
737  locm->poll_drain_dispatch_last_run = now;
738  return true;
739  }
740  }
741  } else {
742  return true;
743  }
744  return false;
745 }
746 
747 /*
748  * em_dispatch() helper: dispatch and call the user provided callback functions
749  * 'input_poll' and 'output_drain'
750  */
751 static inline uint64_t
752 dispatch_with_userfn(uint64_t rounds, bool do_input_poll, bool do_output_drain)
753 {
754  const bool do_forever = rounds == 0 ? true : false;
755  const em_input_poll_func_t input_poll = em_shm->conf.input.input_poll_fn;
756  const em_output_drain_func_t output_drain = em_shm->conf.output.output_drain_fn;
757  const unsigned int poll_interval = em_shm->opt.dispatch.poll_drain_interval;
758  const odp_time_t poll_period = em_shm->opt.dispatch.poll_drain_interval_time;
759  const uint64_t sched_wait = EM_SCHED_WAIT_ENABLE ?
760  em_shm->opt.dispatch.sched_wait : ODP_SCHED_NO_WAIT;
761  int rx_events = 0;
762  uint64_t events = 0;
763  int dispatched_events;
764  int round_events;
765  bool do_poll_drain_round;
766 
767  for (uint64_t i = 0; do_forever || i < rounds;) {
768  dispatched_events = 0;
769 
770  do_poll_drain_round = check_poll_drain_round(poll_interval, poll_period);
771 
772  if (do_input_poll && do_poll_drain_round)
773  rx_events = input_poll();
774 
775  do {
776  round_events = dispatch_round(sched_wait, EM_SCHED_MULTI_MAX_BURST, NULL);
777  dispatched_events += round_events;
778  i++; /* inc rounds */
779  } while (dispatched_events < rx_events &&
780  round_events > 0 && (do_forever || i < rounds));
781 
782  events += dispatched_events; /* inc ret value*/
783  if (do_output_drain && do_poll_drain_round)
784  (void)output_drain();
785  }
786 
787  return events;
788 }
789 
790 /*
791  * em_dispatch() helper: dispatch without calling any user provided callbacks
792  */
793 static inline uint64_t
794 dispatch_no_userfn(uint64_t rounds)
795 {
796  const bool do_forever = rounds == 0 ? true : false;
797  const uint64_t sched_wait = EM_SCHED_WAIT_ENABLE ?
798  em_shm->opt.dispatch.sched_wait : ODP_SCHED_NO_WAIT;
799  uint64_t events = 0;
800 
801  if (do_forever) {
802  for (;/*ever*/;)
803  dispatch_round(sched_wait, EM_SCHED_MULTI_MAX_BURST, NULL);
804  } else {
805  for (uint64_t i = 0; i < rounds; i++)
806  events += dispatch_round(sched_wait, EM_SCHED_MULTI_MAX_BURST, NULL);
807  }
808 
809  return events;
810 }
811 
812 /*
813  * em_dispatch() helper: dispatch and call the user provided callback functions
814  * 'input_poll' and 'output_drain'
815  */
816 static inline uint64_t
817 dispatch_duration_with_userfn(const em_dispatch_duration_t *duration,
818  const em_dispatch_opt_t *opt,
819  em_dispatch_results_t *results /*out*/,
820  const bool do_input_poll, const bool do_output_drain)
821 {
822  const em_input_poll_func_t input_poll = em_shm->conf.input.input_poll_fn;
823  const em_output_drain_func_t output_drain = em_shm->conf.output.output_drain_fn;
824  const unsigned int poll_interval = em_shm->opt.dispatch.poll_drain_interval;
825  const odp_time_t poll_period = em_shm->opt.dispatch.poll_drain_interval_time;
826  const uint64_t sched_wait = odp_schedule_wait_time(opt->wait_ns);
827  const uint16_t burst_size = opt->burst_size;
828  bool do_poll_drain_round = false;
829 
830  const bool duration_forever =
831  duration->select == EM_DISPATCH_DURATION_FOREVER ? true : false;
832  const bool duration_rounds =
833  duration->select & EM_DISPATCH_DURATION_ROUNDS ? true : false;
834  const bool duration_ns =
835  duration->select & EM_DISPATCH_DURATION_NS ? true : false;
836  const bool duration_events =
837  duration->select & EM_DISPATCH_DURATION_EVENTS ? true : false;
838  const bool duration_noev_rounds =
839  duration->select & EM_DISPATCH_DURATION_NO_EVENTS_ROUNDS ? true : false;
840  const bool duration_noev_ns =
841  duration->select & EM_DISPATCH_DURATION_NO_EVENTS_NS ? true : false;
842 
843  if (unlikely(duration_forever)) {
844  for (;/*ever*/;) {
845  /* check if callback functions should be called */
846  do_poll_drain_round = check_poll_drain_round(poll_interval, poll_period);
847 
848  if (do_input_poll && do_poll_drain_round)
849  (void)input_poll();
850 
851  /* dispatch one round */
852  (void)dispatch_round(sched_wait, burst_size, opt);
853 
854  if (do_output_drain && do_poll_drain_round)
855  (void)output_drain();
856  }
857  /* never return */
858  }
859 
860  uint64_t events = 0;
861  uint64_t rounds = 0;
862  uint64_t noev_rounds = 0;
863 
864  uint64_t start_ns = 0;
865  uint64_t stop_ns = 0;
866  uint64_t noev_stop_ns = 0;
867  uint64_t time_ns = 0;
868 
869  if (duration_ns || duration_noev_ns) {
870  start_ns = odp_time_local_ns();
871  stop_ns = start_ns + duration->ns;
872  noev_stop_ns = start_ns + duration->no_events.ns;
873  time_ns = start_ns;
874  }
875 
876  while ((!duration_rounds || rounds < duration->rounds) &&
877  (!duration_ns || time_ns < stop_ns) &&
878  (!duration_events || events < duration->events) &&
879  (!duration_noev_rounds || noev_rounds < duration->no_events.rounds) &&
880  (!duration_noev_ns || time_ns < noev_stop_ns)) {
881  /* check if callback functions should be called */
882  do_poll_drain_round = check_poll_drain_round(poll_interval, poll_period);
883 
884  if (do_input_poll && do_poll_drain_round)
885  (void)input_poll();
886 
887  /* dispatch one round */
888  int round_events = dispatch_round(sched_wait, burst_size, opt);
889 
890  events += round_events;
891  rounds++;
892 
893  if (do_output_drain && do_poll_drain_round)
894  (void)output_drain();
895 
896  if (duration_noev_rounds) {
897  if (round_events == 0)
898  noev_rounds++;
899  else
900  noev_rounds = 0;
901  }
902 
903  if (duration_ns || duration_noev_ns) {
904  time_ns = odp_time_local_ns();
905 
906  if (duration_noev_ns && round_events > 0)
907  noev_stop_ns = time_ns + duration->no_events.ns;
908  }
909  }
910 
911  if (results) {
912  results->rounds = rounds;
913  if (duration_ns || duration_noev_ns)
914  results->ns = time_ns - start_ns;
915  else
916  results->ns = 0;
917  results->events = events;
918  }
919 
920  return events;
921 }
922 
923 static inline uint64_t
924 dispatch_duration_no_userfn(const em_dispatch_duration_t *duration,
925  const em_dispatch_opt_t *opt,
926  em_dispatch_results_t *results/*out*/)
927 {
928  const uint64_t sched_wait = odp_schedule_wait_time(opt->wait_ns);
929  const uint16_t burst_size = opt->burst_size;
930 
931  const bool duration_forever =
932  duration->select == EM_DISPATCH_DURATION_FOREVER ? true : false;
933  const bool duration_rounds =
934  duration->select & EM_DISPATCH_DURATION_ROUNDS ? true : false;
935  const bool duration_ns =
936  duration->select & EM_DISPATCH_DURATION_NS ? true : false;
937  const bool duration_events =
938  duration->select & EM_DISPATCH_DURATION_EVENTS ? true : false;
939  const bool duration_noev_rounds =
940  duration->select & EM_DISPATCH_DURATION_NO_EVENTS_ROUNDS ? true : false;
941  const bool duration_noev_ns =
942  duration->select & EM_DISPATCH_DURATION_NO_EVENTS_NS ? true : false;
943 
944  if (unlikely(duration_forever)) {
945  for (;/*ever*/;)
946  (void)dispatch_round(sched_wait, burst_size, opt);
947  /* never return */
948  }
949 
950  uint64_t events = 0;
951  uint64_t rounds = 0;
952  uint64_t noev_rounds = 0;
953 
954  uint64_t start_ns = 0;
955  uint64_t stop_ns = 0;
956  uint64_t noev_stop_ns = 0;
957  uint64_t time_ns = 0;
958 
959  if (duration_ns || duration_noev_ns) {
960  start_ns = odp_time_local_ns();
961  stop_ns = start_ns + duration->ns;
962  noev_stop_ns = start_ns + duration->no_events.ns;
963  time_ns = start_ns;
964  }
965 
966  while ((!duration_rounds || rounds < duration->rounds) &&
967  (!duration_ns || time_ns < stop_ns) &&
968  (!duration_events || events < duration->events) &&
969  (!duration_noev_rounds || noev_rounds < duration->no_events.rounds) &&
970  (!duration_noev_ns || time_ns < noev_stop_ns)) {
971  /* dispatch one round */
972  int round_events = dispatch_round(sched_wait, burst_size, opt);
973 
974  events += round_events;
975  rounds++;
976 
977  if (duration_noev_rounds) {
978  if (round_events == 0)
979  noev_rounds++;
980  else
981  noev_rounds = 0;
982  }
983 
984  if (duration_ns || duration_noev_ns) {
985  time_ns = odp_time_local_ns();
986  if (duration_noev_ns && round_events > 0)
987  noev_stop_ns = time_ns + duration->no_events.ns;
988  }
989  }
990 
991  if (results) {
992  results->rounds = rounds;
993  if (duration_ns || duration_noev_ns)
994  results->ns = time_ns - start_ns;
995  else
996  results->ns = 0;
997  results->events = events;
998  }
999 
1000  return events;
1001 }
1002 
1003 #ifdef __cplusplus
1004 }
1005 #endif
1006 
1007 #endif /* EM_DISPATCHER_INLINE_H_ */
queue_elem_t::eo_ctx
void * eo_ctx
Definition: em_queue_types.h:241
em_dispatch_duration_t::ns
uint64_t ns
Definition: event_machine_dispatcher.h:143
ODP_PACKED::sched_context_type
em_sched_context_type_t sched_context_type
Definition: em_mem.h:170
em_locm_t::local_queues
local_queues_t local_queues
Definition: em_mem.h:222
em_locm_t::output_queue_track
output_queue_track_t output_queue_track
Definition: em_mem.h:242
IDLE_STATE_ACTIVE
@ IDLE_STATE_ACTIVE
Definition: em_dispatcher_types.h:53
queue_elem_t::max_events
uint16_t max_events
Definition: em_queue_types.h:219
em_dispatch_exit_func_t
void(* em_dispatch_exit_func_t)(em_eo_t eo)
Definition: event_machine_dispatcher.h:548
em_input_poll_func_t
int(* em_input_poll_func_t)(void)
Definition: event_machine_hw_types.h:349
em_dispatch_duration_t::select
em_dispatch_duration_select_t select
Definition: event_machine_dispatcher.h:118
EM_EVENT_UNDEF
#define EM_EVENT_UNDEF
Definition: event_machine_types.h:62
event_hdr::egrp_gen
int32_t egrp_gen
Definition: em_event_types.h:260
queue_elem_t::receive_multi_func
em_receive_multi_func_t receive_multi_func
Definition: em_queue_types.h:237
em_dispatch_results_t::ns
uint64_t ns
Definition: event_machine_dispatcher.h:276
em_dispatch_duration_t
Definition: event_machine_dispatcher.h:110
em_locm_t::dispatch_cnt
unsigned int dispatch_cnt
Definition: em_mem.h:212
em_dispatch_results_t::rounds
uint64_t rounds
Definition: event_machine_dispatcher.h:266
em_dispatch_opt_t
EM dispatch options.
Definition: event_machine_dispatcher.h:190
event_hdr::event_type
em_event_type_t event_type
Definition: em_event_types.h:241
queue_elem_t::type
uint8_t type
Definition: em_queue_types.h:216
em_locm
ENV_LOCAL em_locm_t em_locm
em_free
void em_free(em_event_t event)
Definition: event_machine_event.c:261
em_locm_t::dispatch_last_run
odp_time_t dispatch_last_run
Definition: em_mem.h:214
EM_SCHED_MULTI_MAX_BURST
#define EM_SCHED_MULTI_MAX_BURST
Definition: event_machine_hw_config.h:226
stash_entry_t
Definition: em_event_types.h:86
em_dispatch_opt_t::burst_size
uint16_t burst_size
Definition: event_machine_dispatcher.h:210
ODP_PACKED::sched_q_elem
queue_elem_t * sched_q_elem
Definition: em_mem.h:176
em_locm_t::current
em_locm_current_t current
Definition: em_mem.h:190
EM_EVENT_GROUP_SAFE_MODE
#define EM_EVENT_GROUP_SAFE_MODE
Definition: event_machine_config.h:303
EM_QUEUE_TYPE_ATOMIC
@ EM_QUEUE_TYPE_ATOMIC
Definition: event_machine_hw_types.h:112
queue_elem_t::state
queue_state_t state
Definition: em_queue_types.h:210
em_queue_type_t
uint32_t em_queue_type_t
Definition: event_machine_types.h:168
EM_DISPATCH_DURATION_NS
@ EM_DISPATCH_DURATION_NS
Definition: event_machine_dispatcher.h:88
event_hdr
Definition: em_event_types.h:184
em_locm_t::event_burst_cnt
int event_burst_cnt
Definition: em_mem.h:198
em_locm_t::poll_drain_dispatch_last_run
odp_time_t poll_drain_dispatch_last_run
Definition: em_mem.h:219
EM_DISPATCH_DURATION_FOREVER
@ EM_DISPATCH_DURATION_FOREVER
Definition: event_machine_dispatcher.h:84
queue_elem_t::queue
uint32_t queue
Definition: em_queue_types.h:225
poll_unsched_ctrl_queue
void poll_unsched_ctrl_queue(void)
Poll EM's internal unscheduled control queues during dispatch.
Definition: em_internal_event.c:540
em_dispatch_results_t
Dispatch results.
Definition: event_machine_dispatcher.h:262
em_locm_t::idle_state
idle_state_t idle_state
Definition: em_mem.h:193
em_locm_t::debug_ts
uint64_t debug_ts[EM_DEBUG_TSP_LAST]
Definition: em_mem.h:239
queue_elem_t::context
void * context
Definition: em_queue_types.h:231
ODP_PACKED::egrp
em_event_group_t egrp
Definition: em_mem.h:178
INTERNAL_ERROR
#define INTERNAL_ERROR(error, escope, fmt,...)
Definition: em_error.h:43
queue_elem_t::in_atomic_group
uint8_t in_atomic_group
Definition: em_queue_types.h:201
EM_OUTPUT_QUEUE_IMMEDIATE
#define EM_OUTPUT_QUEUE_IMMEDIATE
Definition: event_machine_hw_config.h:261
EM_IDLE_HOOKS_ENABLE
#define EM_IDLE_HOOKS_ENABLE
Definition: event_machine_config.h:200
EM_SCHED_CONTEXT_TYPE_ATOMIC
@ EM_SCHED_CONTEXT_TYPE_ATOMIC
Definition: event_machine_types.h:285
em_dispatch_results_t::events
uint64_t events
Definition: event_machine_dispatcher.h:281
queue_elem_t::receive_func
em_receive_func_t receive_func
Definition: em_queue_types.h:235
ODP_PACKED::q_elem
queue_elem_t * q_elem
Definition: em_mem.h:174
PRI_QUEUE
#define PRI_QUEUE
Definition: event_machine_types.h:109
EM_QUEUE_STATE_READY
@ EM_QUEUE_STATE_READY
Definition: em_queue_types.h:137
EM_SCHED_CONTEXT_TYPE_ORDERED
@ EM_SCHED_CONTEXT_TYPE_ORDERED
Definition: event_machine_types.h:289
EM_CHECK_LEVEL
#define EM_CHECK_LEVEL
Definition: event_machine_config.h:253
event_hdr::egrp
em_event_group_t egrp
Definition: em_event_types.h:265
event_machine_debug.h
EM_DISPATCH_DURATION_EVENTS
@ EM_DISPATCH_DURATION_EVENTS
Definition: event_machine_dispatcher.h:90
em_dispatch_opt_t::wait_ns
uint64_t wait_ns
Definition: event_machine_dispatcher.h:201
queue_elem_t::valid_check
uint16_t valid_check
Definition: em_queue_types.h:191
em_event_type_t
uint32_t em_event_type_t
Definition: event_machine_types.h:85
em_shm
em_shm_t * em_shm
Definition: event_machine_init.c:41
em_receive_multi_func_t
void(* em_receive_multi_func_t)(void *eo_ctx, em_event_t events[], int num, em_queue_t queue, void *q_ctx)
Definition: event_machine_eo.h:189
EM_QUEUE_TYPE_PARALLEL_ORDERED
@ EM_QUEUE_TYPE_PARALLEL_ORDERED
Definition: event_machine_hw_types.h:122
em_output_drain_func_t
int(* em_output_drain_func_t)(void)
Definition: event_machine_hw_types.h:367
queue_elem_t::eo
uint16_t eo
Definition: em_queue_types.h:222
EM_SCHED_CONTEXT_TYPE_NONE
@ EM_SCHED_CONTEXT_TYPE_NONE
Definition: event_machine_types.h:281
EM_DISPATCH_DURATION_NO_EVENTS_ROUNDS
@ EM_DISPATCH_DURATION_NO_EVENTS_ROUNDS
Definition: event_machine_dispatcher.h:93
hook_tbl_t
Definition: em_hook_types.h:76
EM_ERR_BAD_POINTER
@ EM_ERR_BAD_POINTER
Definition: event_machine_hw_types.h:271
em_shm_t::dispatch_exit_cb_tbl
hook_tbl_t * dispatch_exit_cb_tbl
Definition: em_mem.h:100
em_locm_t::poll_drain_dispatch_cnt
unsigned int poll_drain_dispatch_cnt
Definition: em_mem.h:217
em_receive_func_t
void(* em_receive_func_t)(void *eo_ctx, em_event_t event, em_event_type_t type, em_queue_t queue, void *q_ctx)
Definition: event_machine_eo.h:149
EM_QUEUE_LOCAL_MULTI_MAX_BURST
#define EM_QUEUE_LOCAL_MULTI_MAX_BURST
Definition: event_machine_hw_config.h:239
EM_DISPATCH_DURATION_ROUNDS
@ EM_DISPATCH_DURATION_ROUNDS
Definition: event_machine_dispatcher.h:86
em_dispatch_enter_func_t
void(* em_dispatch_enter_func_t)(em_eo_t eo, void **eo_ctx, em_event_t events[], int num, em_queue_t *queue, void **q_ctx)
Definition: event_machine_dispatcher.h:533
EM_CALLBACKS_MAX
#define EM_CALLBACKS_MAX
Definition: event_machine_config.h:234
EM_SCHED_WAIT_ENABLE
#define EM_SCHED_WAIT_ENABLE
Definition: event_machine_config.h:222
em_free_multi
void em_free_multi(em_event_t events[], int num)
Definition: event_machine_event.c:370
em_locm_t
Definition: em_mem.h:188
EM_ERR_BAD_STATE
@ EM_ERR_BAD_STATE
Definition: event_machine_hw_types.h:263
EM_EVENT_GROUP_UNDEF
#define EM_EVENT_GROUP_UNDEF
Definition: event_machine_types.h:141
queue_elem_t
Definition: em_queue_types.h:180
IDLE_STATE_IDLE
@ IDLE_STATE_IDLE
Definition: em_dispatcher_types.h:51
EM_DEBUG_TIMESTAMP_ENABLE
#define EM_DEBUG_TIMESTAMP_ENABLE
Definition: event_machine_config.h:325
em_sched_context_type_t
em_sched_context_type_t
Definition: event_machine_types.h:277
EM_DISPATCH_DURATION_NO_EVENTS_NS
@ EM_DISPATCH_DURATION_NO_EVENTS_NS
Definition: event_machine_dispatcher.h:95
ODP_PACKED::rcv_multi_cnt
int rcv_multi_cnt
Definition: em_mem.h:172