11 #ifndef EM_DISPATCHER_INLINE_H_
12 #define EM_DISPATCHER_INLINE_H_
21 dispatch_multi_receive(em_event_t ev_tbl[],
event_hdr_t *ev_hdr_tbl[],
23 const bool check_local_qs);
25 dispatch_single_receive(em_event_t ev_tbl[],
event_hdr_t *ev_hdr_tbl[],
27 const bool check_local_qs);
32 static inline int pack_ev_tbl(em_event_t ev_tbl[],
const int num)
43 for (
int i = 0; i < num; i++) {
46 ev_tbl[pack] = ev_tbl[i];
54 static inline uint64_t debug_timestamp(
void)
77 dispatch_enter_cb(em_eo_t eo,
void **eo_ctx,
78 em_event_t ev_tbl[],
const int num_events,
79 em_queue_t *queue,
void **q_ctx)
85 for (
int i = 0; i < EM_CALLBACKS_MAX && num > 0; i++) {
86 dispatch_enter_fn = cb_tbl->tbl[i].disp_enter;
87 if (dispatch_enter_fn == NULL)
89 dispatch_enter_fn(eo, eo_ctx, ev_tbl, num, queue, q_ctx);
90 num = pack_ev_tbl(ev_tbl, num);
102 dispatch_exit_cb(em_eo_t eo)
108 dispatch_exit_fn = dispatch_exit_cb_tbl->tbl[i].disp_exit;
109 if (dispatch_exit_fn == NULL)
111 dispatch_exit_fn(eo);
120 em_queue_t queue = (em_queue_t)(uintptr_t)q_elem->
queue;
121 void *queue_ctx = q_elem->
context;
122 void *eo_ctx = q_elem->
eo_ctx;
127 event_group_set_local(ev_hdr->
egrp, ev_hdr->
egrp_gen, 1);
129 if (EM_DISPATCH_CALLBACKS_ENABLE) {
130 em_event_t ev_tbl[1] = {
event};
132 num = dispatch_enter_cb(eo, &eo_ctx, ev_tbl, 1,
134 if (num && ev_tbl[0] != event) {
137 ev_hdr = event_to_hdr(event);
141 if (likely(num == 1)) {
147 eo_receive_func(eo_ctx, event, event_type,
151 if (EM_DISPATCH_CALLBACKS_ENABLE)
152 dispatch_exit_cb(eo);
163 event_group_count_decrement(1);
173 call_eo_receive_multi_fn(
const em_eo_t eo,
179 em_queue_t queue = (em_queue_t)(uintptr_t)q_elem->
queue;
180 void *queue_ctx = q_elem->
context;
181 void *eo_ctx = q_elem->
eo_ctx;
182 int num = num_events;
186 event_group_set_local(ev_hdr_tbl[0]->egrp, ev_hdr_tbl[0]->egrp_gen,
189 if (EM_DISPATCH_CALLBACKS_ENABLE)
190 num = dispatch_enter_cb(eo, &eo_ctx,
193 if (likely(num > 0)) {
198 eo_receive_multi_func(eo_ctx, ev_tbl, num, queue, queue_ctx);
201 if (EM_DISPATCH_CALLBACKS_ENABLE)
202 dispatch_exit_cb(eo);
213 event_group_count_decrement(num_events);
225 odp_event_t odp_event;
230 const int qidx = entry.qidx;
231 const em_queue_t queue = queue_idx2hdl(qidx);
236 odp_event = (odp_event_t)(uintptr_t)entry.evptr;
238 event = event_init_odp(odp_event,
true, &ev_hdr);
244 "EM info: %s(): localQ:%" PRI_QUEUE ":\n"
245 "Not ready - state:%d drop:1 event\n",
246 __func__, queue, q_elem ? q_elem->
state : 0);
250 if (q_elem->flags.use_multi_rcv)
251 dispatch_multi_receive(&event, &ev_hdr, 1, q_elem,
false);
253 dispatch_single_receive(&event, &ev_hdr, 1, q_elem,
false);
260 _dispatch_local_multi(
const stash_entry_t entry_tbl[],
const int num)
266 odp_event_t odp_evtbl[num];
267 em_event_t ev_tbl[num];
270 for (
int i = 0; i < num; i++)
271 odp_evtbl[i] = (odp_event_t)(uintptr_t)entry_tbl[i].evptr;
276 const int qidx = entry_tbl[idx].qidx;
277 const em_queue_t queue = queue_idx2hdl(qidx);
287 for (i = idx + 1; i < num && entry_tbl[i].qidx == qidx; i++)
293 event_init_odp_multi(&odp_evtbl[idx], ev_tbl, evhdr_tbl,
300 "EM info: %s(): localQ:%" PRI_QUEUE ":\n"
301 "Not ready - state:%d drop:%d events\n",
302 __func__, queue, q_elem ? q_elem->
state : 0, ev_cnt);
307 if (q_elem->flags.use_multi_rcv)
308 dispatch_multi_receive(ev_tbl, evhdr_tbl, ev_cnt, q_elem,
false);
310 dispatch_single_receive(ev_tbl, evhdr_tbl, ev_cnt, q_elem,
false);
317 dispatch_local_queues(
const stash_entry_t entry_tbl[],
const int num)
320 _dispatch_local(entry_tbl[0]);
322 _dispatch_local_multi(entry_tbl, num);
326 check_local_queues(
void)
342 locm->
debug_ts[EM_DEBUG_TSP_SCHED_ENTRY] = debug_timestamp();
344 int num = next_local_queue_events(entry_tbl ,
347 locm->
debug_ts[EM_DEBUG_TSP_SCHED_RETURN] = debug_timestamp();
352 dispatch_local_queues(entry_tbl, num);
363 count_same_evgroup(
event_hdr_t *ev_hdr_tbl[],
const unsigned int num)
365 if (unlikely(num < 2))
368 const em_event_group_t egrp = ev_hdr_tbl[0]->
egrp;
372 const int32_t egrp_gen = ev_hdr_tbl[0]->
egrp_gen;
375 egrp == ev_hdr_tbl[i]->
egrp &&
376 egrp_gen == ev_hdr_tbl[i]->
egrp_gen; i++)
380 egrp == ev_hdr_tbl[i]->
egrp; i++)
388 dispatch_multi_receive(em_event_t ev_tbl[],
event_hdr_t *ev_hdr_tbl[],
390 const bool check_local_qs)
393 const em_eo_t eo = (em_eo_t)(uintptr_t)q_elem->
eo;
402 const int egrp_cnt = count_same_evgroup(&ev_hdr_tbl[idx],
405 const int num = MIN(egrp_cnt, max);
406 const int rounds = egrp_cnt / num;
407 const int left_over = egrp_cnt % num;
409 if (check_local_qs) {
411 for (i = 0; i < rounds; i++) {
413 call_eo_receive_multi_fn(eo, eo_rcv_multi_fn,
417 check_local_queues();
422 call_eo_receive_multi_fn(eo, eo_rcv_multi_fn,
426 check_local_queues();
430 for (i = 0; i < rounds; i++) {
431 call_eo_receive_multi_fn(eo, eo_rcv_multi_fn,
438 call_eo_receive_multi_fn(eo, eo_rcv_multi_fn,
446 }
while (idx < num_events);
450 dispatch_single_receive(em_event_t ev_tbl[],
event_hdr_t *ev_hdr_tbl[],
452 const bool check_local_qs)
455 const em_eo_t eo = (em_eo_t)(uintptr_t)q_elem->
eo;
459 if (check_local_qs) {
460 for (i = 0; i < num_events; i++) {
462 call_eo_receive_fn(eo, eo_rcv_fn,
463 ev_tbl[i], ev_hdr_tbl[i],
465 check_local_queues();
468 for (i = 0; i < num_events; i++)
469 call_eo_receive_fn(eo, eo_rcv_fn,
470 ev_tbl[i], ev_hdr_tbl[i],
480 dispatch_events(odp_event_t odp_evtbl[],
const int num_events,
498 em_event_t ev_tbl[num_events];
501 event_init_odp_multi(odp_evtbl, ev_tbl, evhdr_tbl,
508 if (q_elem->flags.use_multi_rcv)
509 dispatch_multi_receive(ev_tbl, evhdr_tbl, num_events,
512 dispatch_single_receive(ev_tbl, evhdr_tbl, num_events,
524 output_queue_buffering_drain();
532 dispatch_poll_ctrl_queue(
void)
534 const unsigned int poll_interval =
em_shm->opt.dispatch.poll_ctrl_interval;
540 if (poll_interval > 1) {
548 odp_time_t now = odp_time_global();
550 odp_time_t poll_period =
em_shm->opt.dispatch.poll_ctrl_interval_time;
552 if (odp_time_cmp(period, poll_period) < 0)
572 uint64_t to_idle_delay_ns = 0;
575 to_idle_delay_ns = debug_timestamp() -
576 locm->
debug_ts[EM_DEBUG_TSP_SCHED_ENTRY];
578 to_idle_delay_ns = opt ? opt->
wait_ns :
579 em_shm->opt.dispatch.sched_wait_ns;
581 to_idle_delay_ns = opt->
wait_ns;
584 call_idle_hooks_to_idle(to_idle_delay_ns);
587 call_idle_hooks_while_idle();
604 call_idle_hooks_to_active();
614 dispatch_schedule(odp_queue_t *odp_queue , uint64_t sched_wait,
615 odp_event_t odp_evtbl[],
int num)
622 ret = odp_schedule_multi(odp_queue, sched_wait, odp_evtbl, num);
639 "Drop event(s) from non-EM Q");
642 "Drop event(s) from Q:%" PRI_QUEUE ": not ready, state=%d",
651 free_invalid_events(odp_event_t odp_evtbl[],
int num)
656 event_init_odp_multi(odp_evtbl, ev_tbl, ev_hdr_tbl,
665 dispatch_round(uint64_t sched_wait, uint16_t burst_size,
668 odp_queue_t odp_queue;
669 odp_event_t odp_evtbl[burst_size];
672 dispatch_poll_ctrl_queue();
674 num = dispatch_schedule(&odp_queue, sched_wait,
675 odp_evtbl, burst_size);
676 if (unlikely(num <= 0)) {
688 check_local_queues();
693 queue_elem_t *
const q_elem = odp_queue_context(odp_queue);
695 if (unlikely(is_invalid_queue(q_elem))) {
697 free_invalid_events(odp_evtbl, num);
708 atomic_group_dispatch(odp_evtbl, num, q_elem);
711 dispatch_events(odp_evtbl, num, q_elem);
723 check_poll_drain_round(
unsigned int interval, odp_time_t poll_drain_period)
730 odp_time_t now = odp_time_global();
736 if (odp_time_cmp(poll_drain_period, period) < 0) {
751 static inline uint64_t
752 dispatch_with_userfn(uint64_t rounds,
bool do_input_poll,
bool do_output_drain)
754 const bool do_forever = rounds == 0 ? true :
false;
757 const unsigned int poll_interval =
em_shm->opt.dispatch.poll_drain_interval;
758 const odp_time_t poll_period =
em_shm->opt.dispatch.poll_drain_interval_time;
760 em_shm->opt.dispatch.sched_wait : ODP_SCHED_NO_WAIT;
763 int dispatched_events;
765 bool do_poll_drain_round;
767 for (uint64_t i = 0; do_forever || i < rounds;) {
768 dispatched_events = 0;
770 do_poll_drain_round = check_poll_drain_round(poll_interval, poll_period);
772 if (do_input_poll && do_poll_drain_round)
773 rx_events = input_poll();
777 dispatched_events += round_events;
779 }
while (dispatched_events < rx_events &&
780 round_events > 0 && (do_forever || i < rounds));
782 events += dispatched_events;
783 if (do_output_drain && do_poll_drain_round)
784 (void)output_drain();
793 static inline uint64_t
794 dispatch_no_userfn(uint64_t rounds)
796 const bool do_forever = rounds == 0 ? true :
false;
798 em_shm->opt.dispatch.sched_wait : ODP_SCHED_NO_WAIT;
805 for (uint64_t i = 0; i < rounds; i++)
816 static inline uint64_t
820 const bool do_input_poll,
const bool do_output_drain)
824 const unsigned int poll_interval =
em_shm->opt.dispatch.poll_drain_interval;
825 const odp_time_t poll_period =
em_shm->opt.dispatch.poll_drain_interval_time;
826 const uint64_t sched_wait = odp_schedule_wait_time(opt->
wait_ns);
828 bool do_poll_drain_round =
false;
830 const bool duration_forever =
832 const bool duration_rounds =
834 const bool duration_ns =
836 const bool duration_events =
838 const bool duration_noev_rounds =
840 const bool duration_noev_ns =
843 if (unlikely(duration_forever)) {
846 do_poll_drain_round = check_poll_drain_round(poll_interval, poll_period);
848 if (do_input_poll && do_poll_drain_round)
852 (void)dispatch_round(sched_wait, burst_size, opt);
854 if (do_output_drain && do_poll_drain_round)
855 (void)output_drain();
862 uint64_t noev_rounds = 0;
864 uint64_t start_ns = 0;
865 uint64_t stop_ns = 0;
866 uint64_t noev_stop_ns = 0;
867 uint64_t time_ns = 0;
869 if (duration_ns || duration_noev_ns) {
870 start_ns = odp_time_local_ns();
871 stop_ns = start_ns + duration->
ns;
872 noev_stop_ns = start_ns + duration->no_events.
ns;
876 while ((!duration_rounds || rounds < duration->rounds) &&
877 (!duration_ns || time_ns < stop_ns) &&
878 (!duration_events || events < duration->events) &&
879 (!duration_noev_rounds || noev_rounds < duration->no_events.rounds) &&
880 (!duration_noev_ns || time_ns < noev_stop_ns)) {
882 do_poll_drain_round = check_poll_drain_round(poll_interval, poll_period);
884 if (do_input_poll && do_poll_drain_round)
888 int round_events = dispatch_round(sched_wait, burst_size, opt);
890 events += round_events;
893 if (do_output_drain && do_poll_drain_round)
894 (void)output_drain();
896 if (duration_noev_rounds) {
897 if (round_events == 0)
903 if (duration_ns || duration_noev_ns) {
904 time_ns = odp_time_local_ns();
906 if (duration_noev_ns && round_events > 0)
907 noev_stop_ns = time_ns + duration->no_events.
ns;
913 if (duration_ns || duration_noev_ns)
914 results->
ns = time_ns - start_ns;
923 static inline uint64_t
928 const uint64_t sched_wait = odp_schedule_wait_time(opt->
wait_ns);
931 const bool duration_forever =
933 const bool duration_rounds =
935 const bool duration_ns =
937 const bool duration_events =
939 const bool duration_noev_rounds =
941 const bool duration_noev_ns =
944 if (unlikely(duration_forever)) {
946 (
void)dispatch_round(sched_wait, burst_size, opt);
952 uint64_t noev_rounds = 0;
954 uint64_t start_ns = 0;
955 uint64_t stop_ns = 0;
956 uint64_t noev_stop_ns = 0;
957 uint64_t time_ns = 0;
959 if (duration_ns || duration_noev_ns) {
960 start_ns = odp_time_local_ns();
961 stop_ns = start_ns + duration->
ns;
962 noev_stop_ns = start_ns + duration->no_events.
ns;
966 while ((!duration_rounds || rounds < duration->rounds) &&
967 (!duration_ns || time_ns < stop_ns) &&
968 (!duration_events || events < duration->events) &&
969 (!duration_noev_rounds || noev_rounds < duration->no_events.rounds) &&
970 (!duration_noev_ns || time_ns < noev_stop_ns)) {
972 int round_events = dispatch_round(sched_wait, burst_size, opt);
974 events += round_events;
977 if (duration_noev_rounds) {
978 if (round_events == 0)
984 if (duration_ns || duration_noev_ns) {
985 time_ns = odp_time_local_ns();
986 if (duration_noev_ns && round_events > 0)
987 noev_stop_ns = time_ns + duration->no_events.
ns;
993 if (duration_ns || duration_noev_ns)
994 results->
ns = time_ns - start_ns;