#include <string.h>
#include <stdio.h>
#include "cm_setup.h"
#include "cm_error_handler.h"
#define DATA_EVENTS 256
#define EVENT_GROUP_INCREMENT 7
#define MAX_NBR_OF_CORES 256
#define DELAY_SPIN_COUNT 50000000
#define CFG_EV_CNT (DATA_EVENTS * (1 + EVENT_GROUP_INCREMENT))
#define SEND_MULTI_MAX 32
typedef struct {
#define MSG_START 1
#define MSG_START_ASSIGN 2
#define MSG_DATA 3
#define MSG_DONE 4
#define MSG_DONE_ASSIGN 5
#define MSG_ALL_DONE_CHAINED 6
uint64_t msg;
env_time_t start_time;
uint64_t increment;
} event_group_test_t;
typedef struct {
em_event_group_t event_group;
em_event_t event_start;
env_time_t total_time;
uint64_t total_rounds;
} egrp_context_t;
typedef union {
struct {
em_eo_t eo;
em_queue_t queue;
int event_count;
egrp_context_t egrp;
egrp_context_t egrp_assign;
em_event_group_t all_done_event_group;
};
} eo_context_t;
EVENT_GROUP_TEST_EO_CONTEXT_SIZE_ERROR);
typedef union {
struct {
uint64_t rcv_ev_cnt;
};
} core_stat_t;
CORE_STAT_T_SIZE_ERROR);
typedef struct {
em_pool_t pool;
unsigned int core_count;
uint64_t cpu_hz;
} egrp_shm_t;
egroup_start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf);
egroup_stop(void *eo_context, em_eo_t eo);
static void
em_queue_t queue, void *q_ctx);
static uint64_t
sum_received_events(core_stat_t core_stats[], int len);
static void
update_test_time(env_time_t diff, egrp_context_t *const egrp_context);
int main(int argc, char *argv[])
{
return cm_setup(argc, argv);
}
void test_init(const appl_conf_t *appl_conf)
{
(void)appl_conf;
if (core == 0) {
egrp_shm = env_shared_reserve("EGrpSharedMem",
sizeof(egrp_shm_t));
} else {
egrp_shm = env_shared_lookup("EGrpSharedMem");
}
if (egrp_shm == NULL) {
"EventGroup test init failed on EM-core: %u\n",
} else if (core == 0) {
memset(egrp_shm, 0, sizeof(egrp_shm_t));
}
}
void test_start(const appl_conf_t *appl_conf)
{
em_event_t event;
em_eo_t eo;
em_queue_t queue;
eo_context_t *eo_ctx;
event_group_test_t *egroup_test;
if (appl_conf->num_pools >= 1)
egrp_shm->pool = appl_conf->pools[0];
else
egrp_shm->core_count = appl_conf->core_count;
APPL_PRINT("\n"
"***********************************************************\n"
"EM APPLICATION: '%s' initializing:\n"
" %s: %s() - EM-core:%d\n"
" Application running on %u EM-cores (procs:%u, threads:%u)\n"
"***********************************************************\n"
"\n",
appl_conf->name, NO_PATH(__FILE__), __func__,
em_core_id(),
appl_conf->core_count, appl_conf->num_procs, appl_conf->num_threads,
egrp_shm->pool);
"Undefined application event pool!");
egrp_shm->cpu_hz = env_core_hz();
eo_ctx = &egrp_shm->eo_context;
memset(eo_ctx, 0, sizeof(eo_context_t));
eo =
em_eo_create(
"group test appl", egroup_start, NULL, egroup_stop,
NULL, egroup_receive, eo_ctx);
eo_ctx->eo = eo;
NULL);
eo_ctx->queue = queue;
test_fatal_if(ret !=
EM_OK,
"EO add queue:%" PRI_STAT
".\n"
ret, eo, queue);
test_fatal_if(ret !=
EM_OK || eo_start_ret !=
EM_OK,
"em_eo_start() failed! EO:%" PRI_EO "\n"
"ret:%" PRI_STAT " EO-start-ret:%" PRI_STAT "",
eo, ret, eo_start_ret);
egrp_shm->pool);
egroup_test->msg = MSG_START;
test_fatal_if(ret !=
EM_OK,
"Event send:%" PRI_STAT
" Queue:%" PRI_QUEUE "",
ret, queue);
egrp_shm->pool);
egroup_test->msg = MSG_START_ASSIGN;
test_fatal_if(ret !=
EM_OK,
"Event send:%" PRI_STAT
" Queue:%" PRI_QUEUE "",
ret, queue);
}
void test_stop(const appl_conf_t *appl_conf)
{
eo_context_t *const eo_ctx = &egrp_shm->eo_context;
em_eo_t eo;
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
eo = eo_ctx->eo;
test_fatal_if(ret !=
EM_OK,
"EO:%" PRI_EO " stop:%" PRI_STAT
"", eo, ret);
test_fatal_if(ret !=
EM_OK,
"EO:%" PRI_EO " delete:%" PRI_STAT
"", eo, ret);
}
void test_term(const appl_conf_t *appl_conf)
{
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
if (core == 0) {
env_shared_free(egrp_shm);
}
}
egroup_start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf)
{
em_event_t event;
event_group_test_t *egroup_test;
eo_context_t *eo_ctx = eo_context;
(void)conf;
}
memset(egrp_shm->core_stats, 0, sizeof(egrp_shm->core_stats));
memset(egrp_shm->core_stats_assign, 0,
sizeof(egrp_shm->core_stats_assign));
eo, eo_ctx->egrp.event_group);
eo, eo_ctx->egrp_assign.event_group);
eo_ctx->event_count = DATA_EVENTS;
egrp_shm->pool);
egroup_test->msg = MSG_ALL_DONE_CHAINED;
egroup_notif_tbl[0].
event = event;
egroup_notif_tbl[0].
queue = eo_ctx->queue;
egroup_notif_tbl);
test_fatal_if(ret !=
EM_OK,
"Event group apply:%" PRI_STAT
"", ret);
}
egroup_stop(void *eo_context, em_eo_t eo)
{
eo_context_t *eo_ctx = eo_context;
em_event_group_t egrp;
int num_notifs;
test_fatal_if(ret !=
EM_OK,
"EO remove queue all:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
egrp = eo_ctx->egrp.event_group;
if (ret ==
EM_OK && num_notifs == 1)
}
test_fatal_if(ret !=
EM_OK,
egrp, ret, eo);
egrp = eo_ctx->egrp_assign.event_group;
if (ret ==
EM_OK && num_notifs == 1)
}
test_fatal_if(ret !=
EM_OK,
egrp, ret, eo);
egrp = eo_ctx->all_done_event_group;
if (ret ==
EM_OK && num_notifs == 1)
}
test_fatal_if(ret !=
EM_OK,
egrp, ret, eo);
}
static void
em_queue_t queue, void *q_ctx)
{
eo_context_t *eo_ctx = eo_context;
event_group_test_t *egroup_test;
env_time_t diff_time, start_time, end_time;
int i, j;
em_event_group_t apply_event_group;
uint64_t rcv_ev_cnt;
egrp_context_t *egrp_context;
core_stat_t *core_stats;
const char *egrp_str;
(void)type;
(void)q_ctx;
if (unlikely(appl_shm->exit_flag)) {
return;
}
switch (egroup_test->msg) {
case MSG_START:
case MSG_START_ASSIGN: {
const int ev_cnt = eo_ctx->event_count;
em_event_t ev_tbl[ev_cnt];
int num_sent = 0;
if (egroup_test->msg == MSG_START) {
APPL_PRINT("--- Start event group ---\n");
egroup_test->msg = MSG_DONE;
apply_event_group = eo_ctx->egrp.event_group;
} else {
APPL_PRINT("--- Start assigned event group ---\n");
egroup_test->msg = MSG_DONE_ASSIGN;
apply_event_group = eo_ctx->egrp_assign.event_group;
}
egroup_test->start_time = env_time_global();
egroup_test->increment = 0;
egroup_notif_tbl[0].
event = event;
egroup_notif_tbl[0].
queue = queue;
egroup_notif_tbl[0].
egroup = eo_ctx->all_done_event_group;
egroup_notif_tbl);
test_fatal_if(ret !=
EM_OK,
"Event group apply:%" PRI_STAT "", ret);
for (i = 0; i < ev_cnt; i++) {
event_group_test_t *egroup_test_2;
ev_tbl[i] =
em_alloc(
sizeof(event_group_test_t),
"Event allocation failed!");
egroup_test_2->msg = MSG_DATA;
egroup_test_2->start_time = ENV_TIME_NULL;
egroup_test_2->increment = EVENT_GROUP_INCREMENT;
}
const int send_rounds = ev_cnt / SEND_MULTI_MAX;
const int left_over = ev_cnt % SEND_MULTI_MAX;
em_event_group_t send_event_group;
if (apply_event_group == eo_ctx->egrp.event_group) {
send_event_group = apply_event_group;
} else {
}
for (i = 0, j = 0; i < send_rounds; i++, j += SEND_MULTI_MAX) {
num_sent +=
queue, send_event_group);
}
if (left_over) {
num_sent +=
queue, send_event_group);
}
if (unlikely(num_sent != ev_cnt)) {
for (i = num_sent; i < ev_cnt; i++)
test_fatal_if(!appl_shm->exit_flag,
"Event send multi failed:%d (%d)\n"
num_sent, ev_cnt, queue);
}
break;
}
case MSG_DATA: {
ret =
test_fatal_if(ret !=
EM_OK,
"Event group assign:%" PRI_STAT "", ret);
egrp_shm->core_stats_assign[core].rcv_ev_cnt += 1;
} else {
egrp_shm->core_stats[core].rcv_ev_cnt += 1;
}
if (egroup_test->increment) {
egroup_test->increment--;
test_fatal_if(ret !=
EM_OK,
"Event group incr:%" PRI_STAT "", ret);
const em_event_group_t event_group =
test_fatal_if(ret !=
EM_OK,
"Event group assign:%" PRI_STAT "", ret);
test_fatal_if(ret !=
EM_OK,
"Event group incr:%" PRI_STAT "", ret);
eo_ctx->egrp.event_group) {
} else {
}
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"Send:%" PRI_STAT "\n"
}
} else {
}
break;
}
case MSG_DONE:
case MSG_DONE_ASSIGN:
if (egroup_test->msg == MSG_DONE) {
egrp_context = &eo_ctx->egrp;
core_stats = egrp_shm->core_stats;
egrp_str = "\"Normal\"";
egroup_test->msg = MSG_START;
} else {
egrp_context = &eo_ctx->egrp_assign;
core_stats = egrp_shm->core_stats_assign;
egrp_str = "Assigned";
egroup_test->msg = MSG_START_ASSIGN;
}
end_time = env_time_global();
start_time = egroup_test->start_time;
diff_time = env_time_diff(end_time, start_time);
update_test_time(diff_time, egrp_context);
rcv_ev_cnt = sum_received_events(core_stats, egrp_shm->core_count);
APPL_PRINT("%s event group notification event received after\t"
"%" PRIu64 " data events.\n"
"Cycles curr:%" PRIu64 ", ave:%" PRIu64 "\n",
egrp_str, rcv_ev_cnt,
env_time_to_cycles(diff_time, egrp_shm->cpu_hz),
env_time_to_cycles(egrp_context->total_time,
egrp_shm->cpu_hz) /
egrp_context->total_rounds);
test_fatal_if(rcv_ev_cnt != CFG_EV_CNT,
"Incorrect nbr of data events before notif:\t"
"%" PRIu64 " != %" PRIu64 "!",
rcv_ev_cnt, CFG_EV_CNT);
egrp_context->event_start = event;
break;
case MSG_ALL_DONE_CHAINED:
APPL_PRINT("--- Chained event group done ---\n\n");
egroup_notif_tbl[0].
event = event;
egroup_notif_tbl[0].
queue = eo_ctx->queue;
egroup_notif_tbl);
test_fatal_if(ret !=
EM_OK,
"Event group apply:%" PRI_STAT "!", ret);
delay_spin(DELAY_SPIN_COUNT);
ret =
em_send(eo_ctx->egrp.event_start, queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
ret, queue);
}
ret =
em_send(eo_ctx->egrp_assign.event_start, queue);
if (unlikely(ret !=
EM_OK)) {
em_free(eo_ctx->egrp_assign.event_start);
test_fatal_if(!appl_shm->exit_flag,
ret, queue);
}
break;
default:
test_fatal_if(1, "Bad msg (%" PRIu64 ")!", egroup_test->msg);
break;
};
}
static uint64_t
sum_received_events(core_stat_t core_stats[], int len)
{
uint64_t rcv_ev_cnt = 0;
int i;
for (i = 0; i < len; i++) {
rcv_ev_cnt += core_stats[i].rcv_ev_cnt;
core_stats[i].rcv_ev_cnt = 0;
}
return rcv_ev_cnt;
}
static void
update_test_time(env_time_t diff_time, egrp_context_t *const egrp_context)
{
if (egrp_context->total_rounds == 1) {
egrp_context->total_time =
env_time_sum(egrp_context->total_time, diff_time);
egrp_context->total_time =
env_time_sum(egrp_context->total_time, diff_time);
} else if (egrp_context->total_rounds > 1) {
egrp_context->total_time =
env_time_sum(egrp_context->total_time, diff_time);
}
egrp_context->total_rounds++;
}