#include <string.h>
#include <stdio.h>
#include "cm_setup.h"
#include "cm_error_handler.h"
#define TEST_PRINT_COUNT 5
#define TEST_QGRP_NAME_LEN EM_QUEUE_GROUP_NAME_LEN
#define TEST_QGRP_NAME_BASE "QGrp"
#define MAX_CORES 64
#define EVENT_DATA_ALLOC_NBR (MAX_CORES * 16)
#define ROUND_UP(val, N) ((((val) + ((N) - 1)) / (N)) * (N))
typedef struct app_eo_ctx_t {
em_eo_t eo;
em_queue_t notif_queue;
em_queue_group_t notif_qgrp;
em_queue_t test_queue;
bool test_queue_added;
em_queue_group_t test_qgrp;
em_event_group_t event_group;
char test_qgrp_name[TEST_QGRP_NAME_LEN];
int test_qgrp_name_nbr;
uint64_t qgrp_modify_count;
uint64_t modify_threshold;
uint64_t print_threshold;
uint64_t tot_modify_count;
uint64_t tot_modify_count_check;
} app_eo_ctx_t;
typedef struct app_q_ctx_t {
} app_q_ctx_t;
typedef union app_event_t {
#define EVENT_NOTIF 1
#define EVENT_DATA 2
uint32_t id;
struct {
uint32_t id;
enum {
NOTIF_START_DONE,
NOTIF_RESTART,
NOTIF_QUEUE_GROUP_MODIFY_DONE_FIRST,
NOTIF_QUEUE_GROUP_MODIFY_DONE,
NOTIF_EVENT_GROUP_DATA_DONE
} type;
em_queue_group_t used_group;
} notif;
struct {
uint32_t id;
em_queue_group_t used_group;
} data;
} app_event_t;
typedef union core_stat_t {
struct {
uint64_t event_count;
};
} core_stat_t;
CORE_STAT_T__SIZE_ERROR);
typedef struct qgrp_shm_t {
unsigned int core_count;
} qgrp_shm_t;
QGRP_SHM_T__SIZE_ERROR);
== 0, OFFSETOF_EO_CTX_ERROR);
== 0, OFFSETOF_Q_CTX_ERROR);
== 0, OFFSETOF_CORE_STAT_ERROR);
static void
em_queue_t queue, void *queue_context);
static inline void
receive_event_notif(app_eo_ctx_t *eo_ctx, em_event_t event,
em_queue_t queue, app_q_ctx_t *q_ctx);
static void
notif_start_done(app_eo_ctx_t *eo_ctx, em_event_t event, em_queue_t queue);
static void
notif_queue_group_modify_done(app_eo_ctx_t *eo_ctx, em_event_t event,
em_queue_t queue);
static void
notif_event_group_data_done(app_eo_ctx_t *eo_ctx, em_event_t event,
em_queue_t queue);
static inline void
receive_event_data(app_eo_ctx_t *eo_ctx, em_event_t event,
em_queue_t queue, app_q_ctx_t *q_ctx);
static void await_exit_ack(void);
start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf);
stop(void *eo_context, em_eo_t eo);
start_local(void *eo_context, em_eo_t eo);
stop_local(void *eo_context, em_eo_t eo);
static void
int main(int argc, char *argv[])
{
return cm_setup(argc, argv);
}
void test_init(const appl_conf_t *appl_conf)
{
(void)appl_conf;
if (core == 0) {
qgrp_shm = env_shared_reserve("QueueGroupSharedMem",
sizeof(qgrp_shm_t));
} else {
qgrp_shm = env_shared_lookup("QueueGroupSharedMem");
}
if (qgrp_shm == NULL) {
"Queue Group test init failed on EM-core: %u\n",
} else if (core == 0) {
memset(qgrp_shm, 0, sizeof(qgrp_shm_t));
}
}
void test_start(const appl_conf_t *appl_conf)
{
app_event_t *app_event;
em_event_t event;
em_queue_group_t default_group;
em_queue_t notif_queue;
em_event_group_t event_group;
em_eo_t eo;
if (appl_conf->num_pools >= 1)
qgrp_shm->pool = appl_conf->pools[0];
else
qgrp_shm->core_count = appl_conf->core_count;
APPL_PRINT("\n"
"***********************************************************\n"
"EM APPLICATION: '%s' initializing:\n"
" %s: %s() - EM-core:%d\n"
" Application running on %u EM-cores (procs:%u, threads:%u)\n"
"***********************************************************\n"
"\n",
appl_conf->name, NO_PATH(__FILE__), __func__,
em_core_id(),
appl_conf->core_count, appl_conf->num_procs, appl_conf->num_threads,
qgrp_shm->pool);
"Undefined application event pool!");
test_fatal_if(qgrp_shm->core_count > MAX_CORES,
"Test started on too many cores(%i)!\n"
"Max supported core count for this test is: %u\n",
qgrp_shm->core_count, MAX_CORES);
env_atomic32_init(&qgrp_shm->exit_ack);
env_atomic32_set(&qgrp_shm->exit_ack, 0);
start, start_local, stop, stop_local,
receive, &qgrp_shm->app_eo_ctx);
"Default queue group(%" PRI_QGRP ") not found!",
default_group);
"Notification queue creation failed!");
test_fatal_if(err !=
EM_OK,
"Notification queue add to EO failed:%" PRI_STAT "", err);
"Event group creation failed!");
qgrp_shm->app_eo_ctx.eo = eo;
qgrp_shm->app_eo_ctx.notif_queue = notif_queue;
qgrp_shm->app_eo_ctx.notif_qgrp = default_group;
qgrp_shm->app_eo_ctx.event_group = event_group;
APPL_PRINT(
"Starting EO:%" PRI_EO "\t"
"- Notification Queue=%" PRI_QUEUE "\n", eo, notif_queue);
qgrp_shm->pool);
"Notification event allocation failed");
memset(app_event, 0, sizeof(*app_event));
app_event->notif.id = EVENT_NOTIF;
app_event->notif.type = NOTIF_START_DONE;
app_event->notif.used_group = default_group;
notif_tbl[0].
event = event;
notif_tbl[0].
queue = notif_queue;
test_fatal_if(err !=
EM_OK,
"em_eo_start(%" PRI_EO "):%" PRI_STAT
"", eo, err);
test_fatal_if(start_err !=
EM_OK,
"EO start function:%" PRI_STAT "",
start_err);
}
void test_stop(const appl_conf_t *appl_conf)
{
(void)appl_conf;
APPL_PRINT("%s() on EM-core %02d\n", __func__, core);
await_exit_ack();
em_eo_t eo = qgrp_shm->app_eo_ctx.eo;
em_event_group_t egrp;
int num_notifs;
test_fatal_if(err !=
EM_OK,
"EO stop:%" PRI_STAT
" EO:%" PRI_EO "", err, eo);
egrp = qgrp_shm->app_eo_ctx.event_group;
if (err ==
EM_OK && num_notifs == 1)
}
test_fatal_if(err !=
EM_OK,
egrp, err, eo);
}
void test_term(const appl_conf_t *appl_conf)
{
(void)appl_conf;
APPL_PRINT("%s() on EM-core %02d\n", __func__, core);
if (core == 0) {
env_shared_free(qgrp_shm);
}
}
static void
em_queue_t queue, void *queue_context)
{
app_eo_ctx_t *eo_ctx = eo_context;
app_q_ctx_t *q_ctx = queue_context;
"Unexpected event type: 0x%x", type);
if (unlikely(appl_shm->exit_flag)) {
uint32_t exit_ack = env_atomic32_get(&qgrp_shm->exit_ack);
if (exit_ack) {
return;
}
if (app_event->id == EVENT_NOTIF &&
(app_event->notif.type == NOTIF_QUEUE_GROUP_MODIFY_DONE_FIRST ||
app_event->notif.type == NOTIF_QUEUE_GROUP_MODIFY_DONE)) {
if (!exit_ack)
env_atomic32_set(&qgrp_shm->exit_ack, 1);
return;
}
}
switch (app_event->id) {
case EVENT_NOTIF:
receive_event_notif(eo_ctx, event, queue, q_ctx);
break;
case EVENT_DATA:
receive_event_data(eo_ctx, event, queue, q_ctx);
break;
default:
"Unknown event id(%u)!", app_event->id);
break;
}
}
static inline void
receive_event_notif(app_eo_ctx_t *eo_ctx, em_event_t event,
em_queue_t queue, app_q_ctx_t *q_ctx)
{
(void)q_ctx;
switch (app_event->notif.type) {
case NOTIF_RESTART:
APPL_PRINT("\n"
"***********************************************\n"
"!!! Restarting test !!!\n"
"***********************************************\n"
"\n\n\n");
eo_ctx->tot_modify_count_check = 0;
notif_start_done(eo_ctx, event, queue);
break;
case NOTIF_START_DONE:
notif_start_done(eo_ctx, event, queue);
break;
case NOTIF_QUEUE_GROUP_MODIFY_DONE_FIRST:
test_fatal_if(err !=
EM_OK,
"EO add queue:%" PRI_STAT "", err);
eo_ctx->test_queue_added = true;
notif_queue_group_modify_done(eo_ctx, event, queue);
break;
case NOTIF_QUEUE_GROUP_MODIFY_DONE:
notif_queue_group_modify_done(eo_ctx, event, queue);
break;
case NOTIF_EVENT_GROUP_DATA_DONE:
notif_event_group_data_done(eo_ctx, event, queue);
break;
default:
"Unknown notification type:%i!",
app_event->notif.type);
break;
}
}
static void
notif_start_done(app_eo_ctx_t *eo_ctx, em_event_t event, em_queue_t queue)
{
em_queue_group_t new_qgrp;
const char *new_qtype_str;
test_fatal_if(app_event->notif.used_group != qgrp_curr,
app_event->notif.used_group, qgrp_curr);
snprintf(&eo_ctx->test_qgrp_name[0],
sizeof(eo_ctx->test_qgrp_name), "%s%03i",
TEST_QGRP_NAME_BASE, eo_ctx->test_qgrp_name_nbr);
eo_ctx->test_qgrp_name[TEST_QGRP_NAME_LEN - 1] = '\0';
eo_ctx->test_qgrp_name_nbr = (eo_ctx->test_qgrp_name_nbr + 1)
% 1000;
app_event->notif.type = NOTIF_QUEUE_GROUP_MODIFY_DONE_FIRST;
app_event->notif.used_group = eo_ctx->notif_qgrp;
1, ¬if_tbl);
"Queue group creation failed!");
test_fatal_if(err !=
EM_OK,
"Qgrp delete:%" PRI_STAT "", err);
}
eo_ctx->test_qgrp = new_qgrp;
switch (eo_ctx->test_queue_type) {
new_qtype_str = "PARALLEL";
break;
new_qtype_str = "PARALLEL_ORDERED";
break;
default:
new_qtype_str = "ATOMIC";
break;
}
eo_ctx->test_queue_type = new_qtype;
eo_ctx->test_queue_type,
eo_ctx->test_qgrp, NULL);
"Test queue creation failed!");
eo_ctx->test_queue_added = false;
APPL_PRINT("\n"
"Created test queue:%" PRI_QUEUE " type:%s(%u)\t"
"queue group:%" PRI_QGRP " (name:\"%s\")\n",
eo_ctx->test_queue, new_qtype_str, eo_ctx->test_queue_type,
eo_ctx->test_qgrp, eo_ctx->test_qgrp_name);
memset(&qgrp_shm->app_q_ctx, 0, sizeof(qgrp_shm->app_q_ctx));
env_atomic64_init(&qgrp_shm->app_q_ctx.event_count);
test_fatal_if(err !=
EM_OK,
"Set queue context:%" PRI_STAT
"", err);
env_sync_mem();
}
static void
notif_queue_group_modify_done(app_eo_ctx_t *eo_ctx, em_event_t event,
em_queue_t queue)
{
test_fatal_if(app_event->notif.used_group != qgrp_curr,
app_event->notif.used_group, qgrp_curr);
APPL_PRINT("\n"
"*************************************\n"
"All cores removed from QueueGroup!\n"
"*************************************\n");
test_fatal_if(eo_ctx->tot_modify_count !=
eo_ctx->tot_modify_count_check,
"Modify count != actual count:\t"
"%" PRIu64 " vs %" PRIu64 "",
eo_ctx->tot_modify_count,
eo_ctx->tot_modify_count_check);
eo_ctx->test_queue);
test_fatal_if(err !=
EM_OK,
"Remove test queue:%" PRI_STAT "", err);
eo_ctx->test_queue_added = false;
APPL_PRINT(
"Deleting test queue:%" PRI_QUEUE ",\t"
"Qgrp ID:%" PRI_QGRP " (name:\"%s\")\n",
eo_ctx->test_queue, eo_ctx->test_qgrp,
eo_ctx->test_qgrp_name);
test_fatal_if(err !=
EM_OK,
"Delete test queue:%" PRI_STAT "", err);
app_event->notif.id = EVENT_NOTIF;
app_event->notif.type = NOTIF_RESTART;
app_event->notif.used_group = eo_ctx->notif_qgrp;
err =
em_send(event, eo_ctx->notif_queue);
if (unlikely(err !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"Send to notif queue:%" PRI_STAT "", err);
}
} else {
app_event->notif.id = EVENT_NOTIF;
app_event->notif.type = NOTIF_EVENT_GROUP_DATA_DONE;
app_event->notif.used_group = eo_ctx->notif_qgrp;
egroup_notif_tbl[0].
event = event;
egroup_notif_tbl[0].
queue = eo_ctx->notif_queue;
eo_ctx->modify_threshold, 1,
egroup_notif_tbl);
test_fatal_if(err !=
EM_OK,
"em_event_group_apply():%" PRI_STAT "", err);
for (int i = 0; i < EVENT_DATA_ALLOC_NBR; i++) {
em_event_t ev_data =
em_alloc(
sizeof(app_event_t),
qgrp_shm->pool);
"Event alloc failed!");
data_event->id = EVENT_DATA;
data_event->data.used_group = eo_ctx->test_qgrp;
eo_ctx->event_group);
if (unlikely(err !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"Send to test queue:%" PRI_STAT "",
err);
}
}
}
}
static void
notif_event_group_data_done(app_eo_ctx_t *eo_ctx, em_event_t event,
em_queue_t queue)
{
int core_count;
int i;
test_fatal_if(app_event->notif.used_group != qgrp_curr,
app_event->notif.used_group, qgrp_curr);
uint64_t mod_cnt = ++eo_ctx->qgrp_modify_count;
eo_ctx->tot_modify_count_check++;
test_fatal_if(err !=
EM_OK,
"Get queue group mask:%" PRI_STAT "", err);
next_core_mask( &core_mask, &eo_ctx->core_mask_max,
eo_ctx->tot_modify_count_check);
if (mod_cnt >= eo_ctx->print_threshold ||
&used_mask);
&core_mask);
APPL_PRINT("\n"
"****************************************\n"
"Received %" PRIu64
" events on Q:%" PRI_QUEUE ":\n"
" QueueGroup:%" PRI_QGRP ", Curr Coremask:%s\n"
"Now Modifying:\n"
" QueueGroup:%" PRI_QGRP ", New Coremask:%s\n"
"****************************************\n",
env_atomic64_get(&qgrp_shm->app_q_ctx.event_count),
eo_ctx->test_queue, eo_ctx->test_qgrp,
used_mask_str, eo_ctx->test_qgrp, core_mask_str);
eo_ctx->qgrp_modify_count = 0;
}
core_count = qgrp_shm->core_count;
for (i = 0; i < core_count; i++) {
const uint64_t ev_count = qgrp_shm->core_stat[i].event_count;
if (unlikely(ev_count == 0)) {
&used_mask);
"No events on core%i, mask:%s",
i, mstr);
}
} else if (unlikely(ev_count > 0)) {
&used_mask);
"Events:%" PRIu64 " on inv.core%i, mask:%s",
ev_count, i, mstr);
}
}
memset(qgrp_shm->core_stat, 0, sizeof(qgrp_shm->core_stat));
env_atomic64_set(&qgrp_shm->app_q_ctx.event_count, 0);
app_event->id = EVENT_NOTIF;
app_event->notif.type = NOTIF_QUEUE_GROUP_MODIFY_DONE;
app_event->notif.used_group = eo_ctx->notif_qgrp;
notif_tbl.
queue = eo_ctx->notif_queue;
1, ¬if_tbl);
test_fatal_if(err !=
EM_OK,
"em_queue_group_modify():%" PRI_STAT "", err);
}
static inline void
receive_event_data(app_eo_ctx_t *eo_ctx, em_event_t event,
em_queue_t queue, app_q_ctx_t *q_ctx)
{
const uint64_t event_count =
env_atomic64_add_return(&q_ctx->event_count, 1);
qgrp_shm->core_stat[core_id].event_count++;
test_fatal_if(app_event->data.used_group != qgrp_curr,
app_event->data.used_group, qgrp_curr);
test_fatal_if(err !=
EM_OK,
"Get queue group mask:%" PRI_STAT "", err);
"Core bit not set in core mask! core:%02i mask:%s",
core_id, mask_str);
}
if (event_count <= eo_ctx->modify_threshold - EVENT_DATA_ALLOC_NBR) {
eo_ctx->event_group);
if (unlikely(err !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"Send to test queue:%" PRI_STAT "", err);
}
} else if (event_count <= eo_ctx->modify_threshold) {
} else {
"Invalid event count(%u)!", event_count);
}
}
static void await_exit_ack(void)
{
env_time_t t_max = env_time_global_from_ns(20 * 1000000000ULL);
env_time_t t_now = ENV_TIME_NULL;
env_time_t t_start = env_time_global();
env_time_t t_end = env_time_sum(t_start, t_max);
uint64_t ns;
uint32_t exit_ack = 0;
long double sec;
do {
if (!exit_ack)
exit_ack = env_atomic32_get(&qgrp_shm->exit_ack);
t_now = env_time_global();
} while (!exit_ack && env_time_cmp(t_now, t_end) < 0);
ns = env_time_diff_ns(t_now, t_start);
sec = (long double)ns / 1000000000.0;
if (unlikely(!exit_ack)) {
"Timeout: No exit_ack within %Lfs!\n", sec);
return;
}
APPL_PRINT("exit_ack in %Lfs on EM-core:%02d => Tearing down\n",
}
start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf)
{
app_eo_ctx_t *eo_ctx = eo_context;
uint64_t tot_modify_count = 0;
uint64_t tmp;
int ret;
(void)eo;
(void)conf;
APPL_PRINT("Queue Group Test - Global EO Start\n");
snprintf(&eo_ctx->test_qgrp_name[0],
sizeof(eo_ctx->test_qgrp_name),
"%s%03i", TEST_QGRP_NAME_BASE, 0);
if (unlikely(ret != 1)) {
&eo_ctx->core_mask_max);
"em_core_mask_get_bits(coremask=%s), ret=%i",
mask_str, ret);
}
do {
tot_modify_count += (tmp & 0xFF) + 1;
tmp = (tmp >> 4);
if (tmp < 0x10)
break;
} while (tmp);
tot_modify_count -= 1;
eo_ctx->tot_modify_count = tot_modify_count;
eo_ctx->tot_modify_count_check = 0;
eo_ctx->print_threshold = tot_modify_count / TEST_PRINT_COUNT;
if (eo_ctx->print_threshold == 0)
eo_ctx->print_threshold = 1;
eo_ctx->modify_threshold =
((256 * 15 * 0x1000) - 1) / tot_modify_count;
eo_ctx->modify_threshold = ROUND_UP(eo_ctx->modify_threshold,
EVENT_DATA_ALLOC_NBR);
APPL_PRINT("\n"
"*******************************************************\n"
"Test threshold values set:\n"
" Tot group modifies: %" PRIu64 "\n"
" Events received on group before modify: %" PRIu64 "\n"
" Group modify print threshold: %" PRIu64 "\n"
"*******************************************************\n"
"\n",
tot_modify_count, eo_ctx->modify_threshold,
eo_ctx->print_threshold);
}
stop(void *eo_context, em_eo_t eo)
{
app_eo_ctx_t *eo_ctx = eo_context;
test_fatal_if(err !=
EM_OK,
"EO remove queue all:%" PRI_STAT
" EO:%" PRI_EO "",
err, eo);
if (eo_ctx->test_queue !=
EM_QUEUE_UNDEF && !eo_ctx->test_queue_added) {
test_fatal_if(err !=
EM_OK,
"Delete test queue:%" PRI_STAT "", err);
}
test_fatal_if(err !=
EM_OK,
"EO delete:%" PRI_STAT
" EO:%" PRI_EO "",
err, eo);
APPL_PRINT("Queue Group Test - Global EO Stop\n");
}
start_local(void *eo_context, em_eo_t eo)
{
(void)eo_context;
(void)eo;
APPL_PRINT("Queue Group Test - Local EO Start: EM-core:%02d\n",
}
stop_local(void *eo_context, em_eo_t eo)
{
(void)eo_context;
(void)eo;
APPL_PRINT("Queue Group Test - Local EO Stop: EM-core:%02d\n",
}
static void
{
uint64_t mask64 = ((uint64_t)(count % 256) + 1) << (4 * (count / 256));
}