#include <inttypes.h>
#include <string.h>
#include <stdio.h>
#include "cm_setup.h"
#include "cm_error_handler.h"
#define NUM_EO 32
#define NUM_EVENT 37
#define NUM_SUB_EVENT 11
#define MAX_NBR_OF_CORES 128
#define PRINT_EVENT_COUNT 0xff0000
#define PRINT_ON_ALL_CORES 1
#define SEND_MULTI_MAX 32
typedef union {
struct {
uint64_t num_events;
uint64_t begin_cycles;
uint64_t end_cycles;
uint64_t print_count;
};
} test_stat_t;
TEST_STAT_T_SIZE_ERROR);
typedef struct {
em_queue_t dest_queue;
} q_ordered_context_t;
typedef struct {
uint32_t seq;
uint32_t sub_seq;
em_queue_t dest_queue;
} q_atomic_context_t;
typedef union {
q_ordered_context_t q_ordered_ctx;
q_atomic_context_t q_atomic_ctx;
} q_context_array_elem_t;
Q_CONTEXT_SIZE_ERROR);
typedef struct {
em_eo_t hdl;
em_queue_t ordered_queue;
} eo_ordered_context_t;
typedef struct {
em_eo_t hdl;
em_queue_t atomic_queue;
em_queue_t peer_ordered_queue;
} eo_atomic_context_t;
typedef union {
eo_ordered_context_t eo_ordered_ctx;
eo_atomic_context_t eo_atomic_ctx;
} eo_context_array_elem_t;
EO_CONTEXT_SIZE_ERROR);
#define EV_ID_ORDERED_EVENT 1
#define EV_ID_START_EVENT 2
typedef struct {
int ev_id;
uint32_t seq;
uint32_t sub_seq;
int out_of_order;
int last_in_order;
int is_copy;
em_event_t original;
} ordered_event_t;
typedef struct {
int ev_id;
em_queue_t ordered_queue;
} start_event_t;
typedef union {
int ev_id;
ordered_event_t ordered;
start_event_t start;
} test_event_t;
typedef struct {
em_pool_t pool;
q_context_array_elem_t q_ordered_ctx[NUM_EO / 2]
q_context_array_elem_t q_atomic_ctx[NUM_EO / 2]
eo_context_array_elem_t eo_ordered_ctx[NUM_EO / 2]
eo_context_array_elem_t eo_atomic_ctx[NUM_EO / 2]
} test_shm_t;
eo_ordered_start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf);
eo_ordered_stop(void *eo_context, em_eo_t eo);
static void
eo_ordered_receive(
void *eo_context, em_event_t event,
em_event_type_t type,
em_queue_t queue, void *queue_ctx);
eo_atomic_start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf);
eo_atomic_stop(void *eo_context, em_eo_t eo);
static void
eo_atomic_receive(
void *eo_context, em_event_t event,
em_event_type_t type,
em_queue_t queue, void *queue_ctx);
static void
initialize_events(start_event_t *const start_event);
static void
print_result(test_stat_t *const test_stat);
static int
get_queue_priority(const int index);
int main(int argc, char *argv[])
{
return cm_setup(argc, argv);
}
void test_init(const appl_conf_t *appl_conf)
{
(void)appl_conf;
if (core == 0) {
test_shm = env_shared_reserve("TestSharedMem",
sizeof(test_shm_t));
} else {
test_shm = env_shared_lookup("TestSharedMem");
}
if (test_shm == NULL)
else if (core == 0)
memset(test_shm, 0, sizeof(test_shm_t));
}
void test_start(const appl_conf_t *appl_conf)
{
em_eo_t eo_a, eo_b;
em_queue_t queue_a, queue_b;
eo_ordered_context_t *eo_ordered_ctx;
eo_atomic_context_t *eo_atomic_ctx;
q_ordered_context_t *q_ordered_ctx;
q_atomic_context_t *q_atomic_ctx;
int i;
if (appl_conf->num_pools >= 1)
test_shm->pool = appl_conf->pools[0];
else
APPL_PRINT("\n"
"***********************************************************\n"
"EM APPLICATION: '%s' initializing:\n"
" %s: %s() - EM-core:%d\n"
" Application running on %u EM-cores (procs:%u, threads:%u)\n"
"***********************************************************\n"
"\n",
appl_conf->name, NO_PATH(__FILE__), __func__,
em_core_id(),
appl_conf->core_count, appl_conf->num_procs, appl_conf->num_threads,
test_shm->pool);
"Undefined application event pool!");
for (i = 0; i < NUM_EO / 2; i++) {
eo_ordered_ctx = &test_shm->eo_ordered_ctx[i].eo_ordered_ctx;
eo_atomic_ctx = &test_shm->eo_atomic_ctx[i].eo_atomic_ctx;
q_ordered_ctx = &test_shm->q_ordered_ctx[i].q_ordered_ctx;
q_atomic_ctx = &test_shm->q_atomic_ctx[i].q_atomic_ctx;
eo_ordered_stop, NULL, eo_ordered_receive,
eo_ordered_ctx);
get_queue_priority(i),
test_fatal_if(ret !=
EM_OK,
"Queue set context:%" PRI_STAT "\n"
ret, eo_a, queue_a);
test_fatal_if(ret !=
EM_OK,
"EO add queue:%" PRI_STAT "\n"
ret, eo_a, queue_a);
eo_ordered_ctx->hdl = eo_a;
eo_ordered_ctx->ordered_queue = queue_a;
eo_atomic_stop, NULL, eo_atomic_receive,
eo_atomic_ctx);
get_queue_priority(i),
test_fatal_if(ret !=
EM_OK,
"Queue set context:%" PRI_STAT "\n"
ret, eo_b, queue_b);
test_fatal_if(ret !=
EM_OK,
"EO add queue:%" PRI_STAT "\n"
ret, eo_b, queue_b);
eo_atomic_ctx->hdl = eo_b;
eo_atomic_ctx->atomic_queue = queue_b;
eo_atomic_ctx->peer_ordered_queue = queue_a;
q_ordered_ctx->dest_queue = queue_b;
q_atomic_ctx->seq = 0;
q_atomic_ctx->sub_seq = 0;
q_atomic_ctx->dest_queue = queue_a;
test_fatal_if(ret !=
EM_OK,
"EO start:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo_a);
test_fatal_if(ret !=
EM_OK,
"EO start:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo_b);
}
}
void test_stop(const appl_conf_t *appl_conf)
{
em_eo_t eo;
int i;
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
for (i = 0; i < NUM_EO / 2; i++) {
eo = test_shm->eo_atomic_ctx[i].eo_atomic_ctx.hdl;
test_fatal_if(ret !=
EM_OK,
"EO stop:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
eo = test_shm->eo_ordered_ctx[i].eo_ordered_ctx.hdl;
test_fatal_if(ret !=
EM_OK,
"EO stop:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
}
}
void test_term(const appl_conf_t *appl_conf)
{
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
if (core == 0) {
env_shared_free(test_shm);
}
}
eo_ordered_start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf)
{
(void)eo_context;
(void)conf;
APPL_PRINT(
"EO %" PRI_EO " starting.\n", eo);
}
eo_atomic_start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf)
{
eo_atomic_context_t *const eo_ctx = eo_context;
(void)conf;
APPL_PRINT(
"EO %" PRI_EO " starting.\n", eo);
test_shm->pool);
start_event->ev_id = EV_ID_START_EVENT;
start_event->ordered_queue = eo_ctx->peer_ordered_queue;
ret =
em_send(event, eo_ctx->atomic_queue);
test_fatal_if(ret !=
EM_OK,
"start event send:%" PRI_STAT
"");
}
eo_ordered_stop(void *eo_context, em_eo_t eo)
{
(void)eo_context;
APPL_PRINT(
"EO %" PRI_EO " stopping.\n", eo);
test_fatal_if(ret !=
EM_OK,
"EO remove queue all:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
test_fatal_if(ret !=
EM_OK,
"EO delete:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
}
eo_atomic_stop(void *eo_context, em_eo_t eo)
{
(void)eo_context;
APPL_PRINT(
"EO %" PRI_EO " stopping.\n", eo);
test_fatal_if(ret !=
EM_OK,
"EO remove queue all:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
test_fatal_if(ret !=
EM_OK,
"EO delete:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
}
static void
eo_ordered_receive(
void *eo_context, em_event_t event,
em_event_type_t type,
em_queue_t queue, void *queue_ctx)
{
q_ordered_context_t *const q_ctx = queue_ctx;
ordered_event_t *ordered;
int interleave;
int out_of_order = 0;
uint32_t sub_seq;
int i;
(void)eo_context;
(void)type;
(void)queue;
test_fatal_if(test_event->ev_id != EV_ID_ORDERED_EVENT,
"Unexpected ev-id:%d", test_event->ev_id);
if (unlikely(appl_shm->exit_flag)) {
return;
}
ordered = &test_event->ordered;
ordered->out_of_order = 0;
ordered->last_in_order = 0;
ordered->is_copy = 0;
interleave = ordered->seq % (NUM_SUB_EVENT + 1);
for (i = 0, sub_seq = 0; i < NUM_SUB_EVENT; i++, sub_seq++) {
em_event_t sub_event =
em_alloc(
sizeof(ordered_event_t),
test_shm->pool);
"Sub-event alloc failed:%i", i);
ordered_event_t *const sub_ordered =
sub_ordered->ev_id = EV_ID_ORDERED_EVENT;
sub_ordered->seq = ordered->seq;
if (interleave == i) {
ordered->sub_seq = sub_seq;
sub_seq++;
ordered->last_in_order = 1;
em_event_t copy_event =
"Copy-event alloc failed:%i", i);
ordered_event_t *const copy_ordered =
memcpy(copy_ordered, ordered, sizeof(ordered_event_t));
copy_ordered->is_copy = 1;
copy_ordered->original = event;
ret =
em_send(copy_event, q_ctx->dest_queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"event send:%" PRI_STAT "");
}
out_of_order = 1;
}
sub_ordered->sub_seq = sub_seq;
sub_ordered->out_of_order = out_of_order;
sub_ordered->last_in_order = 0;
sub_ordered->is_copy = 0;
ret =
em_send(sub_event, q_ctx->dest_queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"event send:%" PRI_STAT "");
}
}
if (interleave == i) {
ordered->sub_seq = sub_seq;
ordered->out_of_order = 0;
ordered->last_in_order = 1;
ret =
em_send(event, q_ctx->dest_queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"event send:%" PRI_STAT "");
}
}
}
static void
eo_atomic_receive(
void *eo_context, em_event_t event,
em_event_type_t type,
em_queue_t queue, void *queue_ctx)
{
eo_atomic_context_t *const eo_ctx = eo_context;
q_atomic_context_t *const q_ctx = queue_ctx;
ordered_event_t *ordered;
uint64_t num_events;
uint32_t seq, sub_seq;
int out_of_order, last_in_order;
(void)type;
if (unlikely(appl_shm->exit_flag)) {
if (test_event->ev_id == EV_ID_ORDERED_EVENT) {
ordered = &test_event->ordered;
if (ordered->is_copy)
}
return;
}
if (unlikely(test_event->ev_id == EV_ID_START_EVENT)) {
initialize_events(&test_event->start);
return;
}
test_fatal_if(test_event->ev_id != EV_ID_ORDERED_EVENT,
"Unexpected ev-id:%d", test_event->ev_id);
ordered = &test_event->ordered;
seq = ordered->seq;
sub_seq = ordered->sub_seq;
out_of_order = ordered->out_of_order;
last_in_order = ordered->last_in_order;
if (ordered->is_copy)
uint32_t q_ctx_seq = q_ctx->seq;
uint32_t q_ctx_sub_seq = q_ctx->sub_seq;
if (!out_of_order &&
unlikely(seq != q_ctx_seq || sub_seq != q_ctx_sub_seq))
"expected:%u-%u event-seq:%u-%u core:%d\n",
eo_ctx->hdl, queue, q_ctx_seq,
q_ctx_sub_seq, seq, sub_seq, core);
if (out_of_order) {
} else if (last_in_order) {
ordered->seq = q_ctx_seq + NUM_EVENT;
ordered->sub_seq = 0;
q_ctx->seq = q_ctx_seq + 1;
q_ctx->sub_seq = 0;
ret =
em_send(event, q_ctx->dest_queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"event send:%" PRI_STAT "");
}
} else if (!out_of_order) {
q_ctx->sub_seq = q_ctx_sub_seq + 1;
}
num_events = test_shm->core_stat[core].num_events;
if (unlikely(num_events == 0)) {
test_shm->core_stat[core].begin_cycles = env_get_cycle();
num_events = 1;
} else if (unlikely(num_events > PRINT_EVENT_COUNT)) {
test_shm->core_stat[core].end_cycles = env_get_cycle();
test_shm->core_stat[core].print_count += 1;
if (PRINT_ON_ALL_CORES)
print_result(&test_shm->core_stat[core]);
else if (core == 0)
print_result(&test_shm->core_stat[core]);
test_shm->core_stat[core].begin_cycles = env_get_cycle();
num_events = 0;
} else {
num_events += 1;
}
test_shm->core_stat[core].num_events = num_events;
}
static void
initialize_events(start_event_t *const start_event)
{
em_event_t events[NUM_EVENT];
ordered_event_t *ordered;
int num_sent = 0;
int i, j;
for (i = 0; i < NUM_EVENT; i++) {
test_shm->pool);
"Event allocation failed:%i", i);
ordered->ev_id = EV_ID_ORDERED_EVENT;
ordered->seq = i;
ordered->sub_seq = 0;
}
const int send_rounds = NUM_EVENT / SEND_MULTI_MAX;
const int left_over = NUM_EVENT % SEND_MULTI_MAX;
for (i = 0, j = 0; i < send_rounds; i++, j += SEND_MULTI_MAX) {
start_event->ordered_queue);
}
if (left_over) {
start_event->ordered_queue);
}
test_fatal_if(num_sent != NUM_EVENT,
"Event send multi failed:%d (%d)\n"
num_sent, NUM_EVENT, start_event->ordered_queue);
}
static int
get_queue_priority(const int queue_index)
{
int remainder = queue_index % 5;
if (remainder <= 1)
else if (remainder <= 3)
else
}
static void
print_result(test_stat_t *const test_stat)
{
uint64_t diff;
uint32_t hz;
double mhz;
double cycles_per_event;
uint64_t print_count;
diff = env_cycles_diff(test_stat->end_cycles, test_stat->begin_cycles);
print_count = test_stat->print_count;
cycles_per_event = ((double)diff) / ((double)test_stat->num_events);
hz = env_core_hz();
mhz = ((double)hz) / 1000000.0;
APPL_PRINT("cycles per event %.2f @%.2f MHz (core-%02i %" PRIu64 ")\n",
cycles_per_event, mhz,
em_core_id(), print_count);
}