EM-ODP  3.7.0
Event Machine on ODP
queues_output.c
/*
* Copyright (c) 2012, Nokia Siemens Networks
* Copyright (c) 2024, Nokia Solutions and Networks
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/**
* @file
*
* Event Machine performance test for output queues.
*
* Measures the average cycles consumed during an event send_scheduled_queue->
* schedule->eo_receive->send_output_queue->output_fn -loop. One or more
* Scheduled queues are linked with an output queue. NUM_SCHED_TO_OUTPUT_QUEUE
* defines how many scheduled queues are created and linked with each output
* queue. Each event is circulated within the linked scheduled queue and output
* queue. The test increases the number of queues[+events] for each measurement
* round and prints the results. The test will stop if the maximum number of
* supported queues is reached.
*
* Test derived from the programs/performance/queues.c test but additionally
* uses output queues between the processing EO's and scheduled queues.
*
*/
#include <inttypes.h>
#include <string.h>
#include <stdio.h>
#include <event_machine.h>
#include "cm_setup.h"
#include "cm_error_handler.h"
/*
* Test options:
*/
/* Alloc and free per event */
#define ALLOC_FREE_PER_EVENT 0 /* false=0 or true=1 */
/*
* Create all EM queues at startup or create the queues during
* the test in steps.
*/
#define CREATE_ALL_QUEUES_AT_STARTUP 0 /* false=0 or true=1 */
/*
* Measure the send-enqueue-schedule-receive latency and send-enqueue-output-fn
* latency.
*/
#define MEASURE_LATENCY 1 /* false=0 or true=1 */
/*
* Keep the number of events constant while increasing the number of queues.
* Should be dividable by or factor of queue_step.
*/
#define CONST_NUM_EVENTS 4096 /* true>0 or false=0 */
/*
* Test configuration:
*/
#define MAX_CORES 64
/* Number of EO's, the queues are evenly attached to EOs */
#define NUM_EOS 4
/* Number of events per queue */
#define NUM_EVENTS 4
#if CONST_NUM_EVENTS > 0
/*
* Total number of scheduled queues when using a constant number of events.
* Make sure that all queues get 'NUM_EVENTS' events per queue.
*/
#define NUM_QUEUES (CONST_NUM_EVENTS / NUM_EVENTS)
#else
/*
* Total number of queues when increasing the total event count for each queue
* step.
*/
#define NUM_QUEUES (NUM_EOS * 16 * 1024)
#endif
/* Number of data bytes in an event */
#define DATA_SIZE 128
/* Samples before adding more queues */
#define NUM_SAMPLES (1 + 8) /* setup(1) + measure(N) */
/* Num events a core processes between samples */
#define EVENTS_PER_SAMPLE 0x400000
/* Scheduled queue type */
#define QUEUE_TYPE EM_QUEUE_TYPE_ATOMIC
/* Core states during test. */
#define CORE_STATE_MEASURE 0
#define CORE_STATE_IDLE 1
/* Result APPL_PRINT() format string */
#define RESULT_PRINTF_HDR "Cycles/Event Events/s cpu-freq\n"
#define RESULT_PRINTF_FMT "%12.0f %7.0f M %5.0f MHz %" PRIu64 "\n"
/* Result APPL_PRINT() format string when MEASURE_LATENCY is used */
#define RESULT_PRINTF_LATENCY_HDR \
"Cycles/ Events/ Latency:\n" \
" Event Sec sched-ave sched-max output-ave output-max cpu-freq\n"
#define RESULT_PRINTF_LATENCY_FMT \
"%6.0f %7.2f M %11.0f %10" PRIu64 " %10.0f %10" PRIu64 " %5.0f MHz %" PRIu64 "\n"
/*
* Number of scheduled queues for each output queue.
* With the default queue steps this can be adjusted in the range of 1-8.
* Value 1 means that for each scheduled queue one output queue is created.
*/
#define NUM_SCHED_TO_OUTPUT_QUEUE 8
/*
* The number of scheduled queues to use in each test step.
*
* NOTE: The max queue step is always 'NUM_QUEUES', even if the value of
* 'NUM_QUEUES' would be smaller than a listed queue step (then just stop
* before reaching the end of the list).
*/
static const int queue_steps[] = {8, 16, 32, 64, 128, 256, 512, 1024, 2048,
4096, 8192, 16384, 32768, 65536, NUM_QUEUES};
/**
* Test state,
* cache line alignment and padding handled in 'perf_shm_t'
*/
typedef struct {
int queues;
int step;
int samples;
int num_cores;
int reset_flag;
double cpu_mhz;
uint64_t cpu_hz;
uint64_t print_count;
env_atomic64_t ready_count;
/* if using CONST_NUM_EVENTS:*/
int free_flag;
env_atomic64_t freed_count;
} test_status_t;
/**
* Performance test statistics (per core)
*/
typedef struct {
uint64_t events;
env_time_t begin_time;
env_time_t end_time;
env_time_t diff_time;
struct {
uint64_t events;
env_time_t sched_ave;
env_time_t sched_max;
env_time_t output_ave;
env_time_t output_max;
} latency;
/* Pad size to a multiple of cache line size */
void *end[0] ENV_CACHE_LINE_ALIGNED;
} core_stat_t;
COMPILE_TIME_ASSERT(sizeof(core_stat_t) % ENV_CACHE_LINE_SIZE == 0,
CORE_STAT_SIZE_ERROR);
/**
* EO context data
*/
typedef struct {
em_eo_t eo_id;
/* Pad size to a multiple of cache line size */
void *end[0] ENV_CACHE_LINE_ALIGNED;
} eo_context_t;
COMPILE_TIME_ASSERT(sizeof(eo_context_t) % ENV_CACHE_LINE_SIZE == 0,
EO_CONTEXT_T__SIZE_ERROR);
/**
* Queue context data
*/
typedef struct {
/* This queue */
em_queue_t this_queue;
/* Priority of 'this_queue' */
/* Type of 'this_queue' */
/* Pad size to a multiple of cache line size */
void *end[0] ENV_CACHE_LINE_ALIGNED;
} queue_context_t;
/**
* Output queue function arguments, pointer to queue context
*/
typedef struct {
queue_context_t *q_ctx;
} output_func_args_t;
COMPILE_TIME_ASSERT(sizeof(queue_context_t) % ENV_CACHE_LINE_SIZE == 0,
QUEUE_CONTEXT_SIZE_ERROR);
/**
* Performance test event
*/
typedef struct {
/* Send time stamp */
env_time_t send_time;
/* Sequence number */
int seq;
/* Scheduled queue */
em_queue_t sched_queue;
/* Output queue */
em_queue_t output_queue;
/* Test data */
uint8_t data[DATA_SIZE];
} perf_event_t;
/**
* Test shared memory
*/
typedef struct {
/* Event pool used by this application */
em_pool_t pool;
test_status_t test_status ENV_CACHE_LINE_ALIGNED;
core_stat_t core_stat[MAX_CORES] ENV_CACHE_LINE_ALIGNED;
eo_context_t eo_context_tbl[NUM_EOS] ENV_CACHE_LINE_ALIGNED;
queue_context_t sched_queue_context_tbl[NUM_QUEUES] ENV_CACHE_LINE_ALIGNED;
queue_context_t output_queue_context_tbl[NUM_QUEUES] ENV_CACHE_LINE_ALIGNED;
em_queue_t output_queues[EM_MAX_OUTPUT_QUEUES] ENV_CACHE_LINE_ALIGNED;
/* EO ID's */
em_eo_t eo[NUM_EOS] ENV_CACHE_LINE_ALIGNED;
} perf_shm_t;
COMPILE_TIME_ASSERT(sizeof(perf_shm_t) % ENV_CACHE_LINE_SIZE == 0,
PERF_SHM_T__SIZE_ERROR);
/* EM-core local pointer to shared memory */
static ENV_LOCAL perf_shm_t *perf_shm;
/* EM-core local state */
static ENV_LOCAL int core_state = CORE_STATE_MEASURE;
error_handler(em_eo_t eo, em_status_t error, em_escope_t escope, va_list args);
static void
queue_step(void);
start(void *eo_context, em_eo_t eo, const em_eo_conf_t *conf);
stop(void *eo_context, em_eo_t eo);
static void
receive_func(void *eo_context, em_event_t event, em_event_type_t type,
em_queue_t queue, void *q_context);
static int
update_test_state(em_event_t event);
static void
create_and_link_queues(int start_queue, int num_queues);
static void
print_test_statistics(test_status_t *test_status, int print_header,
core_stat_t core_stat[]);
static inline em_event_t
alloc_free_per_event(em_event_t event);
static inline void
measure_latency(perf_event_t *const perf_event, queue_context_t *const q_ctx,
env_time_t recv_time);
/* Output queue callback function */
static int output_fn(const em_event_t events[], const unsigned int num,
const em_queue_t output_queue, void *output_fn_args)
{
env_time_t recv_time;
perf_event_t *perf_event;
em_queue_t dst_queue;
output_func_args_t *fn_args = output_fn_args;
queue_context_t *q_ctx = fn_args->q_ctx;
(void)output_queue;
if (MEASURE_LATENCY)
recv_time = env_time_global();
for (unsigned int i = 0; i < num; i++) {
perf_event = em_event_pointer(events[i]);
dst_queue = perf_event->sched_queue;
if (MEASURE_LATENCY) {
measure_latency(perf_event, q_ctx, recv_time);
perf_event->send_time = env_time_global();
}
ret = em_send(events[i], dst_queue);
if (unlikely(ret != EM_OK)) {
em_free(events[i]);
test_fatal_if(!appl_shm->exit_flag,
"EM send:%" PRI_STAT " Queue:%" PRI_QUEUE "",
ret, dst_queue);
}
}
return (int)num;
}
/**
* Main function
*
* Call cm_setup() to perform test & EM setup common for all the
* test applications.
*
* cm_setup() will call test_init() and test_start() and launch
* the EM dispatch loop on every EM-core.
*/
int main(int argc, char *argv[])
{
return cm_setup(argc, argv);
}
/**
* Test error handler
*
* @param eo Execution object id
* @param error The error code
* @param escope Error scope
* @param args List of arguments (__FILE__, __func__, __LINE__,
* (format), ## __VA_ARGS__)
*
* @return The original error code.
*/
error_handler(em_eo_t eo, em_status_t error, em_escope_t escope, va_list args)
{
if (escope == EM_ESCOPE_QUEUE_CREATE && !EM_ERROR_IS_FATAL(error)) {
APPL_PRINT("\nUnable to create more queues\n\n"
"Test finished\n");
raise(SIGINT);
return error;
}
if (appl_shm->exit_flag && EM_ESCOPE(escope) &&
!EM_ERROR_IS_FATAL(error)) {
/* Suppress non-fatal EM-error logs during tear-down */
if (escope == EM_ESCOPE_EO_ADD_QUEUE_SYNC) {
APPL_PRINT("\nExit: suppress queue setup error\n\n");
return error;
}
}
return test_error_handler(eo, error, escope, args);
}
/**
* Init of the Queues performance test application.
*
* @attention Run on all cores.
*
* @see cm_setup() for setup and dispatch.
*/
void test_init(const appl_conf_t *appl_conf)
{
(void)appl_conf;
int core = em_core_id();
if (core == 0) {
perf_shm = env_shared_reserve("PerfQueuesSharedMem",
sizeof(perf_shm_t));
em_register_error_handler(error_handler);
} else {
perf_shm = env_shared_lookup("PerfQueuesSharedMem");
}
if (perf_shm == NULL)
test_error(EM_ERROR_SET_FATAL(0xec0de), 0xdead,
"Perf test queues init failed on EM-core: %u\n",
else if (core == 0)
memset(perf_shm, 0, sizeof(perf_shm_t));
}
/**
* Startup of the Queues output performance test application.
*
* @attention Run only on EM core 0.
*
* @param appl_conf Application configuration
*
* @see cm_setup() for setup and dispatch.
*/
void test_start(const appl_conf_t *appl_conf)
{
eo_context_t *eo_ctx;
em_status_t ret, start_ret = EM_ERROR;
const int sched_q_ctx_size = sizeof(perf_shm->sched_queue_context_tbl);
const int output_q_ctx_size = sizeof(perf_shm->output_queue_context_tbl);
int i;
/*
* Store the event pool to use, use the EM default pool if no other
* pool is provided through the appl_conf.
*/
if (appl_conf->num_pools >= 1)
perf_shm->pool = appl_conf->pools[0];
else
perf_shm->pool = EM_POOL_DEFAULT;
APPL_PRINT("\n"
"***********************************************************\n"
"EM APPLICATION: '%s' initializing:\n"
" %s: %s() - EM-core:%d\n"
" Application running on %u EM-cores (procs:%u, threads:%u)\n"
" using event pool:%" PRI_POOL "\n"
" Max. NUM_QUEUES: %i\n"
" sizeof sched_queue_context_tbl: %i kB\n"
" sizeof output_queue_context_tbl: %i kB\n"
"***********************************************************\n"
"\n",
appl_conf->name, NO_PATH(__FILE__), __func__, em_core_id(),
appl_conf->core_count, appl_conf->num_procs, appl_conf->num_threads,
perf_shm->pool, NUM_QUEUES, sched_q_ctx_size / 1024,
output_q_ctx_size / 1024);
test_fatal_if(perf_shm->pool == EM_POOL_UNDEF,
"Undefined application event pool!");
perf_shm->test_status.cpu_hz = env_core_hz();
perf_shm->test_status.cpu_mhz = (double)perf_shm->test_status.cpu_hz /
1000000.0;
perf_shm->test_status.num_cores = appl_conf->core_count;
perf_shm->test_status.free_flag = 0;
env_atomic64_init(&perf_shm->test_status.ready_count);
env_atomic64_init(&perf_shm->test_status.freed_count);
env_atomic32_init(&perf_shm->nbr_output_queues);
/* Create EOs */
for (i = 0; i < NUM_EOS; i++) {
eo_ctx = &perf_shm->eo_context_tbl[i];
perf_shm->eo[i] = em_eo_create("perf test eo", start, NULL,
stop, NULL, receive_func,
eo_ctx);
test_fatal_if(perf_shm->eo[i] == EM_EO_UNDEF,
"EO create failed:%d", i, NUM_EOS);
}
APPL_PRINT(" EOs created\n");
/*
* Create and link queues
*/
if (CREATE_ALL_QUEUES_AT_STARTUP) /* Create ALL queues at once */
create_and_link_queues(0, NUM_QUEUES);
else /* Create queues for the first step, then more before each step */
create_and_link_queues(0, queue_steps[0]);
/* Start EOs */
for (i = 0; i < NUM_EOS; i++) {
ret = em_eo_start_sync(perf_shm->eo[i], &start_ret, NULL);
test_fatal_if(ret != EM_OK || start_ret != EM_OK,
"EO start(%d):%" PRI_STAT " %" PRI_STAT "",
i, ret, start_ret);
}
queue_step();
}
/**
* Stop the test, only run on one core
*/
void test_stop(const appl_conf_t *appl_conf)
{
em_eo_t eo;
em_queue_t queue;
int i;
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, em_core_id());
/* Stop EOs */
for (i = 0; i < NUM_EOS; i++) {
eo = perf_shm->eo[i];
ret = em_eo_stop_sync(eo);
test_fatal_if(ret != EM_OK,
"EO:%" PRI_EO " stop:%" PRI_STAT "",
eo, ret);
}
/* Remove and delete all of the EO's queues, then delete the EO */
for (i = 0; i < NUM_EOS; i++) {
eo = perf_shm->eo[i];
ret = em_eo_remove_queue_all_sync(eo, EM_TRUE/*delete Qs*/);
test_fatal_if(ret != EM_OK,
"EO remove queue all:%" PRI_STAT " EO:%" PRI_EO "",
ret, eo);
ret = em_eo_delete(eo);
test_fatal_if(ret != EM_OK,
"EO:%" PRI_EO " delete:%" PRI_STAT "",
eo, ret);
}
int nrb_queues = env_atomic32_get(&perf_shm->nbr_output_queues);
for (i = 0; i < nrb_queues; i++) {
queue = perf_shm->output_queues[i];
ret = em_queue_delete(queue);
test_fatal_if(ret != EM_OK,
"Queue:%" PRI_QUEUE " delete:%" PRI_STAT "",
queue, ret);
}
}
/**
* Terminate the test, only run on one core
*/
void test_term(const appl_conf_t *appl_conf)
{
(void)appl_conf;
int core = em_core_id();
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
env_shared_free(perf_shm);
}
/**
* Allocate, initialize and send test step events.
*/
static void
queue_step(void)
{
queue_context_t *q_ctx_s;
queue_context_t *q_ctx_o;
em_event_t event;
perf_event_t *perf_event;
const int first = perf_shm->test_status.queues;
const int step = perf_shm->test_status.step;
const int queue_count = queue_steps[step];
int i, j;
const int output_queue_count = queue_count / NUM_SCHED_TO_OUTPUT_QUEUE;
test_fatal_if(output_queue_count < 1, "Need to have at least 1 output Queue");
/* Allocate and send test events for the queues in the first step */
if (CONST_NUM_EVENTS) {
for (i = 0; i < CONST_NUM_EVENTS; i++) {
event = em_alloc(sizeof(perf_event_t),
EM_EVENT_TYPE_SW, perf_shm->pool);
test_fatal_if(event == EM_EVENT_UNDEF,
"EM alloc failed (%i)", i);
perf_event = em_event_pointer(event);
perf_event->seq = i;
perf_event->send_time = env_time_global();
/* Allocate events evenly to the queues */
q_ctx_s = &perf_shm->sched_queue_context_tbl[i % queue_count];
q_ctx_o = &perf_shm->output_queue_context_tbl[i % output_queue_count];
perf_event->sched_queue = q_ctx_s->this_queue;
perf_event->output_queue = q_ctx_o->this_queue;
ret = em_send(event, q_ctx_s->this_queue);
if (unlikely(ret != EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"EM send:%" PRI_STAT "\n"
"Queue:%" PRI_QUEUE "",
ret, q_ctx_s->this_queue);
em_free(event);
return;
}
}
} else {
for (i = first; i < queue_count; i++) {
int output_idx = i / NUM_SCHED_TO_OUTPUT_QUEUE;
q_ctx_s = &perf_shm->sched_queue_context_tbl[i];
q_ctx_o = &perf_shm->output_queue_context_tbl[output_idx];
for (j = 0; j < NUM_EVENTS; j++) {
event = em_alloc(sizeof(perf_event_t),
perf_shm->pool);
test_fatal_if(event == EM_EVENT_UNDEF,
"EM alloc failed (%i)", i);
perf_event = em_event_pointer(event);
perf_event->seq = i * NUM_EVENTS + j;
perf_event->send_time = env_time_global();
perf_event->sched_queue = q_ctx_s->this_queue;
perf_event->output_queue = q_ctx_o->this_queue;
ret = em_send(event, q_ctx_s->this_queue);
if (unlikely(ret != EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"EM send:%" PRI_STAT "\n"
"Queue:%" PRI_QUEUE "",
ret, q_ctx_s->this_queue);
em_free(event);
return;
}
}
}
}
perf_shm->test_status.queues = queue_count;
perf_shm->test_status.step++;
APPL_PRINT("\n"
"Number of queues: %d - scheduled:%d + output:%d\n",
queue_count + output_queue_count, queue_count, output_queue_count);
if (CONST_NUM_EVENTS)
APPL_PRINT("Number of events: %d\n", CONST_NUM_EVENTS);
else
APPL_PRINT("Number of events: %d\n",
perf_shm->test_status.queues * NUM_EVENTS);
}
/**
* @private
*
* EO start function.
*
*/
start(void *eo_context, em_eo_t eo, const em_eo_conf_t *conf)
{
eo_context_t *eo_ctx = eo_context;
(void)conf;
APPL_PRINT("EO %" PRI_EO " starting.\n", eo);
eo_ctx->eo_id = eo;
return EM_OK;
}
/**
* @private
*
* EO stop function.
*/
stop(void *eo_context, em_eo_t eo)
{
(void)eo_context;
APPL_PRINT("EO %" PRI_EO " stopping.\n", eo);
return EM_OK;
}
/**
* @private
*
* EO receive function.
*
* Loops back events and calculates the event rate.
*/
static void
receive_func(void *eo_context, em_event_t event, em_event_type_t type,
em_queue_t queue, void *q_context)
{
env_time_t recv_time;
if (MEASURE_LATENCY)
recv_time = env_time_global();
perf_event_t *perf_event = em_event_pointer(event);
queue_context_t *q_ctx;
em_queue_t dst_queue;
int do_return;
(void)eo_context;
(void)type;
(void)queue;
if (unlikely(appl_shm->exit_flag)) {
em_free(event);
return;
}
q_ctx = q_context;
dst_queue = perf_event->output_queue;
/*
* Helper: Update the test state, count recv events,
* calc & print stats, prepare for next step
*/
do_return = update_test_state(event);
if (unlikely(do_return))
return;
if (ALLOC_FREE_PER_EVENT)
event = alloc_free_per_event(event);
if (MEASURE_LATENCY) {
measure_latency(perf_event, q_ctx, recv_time);
perf_event->send_time = env_time_global();
}
/* Send the event to the next queue */
ret = em_send(event, perf_event->output_queue);
if (unlikely(ret != EM_OK)) {
em_free(event);
test_fatal_if(!appl_shm->exit_flag,
"EM send:%" PRI_STAT " Queue:%" PRI_QUEUE "",
ret, dst_queue);
}
}
/**
* Receive function helper: Update the test state
*
* Calculates the number of received events, maintains & prints test statistics
* and restarts/reconfigures the test for the next queue/event-setup
*
* @return '1' if the caller receive function should immediately return,
* '0' otherwise
*/
static inline int
update_test_state(em_event_t event)
{
uint64_t events;
uint64_t freed_count;
uint64_t ready_count;
const int core = em_core_id();
test_status_t *const tstat = &perf_shm->test_status;
core_stat_t *const cstat = &perf_shm->core_stat[core];
events = cstat->events;
events++;
if (unlikely(tstat->reset_flag)) {
events = 0;
if (CONST_NUM_EVENTS) {
/* Free all old events before allocating new ones. */
if (unlikely(tstat->free_flag)) {
em_free(event);
freed_count =
env_atomic64_add_return(&tstat->freed_count, 1);
if (freed_count == CONST_NUM_EVENTS) {
/* Last event */
env_atomic64_set(&tstat->freed_count,
0);
tstat->reset_flag = 0;
tstat->free_flag = 0;
queue_step();
}
/* Req caller receive-func to return */
return 1;
}
}
if (unlikely(core_state != CORE_STATE_IDLE)) {
core_state = CORE_STATE_IDLE;
cstat->begin_time = ENV_TIME_NULL;
ready_count =
env_atomic64_add_return(&tstat->ready_count, 1);
if (ready_count == (uint64_t)tstat->num_cores) {
env_atomic64_set(&tstat->ready_count, 0);
if (CONST_NUM_EVENTS) {
int sample = tstat->samples;
int queues = tstat->queues;
if (sample == 0 && queues < NUM_QUEUES)
tstat->free_flag = 1;
else
tstat->reset_flag = 0;
} else {
tstat->reset_flag = 0;
}
}
}
} else if (unlikely(events == 1)) {
cstat->begin_time = env_time_global();
cstat->latency.events = 0;
cstat->latency.sched_ave = ENV_TIME_NULL;
cstat->latency.sched_max = ENV_TIME_NULL;
cstat->latency.output_ave = ENV_TIME_NULL;
cstat->latency.output_max = ENV_TIME_NULL;
core_state = CORE_STATE_MEASURE;
} else if (unlikely(events == EVENTS_PER_SAMPLE)) {
/*
* Measurements done for this step. Store results and continue
* receiving events until all cores are done.
*/
env_time_t begin_time, end_time;
cstat->end_time = env_time_global();
end_time = cstat->end_time;
begin_time = cstat->begin_time;
cstat->diff_time = env_time_diff(end_time, begin_time);
ready_count = env_atomic64_add_return(&tstat->ready_count, 1);
/*
* Check whether all cores are done with the step,
* and if done proceed to the next step
*/
if (unlikely((int)ready_count == tstat->num_cores)) {
/* No real need for atomicity here, ran on last core*/
env_atomic64_set(&tstat->ready_count, 0);
tstat->reset_flag = 1;
tstat->samples++;
/*
* Print statistics.
* Omit prints for the first sample round to allow the
* test to stabilize after setups and teardowns.
*/
if (tstat->samples > 1) {
int print_header = tstat->samples == 2 ? 1 : 0;
print_test_statistics(tstat, print_header,
perf_shm->core_stat);
}
/*
* Start next test step - setup new queues
*/
if (tstat->samples == NUM_SAMPLES &&
tstat->queues < NUM_QUEUES) {
if (!CREATE_ALL_QUEUES_AT_STARTUP) {
int step = tstat->step;
int first_q = tstat->queues;
int num_qs = queue_steps[step] -
queue_steps[step - 1];
create_and_link_queues(first_q, num_qs);
}
if (!CONST_NUM_EVENTS)
queue_step();
tstat->samples = 0;
}
}
}
cstat->events = events;
return 0;
}
/**
* Creates a number of EM queues, associates them with EOs, and links them.
*/
static void
create_and_link_queues(int start_queue, int num_queues)
{
int i, j;
em_queue_t queue;
em_queue_group_t group;
queue_context_t *q_ctx;
em_queue_conf_t queue_conf;
uint32_t output_tot_cnt = 0;
env_atomic32_t *output_counter;
const int num_output_queues = num_queues / NUM_SCHED_TO_OUTPUT_QUEUE;
char queue_name_sched[EM_QUEUE_NAME_LEN];
char queue_name_output[EM_QUEUE_NAME_LEN];
APPL_PRINT("\nCreate new queues - scheduled:%d + output:%d\n",
num_queues, num_output_queues);
if (num_queues % NUM_EOS != 0) {
APPL_PRINT("%s() 'num_queues'=%d not multiple of NUM_EOS=%d\n",
__func__, num_queues, NUM_EOS);
return;
}
/* create scheduled queues */
for (i = start_queue; i < (start_queue + num_queues); i += NUM_EOS) {
for (j = 0; j < NUM_EOS; j++) {
snprintf(queue_name_sched, sizeof(queue_name_sched),
"Q-sched-%" PRIu8 "", i + j);
queue_name_sched[sizeof(queue_name_sched) - 1] = '\0';
type = QUEUE_TYPE;
queue = em_queue_create(queue_name_sched, type, prio, group,
NULL);
if (queue == EM_QUEUE_UNDEF) {
APPL_PRINT("Max nbr of supported queues: %d\n", i);
return;
}
ret = em_eo_add_queue_sync(perf_shm->eo[j], queue);
if (unlikely(ret != EM_OK)) {
test_fatal_if(!appl_shm->exit_flag,
"em_eo_add_queue_sync():%" PRI_STAT "\n"
"EO:%" PRI_EO " Q:%" PRI_QUEUE "",
ret, perf_shm->eo[j], queue);
return;
}
q_ctx = &perf_shm->sched_queue_context_tbl[i + j];
/* Link queues */
q_ctx->this_queue = queue;
q_ctx->prio = prio;
q_ctx->type = type;
ret = em_queue_set_context(queue, q_ctx);
test_fatal_if(ret != EM_OK,
"em_queue_set_context():%" PRI_STAT "\n"
"EO:%" PRI_EO " Q:%" PRI_QUEUE "",
ret, perf_shm->eo[j], queue);
}
}
int last_idx = start_queue + num_queues - 1;
APPL_PRINT("New Scheduled Qs created:%d First:%" PRI_QUEUE " Last:%" PRI_QUEUE "\n",
num_queues,
perf_shm->sched_queue_context_tbl[start_queue].this_queue,
perf_shm->sched_queue_context_tbl[last_idx].this_queue);
/* create output queues */
for (i = 0; i < (num_output_queues); i++) {
output_func_args_t args;
output_counter = &perf_shm->nbr_output_queues;
output_tot_cnt = env_atomic32_get(output_counter);
snprintf(queue_name_output, sizeof(queue_name_output),
"Q-output-%" PRIu8 "", output_tot_cnt);
queue_name_output[sizeof(queue_name_output) - 1] = '\0';
memset(&queue_conf, 0, sizeof(queue_conf));
memset(&output_conf, 0, sizeof(output_conf));
queue_conf.min_events = 0; /* system default */
queue_conf.conf_len = sizeof(output_conf);
queue_conf.conf = &output_conf;
output_conf.output_fn = output_fn;
args.q_ctx = &perf_shm->output_queue_context_tbl[output_tot_cnt];
output_conf.output_fn_args = &args;
output_conf.args_len = sizeof(queue_context_t *);
queue = em_queue_create(queue_name_output, type, prio, group,
&queue_conf);
if (queue != EM_QUEUE_UNDEF) {
env_atomic32_inc(output_counter);
perf_shm->output_queues[output_tot_cnt] = queue;
}
if (queue == EM_QUEUE_UNDEF) {
APPL_PRINT("Max nbr of Output queues: %d\n",
return;
}
q_ctx = &perf_shm->output_queue_context_tbl[output_tot_cnt];
/* Link queues */
q_ctx->this_queue = queue;
q_ctx->prio = prio;
q_ctx->type = type;
}
int first_idx = output_tot_cnt + 1 - num_output_queues;
APPL_PRINT("New Output Qs created:%d First:%" PRI_QUEUE " Last:%" PRI_QUEUE "\n",
num_output_queues,
perf_shm->output_queue_context_tbl[first_idx].this_queue,
perf_shm->output_queue_context_tbl[output_tot_cnt].this_queue);
}
/**
* Print test statistics
*/
static void
print_test_statistics(test_status_t *test_status, int print_header,
core_stat_t core_stat[])
{
const int num_cores = test_status->num_cores;
const uint64_t cpu_hz = test_status->cpu_hz;
const double cpu_mhz = test_status->cpu_mhz;
const uint64_t total_events = (uint64_t)num_cores * EVENTS_PER_SAMPLE;
const uint64_t print_count = test_status->print_count++;
env_time_t total_time = ENV_TIME_NULL;
for (int i = 0; i < num_cores; i++)
total_time = env_time_sum(total_time, core_stat[i].diff_time);
double cycles_per_event = 0.0;
double events_per_sec = 0.0;
if (likely(total_events > 0))
cycles_per_event = env_time_to_cycles(total_time, cpu_hz) /
(double)total_events;
if (likely(cycles_per_event > 0)) /* Million events/s: */
events_per_sec = cpu_mhz * num_cores / cycles_per_event;
/*
* Print without latency statistics
*/
if (!MEASURE_LATENCY) {
if (print_header)
APPL_PRINT(RESULT_PRINTF_HDR);
APPL_PRINT(RESULT_PRINTF_FMT,
cycles_per_event, events_per_sec,
cpu_mhz, print_count);
return;
}
/*
* Print with latency statistics
*/
uint64_t latency_events = 0;
env_time_t latency_hi_ave = ENV_TIME_NULL;
env_time_t latency_hi_max = ENV_TIME_NULL;
env_time_t latency_lo_ave = ENV_TIME_NULL;
env_time_t latency_lo_max = ENV_TIME_NULL;
for (int i = 0; i < num_cores; i++) {
latency_events += core_stat[i].latency.events;
latency_hi_ave = env_time_sum(latency_hi_ave,
core_stat[i].latency.sched_ave);
latency_lo_ave = env_time_sum(latency_lo_ave,
core_stat[i].latency.output_ave);
if (env_time_cmp(core_stat[i].latency.sched_max,
latency_hi_max) > 0) {
latency_hi_max = core_stat[i].latency.sched_max;
}
if (env_time_cmp(core_stat[i].latency.output_max,
latency_lo_max) > 0) {
latency_lo_max = core_stat[i].latency.output_max;
}
}
double lat_per_hi_ave = 0.0;
double lat_per_lo_ave = 0.0;
if (likely(latency_events > 0)) {
lat_per_hi_ave = env_time_to_cycles(latency_hi_ave, cpu_hz) /
(double)latency_events;
lat_per_lo_ave = env_time_to_cycles(latency_lo_ave, cpu_hz) /
(double)latency_events;
}
if (print_header)
APPL_PRINT(RESULT_PRINTF_LATENCY_HDR);
APPL_PRINT(RESULT_PRINTF_LATENCY_FMT,
cycles_per_event, events_per_sec, lat_per_hi_ave,
env_time_to_cycles(latency_hi_max, cpu_hz),
lat_per_lo_ave,
env_time_to_cycles(latency_lo_max, cpu_hz),
cpu_mhz, print_count);
}
/**
* Free the input event and allocate a new one instead
*/
static inline em_event_t
alloc_free_per_event(em_event_t event)
{
perf_event_t *perf_event = em_event_pointer(event);
env_time_t send_time = perf_event->send_time;
int seq = perf_event->seq;
em_queue_t sched_queue = perf_event->sched_queue;
em_queue_t output_queue = perf_event->output_queue;
uint32_t event_size = em_event_get_size(event);
em_free(event);
event = em_alloc(event_size, EM_EVENT_TYPE_SW, perf_shm->pool);
perf_event = em_event_pointer(event);
perf_event->sched_queue = sched_queue;
perf_event->output_queue = output_queue;
perf_event->send_time = send_time;
perf_event->seq = seq;
return event;
}
/**
* Measure the scheduling latency per event
*/
static inline void
measure_latency(perf_event_t *const perf_event, queue_context_t *const q_ctx,
env_time_t recv_time)
{
const int core = em_core_id();
core_stat_t *const cstat = &perf_shm->core_stat[core];
const env_time_t send_time = perf_event->send_time;
env_time_t latency;
if (perf_shm->test_status.reset_flag ||
cstat->events == 0 || cstat->events >= EVENTS_PER_SAMPLE)
return;
cstat->latency.events++;
latency = env_time_diff(recv_time, send_time);
if (q_ctx->type != EM_QUEUE_TYPE_OUTPUT) {
cstat->latency.sched_ave =
env_time_sum(cstat->latency.sched_ave, latency);
if (env_time_cmp(latency, cstat->latency.sched_max) > 0)
cstat->latency.sched_max = latency;
} else {
cstat->latency.output_ave =
env_time_sum(cstat->latency.output_ave, latency);
if (env_time_cmp(latency, cstat->latency.output_max) > 0)
cstat->latency.output_max = latency;
}
}
EM_QUEUE_GROUP_UNDEF
#define EM_QUEUE_GROUP_UNDEF
Definition: event_machine_types.h:127
EM_OK
#define EM_OK
Definition: event_machine_types.h:329
EM_ERROR_IS_FATAL
#define EM_ERROR_IS_FATAL(error)
Definition: event_machine_hw_types.h:423
_env_atomic64
Definition: env_atomic.h:48
EM_EVENT_TYPE_SW
@ EM_EVENT_TYPE_SW
Definition: event_machine_hw_types.h:72
EM_QUEUE_PRIO_NORMAL
@ EM_QUEUE_PRIO_NORMAL
Definition: event_machine_hw_types.h:153
em_queue_conf_t
Definition: event_machine_types.h:212
EM_EVENT_UNDEF
#define EM_EVENT_UNDEF
Definition: event_machine_types.h:62
ENV_CACHE_LINE_SIZE
#define ENV_CACHE_LINE_SIZE
Definition: environment.h:62
EM_QUEUE_GROUP_DEFAULT
#define EM_QUEUE_GROUP_DEFAULT
Definition: event_machine_hw_config.h:147
EM_POOL_DEFAULT
#define EM_POOL_DEFAULT
Definition: event_machine_hw_config.h:191
em_output_queue_conf_t::args_len
size_t args_len
Definition: event_machine_hw_types.h:411
em_output_queue_conf_t::output_fn
em_output_func_t output_fn
Definition: event_machine_hw_types.h:402
PRI_POOL
#define PRI_POOL
Definition: event_machine_hw_types.h:62
PRI_EO
#define PRI_EO
Definition: event_machine_types.h:97
em_free
void em_free(em_event_t event)
Definition: event_machine_event.c:261
em_output_queue_conf_t::output_fn_args
void * output_fn_args
Definition: event_machine_hw_types.h:406
em_send
em_status_t em_send(em_event_t event, em_queue_t queue)
Definition: event_machine_event.c:661
EM_EO_UNDEF
#define EM_EO_UNDEF
Definition: event_machine_types.h:95
em_eo_add_queue_sync
em_status_t em_eo_add_queue_sync(em_eo_t eo, em_queue_t queue)
Definition: event_machine_eo.c:344
em_queue_type_t
uint32_t em_queue_type_t
Definition: event_machine_types.h:168
event_machine.h
em_eo_remove_queue_all_sync
em_status_t em_eo_remove_queue_all_sync(em_eo_t eo, int delete_queues)
Definition: event_machine_eo.c:517
em_queue_create
em_queue_t em_queue_create(const char *name, em_queue_type_t type, em_queue_prio_t prio, em_queue_group_t group, const em_queue_conf_t *conf)
Definition: event_machine_queue.c:41
EM_QUEUE_NAME_LEN
#define EM_QUEUE_NAME_LEN
Definition: event_machine_config.h:125
em_output_queue_conf_t
Definition: event_machine_hw_types.h:396
EM_TRUE
#define EM_TRUE
Definition: event_machine_types.h:53
EM_ESCOPE
#define EM_ESCOPE(escope)
Definition: event_machine_types.h:365
em_queue_delete
em_status_t em_queue_delete(em_queue_t queue)
Definition: event_machine_queue.c:95
em_queue_conf_t::flags
em_queue_flag_t flags
Definition: event_machine_types.h:218
ENV_CACHE_LINE_ALIGNED
#define ENV_CACHE_LINE_ALIGNED
Definition: environment.h:76
em_eo_delete
em_status_t em_eo_delete(em_eo_t eo)
Definition: event_machine_eo.c:205
em_alloc
em_event_t em_alloc(uint32_t size, em_event_type_t type, em_pool_t pool)
Definition: event_machine_event.c:33
em_queue_conf_t::conf_len
size_t conf_len
Definition: event_machine_types.h:229
em_queue_conf_t::conf
void * conf
Definition: event_machine_types.h:234
em_escope_t
uint32_t em_escope_t
Definition: event_machine_types.h:348
em_eo_start_sync
em_status_t em_eo_start_sync(em_eo_t eo, em_status_t *result, const em_eo_conf_t *conf)
Definition: event_machine_eo.c:725
em_status_t
uint32_t em_status_t
Definition: event_machine_types.h:321
PRI_QUEUE
#define PRI_QUEUE
Definition: event_machine_types.h:109
EM_ERROR_SET_FATAL
#define EM_ERROR_SET_FATAL(error)
Definition: event_machine_hw_types.h:428
em_unregister_error_handler
em_status_t em_unregister_error_handler(void)
Definition: event_machine_error.c:50
_env_atomic32
Definition: env_atomic.h:44
em_event_type_t
uint32_t em_event_type_t
Definition: event_machine_types.h:85
em_event_get_size
uint32_t em_event_get_size(em_event_t event)
Definition: event_machine_event.c:818
EM_QUEUE_UNDEF
#define EM_QUEUE_UNDEF
Definition: event_machine_types.h:107
em_queue_set_context
em_status_t em_queue_set_context(em_queue_t queue, const void *context)
Definition: event_machine_queue.c:112
EM_QUEUE_TYPE_OUTPUT
@ EM_QUEUE_TYPE_OUTPUT
Definition: event_machine_hw_types.h:137
em_core_id
int em_core_id(void)
Definition: event_machine_core.c:34
EM_POOL_UNDEF
#define EM_POOL_UNDEF
Definition: event_machine_hw_types.h:60
em_queue_conf_t::min_events
unsigned int min_events
Definition: event_machine_types.h:224
environment.h
em_queue_prio_t
uint32_t em_queue_prio_t
Definition: event_machine_types.h:186
em_eo_create
em_eo_t em_eo_create(const char *name, em_start_func_t start, em_start_local_func_t local_start, em_stop_func_t stop, em_stop_local_func_t local_stop, em_receive_func_t receive, const void *eo_ctx)
Definition: event_machine_eo.c:40
em_register_error_handler
em_status_t em_register_error_handler(em_error_handler_t handler)
Definition: event_machine_error.c:34
em_eo_stop_sync
em_status_t em_eo_stop_sync(em_eo_t eo)
Definition: event_machine_eo.c:897
ENV_LOCAL
#define ENV_LOCAL
Definition: environment.h:57
em_eo_conf_t
Definition: event_machine_types.h:242
EM_MAX_OUTPUT_QUEUES
#define EM_MAX_OUTPUT_QUEUES
Definition: event_machine_config.h:131
em_event_pointer
void * em_event_pointer(em_event_t event)
Definition: event_machine_event.c:750
EM_ERROR
#define EM_ERROR
Definition: event_machine_types.h:337
EM_QUEUE_FLAG_DEFAULT
#define EM_QUEUE_FLAG_DEFAULT
Definition: event_machine_hw_types.h:171