#include <inttypes.h>
#include <string.h>
#include <stdio.h>
#include "cm_setup.h"
#include "cm_error_handler.h"
#define MIN_NUM_QUEUE_GROUPS 1
#define QUEUE_GROUP_STEP_SIZE 1
#define NUM_QUEUES 256
#define NUM_EVENTS (NUM_QUEUES * 10)
#define MAX_CORES 64
#define DATA_SIZE 128
#define NUM_SAMPLES 8
#define EVENTS_PER_SAMPLE 0x400000
#define EVENTS_CHECK_READY_MASK 0xffff
#define MEASURE_LATENCY 1
#define CORE_STATE_MEASURE 0
#define CORE_STATE_IDLE 1
typedef union {
struct {
int num_qgrp;
int samples;
int num_cores;
int free_flag;
int reset_flag;
double mhz;
uint64_t cpu_hz;
uint64_t print_count;
};
} test_status_t;
TEST_STATUS_T__SIZE_ERROR);
typedef union {
struct {
uint64_t num_events;
env_time_t begin_time;
env_time_t end_time;
env_time_t diff_time;
env_time_t latency_hi;
env_time_t latency_lo;
};
} core_stat_t;
CORE_STAT_SIZE_ERROR);
typedef union {
struct {
em_eo_t eo_id;
em_event_group_t event_group;
};
} eo_context_t;
EO_CONTEXT_T__SIZE_ERROR);
typedef union {
struct {
em_queue_t this_queue;
em_queue_t next_queue;
};
} queue_context_t;
QUEUE_CONTEXT_SIZE_ERROR);
typedef struct {
env_time_t send_time;
enum {
PERF_EVENT,
NOTIF_QUEUE_GROUP_CREATED,
NOTIF_ALL_QUEUE_GROUPS_CREATED
} type;
int seq;
union {
uint8_t data[DATA_SIZE];
};
} perf_event_t;
typedef struct {
em_pool_t pool;
em_eo_t eo;
} perf_shm_t;
PERF_SHM_T__SIZE_ERROR);
static ENV_LOCAL int core_state = CORE_STATE_MEASURE;
static void
init_queue_groups(int count);
static void
test_step(void);
static void
next_test_step(void);
start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf);
stop(void *eo_context, em_eo_t eo);
static void
em_queue_t queue, void *q_context);
static void
receive_notif_queue_group_created(perf_event_t *perf, em_event_t event);
static void
receive_notif_all_queue_groups_created(em_event_t event);
static void
receive_perf_event(env_time_t recv_time, em_event_t event,
perf_event_t *perf, em_queue_t queue, void *q_context);
static inline void
update_latency(env_time_t recv_time, queue_context_t *q_ctx, perf_event_t *perf,
core_stat_t *const cstat);
static void
print_sample_statistics(test_status_t *const tstat);
static void
create_and_link_queues(int num_queues);
static int
unschedule_and_delete_queues(int num_queues);
int main(int argc, char *argv[])
{
return cm_setup(argc, argv);
}
{
escope == EM_ESCOPE_QUEUE_GROUP_CREATE) {
APPL_PRINT("\nNo more free queue groups left\n"
"\nTest finished\n\n");
return error;
}
if (appl_shm->exit_flag &&
EM_ESCOPE(escope) &&
if (escope == EM_ESCOPE_EO_ADD_QUEUE_SYNC ||
escope == EM_ESCOPE_EO_REMOVE_QUEUE_SYNC ||
escope == EM_ESCOPE_EO_REMOVE_QUEUE_SYNC_DONE_CB ||
escope == EM_ESCOPE_QUEUE_DELETE) {
APPL_PRINT("\nExit: suppress queue setup error\n\n");
return error;
}
}
return test_error_handler(eo, error, escope, args);
}
void test_init(const appl_conf_t *appl_conf)
{
(void)appl_conf;
if (core == 0) {
perf_shm = env_shared_reserve("PerfGroupsSharedMem",
sizeof(perf_shm_t));
} else {
perf_shm = env_shared_lookup("PerfGroupsSharedMem");
}
if (perf_shm == NULL)
"Perf test groups init failed on EM-core: %u",
else if (core == 0)
memset(perf_shm, 0, sizeof(perf_shm_t));
}
void test_start(const appl_conf_t *appl_conf)
{
eo_context_t *eo_ctx;
em_queue_t notif_queue;
int q_ctx_size = NUM_QUEUES * sizeof(queue_context_t);
if (appl_conf->num_pools >= 1)
perf_shm->pool = appl_conf->pools[0];
else
APPL_PRINT("\n"
"***********************************************************\n"
"EM APPLICATION: '%s' initializing:\n"
" %s: %s() - EM-core:%d\n"
" Application running on %u EM-cores (procs:%u, threads:%u)\n"
" Max. test queue groups: %i\n"
" sizeof queue_context_tbl: %i kB\n"
"***********************************************************\n"
"\n",
appl_conf->name, NO_PATH(__FILE__), __func__,
em_core_id(),
appl_conf->core_count, appl_conf->num_procs, appl_conf->num_threads,
q_ctx_size / 1024);
"Undefined application event pool!");
perf_shm->test_status.cpu_hz = env_core_hz();
perf_shm->test_status.mhz = ((double)perf_shm->test_status.cpu_hz) /
1000000.0;
perf_shm->test_status.reset_flag = 0;
perf_shm->test_status.num_cores = appl_conf->core_count;
perf_shm->test_status.num_qgrp = 0;
env_atomic64_init(&perf_shm->test_status.ready_count);
env_atomic64_init(&perf_shm->test_status.freed_count);
eo_ctx = &perf_shm->eo_context;
perf_shm->eo =
em_eo_create(
"perf test eo", start, NULL, stop, NULL,
receive_func, eo_ctx);
test_fatal_if(ret !=
EM_OK,
"EO add queue:%" PRI_STAT
"\n"
ret, perf_shm->eo, notif_queue);
perf_shm->notif_queue = notif_queue;
test_fatal_if(ret !=
EM_OK || start_fn_ret !=
EM_OK,
"EO start:%" PRI_STAT
" EO:%" PRI_EO "",
ret, perf_shm->eo);
env_sync_mem();
init_queue_groups(MIN_NUM_QUEUE_GROUPS);
}
void test_stop(const appl_conf_t *appl_conf)
{
em_eo_t eo = perf_shm->eo;
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
test_fatal_if(ret !=
EM_OK,
"EO:%" PRI_EO " stop:%" PRI_STAT
"", eo, ret);
test_fatal_if(ret !=
EM_OK,
"EO:%" PRI_EO " delete:%" PRI_STAT
"", eo, ret);
}
void test_term(const appl_conf_t *appl_conf)
{
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
if (core == 0) {
env_shared_free(perf_shm);
}
}
static void
init_queue_groups(int count)
{
em_event_t event;
em_queue_group_t queue_group;
perf_event_t *notif_event;
int i, ret, num_qgrp;
size_t nlen;
num_qgrp = perf_shm->test_status.num_qgrp;
APPL_PRINT("\nCreating %d new queue group(s)\n", count);
test_fatal_if(perf_shm->eo_context.event_group ==
"Creating event group failed!");
perf_shm->pool);
"Event allocation failed!");
notif_event->type = NOTIF_ALL_QUEUE_GROUPS_CREATED;
notif_event->seq = 0;
notif_event->send_time = env_time_global();
notif_tbl[0].
event = event;
notif_tbl[0].
queue = perf_shm->notif_queue;
1, notif_tbl);
test_fatal_if(stat !=
EM_OK,
"em_event_group_apply():%" PRI_STAT "", stat);
for (i = num_qgrp; i < (num_qgrp + count); i++) {
perf_shm->pool);
"Notification event allocation failed!");
notif_event->type = NOTIF_QUEUE_GROUP_CREATED;
notif_event->seq = i;
notif_event->send_time = env_time_global();
nlen = sizeof(notif_event->qgrp_name);
ret = snprintf(notif_event->qgrp_name, nlen, "grp_%d", i);
test_fatal_if(ret >= (int)nlen, "Too long queue group name");
notif_event->qgrp_name[nlen - 1] = '\0';
completed_notif_tbl[0].
event = event;
completed_notif_tbl[0].
queue = perf_shm->notif_queue;
completed_notif_tbl[0].
egroup =
perf_shm->eo_context.event_group;
&mask, 1,
completed_notif_tbl);
APPL_PRINT("Cannot create more queue groups - exit.\n");
exit(EXIT_SUCCESS);
}
}
}
static void
test_step(void)
{
em_event_t event;
perf_event_t *perf;
queue_context_t *q_ctx;
int i;
for (i = 0; i < NUM_EVENTS; i++) {
perf_shm->pool);
"EM alloc failed (%i)", i);
perf->type = PERF_EVENT;
perf->seq = i;
perf->send_time = env_time_global();
q_ctx = &perf_shm->queue_context_tbl[i % NUM_QUEUES];
ret =
em_send(event, q_ctx->this_queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"EM send:%" PRI_STAT
" Queue:%" PRI_QUEUE "",
ret, q_ctx->this_queue);
return;
}
}
}
start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf)
{
eo_context_t *eo_ctx = eo_context;
(void)conf;
APPL_PRINT(
"EO %" PRI_EO " starting.\n", eo);
eo_ctx->eo_id = eo;
}
stop(void *eo_context, em_eo_t eo)
{
(void)eo_context;
APPL_PRINT(
"EO %" PRI_EO " stopping.\n", eo);
test_fatal_if(ret !=
EM_OK,
"EO remove queue all:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
}
static void
em_queue_t queue, void *q_context)
{
env_time_t recv_time;
if (MEASURE_LATENCY)
recv_time = env_time_global();
(void)eo_context;
(void)type;
if (unlikely(appl_shm->exit_flag)) {
return;
}
if (unlikely(perf->type == NOTIF_QUEUE_GROUP_CREATED)) {
receive_notif_queue_group_created(perf, event);
return;
}
if (unlikely(perf->type == NOTIF_ALL_QUEUE_GROUPS_CREATED)) {
receive_notif_all_queue_groups_created(event);
return;
}
receive_perf_event(recv_time, event, perf, queue, q_context);
}
static void receive_notif_queue_group_created(perf_event_t *perf, em_event_t event)
{
int cmp;
test_status_t *const tstat = &perf_shm->test_status;
const int tbl_idx = tstat->num_qgrp;
"QGrp:%s not found!", perf->qgrp_name);
cmp = strncmp(perf->qgrp_name, name, sizeof(name));
test_fatal_if(cmp != 0, "Qgrp name check fails! %s != %s",
perf->qgrp_name, name);
perf_shm->queue_group_tbl[tbl_idx] = queue_group;
tstat->num_qgrp++;
APPL_PRINT("New group name: %s\n"
"Queue group created - total num:%d\n",
name, tstat->num_qgrp);
}
static void receive_notif_all_queue_groups_created(em_event_t event)
{
APPL_PRINT("New queue group(s) ready\n");
test_fatal_if(ret !=
EM_OK,
"egrp:%" PRI_EGRP " delete:%" PRI_STAT
"",
perf_shm->eo_context.event_group, ret);
next_test_step();
}
static void receive_perf_event(env_time_t recv_time, em_event_t event,
perf_event_t *perf, em_queue_t queue,
void *q_context)
{
em_queue_t dest_queue;
uint64_t freed_count;
uint64_t ready_count;
queue_context_t *q_ctx;
test_status_t *const tstat = &perf_shm->test_status;
core_stat_t *const cstat = &perf_shm->core_stat[core];
uint64_t num_events = cstat->num_events;
q_ctx = q_context;
num_events++;
if (unlikely(tstat->reset_flag)) {
num_events = 0;
if (unlikely(tstat->free_flag)) {
freed_count =
env_atomic64_add_return(&tstat->freed_count, 1);
if (freed_count == NUM_EVENTS) {
env_atomic64_init(&tstat->freed_count);
tstat->reset_flag = 0;
tstat->free_flag = 0;
init_queue_groups(QUEUE_GROUP_STEP_SIZE);
}
return;
}
if (unlikely(core_state != CORE_STATE_IDLE)) {
core_state = CORE_STATE_IDLE;
cstat->begin_time = ENV_TIME_NULL;
ready_count =
env_atomic64_add_return(&tstat->ready_count, 1);
if ((int)ready_count == tstat->num_cores) {
env_atomic64_init(&tstat->ready_count);
if (tstat->samples == 0)
tstat->free_flag = 1;
else
tstat->reset_flag = 0;
}
}
} else if (unlikely(num_events == 1)) {
cstat->begin_time = env_time_global();
cstat->latency_hi = ENV_TIME_NULL;
cstat->latency_lo = ENV_TIME_NULL;
core_state = CORE_STATE_MEASURE;
} else if (unlikely(num_events == EVENTS_PER_SAMPLE)) {
cstat->end_time = env_time_global();
cstat->diff_time = env_time_diff(cstat->end_time, cstat->begin_time);
update_latency(recv_time, q_ctx, perf, cstat);
ready_count = env_atomic64_add_return(&tstat->ready_count, 1);
if (unlikely((int)ready_count == tstat->num_cores)) {
print_sample_statistics(tstat);
tstat->samples++;
if (tstat->samples == NUM_SAMPLES)
tstat->samples = 0;
tstat->reset_flag = 1;
}
}
cstat->num_events = num_events;
dest_queue = q_ctx->next_queue;
test_fatal_if(queue != q_ctx->this_queue, "Queue config error");
if (MEASURE_LATENCY) {
if (likely(num_events < EVENTS_PER_SAMPLE) &&
likely(!tstat->reset_flag))
update_latency(recv_time, q_ctx, perf, cstat);
perf->send_time = env_time_global();
}
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"EM send:%" PRI_STAT
" Queue:%" PRI_QUEUE "",
ret, dest_queue);
}
}
static inline void update_latency(env_time_t recv_time, queue_context_t *q_ctx,
perf_event_t *perf, core_stat_t *const cstat)
{
env_time_t diff_time;
diff_time = env_time_diff(recv_time, perf->send_time);
cstat->latency_hi = env_time_sum(cstat->latency_hi, diff_time);
else
cstat->latency_lo = env_time_sum(cstat->latency_lo, diff_time);
}
static void print_sample_statistics(test_status_t *const tstat)
{
env_time_t latency_hi;
env_time_t latency_lo;
double cycles_per_event, events_per_sec;
env_time_t total_time = ENV_TIME_NULL;
const uint64_t total_events = (uint64_t)tstat->num_cores * EVENTS_PER_SAMPLE;
env_atomic64_init(&tstat->ready_count);
if (MEASURE_LATENCY) {
latency_hi = ENV_TIME_NULL;
latency_lo = ENV_TIME_NULL;
}
for (int i = 0; i < tstat->num_cores; i++) {
core_stat_t *cstat = &perf_shm->core_stat[i];
total_time = env_time_sum(total_time, cstat->diff_time);
if (MEASURE_LATENCY) {
latency_hi = env_time_sum(latency_hi, cstat->latency_hi);
latency_lo = env_time_sum(latency_lo, cstat->latency_lo);
}
}
cycles_per_event =
(double)env_time_to_cycles(total_time, tstat->cpu_hz) / (double)total_events;
events_per_sec = tstat->mhz * tstat->num_cores / cycles_per_event;
if (MEASURE_LATENCY) {
const double latency_per_hi =
(double)env_time_to_cycles(latency_hi, tstat->cpu_hz) /
(double)total_events;
const double latency_per_lo =
(double)env_time_to_cycles(latency_lo, tstat->cpu_hz) /
(double)total_events;
APPL_PRINT("Cycles/Event: %.0f Events/s: %.2f M\t"
"Latency: Hi-prio=%.0f Lo-prio=%.0f \t"
"@%.0f MHz(%" PRIu64 ")\n",
cycles_per_event, events_per_sec,
latency_per_hi, latency_per_lo,
tstat->mhz, tstat->print_count++);
} else {
APPL_PRINT("Cycles/Event: %.0f Events/s: %.2f M\t"
"@%.0f MHz(%" PRIu64 ")\n",
cycles_per_event, events_per_sec,
tstat->mhz, tstat->print_count++);
}
}
static void
next_test_step(void)
{
if (perf_shm->test_status.num_qgrp > MIN_NUM_QUEUE_GROUPS)
if (unschedule_and_delete_queues(NUM_QUEUES))
return;
create_and_link_queues(NUM_QUEUES);
if (unlikely(appl_shm->exit_flag))
return;
test_step();
}
static void
create_and_link_queues(int num_queues)
{
em_queue_group_t queue_group;
em_queue_t queue, prev_queue;
queue_context_t *q_ctx;
int i;
for (i = 0; i < num_queues; i++) {
if (MEASURE_LATENCY && (i % 4 == 0))
queue_group = perf_shm->queue_group_tbl[i %
perf_shm->test_status.num_qgrp];
prio, queue_group, NULL);
"em_queue_create failed.\t"
i, queue_group);
perf_shm->queue_tbl[i] = queue;
q_ctx = &perf_shm->queue_context_tbl[i];
EM_OK,
"em_queue_set_context failed.\t"
perf_shm->eo, queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"em_eo_add_queue_sync():%" PRI_STAT "\n"
ret, perf_shm->eo, queue);
return;
}
q_ctx->this_queue = queue;
q_ctx->next_queue = prev_queue;
q_ctx->prio = prio;
prev_queue = queue;
}
q_ctx = &perf_shm->queue_context_tbl[0];
q_ctx->next_queue = prev_queue;
APPL_PRINT("\nQueues: %i, Queue groups: %i\n",
num_queues, perf_shm->test_status.num_qgrp);
}
static int
unschedule_and_delete_queues(int num_queues)
{
em_queue_t queue;
int i;
for (i = 0; i < num_queues; i++) {
queue = perf_shm->queue_tbl[i];
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"em_eo_remove_queue_sync failed:\t"
perf_shm->eo, queue);
return -1;
}
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"em_queue_delete():%" PRI_STAT "\t"
ret, perf_shm->eo, queue);
return -1;
}
}
return 0;
}