44 static em_queue_group_t default_queue_group_create(
void);
45 static em_queue_group_t default_queue_group_join(
void);
47 static em_queue_group_t
48 queue_group_create_escope(
const char *name,
const em_core_mask_t *mask,
50 em_queue_group_t requested_queue_group,
56 static void q_grp_create_done_callback(
void *arg_ptr);
57 static void q_grp_create_sync_done_callback(
void *arg_ptr);
63 static void q_grp_modify_done_callback(
void *arg_ptr);
64 static void q_grp_modify_sync_done_callback(
void *arg_ptr);
68 static void q_grp_delete_done_callback(
void *arg_ptr);
69 static void q_grp_delete_sync_done_callback(
void *arg_ptr);
85 queue_group_poolelem2qgrpelem(
objpool_elem_t *
const queue_group_pool_elem)
92 read_config_file(
void)
95 bool val_bool =
false;
98 EM_PRINT(
"EM queue group config:\n");
103 conf_str =
"queue_group.create_core_queue_groups";
104 ret = em_libconfig_lookup_bool(&
em_shm->
libconfig, conf_str, &val_bool);
105 if (unlikely(!ret)) {
106 EM_LOG(EM_LOG_ERR,
"Config option '%s' not found\n", conf_str);
110 em_shm->opt.queue_group.create_core_queue_groups = val_bool;
111 EM_PRINT(
" %s: %s(%d)\n", conf_str, val_bool ?
"true" :
"false",
114 em_shm->opt.queue_group.create_core_queue_groups = val_bool;
125 const uint32_t objpool_subpools = MIN(4, OBJSUBPOOLS_MAX);
129 if (read_config_file())
140 env_spinlock_init(&queue_group_elem->
lock);
144 ret = objpool_init(&queue_group_pool->objpool, objpool_subpools);
150 objpool_add(&queue_group_pool->objpool, i % objpool_subpools,
157 em_queue_group_t default_queue_group = default_queue_group_create();
160 EM_LOG(EM_LOG_ERR,
"default_queue_group_create() failed!\n");
167 if (
em_shm->opt.queue_group.create_core_queue_groups) {
171 EM_LOG(EM_LOG_ERR,
"core_queue_groups_create():%" PRI_STAT
"\n", stat);
184 em_queue_group_t def_qgrp = default_queue_group_join();
187 EM_LOG(EM_LOG_ERR,
"default_queue_group_join() failed!\n");
195 if (
em_shm->opt.queue_group.create_core_queue_groups) {
199 EM_LOG(EM_LOG_ERR,
"core_queue_group_join():%" PRI_STAT
"\n", stat);
217 static em_queue_group_t
218 queue_group_alloc(em_queue_group_t queue_group)
227 qgrp_pool_elem = objpool_rem(&
em_shm->queue_group_pool.objpool,
229 if (unlikely(qgrp_pool_elem == NULL))
232 qgrp_elem = queue_group_poolelem2qgrpelem(qgrp_pool_elem);
237 qgrp_elem = queue_group_elem_get(queue_group);
238 if (unlikely(qgrp_elem == NULL))
241 env_spinlock_lock(&qgrp_elem->
lock);
243 if (queue_group_allocated(qgrp_elem)) {
244 env_spinlock_unlock(&qgrp_elem->
lock);
249 int ret = objpool_rem_elem(&
em_shm->queue_group_pool.objpool,
251 env_spinlock_unlock(&qgrp_elem->
lock);
252 if (unlikely(ret != 0))
269 queue_group_free(em_queue_group_t queue_group)
272 queue_group_elem_get(queue_group);
274 if (unlikely(queue_group_elem == NULL))
277 objpool_add(&
em_shm->queue_group_pool.objpool,
288 static em_queue_group_t default_queue_group_create(
void)
290 em_queue_group_t default_qgrp;
293 odp_thrmask_t zero_thrmask;
294 odp_schedule_group_t odp_sched_group;
301 if (unlikely(default_qgrp_elem == NULL))
308 odp_thrmask_zero(&zero_thrmask);
321 if (unlikely(odp_sched_group == ODP_SCHED_GROUP_INVALID))
334 static em_queue_group_t default_queue_group_join(
void)
337 odp_thrmask_t odp_joinmask;
339 const int odp_thr = odp_thread_id();
343 if (unlikely(!default_qgrp_elem))
347 odp_thrmask_zero(&odp_joinmask);
348 odp_thrmask_set(&odp_joinmask, odp_thr);
350 env_spinlock_lock(&default_qgrp_elem->
lock);
355 env_spinlock_unlock(&default_qgrp_elem->
lock);
378 env_spinlock_lock(&qgrp_elem->
lock);
380 int allocated = queue_group_allocated(qgrp_elem);
383 if (allocated && !ongoing_delete &&
386 q_grp_add_core(qgrp_elem);
388 env_spinlock_unlock(&qgrp_elem->
lock);
396 em_queue_group_t qgrp;
397 em_queue_group_t qgrp_req;
400 odp_thrmask_t zero_thrmask;
401 odp_schedule_group_t odp_sched_group;
405 for (
int i = 0; i < num_cores; i++) {
406 qgrp_req = qgrp_idx2hdl(i);
407 qgrp = queue_group_alloc(qgrp_req);
409 EM_DBG(
"queue_group_alloc() fails for core-qgrp:%d\n", i);
413 qgrp_elem = queue_group_elem_get(qgrp);
414 if (unlikely(qgrp_elem == NULL)) {
415 EM_DBG(
"qgrp_elem NULL for core-qgrp:%d\n", i);
423 odp_thrmask_zero(&zero_thrmask);
432 core_queue_grp_name(i, qgrp_name,
434 odp_sched_group = odp_schedule_group_create(qgrp_name,
436 if (unlikely(odp_sched_group == ODP_SCHED_GROUP_INVALID)) {
437 EM_DBG(
"odp_schedule_group_create() fails for core-qgrp:%d\n", i);
451 const int odp_thr = odp_thread_id();
453 core_queue_grp_name(core, qgrp_name,
sizeof(qgrp_name));
458 EM_DBG(
"%s(): core:%d, %s not found", __func__, core, qgrp_name);
464 if (unlikely(!qgrp_elem)) {
465 EM_DBG(
"%s(): qgrp_elem NULL for core-qgrp:%d\n",
471 odp_thrmask_t odp_joinmask;
473 odp_thrmask_zero(&odp_joinmask);
474 odp_thrmask_set(&odp_joinmask, odp_thr);
476 env_spinlock_lock(&qgrp_elem->
lock);
480 env_spinlock_unlock(&qgrp_elem->
lock);
483 EM_DBG(
"%s(): odp_schedule_group_join():%d, core-qgrp:%d\n",
484 __func__, ret, core);
497 static em_queue_group_t
498 queue_group_create_escope(
const char *name,
const em_core_mask_t *mask,
500 em_queue_group_t requested_queue_group,
503 em_queue_group_t queue_group;
505 odp_schedule_group_t odp_sched_group;
506 odp_thrmask_t zero_thrmask;
512 odp_thrmask_zero(&zero_thrmask);
521 queue_group = queue_group_alloc(requested_queue_group);
522 qgrp_elem = queue_group_elem_get(queue_group);
523 if (unlikely(qgrp_elem == NULL)) {
525 "Queue group alloc failed!");
531 odp_sched_group = odp_schedule_group_create(name, &zero_thrmask);
532 if (unlikely(odp_sched_group == ODP_SCHED_GROUP_INVALID)) {
533 queue_group_free(queue_group);
535 "ODP schedule group creation failed!");
539 env_spinlock_lock(&qgrp_elem->
lock);
550 q_grp_add_core(qgrp_elem);
554 if (escope == EM_ESCOPE_QUEUE_GROUP_CREATE_SYNC)
555 q_grp_create_sync_done(qgrp_elem, mask);
557 q_grp_create_done(qgrp_elem, mask);
559 env_spinlock_unlock(&qgrp_elem->
lock);
562 if (unlikely(stat !=
EM_OK))
568 env_spinlock_unlock(&qgrp_elem->
lock);
570 stat = send_qgrp_addrem_reqs(qgrp_elem, mask, &add_mask, &rem_zero_mask,
571 num_notif, notif_tbl, escope);
572 if (unlikely(stat !=
EM_OK))
585 em_queue_group_t requested_queue_group)
587 return queue_group_create_escope(name, mask, num_notif, notif_tbl,
588 requested_queue_group,
589 EM_ESCOPE_QUEUE_GROUP_CREATE);
599 em_queue_group_t requested_queue_group)
601 return queue_group_create_escope(name, mask, 0, NULL,
602 requested_queue_group,
603 EM_ESCOPE_QUEUE_GROUP_CREATE_SYNC);
636 const char **err_str)
638 if (unlikely(!queue_group_allocated(qgrp_elem))) {
639 *err_str =
"Queue group not allocated";
643 *err_str =
"Contending queue group delete ongoing";
646 if (unlikely(is_delete && !list_is_empty(&qgrp_elem->
queue_list))) {
647 *err_str =
"Queue group contains queues, cannot delete group";
667 for (
int i = 0; i < core_count; i++) {
691 for (
int i = 0; i < core_count; i++) {
705 static void addrem_events_free(em_event_t add_events[],
int add_count,
706 em_event_t rem_events[],
int rem_count)
708 for (
int i = 0; i < add_count; i++) {
712 for (
int i = 0; i < rem_count; i++) {
724 static em_status_t send_addrem_events(em_event_t addrem_events[],
726 em_event_group_t event_group)
729 const int first_qidx = queue_id2idx(FIRST_INTERNAL_UNSCHED_QUEUE);
733 for (
int i = 0; i < core_count; i++) {
740 queue_idx2hdl(first_qidx + i),
742 if (unlikely(err !=
EM_OK))
755 static int create_addrem_events(em_event_t addrem_events[],
int count,
756 uint64_t ev_id, em_queue_group_t queue_group)
760 if (unlikely(count < 1))
771 i_event->
q_grp.queue_group = queue_group;
773 for (
int i = 1; i < count; i++) {
788 void (**f_done_callback)(
void *arg_ptr) ,
789 bool *sync_operation )
791 *sync_operation =
false;
794 case EM_ESCOPE_QUEUE_GROUP_CREATE:
795 *f_done_callback = q_grp_create_done_callback;
797 case EM_ESCOPE_QUEUE_GROUP_CREATE_SYNC:
798 *f_done_callback = q_grp_create_sync_done_callback;
799 *sync_operation =
true;
801 case EM_ESCOPE_QUEUE_GROUP_MODIFY:
802 *f_done_callback = q_grp_modify_done_callback;
804 case EM_ESCOPE_QUEUE_GROUP_MODIFY_SYNC:
805 *f_done_callback = q_grp_modify_sync_done_callback;
806 *sync_operation =
true;
808 case EM_ESCOPE_QUEUE_GROUP_DELETE:
809 *f_done_callback = q_grp_delete_done_callback;
811 case EM_ESCOPE_QUEUE_GROUP_DELETE_SYNC:
812 *f_done_callback = q_grp_delete_sync_done_callback;
813 *sync_operation =
true;
816 *f_done_callback = NULL;
834 const em_queue_group_t queue_group = qgrp_elem->
queue_group;
837 const int addrem_count = add_count + rem_count;
838 em_event_t add_events[add_count];
839 em_event_t rem_events[rem_count];
840 em_event_group_t event_group;
845 em_event_t callback_args_event =
854 callback_args->qgrp_elem = qgrp_elem;
862 void (*f_done_callback)(
void *arg_ptr);
863 void *f_done_arg_ptr = callback_args_event;
864 bool sync_operation =
false;
866 ret = set_qgrp_done_func(escope, &f_done_callback,
879 f_done_callback, f_done_arg_ptr,
880 num_notif, notif_tbl,
887 for (
int i = 0; i < add_count; i++)
889 for (
int i = 0; i < rem_count; i++)
894 cnt = create_addrem_events(add_events , add_count,
895 QUEUE_GROUP_ADD_REQ, queue_group);
896 if (unlikely(cnt != add_count))
897 goto err_free_resources;
901 cnt = create_addrem_events(rem_events , rem_count,
902 QUEUE_GROUP_REM_REQ, queue_group);
903 if (unlikely(cnt != rem_count))
904 goto err_free_resources;
910 err = send_addrem_events(rem_events, rem_mask, event_group);
911 if (unlikely(err !=
EM_OK))
912 goto err_free_resources;
916 err = send_addrem_events(add_events, add_mask, event_group);
917 if (unlikely(err !=
EM_OK))
918 goto err_free_resources;
923 addrem_events_free(add_events, add_count,
924 rem_events, rem_count);
948 const char *err_str =
"";
949 const em_escope_t escope = is_delete ? EM_ESCOPE_QUEUE_GROUP_DELETE :
950 EM_ESCOPE_QUEUE_GROUP_MODIFY;
953 env_spinlock_lock(&qgrp_elem->
lock);
956 err = check_qgrp_state(qgrp_elem, is_delete, &err_str);
957 if (unlikely(err !=
EM_OK)) {
958 env_spinlock_unlock(&qgrp_elem->
lock);
972 int adds = count_qgrp_adds(&old_mask, new_mask, &add_mask );
973 int rems = count_qgrp_rems(&old_mask, new_mask, &rem_mask );
983 q_grp_add_core(qgrp_elem);
993 q_grp_delete_done(qgrp_elem, new_mask);
995 q_grp_modify_done(qgrp_elem, new_mask);
997 env_spinlock_unlock(&qgrp_elem->
lock);
1001 "notif sending failed");
1009 env_spinlock_unlock(&qgrp_elem->
lock);
1017 err = send_qgrp_addrem_reqs(qgrp_elem, new_mask, &add_mask, &rem_mask,
1018 num_notif, notif_tbl, escope);
1020 "qgrp rem req(s) sending failed");
1038 const em_queue_group_t queue_group = qgrp_elem->
queue_group;
1040 const char *err_str =
"";
1041 const em_escope_t escope = is_delete ? EM_ESCOPE_QUEUE_GROUP_DELETE_SYNC
1042 : EM_ESCOPE_QUEUE_GROUP_MODIFY_SYNC;
1048 env_spinlock_lock(&qgrp_elem->
lock);
1051 err = check_qgrp_state(qgrp_elem, is_delete, &err_str);
1052 if (unlikely(err !=
EM_OK)) {
1053 env_spinlock_unlock(&qgrp_elem->
lock);
1054 goto queue_group_modify_sync_error;
1067 q_grp_delete_done(qgrp_elem, new_mask);
1069 env_spinlock_unlock(&qgrp_elem->
lock);
1072 goto queue_group_modify_sync_error;
1082 int adds = count_qgrp_adds(&old_mask, new_mask, &add_mask );
1083 int rems = count_qgrp_rems(&old_mask, new_mask, &rem_mask );
1092 q_grp_add_core(qgrp_elem);
1097 q_grp_rem_core(qgrp_elem);
1101 if (adds == 0 && rems == 0) {
1103 q_grp_delete_done(qgrp_elem, new_mask);
1105 q_grp_modify_done(qgrp_elem, new_mask);
1107 env_spinlock_unlock(&qgrp_elem->
lock);
1109 goto queue_group_modify_sync_error;
1112 env_spinlock_unlock(&qgrp_elem->
lock);
1120 err = send_qgrp_addrem_reqs(qgrp_elem, new_mask, &add_mask, &rem_mask,
1122 if (unlikely(err !=
EM_OK)) {
1124 err = EM_FATAL(err);
1125 goto queue_group_modify_sync_error;
1139 queue_group_modify_sync_error:
1142 "Failure: Modify sync QGrp:%" PRI_QGRP ":%s",
1143 queue_group, err_str);
1155 int odp_thr = odp_thread_id();
1156 odp_thrmask_t odp_joinmask;
1158 odp_thrmask_zero(&odp_joinmask);
1159 odp_thrmask_set(&odp_joinmask, odp_thr);
1164 if (unlikely(ret)) {
1166 em_queue_group_t queue_group = qgrp_elem->
queue_group;
1171 "QGrp ADD core%02d: odp_schedule_group_join(thr:%d):%d\n"
1172 "QueueGroup:%" PRI_QGRP " core-mask:%s",
1173 em_core_id(), odp_thr, ret, queue_group, mask_str);
1185 int odp_thr = odp_thread_id();
1186 odp_thrmask_t odp_leavemask;
1188 odp_thrmask_zero(&odp_leavemask);
1189 odp_thrmask_set(&odp_leavemask, odp_thr);
1194 if (unlikely(ret)) {
1196 em_queue_group_t queue_group = qgrp_elem->
queue_group;
1201 "QGrp REM core%02d: odp_schedule_group_leave(thr:%d):%d\n"
1202 "QueueGroup:%" PRI_QGRP " core-mask:%s",
1203 em_core_id(), odp_thr, ret, queue_group, mask_str);
1209 em_queue_group_t qgrp = i_ev->
q_grp.queue_group;
1212 if (unlikely(!qgrp_elem))
1215 env_spinlock_lock(&qgrp_elem->
lock);
1216 q_grp_add_core(qgrp_elem);
1217 env_spinlock_unlock(&qgrp_elem->
lock);
1222 em_queue_group_t qgrp = i_ev->
q_grp.queue_group;
1225 if (unlikely(!qgrp_elem))
1228 env_spinlock_lock(&qgrp_elem->
lock);
1229 q_grp_rem_core(qgrp_elem);
1230 env_spinlock_unlock(&qgrp_elem->
lock);
1237 static void q_grp_create_done_callback(
void *arg_ptr)
1239 em_event_t
event = (em_event_t)arg_ptr;
1243 env_spinlock_lock(&qgrp_elem->
lock);
1244 q_grp_create_done(qgrp_elem, &args->new_mask);
1245 env_spinlock_unlock(&qgrp_elem->
lock);
1254 static void q_grp_create_sync_done_callback(
void *arg_ptr)
1256 em_event_t
event = (em_event_t)arg_ptr;
1260 env_spinlock_lock(&qgrp_elem->
lock);
1261 q_grp_create_sync_done(qgrp_elem, &args->new_mask);
1262 env_spinlock_unlock(&qgrp_elem->
lock);
1285 static void q_grp_modify_done_callback(
void *arg_ptr)
1287 em_event_t
event = (em_event_t)arg_ptr;
1291 env_spinlock_lock(&qgrp_elem->
lock);
1292 q_grp_modify_done(qgrp_elem, &args->new_mask);
1293 env_spinlock_unlock(&qgrp_elem->
lock);
1302 static void q_grp_modify_sync_done_callback(
void *arg_ptr)
1306 q_grp_modify_done_callback(arg_ptr);
1323 static void q_grp_delete_done_callback(
void *arg_ptr)
1325 em_event_t
event = (em_event_t)arg_ptr;
1329 env_spinlock_lock(&qgrp_elem->
lock);
1330 q_grp_delete_done(qgrp_elem, &args->new_mask);
1331 env_spinlock_unlock(&qgrp_elem->
lock);
1340 static void q_grp_delete_sync_done_callback(
void *arg_ptr)
1344 q_grp_delete_done_callback(arg_ptr);
1353 const unsigned int num_queues = env_atomic32_get(&qgrp_elem->
num_queues);
1354 const em_queue_group_t queue_group = qgrp_elem->
queue_group;
1362 "Delete QGrp:%" PRI_QGRP " mask not zero:%s",
1373 "Delete QGrp:%" PRI_QGRP ", masks modified during delete:%s vs. %s",
1374 queue_group, mstr1, mstr2);
1377 if (unlikely(!list_is_empty(&qgrp_elem->
queue_list) || num_queues))
1379 "Delete QGrp:%" PRI_QGRP ", contains %u queues, cannot delete!",
1380 queue_group, num_queues);
1384 if (unlikely(ret != 0))
1386 "Delete QGrp:%" PRI_QGRP ", ODP sched grp destroy fails:%d",
1399 env_spinlock_lock(&queue_group_elem->
lock);
1401 env_atomic32_inc(&queue_group_elem->
num_queues);
1402 env_spinlock_unlock(&queue_group_elem->
lock);
1408 env_spinlock_lock(&queue_group_elem->
lock);
1409 if (!list_is_empty(&queue_group_elem->
queue_list)) {
1411 env_atomic32_dec(&queue_group_elem->
num_queues);
1413 env_spinlock_unlock(&queue_group_elem->
lock);
1416 unsigned int queue_group_count(
void)
1421 #define QGRP_INFO_HDR_STR \
1422 "EM Queue group(s):%2u\n" \
1423 "ID Name EM-mask Cpumask " \
1424 " ODP-mask Q-num\n" \
1425 "------------------------------------------------------------------------------" \
1426 "------------------------------\n" \
1430 #define QGRP_INFO_LEN (108 + 1 )
1431 #define QGRP_INFO_FMT "%-10" PRI_QGRP "%-32s%-20s%-20s%-20s%-5d\n"
1433 static void queue_group_info_str(em_queue_group_t queue_group,
1434 char qgrp_info_str[])
1437 odp_thrmask_t odp_thrmask;
1438 odp_cpumask_t odp_cpumask;
1441 char odp_thrmask_str[ODP_THRMASK_STR_SIZE];
1442 char odp_cpumask_str[ODP_CPUMASK_STR_SIZE];
1449 if (unlikely(!qgrp_elem || !queue_group_allocated(qgrp_elem)))
1450 goto info_print_err;
1454 if (unlikely(err !=
EM_OK))
1455 goto info_print_err;
1462 goto info_print_err;
1463 ret = odp_thrmask_to_str(&odp_thrmask, odp_thrmask_str,
1464 sizeof(odp_thrmask_str));
1465 if (unlikely(ret <= 0))
1466 goto info_print_err;
1467 odp_thrmask_str[ret - 1] =
'\0';
1470 mask_em2phys(&core_mask, &odp_cpumask );
1471 ret = odp_cpumask_to_str(&odp_cpumask, odp_cpumask_str,
1472 sizeof(odp_cpumask_str));
1473 if (unlikely(ret <= 0))
1474 goto info_print_err;
1475 odp_cpumask_str[ret - 1] =
'\0';
1477 len = snprintf(qgrp_info_str, QGRP_INFO_LEN, QGRP_INFO_FMT,
1478 queue_group, qgrp_name, em_mask_str,
1479 odp_cpumask_str, odp_thrmask_str,
1482 qgrp_info_str[len] =
'\0';
1486 len = snprintf(qgrp_info_str, QGRP_INFO_LEN, QGRP_INFO_FMT,
1487 queue_group,
"err:n/a",
"n/a",
"n/a",
"n/a", 0);
1488 qgrp_info_str[len] =
'\0';
1493 em_queue_group_t qgrp;
1494 unsigned int qgrp_num;
1495 char single_qgrp_info_str[QGRP_INFO_LEN];
1511 const int all_qgrp_info_str_len = (qgrp_num + 10) * QGRP_INFO_LEN + 1;
1512 char all_qgrp_info_str[all_qgrp_info_str_len];
1515 queue_group_info_str(qgrp, single_qgrp_info_str);
1517 n_print = snprintf(all_qgrp_info_str + len,
1518 all_qgrp_info_str_len - len,
1519 "%s", single_qgrp_info_str);
1522 if (n_print >= all_qgrp_info_str_len - len)
1531 EM_PRINT(
"No EM queue group!\n");
1539 all_qgrp_info_str[len] =
'\0';
1540 EM_PRINT(QGRP_INFO_HDR_STR, qgrp_num, all_qgrp_info_str);
1543 #define QGRO_QUEUE_INFO_HDR_STR \
1544 "Queue group %" PRI_QGRP "(%s) has %d queue(s):\n\n" \
1545 "Id Name Priority Type State Ctx\n" \
1546 "--------------------------------------------------------------------------\n" \
1550 #define QGRP_Q_LEN 75
1551 #define QGRP_Q_INFO_FMT "%-10" PRI_QUEUE "%-32s%-10d%-10s%-9s%-3c\n"
1556 em_queue_t qgrp_queue;
1565 if (unlikely(!qgrp_elem || !queue_group_allocated(qgrp_elem))) {
1566 EM_PRINT(
"Queue group %" PRI_QGRP " is not created!\n", qgrp);
1582 const int q_info_len = (q_num + 10) * QGRP_Q_LEN + 1;
1583 char q_info_str[q_info_len];
1586 q_elem = queue_elem_get(qgrp_queue);
1588 if (unlikely(q_elem == NULL || !queue_allocated(q_elem))) {
1595 n_print = snprintf(q_info_str + len, q_info_len - len,
1596 QGRP_Q_INFO_FMT, qgrp_queue, q_name,
1603 if (n_print >= q_info_len - len)
1612 EM_PRINT(
"Queue group %" PRI_QGRP "(%s) has no queue!\n",
1621 q_info_str[len] =
'\0';
1622 EM_PRINT(QGRO_QUEUE_INFO_HDR_STR, qgrp, qgrp_name, q_num, q_info_str);