#include "cm_setup.h"
#include "cm_error_handler.h"
#define EVENT_GROUPS 30
#define EVENTS_PER_GROUP 128
#define RANDOM_COUNT (rand() % 128000 + 1)
typedef struct {
#define MSG_START 1
#define MSG_DATA 2
#define MSG_NOTIF 3
uint64_t msg;
uint64_t egrp_id;
uint64_t egrp_gen;
} egrp_test_t;
typedef struct {
em_event_group_t grp;
uint64_t gen;
env_spinlock_t lock;
} egrp_data_tbl;
typedef struct {
em_eo_t eo;
em_queue_t paral_queue;
egrp_data_tbl egrp_tbl[EVENT_GROUPS];
} eo_context_t;
typedef struct {
em_pool_t pool;
uint64_t stop_count;
uint64_t target_count;
uint64_t round;
unsigned int core_count;
} egrp_shm_t;
egroup_start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf);
egroup_stop(void *eo_context, em_eo_t eo);
static void
em_queue_t queue, void *q_ctx);
static void
send_start_event(void);
static void
abort_event_group(em_event_group_t event_group);
static void
print_round_start_info(void);
static void
print_round_end_info(void);
static void
send_test_events(void);
static void
init_counters(void);
static void
update_group_count(void);
int main(int argc, char *argv[])
{
return cm_setup(argc, argv);
}
va_list args)
{
switch (escope) {
break;
case EM_ESCOPE_EVENT_GROUP_ABORT:
break;
case EM_ESCOPE_EVENT_GROUP_ASSIGN:
break;
case EM_ESCOPE_EVENT_GROUP_INCREMENT:
break;
default:
error = test_error_handler(eo, error, escope, args);
};
return error;
}
void
send_test_events(void)
{
em_event_t test_event;
egrp_test_t *egrp_event;
egrp_data_tbl *egrp_elem;
const em_queue_t paral_queue = egrp_shm->test_eo_ctx.paral_queue;
int i, j;
for (i = 0; i < EVENT_GROUPS; i++) {
egrp_elem = &egrp_shm->test_eo_ctx.egrp_tbl[i];
egrp_shm->pool);
"Event allocation failed!");
egrp_event->msg = MSG_NOTIF;
egrp_event->egrp_id = i;
egrp_event->egrp_gen = egrp_elem->gen + 1;
notif_tbl[0].
event = test_event;
notif_tbl[0].
queue = paral_queue;
env_spinlock_lock(&egrp_elem->lock);
egrp_shm->target_count, 1,
notif_tbl);
test_fatal_if(ret !=
EM_OK,
"em_event_group_apply():%" PRI_STAT "", ret);
egrp_elem->gen++;
env_atomic64_init(&egrp_shm->test_eo_ctx.group_counter[i]);
env_spinlock_unlock(&egrp_elem->lock);
for (j = 0; j < EVENTS_PER_GROUP; j++) {
test_event =
em_alloc(
sizeof(egrp_test_t),
"Event allocation failed!");
egrp_event->msg = MSG_DATA;
egrp_event->egrp_id = i;
egrp_event->egrp_gen = egrp_elem->gen;
egrp_elem->grp);
if (likely(ret ==
EM_OK))
continue;
test_fatal_if(!appl_shm->exit_flag,
ret, paral_queue);
return;
}
}
}
void
init_counters(void)
{
env_atomic64_init(&egrp_shm->test_eo_ctx.rcvd_group_events);
env_atomic64_init(&egrp_shm->test_eo_ctx.rcvd_expired_events);
env_atomic64_init(&egrp_shm->test_eo_ctx.rcvd_notif_events);
env_atomic64_init(&egrp_shm->test_eo_ctx.increments);
env_atomic64_init(&egrp_shm->test_eo_ctx.failed_increments);
env_atomic64_init(&egrp_shm->test_eo_ctx.failed_aborts);
env_atomic64_init(&egrp_shm->test_eo_ctx.assigns);
env_atomic64_init(&egrp_shm->test_eo_ctx.failed_assigns);
env_atomic64_init(&egrp_shm->test_eo_ctx.aborted_egrps);
env_atomic64_init(&egrp_shm->test_eo_ctx.del_notifs);
env_atomic64_set(&egrp_shm->test_eo_ctx.groups_left, EVENT_GROUPS);
egrp_shm->target_count = RANDOM_COUNT;
egrp_shm->stop_count = RANDOM_COUNT;
}
void test_init(const appl_conf_t *appl_conf)
{
(void)appl_conf;
if (core == 0) {
egrp_shm = env_shared_reserve("EGrpSharedMem",
sizeof(egrp_shm_t));
} else {
egrp_shm = env_shared_lookup("EGrpSharedMem");
}
if (egrp_shm == NULL) {
"EventGroup test init failed on EM-core: %u\n",
} else if (core == 0) {
memset(egrp_shm, 0, sizeof(egrp_shm_t));
}
}
void test_start(const appl_conf_t *appl_conf)
{
em_eo_t eo;
em_queue_t queue;
eo_context_t *eo_ctx;
if (appl_conf->num_pools >= 1)
egrp_shm->pool = appl_conf->pools[0];
else
egrp_shm->core_count = appl_conf->core_count;
APPL_PRINT("\n"
"***********************************************************\n"
"EM APPLICATION: '%s' initializing:\n"
" %s: %s() - EM-core:%d\n"
" Application running on %u EM-cores (procs:%u, threads:%u)\n"
"***********************************************************\n"
"\n",
appl_conf->name, NO_PATH(__FILE__), __func__,
em_core_id(),
appl_conf->core_count, appl_conf->num_procs, appl_conf->num_threads,
egrp_shm->pool);
"Undefined application event pool!");
eo_ctx = &egrp_shm->test_eo_ctx;
eo =
em_eo_create(
"Evgrp-abort-test", egroup_start, NULL, egroup_stop,
NULL, egroup_receive, eo_ctx);
test_fatal_if(eo ==
EM_EO_UNDEF,
"EO creation failed!");
eo_ctx->eo = eo;
test_fatal_if(ret !=
EM_OK,
"EO add queue failed:%" PRI_STAT
"\n"
eo_ctx->paral_queue = queue;
int i;
for (i = 0; i < EVENT_GROUPS; i++) {
"Event group creation failed!");
eo_ctx->egrp_tbl[i].grp = egrp;
env_spinlock_init(&eo_ctx->egrp_tbl[i].lock);
eo_ctx->egrp_tbl[i].gen = 0;
}
egrp_shm->round = 0;
test_fatal_if(ret !=
EM_OK || eo_start_ret !=
EM_OK,
"em_eo_start() failed! EO:%" PRI_EO "\n"
"ret:%" PRI_STAT ", EO-start-ret:%" PRI_STAT "",
eo, ret, eo_start_ret);
}
void
send_start_event(void)
{
em_event_t start_event;
egrp_test_t *start_event_ptr;
const em_queue_t paral_queue = egrp_shm->test_eo_ctx.paral_queue;
egrp_shm->pool);
"Event allocation failed!");
start_event_ptr->msg = MSG_START;
ret =
em_send(start_event, paral_queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"Event send:%" PRI_STAT
" Queue:%" PRI_QUEUE "",
ret, paral_queue);
}
}
void
print_round_start_info(void)
{
APPL_PRINT("-----------------------------------------\n");
APPL_PRINT("\n--- Round %" PRIu64 "\n", egrp_shm->round);
APPL_PRINT("\nCreated %i event group(s) with count of %" PRIu64 "\n",
EVENT_GROUPS, egrp_shm->target_count);
APPL_PRINT("Abort group when received %" PRIu64 " events\n\n",
egrp_shm->stop_count);
}
void
print_round_end_info(void)
{
APPL_PRINT("Evgrp events:\t\tValid:%" PRIu64 "\tExpired:%" PRIu64 "\n",
env_atomic64_get(&egrp_shm->test_eo_ctx.rcvd_group_events),
env_atomic64_get(&egrp_shm->test_eo_ctx.rcvd_expired_events)
);
APPL_PRINT("Evgrp increments:\tValid:%" PRIu64 "\tFailed:%" PRIu64 "\n",
env_atomic64_get(&egrp_shm->test_eo_ctx.increments),
env_atomic64_get(&egrp_shm->test_eo_ctx.failed_increments));
APPL_PRINT("Evgrp assigns:\t\tValid:%" PRIu64 "\tFailed:%" PRIu64 "\n",
env_atomic64_get(&egrp_shm->test_eo_ctx.assigns),
env_atomic64_get(&egrp_shm->test_eo_ctx.failed_assigns));
APPL_PRINT("Aborted %" PRIu64 " event groups\n",
env_atomic64_get(&egrp_shm->test_eo_ctx.aborted_egrps));
APPL_PRINT("Failed to abort %" PRIu64 " times\n",
env_atomic64_get(&egrp_shm->test_eo_ctx.failed_aborts));
APPL_PRINT("Received %" PRIu64 " notification events\n",
env_atomic64_get(&egrp_shm->test_eo_ctx.rcvd_notif_events));
APPL_PRINT("Freed %" PRIu64 " notification events\n",
env_atomic64_get(&egrp_shm->test_eo_ctx.del_notifs));
}
void
abort_event_group(em_event_group_t event_group)
{
int returned_notifs, ready, i;
for (i = 0; i < returned_notifs; i++) {
env_atomic64_inc(&egrp_shm->test_eo_ctx.del_notifs);
}
env_atomic64_inc(&egrp_shm->test_eo_ctx.aborted_egrps);
update_group_count();
APPL_ERROR("em_event_group_is_ready():\n"
"should succeed after event group abort\n");
exit(EXIT_FAILURE);
}
} else {
env_atomic64_inc(&egrp_shm->test_eo_ctx.failed_aborts);
}
}
void update_group_count(void)
{
int groups_left =
env_atomic64_sub_return(&egrp_shm->test_eo_ctx.groups_left, 1);
if (groups_left == 0) {
print_round_end_info();
send_start_event();
}
}
void test_stop(const appl_conf_t *appl_conf)
{
em_eo_t eo;
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
if (egrp_shm->core_count > 1)
delay_spin(env_core_hz() / 100);
eo = egrp_shm->test_eo_ctx.eo;
test_fatal_if(ret !=
EM_OK,
"EO:%" PRI_EO " stop:%" PRI_STAT
"", eo, ret);
test_fatal_if(ret !=
EM_OK,
"EO:%" PRI_EO " delete:%" PRI_STAT
"", eo, ret);
}
void test_term(const appl_conf_t *appl_conf)
{
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
if (core == 0) {
env_shared_free(egrp_shm);
}
}
egroup_start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf)
{
(void)eo;
(void)eo_context;
(void)conf;
send_start_event();
}
egroup_stop(void *eo_context, em_eo_t eo)
{
eo_context_t *eo_ctx = eo_context;
em_event_group_t egrp;
int num_notifs;
test_fatal_if(ret !=
EM_OK,
"EO remove queue all:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
for (int i = 0; i < EVENT_GROUPS; i++) {
egrp = eo_ctx->egrp_tbl[i].grp;
notif_tbl);
if (ret ==
EM_OK && num_notifs == 1)
}
test_fatal_if(ret !=
EM_OK,
"delete:%" PRI_STAT
" EO:%" PRI_EO "",
egrp, ret, eo);
}
}
static void
em_queue_t queue, void *q_ctx)
{
eo_context_t *eo_ctx = eo_context;
em_event_group_t current_egrp;
egrp_data_tbl *egrp_data;
uint64_t egrp_id;
uint64_t current_count;
(void)type;
(void)q_ctx;
(void)queue;
if (unlikely(appl_shm->exit_flag)) {
return;
}
switch (rcvd_event->msg) {
case MSG_START:
init_counters();
egrp_shm->round++;
print_round_start_info();
send_test_events();
break;
case MSG_DATA:
egrp_id = rcvd_event->egrp_id;
egrp_data = &eo_ctx->egrp_tbl[egrp_id];
env_spinlock_lock(&egrp_data->lock);
env_spinlock_unlock(&egrp_data->lock);
env_atomic64_inc(&eo_ctx->rcvd_expired_events);
return;
}
test_fatal_if(rcvd_event->egrp_gen != egrp_data->gen,
"Current gen: %" PRIu64 ", received gen\t"
"%" PRIu64 " event: %p. Event group:\t"
rcvd_event->egrp_gen, (void *)rcvd_event,
egrp_data->grp);
env_spinlock_unlock(&egrp_data->lock);
env_atomic64_inc(&eo_ctx->rcvd_group_events);
if (rand() % 2) {
env_atomic64_inc(&eo_ctx->increments);
else
env_atomic64_inc(&eo_ctx->failed_increments);
}
if (rand() % 2) {
em_event_group_t rand_egrp =
eo_ctx->egrp_tbl[rand() % EVENT_GROUPS].grp;
env_atomic64_inc(&eo_ctx->assigns);
else
env_atomic64_inc(&eo_ctx->failed_assigns);
}
current_count =
env_atomic64_add_return(&eo_ctx->group_counter[egrp_id], 1);
if (current_count != egrp_shm->stop_count) {
env_spinlock_lock(&egrp_data->lock);
if (rcvd_event->egrp_gen != egrp_data->gen) {
} else {
current_egrp);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"Send:%" PRI_STAT "\t"
ret, eo_ctx->paral_queue);
}
}
env_spinlock_unlock(&egrp_data->lock);
} else {
abort_event_group(current_egrp);
}
break;
case MSG_NOTIF:
egrp_data = &eo_ctx->egrp_tbl[rcvd_event->egrp_id];
if (rcvd_event->egrp_gen != egrp_data->gen) {
APPL_ERROR("Receiving notification events from\n"
"previous rounds should not be possible");
exit(EXIT_FAILURE);
}
env_atomic64_inc(&eo_ctx->rcvd_notif_events);
update_group_count();
break;
default:
test_fatal_if(
EM_TRUE,
"Bad msg (%" PRIu64
")!",
rcvd_event->msg);
break;
}
}