#include <inttypes.h>
#include <string.h>
#include <stdio.h>
#include "cm_setup.h"
#include "cm_error_handler.h"
#define ALLOC_FREE_PER_EVENT 0
#define CREATE_ALL_QUEUES_AT_STARTUP 0
#define MEASURE_LATENCY 1
#define CONST_NUM_EVENTS 4096
#define MAX_CORES 64
#define NUM_EOS 4
#define NUM_EVENTS 4
#if CONST_NUM_EVENTS > 0
#define NUM_QUEUES (CONST_NUM_EVENTS / NUM_EVENTS)
#else
#define NUM_QUEUES (NUM_EOS * 16 * 1024)
#endif
#define DATA_SIZE 128
#define NUM_SAMPLES (1 + 8)
#define EVENTS_PER_SAMPLE 0x400000
#define QUEUE_TYPE EM_QUEUE_TYPE_ATOMIC
#define CORE_STATE_MEASURE 0
#define CORE_STATE_IDLE 1
#define RESULT_PRINTF_HDR "Cycles/Event Events/s cpu-freq\n"
#define RESULT_PRINTF_FMT "%12.0f %7.0f M %5.0f MHz %" PRIu64 "\n"
#define RESULT_PRINTF_LATENCY_HDR \
"Cycles/ Events/ Latency:\n" \
" Event Sec sched-ave sched-max output-ave output-max cpu-freq\n"
#define RESULT_PRINTF_LATENCY_FMT \
"%6.0f %7.2f M %11.0f %10" PRIu64 " %10.0f %10" PRIu64 " %5.0f MHz %" PRIu64 "\n"
#define NUM_SCHED_TO_OUTPUT_QUEUE 8
static const int queue_steps[] = {8, 16, 32, 64, 128, 256, 512, 1024, 2048,
4096, 8192, 16384, 32768, 65536, NUM_QUEUES};
typedef struct {
int queues;
int step;
int samples;
int num_cores;
int reset_flag;
double cpu_mhz;
uint64_t cpu_hz;
uint64_t print_count;
int free_flag;
} test_status_t;
typedef struct {
uint64_t events;
env_time_t begin_time;
env_time_t end_time;
env_time_t diff_time;
struct {
uint64_t events;
env_time_t sched_ave;
env_time_t sched_max;
env_time_t output_ave;
env_time_t output_max;
} latency;
} core_stat_t;
CORE_STAT_SIZE_ERROR);
typedef struct {
em_eo_t eo_id;
} eo_context_t;
EO_CONTEXT_T__SIZE_ERROR);
typedef struct {
em_queue_t this_queue;
} queue_context_t;
typedef struct {
queue_context_t *q_ctx;
} output_func_args_t;
QUEUE_CONTEXT_SIZE_ERROR);
typedef struct {
env_time_t send_time;
int seq;
em_queue_t sched_queue;
em_queue_t output_queue;
uint8_t data[DATA_SIZE];
} perf_event_t;
typedef struct {
em_pool_t pool;
} perf_shm_t;
PERF_SHM_T__SIZE_ERROR);
static ENV_LOCAL int core_state = CORE_STATE_MEASURE;
static void
queue_step(void);
start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf);
stop(void *eo_context, em_eo_t eo);
static void
em_queue_t queue, void *q_context);
static int
update_test_state(em_event_t event);
static void
create_and_link_queues(int start_queue, int num_queues);
static void
print_test_statistics(test_status_t *test_status, int print_header,
core_stat_t core_stat[]);
static inline em_event_t
alloc_free_per_event(em_event_t event);
static inline void
measure_latency(perf_event_t *const perf_event, queue_context_t *const q_ctx,
env_time_t recv_time);
static int output_fn(const em_event_t events[], const unsigned int num,
const em_queue_t output_queue, void *output_fn_args)
{
env_time_t recv_time;
perf_event_t *perf_event;
em_queue_t dst_queue;
output_func_args_t *fn_args = output_fn_args;
queue_context_t *q_ctx = fn_args->q_ctx;
(void)output_queue;
if (MEASURE_LATENCY)
recv_time = env_time_global();
for (unsigned int i = 0; i < num; i++) {
dst_queue = perf_event->sched_queue;
if (MEASURE_LATENCY) {
measure_latency(perf_event, q_ctx, recv_time);
perf_event->send_time = env_time_global();
}
ret =
em_send(events[i], dst_queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"EM send:%" PRI_STAT
" Queue:%" PRI_QUEUE "",
ret, dst_queue);
}
}
return (int)num;
}
int main(int argc, char *argv[])
{
return cm_setup(argc, argv);
}
{
APPL_PRINT("\nUnable to create more queues\n\n"
"Test finished\n");
raise(SIGINT);
return error;
}
if (appl_shm->exit_flag &&
EM_ESCOPE(escope) &&
if (escope == EM_ESCOPE_EO_ADD_QUEUE_SYNC) {
APPL_PRINT("\nExit: suppress queue setup error\n\n");
return error;
}
}
return test_error_handler(eo, error, escope, args);
}
void test_init(const appl_conf_t *appl_conf)
{
(void)appl_conf;
if (core == 0) {
perf_shm = env_shared_reserve("PerfQueuesSharedMem",
sizeof(perf_shm_t));
} else {
perf_shm = env_shared_lookup("PerfQueuesSharedMem");
}
if (perf_shm == NULL)
"Perf test queues init failed on EM-core: %u\n",
else if (core == 0)
memset(perf_shm, 0, sizeof(perf_shm_t));
}
void test_start(const appl_conf_t *appl_conf)
{
eo_context_t *eo_ctx;
const int sched_q_ctx_size = sizeof(perf_shm->sched_queue_context_tbl);
const int output_q_ctx_size = sizeof(perf_shm->output_queue_context_tbl);
int i;
if (appl_conf->num_pools >= 1)
perf_shm->pool = appl_conf->pools[0];
else
APPL_PRINT("\n"
"***********************************************************\n"
"EM APPLICATION: '%s' initializing:\n"
" %s: %s() - EM-core:%d\n"
" Application running on %u EM-cores (procs:%u, threads:%u)\n"
" Max. NUM_QUEUES: %i\n"
" sizeof sched_queue_context_tbl: %i kB\n"
" sizeof output_queue_context_tbl: %i kB\n"
"***********************************************************\n"
"\n",
appl_conf->name, NO_PATH(__FILE__), __func__,
em_core_id(),
appl_conf->core_count, appl_conf->num_procs, appl_conf->num_threads,
perf_shm->pool, NUM_QUEUES, sched_q_ctx_size / 1024,
output_q_ctx_size / 1024);
"Undefined application event pool!");
perf_shm->test_status.cpu_hz = env_core_hz();
perf_shm->test_status.cpu_mhz = (double)perf_shm->test_status.cpu_hz /
1000000.0;
perf_shm->test_status.num_cores = appl_conf->core_count;
perf_shm->test_status.free_flag = 0;
env_atomic64_init(&perf_shm->test_status.ready_count);
env_atomic64_init(&perf_shm->test_status.freed_count);
env_atomic32_init(&perf_shm->nbr_output_queues);
for (i = 0; i < NUM_EOS; i++) {
eo_ctx = &perf_shm->eo_context_tbl[i];
stop, NULL, receive_func,
eo_ctx);
"EO create failed:%d", i, NUM_EOS);
}
APPL_PRINT(" EOs created\n");
if (CREATE_ALL_QUEUES_AT_STARTUP)
create_and_link_queues(0, NUM_QUEUES);
else
create_and_link_queues(0, queue_steps[0]);
for (i = 0; i < NUM_EOS; i++) {
"EO start(%d):%" PRI_STAT " %" PRI_STAT "",
i, ret, start_ret);
}
queue_step();
}
void test_stop(const appl_conf_t *appl_conf)
{
em_eo_t eo;
em_queue_t queue;
int i;
(void)appl_conf;
APPL_PRINT(
"%s() on EM-core %d\n", __func__,
em_core_id());
for (i = 0; i < NUM_EOS; i++) {
eo = perf_shm->eo[i];
test_fatal_if(ret !=
EM_OK,
"EO:%" PRI_EO " stop:%" PRI_STAT
"",
eo, ret);
}
for (i = 0; i < NUM_EOS; i++) {
eo = perf_shm->eo[i];
test_fatal_if(ret !=
EM_OK,
"EO remove queue all:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
test_fatal_if(ret !=
EM_OK,
"EO:%" PRI_EO " delete:%" PRI_STAT
"",
eo, ret);
}
int nrb_queues = env_atomic32_get(&perf_shm->nbr_output_queues);
for (i = 0; i < nrb_queues; i++) {
queue = perf_shm->output_queues[i];
test_fatal_if(ret !=
EM_OK,
queue, ret);
}
}
void test_term(const appl_conf_t *appl_conf)
{
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
env_shared_free(perf_shm);
}
static void
queue_step(void)
{
queue_context_t *q_ctx_s;
queue_context_t *q_ctx_o;
em_event_t event;
perf_event_t *perf_event;
const int first = perf_shm->test_status.queues;
const int step = perf_shm->test_status.step;
const int queue_count = queue_steps[step];
int i, j;
const int output_queue_count = queue_count / NUM_SCHED_TO_OUTPUT_QUEUE;
test_fatal_if(output_queue_count < 1, "Need to have at least 1 output Queue");
if (CONST_NUM_EVENTS) {
for (i = 0; i < CONST_NUM_EVENTS; i++) {
"EM alloc failed (%i)", i);
perf_event->seq = i;
perf_event->send_time = env_time_global();
q_ctx_s = &perf_shm->sched_queue_context_tbl[i % queue_count];
q_ctx_o = &perf_shm->output_queue_context_tbl[i % output_queue_count];
perf_event->sched_queue = q_ctx_s->this_queue;
perf_event->output_queue = q_ctx_o->this_queue;
ret =
em_send(event, q_ctx_s->this_queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"EM send:%" PRI_STAT "\n"
ret, q_ctx_s->this_queue);
return;
}
}
} else {
for (i = first; i < queue_count; i++) {
int output_idx = i / NUM_SCHED_TO_OUTPUT_QUEUE;
q_ctx_s = &perf_shm->sched_queue_context_tbl[i];
q_ctx_o = &perf_shm->output_queue_context_tbl[output_idx];
for (j = 0; j < NUM_EVENTS; j++) {
perf_shm->pool);
"EM alloc failed (%i)", i);
perf_event->seq = i * NUM_EVENTS + j;
perf_event->send_time = env_time_global();
perf_event->sched_queue = q_ctx_s->this_queue;
perf_event->output_queue = q_ctx_o->this_queue;
ret =
em_send(event, q_ctx_s->this_queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"EM send:%" PRI_STAT "\n"
ret, q_ctx_s->this_queue);
return;
}
}
}
}
perf_shm->test_status.queues = queue_count;
perf_shm->test_status.step++;
APPL_PRINT("\n"
"Number of queues: %d - scheduled:%d + output:%d\n",
queue_count + output_queue_count, queue_count, output_queue_count);
if (CONST_NUM_EVENTS)
APPL_PRINT("Number of events: %d\n", CONST_NUM_EVENTS);
else
APPL_PRINT("Number of events: %d\n",
perf_shm->test_status.queues * NUM_EVENTS);
}
start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf)
{
eo_context_t *eo_ctx = eo_context;
(void)conf;
APPL_PRINT(
"EO %" PRI_EO " starting.\n", eo);
eo_ctx->eo_id = eo;
}
stop(void *eo_context, em_eo_t eo)
{
(void)eo_context;
APPL_PRINT(
"EO %" PRI_EO " stopping.\n", eo);
}
static void
em_queue_t queue, void *q_context)
{
env_time_t recv_time;
if (MEASURE_LATENCY)
recv_time = env_time_global();
queue_context_t *q_ctx;
em_queue_t dst_queue;
int do_return;
(void)eo_context;
(void)type;
(void)queue;
if (unlikely(appl_shm->exit_flag)) {
return;
}
q_ctx = q_context;
dst_queue = perf_event->output_queue;
do_return = update_test_state(event);
if (unlikely(do_return))
return;
if (ALLOC_FREE_PER_EVENT)
event = alloc_free_per_event(event);
if (MEASURE_LATENCY) {
measure_latency(perf_event, q_ctx, recv_time);
perf_event->send_time = env_time_global();
}
ret =
em_send(event, perf_event->output_queue);
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"EM send:%" PRI_STAT
" Queue:%" PRI_QUEUE "",
ret, dst_queue);
}
}
static inline int
update_test_state(em_event_t event)
{
uint64_t events;
uint64_t freed_count;
uint64_t ready_count;
test_status_t *const tstat = &perf_shm->test_status;
core_stat_t *const cstat = &perf_shm->core_stat[core];
events = cstat->events;
events++;
if (unlikely(tstat->reset_flag)) {
events = 0;
if (CONST_NUM_EVENTS) {
if (unlikely(tstat->free_flag)) {
freed_count =
env_atomic64_add_return(&tstat->freed_count, 1);
if (freed_count == CONST_NUM_EVENTS) {
env_atomic64_set(&tstat->freed_count,
0);
tstat->reset_flag = 0;
tstat->free_flag = 0;
queue_step();
}
return 1;
}
}
if (unlikely(core_state != CORE_STATE_IDLE)) {
core_state = CORE_STATE_IDLE;
cstat->begin_time = ENV_TIME_NULL;
ready_count =
env_atomic64_add_return(&tstat->ready_count, 1);
if (ready_count == (uint64_t)tstat->num_cores) {
env_atomic64_set(&tstat->ready_count, 0);
if (CONST_NUM_EVENTS) {
int sample = tstat->samples;
int queues = tstat->queues;
if (sample == 0 && queues < NUM_QUEUES)
tstat->free_flag = 1;
else
tstat->reset_flag = 0;
} else {
tstat->reset_flag = 0;
}
}
}
} else if (unlikely(events == 1)) {
cstat->begin_time = env_time_global();
cstat->latency.events = 0;
cstat->latency.sched_ave = ENV_TIME_NULL;
cstat->latency.sched_max = ENV_TIME_NULL;
cstat->latency.output_ave = ENV_TIME_NULL;
cstat->latency.output_max = ENV_TIME_NULL;
core_state = CORE_STATE_MEASURE;
} else if (unlikely(events == EVENTS_PER_SAMPLE)) {
env_time_t begin_time, end_time;
cstat->end_time = env_time_global();
end_time = cstat->end_time;
begin_time = cstat->begin_time;
cstat->diff_time = env_time_diff(end_time, begin_time);
ready_count = env_atomic64_add_return(&tstat->ready_count, 1);
if (unlikely((int)ready_count == tstat->num_cores)) {
env_atomic64_set(&tstat->ready_count, 0);
tstat->reset_flag = 1;
tstat->samples++;
if (tstat->samples > 1) {
int print_header = tstat->samples == 2 ? 1 : 0;
print_test_statistics(tstat, print_header,
perf_shm->core_stat);
}
if (tstat->samples == NUM_SAMPLES &&
tstat->queues < NUM_QUEUES) {
if (!CREATE_ALL_QUEUES_AT_STARTUP) {
int step = tstat->step;
int first_q = tstat->queues;
int num_qs = queue_steps[step] -
queue_steps[step - 1];
create_and_link_queues(first_q, num_qs);
}
if (!CONST_NUM_EVENTS)
queue_step();
tstat->samples = 0;
}
}
}
cstat->events = events;
return 0;
}
static void
create_and_link_queues(int start_queue, int num_queues)
{
int i, j;
em_queue_t queue;
em_queue_group_t group;
queue_context_t *q_ctx;
uint32_t output_tot_cnt = 0;
const int num_output_queues = num_queues / NUM_SCHED_TO_OUTPUT_QUEUE;
APPL_PRINT("\nCreate new queues - scheduled:%d + output:%d\n",
num_queues, num_output_queues);
if (num_queues % NUM_EOS != 0) {
APPL_PRINT("%s() 'num_queues'=%d not multiple of NUM_EOS=%d\n",
__func__, num_queues, NUM_EOS);
return;
}
for (i = start_queue; i < (start_queue + num_queues); i += NUM_EOS) {
for (j = 0; j < NUM_EOS; j++) {
snprintf(queue_name_sched, sizeof(queue_name_sched),
"Q-sched-%" PRIu8 "", i + j);
queue_name_sched[sizeof(queue_name_sched) - 1] = '\0';
type = QUEUE_TYPE;
NULL);
APPL_PRINT("Max nbr of supported queues: %d\n", i);
return;
}
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"em_eo_add_queue_sync():%" PRI_STAT "\n"
ret, perf_shm->eo[j], queue);
return;
}
q_ctx = &perf_shm->sched_queue_context_tbl[i + j];
q_ctx->this_queue = queue;
q_ctx->prio = prio;
q_ctx->type = type;
test_fatal_if(ret !=
EM_OK,
"em_queue_set_context():%" PRI_STAT "\n"
ret, perf_shm->eo[j], queue);
}
}
int last_idx = start_queue + num_queues - 1;
num_queues,
perf_shm->sched_queue_context_tbl[start_queue].this_queue,
perf_shm->sched_queue_context_tbl[last_idx].this_queue);
for (i = 0; i < (num_output_queues); i++) {
output_func_args_t args;
output_counter = &perf_shm->nbr_output_queues;
output_tot_cnt = env_atomic32_get(output_counter);
snprintf(queue_name_output, sizeof(queue_name_output),
"Q-output-%" PRIu8 "", output_tot_cnt);
queue_name_output[sizeof(queue_name_output) - 1] = '\0';
memset(&queue_conf, 0, sizeof(queue_conf));
memset(&output_conf, 0, sizeof(output_conf));
queue_conf.
conf_len =
sizeof(output_conf);
queue_conf.
conf = &output_conf;
args.q_ctx = &perf_shm->output_queue_context_tbl[output_tot_cnt];
output_conf.
args_len =
sizeof(queue_context_t *);
&queue_conf);
env_atomic32_inc(output_counter);
perf_shm->output_queues[output_tot_cnt] = queue;
}
APPL_PRINT("Max nbr of Output queues: %d\n",
return;
}
q_ctx = &perf_shm->output_queue_context_tbl[output_tot_cnt];
q_ctx->this_queue = queue;
q_ctx->prio = prio;
q_ctx->type = type;
}
int first_idx = output_tot_cnt + 1 - num_output_queues;
num_output_queues,
perf_shm->output_queue_context_tbl[first_idx].this_queue,
perf_shm->output_queue_context_tbl[output_tot_cnt].this_queue);
}
static void
print_test_statistics(test_status_t *test_status, int print_header,
core_stat_t core_stat[])
{
const int num_cores = test_status->num_cores;
const uint64_t cpu_hz = test_status->cpu_hz;
const double cpu_mhz = test_status->cpu_mhz;
const uint64_t total_events = (uint64_t)num_cores * EVENTS_PER_SAMPLE;
const uint64_t print_count = test_status->print_count++;
env_time_t total_time = ENV_TIME_NULL;
for (int i = 0; i < num_cores; i++)
total_time = env_time_sum(total_time, core_stat[i].diff_time);
double cycles_per_event = 0.0;
double events_per_sec = 0.0;
if (likely(total_events > 0))
cycles_per_event = env_time_to_cycles(total_time, cpu_hz) /
(double)total_events;
if (likely(cycles_per_event > 0))
events_per_sec = cpu_mhz * num_cores / cycles_per_event;
if (!MEASURE_LATENCY) {
if (print_header)
APPL_PRINT(RESULT_PRINTF_HDR);
APPL_PRINT(RESULT_PRINTF_FMT,
cycles_per_event, events_per_sec,
cpu_mhz, print_count);
return;
}
uint64_t latency_events = 0;
env_time_t latency_hi_ave = ENV_TIME_NULL;
env_time_t latency_hi_max = ENV_TIME_NULL;
env_time_t latency_lo_ave = ENV_TIME_NULL;
env_time_t latency_lo_max = ENV_TIME_NULL;
for (int i = 0; i < num_cores; i++) {
latency_events += core_stat[i].latency.events;
latency_hi_ave = env_time_sum(latency_hi_ave,
core_stat[i].latency.sched_ave);
latency_lo_ave = env_time_sum(latency_lo_ave,
core_stat[i].latency.output_ave);
if (env_time_cmp(core_stat[i].latency.sched_max,
latency_hi_max) > 0) {
latency_hi_max = core_stat[i].latency.sched_max;
}
if (env_time_cmp(core_stat[i].latency.output_max,
latency_lo_max) > 0) {
latency_lo_max = core_stat[i].latency.output_max;
}
}
double lat_per_hi_ave = 0.0;
double lat_per_lo_ave = 0.0;
if (likely(latency_events > 0)) {
lat_per_hi_ave = env_time_to_cycles(latency_hi_ave, cpu_hz) /
(double)latency_events;
lat_per_lo_ave = env_time_to_cycles(latency_lo_ave, cpu_hz) /
(double)latency_events;
}
if (print_header)
APPL_PRINT(RESULT_PRINTF_LATENCY_HDR);
APPL_PRINT(RESULT_PRINTF_LATENCY_FMT,
cycles_per_event, events_per_sec, lat_per_hi_ave,
env_time_to_cycles(latency_hi_max, cpu_hz),
lat_per_lo_ave,
env_time_to_cycles(latency_lo_max, cpu_hz),
cpu_mhz, print_count);
}
static inline em_event_t
alloc_free_per_event(em_event_t event)
{
env_time_t send_time = perf_event->send_time;
int seq = perf_event->seq;
em_queue_t sched_queue = perf_event->sched_queue;
em_queue_t output_queue = perf_event->output_queue;
perf_event->sched_queue = sched_queue;
perf_event->output_queue = output_queue;
perf_event->send_time = send_time;
perf_event->seq = seq;
return event;
}
static inline void
measure_latency(perf_event_t *const perf_event, queue_context_t *const q_ctx,
env_time_t recv_time)
{
core_stat_t *const cstat = &perf_shm->core_stat[core];
const env_time_t send_time = perf_event->send_time;
env_time_t latency;
if (perf_shm->test_status.reset_flag ||
cstat->events == 0 || cstat->events >= EVENTS_PER_SAMPLE)
return;
cstat->latency.events++;
latency = env_time_diff(recv_time, send_time);
cstat->latency.sched_ave =
env_time_sum(cstat->latency.sched_ave, latency);
if (env_time_cmp(latency, cstat->latency.sched_max) > 0)
cstat->latency.sched_max = latency;
} else {
cstat->latency.output_ave =
env_time_sum(cstat->latency.output_ave, latency);
if (env_time_cmp(latency, cstat->latency.output_max) > 0)
cstat->latency.output_max = latency;
}
}