#include <inttypes.h>
#include <string.h>
#include <stdio.h>
#include "cm_setup.h"
#include "cm_error_handler.h"
#define ALLOC_FREE_PER_EVENT 0
#define CREATE_ALL_QUEUES_AT_STARTUP 0
#define MEASURE_LATENCY 1
#define CONST_NUM_EVENTS 4096
#define MAX_CORES 64
#define NUM_EOS 4
#define NUM_SCHED_QUEUES (1)
#define NUM_LOCAL_QUEUES (NUM_EOS - NUM_SCHED_QUEUES)
COMPILE_TIME_ASSERT(NUM_SCHED_QUEUES + NUM_LOCAL_QUEUES == NUM_EOS,
INVALID_NUM_QUEUES_IN_LOOP);
#define NUM_EVENTS 4
#if CONST_NUM_EVENTS > 0
#define NUM_QUEUES (CONST_NUM_EVENTS / NUM_EVENTS)
#else
#define NUM_QUEUES (NUM_EOS * 16 * 1024)
#endif
#define DATA_SIZE 128
#define NUM_SAMPLES (1 + 8)
#define EVENTS_PER_SAMPLE 0x100000
#define QUEUE_TYPE EM_QUEUE_TYPE_ATOMIC
#define CORE_STATE_MEASURE 0
#define CORE_STATE_IDLE 1
#define RESULT_PRINTF_HDR "Cycles/Event Events/s cpu-freq\n"
#define RESULT_PRINTF_FMT "%12.0f %7.0f M %5.0f MHz %" PRIu64 "\n"
#define RESULT_PRINTF_LATENCY_HDR \
"Cycles/ Events/ Latency:\n" \
" Event Sec sched-ave sched-max local-ave local-max cpu-freq\n"
#define RESULT_PRINTF_LATENCY_FMT \
"%6.0f %7.2f M %11.0f %10" PRIu64 " %10.0f %10" PRIu64 " %5.0f MHz %" PRIu64 "\n"
static const int queue_steps[] = {8, 16, 32, 64, 128, 256, 512, 1024, 2048,
4096, 8192, 16384, 32768, 65536, NUM_QUEUES};
typedef struct {
int queues;
int step;
int samples;
int num_cores;
int reset_flag;
double cpu_mhz;
uint64_t cpu_hz;
uint64_t print_count;
int free_flag;
} test_status_t;
typedef struct {
uint64_t events;
env_time_t begin_time;
env_time_t end_time;
env_time_t diff_time;
struct {
uint64_t events;
env_time_t sched_ave;
env_time_t sched_max;
env_time_t local_ave;
env_time_t local_max;
} latency;
} core_stat_t;
CORE_STAT_SIZE_ERROR);
typedef struct {
em_eo_t eo_id;
} eo_context_t;
EO_CONTEXT_T__SIZE_ERROR);
typedef struct {
em_queue_t this_queue;
em_queue_t next_queue;
} queue_context_t;
QUEUE_CONTEXT_SIZE_ERROR);
typedef struct {
env_time_t send_time;
int seq;
uint8_t data[DATA_SIZE];
} perf_event_t;
typedef struct {
em_pool_t pool;
} perf_shm_t;
PERF_SHM_T__SIZE_ERROR);
static ENV_LOCAL int core_state = CORE_STATE_MEASURE;
static void
queue_step(void);
start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf);
stop(void *eo_context, em_eo_t eo);
static void
em_queue_t queue, void *q_context);
static int
update_test_state(em_event_t event);
static void
create_and_link_queues(int start_queue, int num_queues);
static void
print_test_statistics(test_status_t *test_status, int print_header,
core_stat_t core_stat[]);
static inline em_event_t
alloc_free_per_event(em_event_t event);
static inline void
measure_latency(perf_event_t *const perf_event, queue_context_t *const q_ctx,
env_time_t recv_time);
int main(int argc, char *argv[])
{
return cm_setup(argc, argv);
}
{
APPL_PRINT("\nUnable to create more queues\n\n"
"Test finished\n");
raise(SIGINT);
return error;
}
if (appl_shm->exit_flag &&
EM_ESCOPE(escope) &&
if (escope == EM_ESCOPE_EO_ADD_QUEUE_SYNC) {
APPL_PRINT("\nExit: suppress queue setup error\n\n");
return error;
}
}
return test_error_handler(eo, error, escope, args);
}
void test_init(const appl_conf_t *appl_conf)
{
(void)appl_conf;
if (core == 0) {
perf_shm = env_shared_reserve("PerfQueuesSharedMem",
sizeof(perf_shm_t));
} else {
perf_shm = env_shared_lookup("PerfQueuesSharedMem");
}
if (perf_shm == NULL)
"Perf test queues init failed on EM-core: %u\n",
else if (core == 0)
memset(perf_shm, 0, sizeof(perf_shm_t));
}
void test_start(const appl_conf_t *appl_conf)
{
eo_context_t *eo_ctx;
const int q_ctx_size = sizeof(perf_shm->queue_context_tbl);
int i;
if (appl_conf->num_pools >= 1)
perf_shm->pool = appl_conf->pools[0];
else
APPL_PRINT("\n"
"***********************************************************\n"
"EM APPLICATION: '%s' initializing:\n"
" %s: %s() - EM-core:%d\n"
" Application running on %u EM-cores (procs:%u, threads:%u)\n"
" Max. NUM_QUEUES: %i\n"
" sizeof queue_context_tbl: %i kB\n"
"***********************************************************\n"
"\n",
appl_conf->name, NO_PATH(__FILE__), __func__,
em_core_id(),
appl_conf->core_count, appl_conf->num_procs, appl_conf->num_threads,
perf_shm->pool, NUM_QUEUES, q_ctx_size / 1024);
"Undefined application event pool!");
perf_shm->test_status.cpu_hz = env_core_hz();
perf_shm->test_status.cpu_mhz = (double)perf_shm->test_status.cpu_hz /
1000000.0;
perf_shm->test_status.num_cores = appl_conf->core_count;
perf_shm->test_status.free_flag = 0;
env_atomic64_init(&perf_shm->test_status.ready_count);
env_atomic64_init(&perf_shm->test_status.freed_count);
for (i = 0; i < NUM_EOS; i++) {
eo_ctx = &perf_shm->eo_context_tbl[i];
stop, NULL, receive_func,
eo_ctx);
"EO create failed:%d", i, NUM_EOS);
}
APPL_PRINT(" EOs created\n");
if (CREATE_ALL_QUEUES_AT_STARTUP)
create_and_link_queues(0, NUM_QUEUES);
else
create_and_link_queues(0, queue_steps[0]);
for (i = 0; i < NUM_EOS; i++) {
"EO start(%d):%" PRI_STAT " %" PRI_STAT "",
i, ret, start_ret);
}
queue_step();
}
void test_stop(const appl_conf_t *appl_conf)
{
em_eo_t eo;
int i;
(void)appl_conf;
APPL_PRINT(
"%s() on EM-core %d\n", __func__,
em_core_id());
for (i = 0; i < NUM_EOS; i++) {
eo = perf_shm->eo[i];
test_fatal_if(ret !=
EM_OK,
"EO:%" PRI_EO " stop:%" PRI_STAT
"",
eo, ret);
}
for (i = 0; i < NUM_EOS; i++) {
eo = perf_shm->eo[i];
test_fatal_if(ret !=
EM_OK,
"EO remove queue all:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
test_fatal_if(ret !=
EM_OK,
"EO:%" PRI_EO " delete:%" PRI_STAT
"",
eo, ret);
}
}
void test_term(const appl_conf_t *appl_conf)
{
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
env_shared_free(perf_shm);
}
static void
queue_step(void)
{
queue_context_t *q_ctx;
em_event_t event;
perf_event_t *perf_event;
const int first = perf_shm->test_status.queues;
const int step = perf_shm->test_status.step;
const int queue_count = queue_steps[step];
int i, j, idx, qidx;
if (CONST_NUM_EVENTS) {
for (i = 0; i < CONST_NUM_EVENTS; i++) {
"EM alloc failed (%i)", i);
perf_event->seq = i;
perf_event->send_time = env_time_global();
qidx = i % queue_count;
idx = (qidx / NUM_EOS) * NUM_EOS;
idx = idx + qidx % NUM_SCHED_QUEUES;
q_ctx = &perf_shm->queue_context_tbl[idx];
"Illegal QueueType:%" PRI_QTYPE "",
q_ctx->this_queue);
ret =
em_send(event, q_ctx->this_queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"EM send:%" PRI_STAT "\n"
ret, q_ctx->this_queue);
return;
}
}
} else {
for (i = first; i < queue_count; i++) {
qidx = i % queue_count;
idx = (qidx / NUM_EOS) * NUM_EOS;
idx = idx + qidx % NUM_SCHED_QUEUES;
q_ctx = &perf_shm->queue_context_tbl[idx];
"Illegal QueueType:%" PRI_QTYPE "",
q_ctx->this_queue);
for (j = 0; j < NUM_EVENTS; j++) {
perf_shm->pool);
"EM alloc failed (%i)", i);
perf_event->seq = i * NUM_EVENTS + j;
perf_event->send_time = env_time_global();
ret =
em_send(event, q_ctx->this_queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"EM send:%" PRI_STAT "\n"
ret, q_ctx->this_queue);
return;
}
}
}
}
perf_shm->test_status.queues = queue_count;
perf_shm->test_status.step++;
APPL_PRINT("\n"
"Number of queues: %d - scheduled:%d + local:%d\n",
perf_shm->test_status.queues,
(perf_shm->test_status.queues * NUM_SCHED_QUEUES) / NUM_EOS,
(perf_shm->test_status.queues * NUM_LOCAL_QUEUES) / NUM_EOS);
if (CONST_NUM_EVENTS)
APPL_PRINT("Number of events: %d\n", CONST_NUM_EVENTS);
else
APPL_PRINT("Number of events: %d\n",
perf_shm->test_status.queues * NUM_EVENTS);
}
start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf)
{
eo_context_t *eo_ctx = eo_context;
(void)conf;
APPL_PRINT(
"EO %" PRI_EO " starting.\n", eo);
eo_ctx->eo_id = eo;
}
stop(void *eo_context, em_eo_t eo)
{
(void)eo_context;
APPL_PRINT(
"EO %" PRI_EO " stopping.\n", eo);
}
static void
em_queue_t queue, void *q_context)
{
env_time_t recv_time;
perf_event_t *perf_event;
if (unlikely(appl_shm->exit_flag)) {
return;
}
if (MEASURE_LATENCY) {
recv_time = env_time_global();
}
queue_context_t *q_ctx;
em_queue_t dst_queue;
int do_return;
(void)eo_context;
(void)type;
q_ctx = q_context;
do_return = update_test_state(event);
if (unlikely(do_return))
return;
if (ALLOC_FREE_PER_EVENT)
event = alloc_free_per_event(event);
dst_queue = q_ctx->next_queue;
test_fatal_if(queue != q_ctx->this_queue, "Queue config error");
if (MEASURE_LATENCY) {
measure_latency(perf_event, q_ctx, recv_time);
perf_event->send_time = env_time_global();
}
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"EM send:%" PRI_STAT
" Queue:%" PRI_QUEUE "",
ret, dst_queue);
}
}
static inline int
update_test_state(em_event_t event)
{
uint64_t events;
uint64_t freed_count;
uint64_t ready_count;
test_status_t *const tstat = &perf_shm->test_status;
core_stat_t *const cstat = &perf_shm->core_stat[core];
events = cstat->events;
events++;
if (unlikely(tstat->reset_flag)) {
events = 0;
if (CONST_NUM_EVENTS) {
if (unlikely(tstat->free_flag)) {
freed_count =
env_atomic64_add_return(&tstat->freed_count, 1);
if (freed_count == CONST_NUM_EVENTS) {
env_atomic64_set(&tstat->freed_count,
0);
tstat->reset_flag = 0;
tstat->free_flag = 0;
queue_step();
}
return 1;
}
}
if (unlikely(core_state != CORE_STATE_IDLE)) {
core_state = CORE_STATE_IDLE;
cstat->begin_time = ENV_TIME_NULL;
ready_count =
env_atomic64_add_return(&tstat->ready_count, 1);
if (ready_count == (uint64_t)tstat->num_cores) {
env_atomic64_set(&tstat->ready_count, 0);
if (CONST_NUM_EVENTS) {
int sample = tstat->samples;
int queues = tstat->queues;
if (sample == 0 && queues < NUM_QUEUES)
tstat->free_flag = 1;
else
tstat->reset_flag = 0;
} else {
tstat->reset_flag = 0;
}
}
}
} else if (unlikely(events == 1)) {
cstat->begin_time = env_time_global();
cstat->latency.events = 0;
cstat->latency.sched_ave = ENV_TIME_NULL;
cstat->latency.sched_max = ENV_TIME_NULL;
cstat->latency.local_ave = ENV_TIME_NULL;
cstat->latency.local_max = ENV_TIME_NULL;
core_state = CORE_STATE_MEASURE;
} else if (unlikely(events == EVENTS_PER_SAMPLE)) {
env_time_t begin_time, end_time;
cstat->end_time = env_time_global();
end_time = cstat->end_time;
begin_time = cstat->begin_time;
cstat->diff_time = env_time_diff(end_time, begin_time);
ready_count = env_atomic64_add_return(&tstat->ready_count, 1);
if (unlikely((int)ready_count == tstat->num_cores)) {
env_atomic64_set(&tstat->ready_count, 0);
tstat->reset_flag = 1;
tstat->samples++;
if (tstat->samples > 1) {
int print_header = tstat->samples == 2 ? 1 : 0;
print_test_statistics(tstat, print_header,
perf_shm->core_stat);
}
if (tstat->samples == NUM_SAMPLES &&
tstat->queues < NUM_QUEUES) {
if (!CREATE_ALL_QUEUES_AT_STARTUP) {
int step = tstat->step;
int first_q = tstat->queues;
int num_qs = queue_steps[step] -
queue_steps[step - 1];
create_and_link_queues(first_q, num_qs);
}
if (!CONST_NUM_EVENTS)
queue_step();
tstat->samples = 0;
}
}
}
cstat->events = events;
return 0;
}
static void
create_and_link_queues(int start_queue, int num_queues)
{
int i, j;
em_queue_t queue, prev_queue;
em_queue_group_t group;
queue_context_t *q_ctx;
APPL_PRINT("\nCreate new queues: %d - scheduled:%d + local:%d\n",
num_queues,
(num_queues * NUM_SCHED_QUEUES) / NUM_EOS,
(num_queues * NUM_LOCAL_QUEUES) / NUM_EOS);
if (num_queues % NUM_EOS != 0) {
APPL_PRINT("%s() 'num_queues'=%d not multiple of NUM_EOS=%d\n",
__func__, num_queues, NUM_EOS);
return;
}
for (i = start_queue; i < (start_queue + num_queues); i += NUM_EOS) {
for (j = 0; j < NUM_EOS; j++) {
if (j < NUM_SCHED_QUEUES) {
type = QUEUE_TYPE;
} else {
}
NULL);
APPL_PRINT("Max nbr of supported queues: %d\n",
i);
return;
}
q_ctx = &perf_shm->queue_context_tbl[i + j];
test_fatal_if(ret !=
EM_OK,
"em_queue_set_context():%" PRI_STAT "\n"
ret, perf_shm->eo[j], queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"em_eo_add_queue_sync():%" PRI_STAT "\n"
ret, perf_shm->eo[j], queue);
return;
}
q_ctx->this_queue = queue;
q_ctx->next_queue = prev_queue;
q_ctx->prio = prio;
q_ctx->type = type;
prev_queue = queue;
}
q_ctx = &perf_shm->queue_context_tbl[i + 0];
q_ctx->next_queue = prev_queue;
}
num_queues,
perf_shm->queue_context_tbl[start_queue].this_queue,
perf_shm->queue_context_tbl[start_queue +
num_queues - 1].this_queue);
}
static void
print_test_statistics(test_status_t *test_status, int print_header,
core_stat_t core_stat[])
{
const int num_cores = test_status->num_cores;
const uint64_t cpu_hz = test_status->cpu_hz;
const double cpu_mhz = test_status->cpu_mhz;
const uint64_t total_events = (uint64_t)num_cores * EVENTS_PER_SAMPLE;
const uint64_t print_count = test_status->print_count++;
env_time_t total_time = ENV_TIME_NULL;
for (int i = 0; i < num_cores; i++)
total_time = env_time_sum(total_time, core_stat[i].diff_time);
double cycles_per_event = 0.0;
double events_per_sec = 0.0;
if (likely(total_events > 0))
cycles_per_event = env_time_to_cycles(total_time, cpu_hz) /
(double)total_events;
if (likely(cycles_per_event > 0))
events_per_sec = cpu_mhz * num_cores / cycles_per_event;
if (!MEASURE_LATENCY) {
if (print_header)
APPL_PRINT(RESULT_PRINTF_HDR);
APPL_PRINT(RESULT_PRINTF_FMT,
cycles_per_event, events_per_sec,
cpu_mhz, print_count);
return;
}
uint64_t latency_events = 0;
env_time_t latency_hi_ave = ENV_TIME_NULL;
env_time_t latency_hi_max = ENV_TIME_NULL;
env_time_t latency_lo_ave = ENV_TIME_NULL;
env_time_t latency_lo_max = ENV_TIME_NULL;
for (int i = 0; i < num_cores; i++) {
latency_events += core_stat[i].latency.events;
latency_hi_ave = env_time_sum(latency_hi_ave,
core_stat[i].latency.sched_ave);
latency_lo_ave = env_time_sum(latency_lo_ave,
core_stat[i].latency.local_ave);
if (env_time_cmp(core_stat[i].latency.sched_max,
latency_hi_max) > 0) {
latency_hi_max = core_stat[i].latency.sched_max;
}
if (env_time_cmp(core_stat[i].latency.local_max,
latency_lo_max) > 0) {
latency_lo_max = core_stat[i].latency.local_max;
}
}
double lat_per_hi_ave = 0.0;
double lat_per_lo_ave = 0.0;
if (likely(latency_events > 0)) {
lat_per_hi_ave = env_time_to_cycles(latency_hi_ave, cpu_hz) /
(double)latency_events;
lat_per_lo_ave = env_time_to_cycles(latency_lo_ave, cpu_hz) /
(double)latency_events;
}
if (print_header)
APPL_PRINT(RESULT_PRINTF_LATENCY_HDR);
APPL_PRINT(RESULT_PRINTF_LATENCY_FMT,
cycles_per_event, events_per_sec, lat_per_hi_ave,
env_time_to_cycles(latency_hi_max, cpu_hz),
lat_per_lo_ave,
env_time_to_cycles(latency_lo_max, cpu_hz),
cpu_mhz, print_count);
}
static inline em_event_t
alloc_free_per_event(em_event_t event)
{
env_time_t send_time = perf_event->send_time;
int seq = perf_event->seq;
perf_event->send_time = send_time;
perf_event->seq = seq;
return event;
}
static inline void
measure_latency(perf_event_t *const perf_event, queue_context_t *const q_ctx,
env_time_t recv_time)
{
core_stat_t *const cstat = &perf_shm->core_stat[core];
const env_time_t send_time = perf_event->send_time;
env_time_t latency;
if (perf_shm->test_status.reset_flag ||
cstat->events == 0 || cstat->events >= EVENTS_PER_SAMPLE)
return;
cstat->latency.events++;
latency = env_time_diff(recv_time, send_time);
cstat->latency.sched_ave =
env_time_sum(cstat->latency.sched_ave, latency);
if (env_time_cmp(latency, cstat->latency.sched_max) > 0)
cstat->latency.sched_max = latency;
} else {
cstat->latency.local_ave =
env_time_sum(cstat->latency.local_ave, latency);
if (env_time_cmp(latency, cstat->latency.local_max) > 0)
cstat->latency.local_max = latency;
}
}