EM-ODP  3.7.0
Event Machine on ODP
queue_types_ag.c
/*
* Copyright (c) 2014, Nokia Solutions and Networks
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/**
* @file
*
* Event Machine Queue Types test example with included atomic groups.
*
* The test creates several EO-pairs and sends events between the queues in
* the pair. Each EO has an input queue (of type atomic, parallel or
* parallel-ordered) or, in the case of atomic groups, three(3) input atomic
* queues that belong to the same atomic group but have different priority.
* The events sent between the queues of the EO-pair are counted and
* statistics for each pair type is printed. If the queues in the EO-pair
* retain order also this is verified.
*/
#include <inttypes.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <event_machine.h>
#include "cm_setup.h"
#include "cm_error_handler.h"
/* Number of queue type pairs (constant, don't change) */
#define QUEUE_TYPE_PAIRS 10
/*
* Number of test EOs and queues. Must be an even number.
* Test has NUM_EO/2 EO pairs, that send ping-pong events.
* Depending on test dynamics (e.g. single burst in atomic
* queue) only one EO of a pair might be active at a time.
*/
#define NUM_EO (8 * QUEUE_TYPE_PAIRS)
/* Max number of queues supported by the test */
#define MAX_QUEUES (NUM_EO / QUEUE_TYPE_PAIRS * 30)
/* Number of ping-pong events per EO pair */
#define NUM_EVENT (3 * 32)
/* Number of data bytes in the event */
#define DATA_SIZE 64
/* Max number of cores supported by the test */
#define MAX_CORES 64
/* Print stats when the number of received events reaches this value on a core*/
#define PRINT_COUNT 0x1000000
/** Define how many events are sent per em_send_multi() call */
#define SEND_MULTI_MAX 32
/*
* Enable atomic access checks.
* If enabled will crash the application if the atomic-processing context
* is violated, i.e. checks that events from an atomic queue are being
* processed one-by-one.
*/
#define VERIFY_ATOMIC_ACCESS 1 /* 0=False or 1=True */
/*
* Verify that the receive func processing context works as expected
*/
#define VERIFY_PROCESSING_CONTEXT 1 /* 0=False or 1=True */
/* Call em_atomic_processing_end every once in a while in EO-A */
#define CALL_ATOMIC_PROCESSING_END__A 1 /* 0=False or 1=True */
/* Call em_atomic_processing_end every once in a while in EO-B */
#define CALL_ATOMIC_PROCESSING_END__B 1 /* 0=False or 1=True */
/* Return 'TRUE' if the queue pair retains event order */
#define ORDERED_PAIR(q_type_a, q_type_b) ( \
(((q_type_a) == EM_QUEUE_TYPE_ATOMIC) || \
((q_type_a) == EM_QUEUE_TYPE_PARALLEL_ORDERED)) && \
(((q_type_b) == EM_QUEUE_TYPE_ATOMIC) || \
((q_type_b) == EM_QUEUE_TYPE_PARALLEL_ORDERED)))
#define ABS(nbr1, nbr2) (((nbr1) > (nbr2)) ? ((nbr1) - (nbr2)) : \
((nbr2) - (nbr1)))
#define PRINT_CORE_STAT_FMT \
"Stat Core-%02i: Count/PairType\t" \
"A-A:%6" PRIu64 " P-P:%6" PRIu64 " PO-PO:%6" PRIu64 "\t" \
"P-A:%6" PRIu64 " PO-A:%6" PRIu64 " PO-P:%6" PRIu64 "\t" \
"AG-AG:%6" PRIu64 " AG-A:%6" PRIu64 " AG-P:%6" PRIu64 " AG-PO:%6" PRIu64 "\t" \
"cycles/event:%.0f @%.0fMHz %" PRIu64 "\n"
/**
* Combinations of Queue Type pairs
*/
#define NO_AG (0)
#define IN_AG (1)
typedef struct queue_type_pairs_ {
em_queue_type_t q_type[2];
int in_atomic_group[2];
} queue_type_pair_t;
queue_type_pair_t queue_type_pairs[QUEUE_TYPE_PAIRS] = {
/* Ordered Pair */
/* Ordered Pair */
{NO_AG, NO_AG} },
/* Ordered Pair */
{NO_AG, NO_AG} },
{NO_AG, NO_AG} },
/* With Atomic Groups for atomic queues: */
/* Ordered Pair */
/* Ordered Pair */
/* Ordered Pair */
{IN_AG, NO_AG} },
};
COMPILE_TIME_ASSERT(sizeof(queue_type_pairs) ==
(QUEUE_TYPE_PAIRS * sizeof(queue_type_pair_t)),
QUEUE_TYPE_PAIRS_SIZE_ERROR);
typedef enum {
PT_ATOMIC_ATOMIC = 0,
PT_PARALLEL_PARALLEL = 1,
PT_PARALORD_PARALORD = 2,
PT_PARALLEL_ATOMIC = 3,
PT_PARALORD_ATOMIC = 4,
PT_PARALORD_PARALLEL = 5,
/* With Atomic Groups (AG) for atomic queues: */
PT_AG_AG = 6,
PT_AG_ATOMIC = 7,
PT_AG_PARALLEL = 8,
PT_AG_PARALORD = 9,
PT_UNDEFINED
} pair_type_t;
/**
* Test statistics (per core)
*/
typedef union {
struct {
uint64_t events;
uint64_t begin_cycles;
uint64_t end_cycles;
uint64_t print_count;
/*
* Pair-Type count, i.e. the number of events belonging to
* a certain pair-type on this core
*/
uint64_t pt_count[QUEUE_TYPE_PAIRS];
};
} core_stat_t;
COMPILE_TIME_ASSERT(sizeof(core_stat_t) % ENV_CACHE_LINE_SIZE == 0,
CORE_STAT_T__SIZE_ERROR);
/**
* Test EO context
*/
typedef struct {
em_eo_t eo_hdl;
/* EO pair retains order? 0/1 */
int ordered_pair;
pair_type_t pair_type;
int owns_ag_queues;
em_atomic_group_t agrp_hdl;
int peer_owns_ag_queues;
/* Atomic group is also set as queue type atomic */
env_spinlock_t verify_atomic_access;
void *end[0] ENV_CACHE_LINE_ALIGNED;
} eo_context_t;
COMPILE_TIME_ASSERT(sizeof(eo_context_t) % ENV_CACHE_LINE_SIZE == 0,
EO_CTX_T__SIZE_ERROR);
/**
* Test Queue context
*/
typedef struct {
em_queue_t q_hdl;
int in_atomic_group;
unsigned int idx;
uint64_t seqno;
/* Number of events at the previous check-point */
uint64_t prev_events;
/*
* Total number of events handled from the queue.
* Atomically incremented, either by __atomic_add_fetch() or
* protected by atomic context (set by queue type).
*/
uint64_t num_events ENV_CACHE_LINE_ALIGNED;
void *end[0] ENV_CACHE_LINE_ALIGNED;
} queue_context_t;
COMPILE_TIME_ASSERT(sizeof(queue_context_t) % ENV_CACHE_LINE_SIZE == 0,
Q_CTX_T__SIZE_ERROR);
/* IDs stored in the event user area ID */
#define EV_ID_START_EVENT 1
#define EV_ID_DATA_EVENT 2
typedef struct {
int in_atomic_group_a;
int src_q_cnt;
em_queue_t src_queues[3];
int in_atomic_group_b;
int dst_q_cnt;
em_queue_t dst_queues[3];
} start_event_uarea_t;
typedef struct {
/* Next destination queue */
em_queue_t dest;
em_queue_t src;
/* Sequence number */
uint64_t seqno;
} data_event_uarea_t;
typedef union {
start_event_uarea_t start;
data_event_uarea_t data;
} test_event_uarea_t;
/** Data event content */
typedef struct {
/* Test data */
uint8_t data[DATA_SIZE];
} data_event_t;
typedef struct {
uint8_t u8[0]; /* no payload */
} start_event_t;
/**
* Test event, content identified by 'ev_id'
*/
typedef union {
start_event_t start;
data_event_t data;
} test_event_t;
/**
* Queue Types test shared memory
*/
typedef struct {
core_stat_t core_stat[MAX_CORES] ENV_CACHE_LINE_ALIGNED;
eo_context_t eo_context[NUM_EO] ENV_CACHE_LINE_ALIGNED;
queue_context_t queue_context[MAX_QUEUES] ENV_CACHE_LINE_ALIGNED;
unsigned num_queues ENV_CACHE_LINE_ALIGNED;
em_pool_t pool;
int teardown_in_progress;
} qtypes_shm_t;
COMPILE_TIME_ASSERT(sizeof(qtypes_shm_t) % ENV_CACHE_LINE_SIZE == 0,
QTYPES_SHM_T__SIZE_ERROR);
/* EM-core local pointer to shared memory */
static ENV_LOCAL qtypes_shm_t *qtypes_shm;
/**
* Local Function Prototypes
*/
start(void *eo_context, em_eo_t eo, const em_eo_conf_t *conf);
stop(void *eo_context, em_eo_t eo);
static void
initialize_events(const start_event_uarea_t *start);
static void
receive_a(void *eo_context, em_event_t event, em_event_type_t type,
em_queue_t queue, void *q_ctx);
static void
receive_b(void *eo_context, em_event_t event, em_event_type_t type,
em_queue_t queue, void *q_ctx);
static pair_type_t
get_pair_type(queue_type_pair_t *queue_type_pair);
static inline void
verify_seqno(eo_context_t *const eo_ctx, queue_context_t *const q_ctx,
uint64_t seqno);
static void
verify_all_queues_get_events(void);
static inline void
verify_atomic_access__begin(eo_context_t *const eo_ctx);
static inline void
verify_atomic_access__end(eo_context_t *const eo_ctx);
static inline void
verify_processing_context(eo_context_t *const eo_ctx, em_queue_t queue);
static void
print_core_stats(core_stat_t *const cstat, uint64_t print_events);
static void
print_event_msg_string(void);
static void
print_test_info(void);
/**
* Main function
*
* Call cm_setup() to perform test & EM setup common for all the
* test applications.
*
* cm_setup() will call test_init() and test_start() and launch
* the EM dispatch loop on every EM-core.
*/
int main(int argc, char *argv[])
{
return cm_setup(argc, argv);
}
/**
* Init of the Queue Types test application.
*
* @attention Run on all cores.
*
* @see cm_setup() for setup and dispatch.
*/
void test_init(const appl_conf_t *appl_conf)
{
(void)appl_conf;
int core = em_core_id();
if (core == 0) {
qtypes_shm = env_shared_reserve("QueueTypesSharedMem",
sizeof(qtypes_shm_t));
em_register_error_handler(test_error_handler);
} else {
qtypes_shm = env_shared_lookup("QueueTypesSharedMem");
}
if (qtypes_shm == NULL) {
test_error(EM_ERROR_SET_FATAL(__LINE__), 0xdead,
"Queue Types test init failed on EM-core: %u\n",
} else if (core == 0) {
memset(qtypes_shm, 0, sizeof(qtypes_shm_t));
}
}
/**
* Startup of the Queue Types test application.
*
* @attention Run only on EM core 0.
*
* @param appl_conf Application configuration
*
* @see cm_setup() for setup and dispatch.
*/
void test_start(const appl_conf_t *appl_conf)
{
em_atomic_group_t atomic_group;
em_eo_t eo;
em_queue_t queue_a, queue_b;
em_queue_t queue_ag_a1, queue_ag_a2, queue_ag_a3;
em_queue_t queue_ag_b1, queue_ag_b2, queue_ag_b3;
em_queue_type_t q_type_a, q_type_b;
em_status_t ret, start_ret = EM_ERROR;
eo_context_t *eo_ctx;
queue_context_t *q_ctx;
pair_type_t pair_type;
unsigned int qcnt = 0; /* queue context index */
int in_atomic_group_a, in_atomic_group_b;
int ordered_pair;
char eo_name[EM_EO_NAME_LEN];
char q_name[EM_QUEUE_NAME_LEN];
char ag_name[EM_ATOMIC_GROUP_NAME_LEN];
int i;
uint8_t eo_idx = 0, q_idx = 0, agrp_idx = 0;
queue_a = EM_QUEUE_UNDEF;
queue_b = EM_QUEUE_UNDEF;
queue_ag_a1 = EM_QUEUE_UNDEF;
queue_ag_a2 = EM_QUEUE_UNDEF;
queue_ag_a3 = EM_QUEUE_UNDEF;
queue_ag_b1 = EM_QUEUE_UNDEF;
queue_ag_b2 = EM_QUEUE_UNDEF;
queue_ag_b3 = EM_QUEUE_UNDEF;
/*
* Create own pool with events containing user area.
*/
em_pool_cfg_t pool_cfg;
em_pool_cfg_init(&pool_cfg);
pool_cfg.user_area.in_use = true;
pool_cfg.user_area.size = sizeof(test_event_uarea_t);
pool_cfg.num_subpools = 1;
pool_cfg.subpool[0].size = sizeof(test_event_t);
pool_cfg.subpool[0].num = NUM_EVENT * NUM_EO;
/* no cache needed, everything allocated at start-up: */
pool_cfg.subpool[0].cache_size = 0;
em_pool_t pool = em_pool_create("pool:Qtypes-AG",
EM_POOL_UNDEF, &pool_cfg);
test_fatal_if(pool == EM_POOL_UNDEF, "pool create failed");
qtypes_shm->pool = pool;
APPL_PRINT("\n"
"***********************************************************\n"
"EM APPLICATION: '%s' initializing:\n"
" %s: %s() - EM-core:%d\n"
" Application running on %u EM-cores (procs:%u, threads:%u)\n"
" using event pool:%" PRI_POOL "\n"
"***********************************************************\n"
"\n",
appl_conf->name, NO_PATH(__FILE__), __func__, em_core_id(),
appl_conf->core_count, appl_conf->num_procs, appl_conf->num_threads,
qtypes_shm->pool);
test_fatal_if(qtypes_shm->pool == EM_POOL_UNDEF,
"Undefined application event pool!");
qtypes_shm->num_queues = 0;
qtypes_shm->teardown_in_progress = EM_FALSE;
/* Create and start application pairs. Send initial test events */
for (i = 0; i < (NUM_EO / 2); i++) {
q_type_a = queue_type_pairs[i % QUEUE_TYPE_PAIRS].q_type[0];
in_atomic_group_a =
queue_type_pairs[i % QUEUE_TYPE_PAIRS].in_atomic_group[0];
q_type_b = queue_type_pairs[i % QUEUE_TYPE_PAIRS].q_type[1];
in_atomic_group_b =
queue_type_pairs[i % QUEUE_TYPE_PAIRS].in_atomic_group[1];
ordered_pair = ORDERED_PAIR(q_type_a, q_type_b);
pair_type =
get_pair_type(&queue_type_pairs[i % QUEUE_TYPE_PAIRS]);
test_fatal_if(pair_type == PT_UNDEFINED,
"Queue Pair Type UNDEFINED! (%u, %u)",
q_type_a, q_type_b);
/* Create EO "A" */
ret = EM_OK;
eo_ctx = &qtypes_shm->eo_context[2 * i];
eo_ctx->ordered_pair = ordered_pair;
eo_ctx->pair_type = pair_type;
eo_ctx->q_type = q_type_a;
eo_ctx->owns_ag_queues = in_atomic_group_a;
eo_ctx->agrp_hdl = EM_ATOMIC_GROUP_UNDEF;
eo_ctx->peer_owns_ag_queues = in_atomic_group_b;
snprintf(eo_name, sizeof(eo_name), "EO-A%" PRIu8 "", ++eo_idx);
eo_name[sizeof(eo_name) - 1] = '\0';
eo = em_eo_create(eo_name, start, NULL, stop, NULL, receive_a,
eo_ctx);
if (in_atomic_group_a && q_type_a == EM_QUEUE_TYPE_ATOMIC) {
snprintf(ag_name, sizeof(ag_name), "AG-A%" PRIu8 "",
++agrp_idx);
ag_name[sizeof(ag_name) - 1] = '\0';
atomic_group =
test_fatal_if(atomic_group == EM_ATOMIC_GROUP_UNDEF,
"Atomic group creation failed!");
eo_ctx->agrp_hdl = atomic_group;
snprintf(q_name, sizeof(q_name), "AG:Q-A%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
queue_ag_a1 = em_queue_create_ag(q_name,
atomic_group, NULL);
snprintf(q_name, sizeof(q_name), "AG:Q-A%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
queue_ag_a2 = em_queue_create_ag(q_name,
atomic_group, NULL);
snprintf(q_name, sizeof(q_name), "AG:Q-A%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
queue_ag_a3 = em_queue_create_ag(q_name,
atomic_group, NULL);
ret = em_eo_add_queue_sync(eo, queue_ag_a1);
test_fatal_if(ret != EM_OK, "EO-A setup failed!");
ret = em_eo_add_queue_sync(eo, queue_ag_a2);
test_fatal_if(ret != EM_OK, "EO-A setup failed!");
ret = em_eo_add_queue_sync(eo, queue_ag_a3);
test_fatal_if(ret != EM_OK, "EO-A setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_a1;
q_ctx->q_type = q_type_a;
q_ctx->in_atomic_group = in_atomic_group_a;
q_ctx->idx = qcnt++;
ret = em_queue_set_context(queue_ag_a1, q_ctx);
test_fatal_if(ret != EM_OK, "EO-A setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_a2;
q_ctx->q_type = q_type_a;
q_ctx->in_atomic_group = in_atomic_group_a;
q_ctx->idx = qcnt++;
ret = em_queue_set_context(queue_ag_a2, q_ctx);
test_fatal_if(ret != EM_OK, "EO-A setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_a3;
q_ctx->q_type = q_type_a;
q_ctx->in_atomic_group = in_atomic_group_a;
q_ctx->idx = qcnt++;
ret = em_queue_set_context(queue_ag_a3, q_ctx);
test_fatal_if(ret != EM_OK, "EO-A setup failed!");
} else {
snprintf(q_name, sizeof(q_name), "Q-A%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
queue_a = em_queue_create(q_name, q_type_a,
ret = em_eo_add_queue_sync(eo, queue_a);
test_fatal_if(ret != EM_OK, "EO-A setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_a;
q_ctx->q_type = q_type_a;
q_ctx->in_atomic_group = in_atomic_group_a;
q_ctx->idx = qcnt++;
ret = em_queue_set_context(queue_a, q_ctx);
test_fatal_if(ret != EM_OK, "EO-A setup failed!");
}
/* update qcnt each round to avoid == 0 in recv-func */
qtypes_shm->num_queues = qcnt;
/* Start EO-A */
ret = em_eo_start_sync(eo, &start_ret, NULL);
test_fatal_if(ret != EM_OK || start_ret != EM_OK,
"EO-A setup failed:%" PRI_STAT " %" PRI_STAT "",
ret, start_ret);
/* Create EO "B" */
ret = EM_OK;
eo_ctx = &qtypes_shm->eo_context[2 * i + 1];
eo_ctx->ordered_pair = ordered_pair;
eo_ctx->pair_type = pair_type;
eo_ctx->q_type = q_type_b;
eo_ctx->owns_ag_queues = in_atomic_group_b;
eo_ctx->agrp_hdl = EM_ATOMIC_GROUP_UNDEF;
eo_ctx->peer_owns_ag_queues = in_atomic_group_a;
snprintf(eo_name, sizeof(eo_name), "EO-B%" PRIu8 "", ++eo_idx);
eo_name[sizeof(eo_name) - 1] = '\0';
eo = em_eo_create(eo_name, start, NULL, stop, NULL, receive_b,
eo_ctx);
if (in_atomic_group_b && q_type_b == EM_QUEUE_TYPE_ATOMIC) {
snprintf(ag_name, sizeof(ag_name), "AG-B%" PRIu8 "",
++agrp_idx);
ag_name[sizeof(ag_name) - 1] = '\0';
atomic_group =
test_fatal_if(atomic_group == EM_ATOMIC_GROUP_UNDEF,
"Atomic group creation failed!");
eo_ctx->agrp_hdl = atomic_group;
snprintf(q_name, sizeof(q_name), "AG:Q-B%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
queue_ag_b1 = em_queue_create_ag(q_name,
atomic_group, NULL);
snprintf(q_name, sizeof(q_name), "AG:Q-B%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
queue_ag_b2 = em_queue_create_ag(q_name,
atomic_group, NULL);
snprintf(q_name, sizeof(q_name), "AG:Q-B%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
queue_ag_b3 = em_queue_create_ag(q_name,
atomic_group, NULL);
ret = em_eo_add_queue_sync(eo, queue_ag_b1);
test_fatal_if(ret != EM_OK, "EO-B setup failed!");
ret = em_eo_add_queue_sync(eo, queue_ag_b2);
test_fatal_if(ret != EM_OK, "EO-B setup failed!");
ret = em_eo_add_queue_sync(eo, queue_ag_b3);
test_fatal_if(ret != EM_OK, "EO-B setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_b1;
q_ctx->q_type = q_type_b;
q_ctx->in_atomic_group = in_atomic_group_b;
q_ctx->idx = qcnt++;
ret = em_queue_set_context(queue_ag_b1, q_ctx);
test_fatal_if(ret != EM_OK, "EO-B setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_b2;
q_ctx->q_type = q_type_b;
q_ctx->in_atomic_group = in_atomic_group_b;
q_ctx->idx = qcnt++;
ret = em_queue_set_context(queue_ag_b2, q_ctx);
test_fatal_if(ret != EM_OK, "EO-B setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_b3;
q_ctx->q_type = q_type_b;
q_ctx->in_atomic_group = in_atomic_group_b;
q_ctx->idx = qcnt++;
ret = em_queue_set_context(queue_ag_b3, q_ctx);
test_fatal_if(ret != EM_OK, "EO-B setup failed!");
} else {
snprintf(q_name, sizeof(q_name), "Q-B%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
queue_b = em_queue_create(q_name, q_type_b,
ret = em_eo_add_queue_sync(eo, queue_b);
test_fatal_if(ret != EM_OK, "EO-B setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_b;
q_ctx->q_type = q_type_b;
q_ctx->in_atomic_group = in_atomic_group_b;
q_ctx->idx = qcnt++;
ret = em_queue_set_context(queue_b, q_ctx);
test_fatal_if(ret != EM_OK, "EO-B setup failed!");
}
/* update qcnt each round to avoid == 0 in recv-func */
qtypes_shm->num_queues = qcnt;
/* Start EO-B */
ret = em_eo_start_sync(eo, &start_ret, NULL);
test_fatal_if(ret != EM_OK || start_ret != EM_OK,
"EO-B setup failed:%" PRI_STAT " %" PRI_STAT "",
ret, start_ret);
/*
* Allocate and send the startup event to the first EO of the
* pair of this round.
*/
em_event_t event = em_alloc(sizeof(test_event_t),
qtypes_shm->pool);
test_fatal_if(event == EM_EVENT_UNDEF, "Event alloc fails");
size_t uarea_size;
test_event_uarea_t *test_uarea;
test_uarea = em_event_uarea_get(event, &uarea_size);
test_fatal_if(!test_uarea && uarea_size < sizeof(test_event_uarea_t),
"Event User Area error: ptr:%p sz:%zu < %zu",
test_uarea, uarea_size, sizeof(test_event_uarea_t));
ret = em_event_uarea_id_set(event, EV_ID_START_EVENT);
test_fatal_if(ret != EM_OK,
"Error setting uarea id, err:%" PRI_STAT "");
test_uarea->start.in_atomic_group_a = in_atomic_group_a;
if (in_atomic_group_a) {
test_uarea->start.src_q_cnt = 3;
test_uarea->start.src_queues[0] = queue_ag_a1;
test_uarea->start.src_queues[1] = queue_ag_a2;
test_uarea->start.src_queues[2] = queue_ag_a3;
} else {
test_uarea->start.src_q_cnt = 1;
test_uarea->start.src_queues[0] = queue_a;
}
test_uarea->start.in_atomic_group_b = in_atomic_group_b;
if (in_atomic_group_b) {
test_uarea->start.dst_q_cnt = 3;
test_uarea->start.dst_queues[0] = queue_ag_b1;
test_uarea->start.dst_queues[1] = queue_ag_b2;
test_uarea->start.dst_queues[2] = queue_ag_b3;
} else {
test_uarea->start.dst_q_cnt = 1;
test_uarea->start.dst_queues[0] = queue_b;
}
ret = em_send(event, test_uarea->start.src_queues[0]);
test_fatal_if(ret != EM_OK, "Event send:%" PRI_STAT "", ret);
}
APPL_PRINT("\n\nqctx:%i MAX:%i\n\n", qcnt, MAX_QUEUES);
test_fatal_if(qcnt > MAX_QUEUES || qtypes_shm->num_queues != qcnt,
"Queue context number too high!");
print_test_info();
}
/**
* Test stop function
*
* @attention Run only on one EM core
*
* @param appl_conf Application configuration
*
* @see cm_setup() for setup and teardown.
*/
void test_stop(const appl_conf_t *appl_conf)
{
const int core = em_core_id();
em_eo_t eo;
eo_context_t *eo_ctx;
int i;
(void)appl_conf;
/* mark 'teardown in progress' to avoid errors seq.nbr check errors */
qtypes_shm->teardown_in_progress = EM_TRUE;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
/* stop all EOs */
for (i = 0; i < NUM_EO; i++) {
eo_ctx = &qtypes_shm->eo_context[i];
eo = eo_ctx->eo_hdl;
ret = em_eo_stop_sync(eo);
test_fatal_if(ret != EM_OK,
"EO stop:%" PRI_STAT " EO:%" PRI_EO "",
ret, eo);
}
}
/**
* Termination of the 'Queue Types AG' test application.
*
* @attention Run on one EM core only
*
* @see cm_setup() for setup and teardown.
*/
void test_term(const appl_conf_t *appl_conf)
{
(void)appl_conf;
int core = em_core_id();
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
em_status_t ret = em_pool_delete(qtypes_shm->pool);
test_fatal_if(ret != EM_OK,
"em_pool_delete(%" PRI_POOL "):%" PRI_STAT "",
qtypes_shm->pool, ret);
if (core == 0) {
env_shared_free(qtypes_shm);
}
}
/**
* @private
*
* EO start function.
*/
start(void *eo_context, em_eo_t eo, const em_eo_conf_t *conf)
{
eo_context_t *eo_ctx = eo_context;
(void)conf;
APPL_PRINT("EO %" PRI_EO " starting.\n", eo);
eo_ctx->eo_hdl = eo;
if (VERIFY_ATOMIC_ACCESS)
env_spinlock_init(&eo_ctx->verify_atomic_access);
/*
* Test: Verify that EO & queue _current() and
* _get_context() APIs work as expected.
*/
test_fatal_if(em_eo_current() != eo, "Invalid current EO");
test_fatal_if(em_eo_get_context(eo) != eo_context,
"Invalid current EO context");
test_fatal_if(em_queue_current() != EM_QUEUE_UNDEF,
"Invalid current queue");
return EM_OK;
}
/**
* @private
*
* EO stop function.
*/
stop(void *eo_context, em_eo_t eo)
{
eo_context_t *const eo_ctx = (eo_context_t *)eo_context;
APPL_PRINT("EO %" PRI_EO " stopping.\n", eo);
/* remove and delete all of the EO's queues */
test_fatal_if(ret != EM_OK,
"EO remove queue all:%" PRI_STAT " EO:%" PRI_EO "",
ret, eo);
if (eo_ctx->agrp_hdl != EM_ATOMIC_GROUP_UNDEF) {
ret = em_atomic_group_delete(eo_ctx->agrp_hdl);
test_fatal_if(ret != EM_OK,
"AGrp delete:%" PRI_STAT " EO:%" PRI_EO "",
ret, eo);
}
/* delete the EO at the end of the stop-function */
ret = em_eo_delete(eo);
test_fatal_if(ret != EM_OK,
"EO delete:%" PRI_STAT " EO:%" PRI_EO "",
ret, eo);
return EM_OK;
}
static void
initialize_events(const start_event_uarea_t *start)
{
/*
* Allocate and send test events to the EO-pair of this round
*/
const int max_q_cnt = start->src_q_cnt > start->dst_q_cnt ?
start->src_q_cnt : start->dst_q_cnt;
/* tmp storage for all events to send this round */
em_event_t all_events[max_q_cnt][NUM_EVENT];
/* number of events for a queue in all_events[Q][events] */
int ev_cnt[max_q_cnt];
uint64_t seqno = 0;
int j, x, y;
for (x = 0; x < max_q_cnt; x++)
ev_cnt[x] = 0;
for (j = 0; j < NUM_EVENT;) {
for (x = 0, y = 0; x < max_q_cnt; x++, y++, j++) {
em_event_t event = em_alloc(sizeof(test_event_t),
qtypes_shm->pool);
test_fatal_if(event == EM_EVENT_UNDEF,
"Event alloc fails");
test_event_t *test_event = em_event_pointer(event);
size_t uarea_size = 0;
test_event_uarea_t *test_uarea =
em_event_uarea_get(event, &uarea_size);
test_fatal_if(!test_event || !test_uarea ||
uarea_size != sizeof(test_event_uarea_t),
"Event payload/uarea error");
memset(test_event, 0, sizeof(test_event_t));
memset(test_uarea, 0, uarea_size);
ret = em_event_uarea_id_set(event, EV_ID_DATA_EVENT);
test_fatal_if(ret != EM_OK,
"Error setting uarea id, err:%" PRI_STAT "");
if (start->in_atomic_group_b)
test_uarea->data.dest = start->dst_queues[y];
else
test_uarea->data.dest = start->dst_queues[0];
test_uarea->data.src = start->src_queues[x];
if (start->in_atomic_group_a ==
start->in_atomic_group_b) {
/* verify seqno (symmetric EO-pairs)*/
test_uarea->data.seqno = seqno;
}
all_events[x][ev_cnt[x]] = event;
ev_cnt[x] += 1;
}
seqno += 1;
}
/* Send events to EO A */
for (x = 0; x < max_q_cnt; x++) {
int n, m;
int num_sent = 0;
/* Send in bursts of 'SEND_MULTI_MAX' events */
const int send_rounds = ev_cnt[x] / SEND_MULTI_MAX;
const int left_over = ev_cnt[x] % SEND_MULTI_MAX;
for (n = 0, m = 0; n < send_rounds;
n++, m += SEND_MULTI_MAX) {
num_sent += em_send_multi(&all_events[x][m],
SEND_MULTI_MAX,
start->src_queues[x]);
}
if (left_over) {
num_sent += em_send_multi(&all_events[x][m], left_over,
start->src_queues[x]);
}
test_fatal_if(num_sent != ev_cnt[x],
"Event send multi failed:%d (%d)\n"
"Q:%" PRI_QUEUE "",
num_sent, ev_cnt[x], start->src_queues[x]);
}
}
/**
* @private
*
* EO receive function for EO A.
*
* Forwards events to the next processing stage (EO)
* and calculates the event rate.
*/
static void
receive_a(void *eo_context, em_event_t event, em_event_type_t type,
em_queue_t queue, void *queue_context)
{
eo_context_t *const eo_ctx = eo_context;
queue_context_t *const q_ctx = queue_context;
test_event_uarea_t *test_uarea;
em_queue_t dest_queue;
uint64_t queue_events;
uint64_t seqno;
const int core = em_core_id();
core_stat_t *cstat = &qtypes_shm->core_stat[core];
(void)type;
if (unlikely(appl_shm->exit_flag)) {
em_free(event);
return;
}
ret = em_event_uarea_info(event, &uarea_info);
test_fatal_if(ret != EM_OK,
"em_event_uarea_info() fails:%" PRI_STAT "", ret);
test_uarea = uarea_info.uarea;
if (unlikely(uarea_info.id.value == EV_ID_START_EVENT)) {
/*
* Start-up only, one time: initialize the test event sending.
* Called from EO-receive to avoid mixing up events & sequence
* numbers in start-up for ordered EO-pairs (sending from the
* start functions could mess up the seqno:s since all the
* cores are already in the dispatch loop).
*/
initialize_events(&test_uarea->start);
em_free(event);
return;
}
test_fatal_if(uarea_info.id.value != EV_ID_DATA_EVENT,
"Unexpected ev-id:%d", uarea_info.id.value);
if (VERIFY_ATOMIC_ACCESS)
verify_atomic_access__begin(eo_ctx);
if (VERIFY_PROCESSING_CONTEXT)
verify_processing_context(eo_ctx, queue);
seqno = test_uarea->data.seqno;
/* Increment Q specific event counter (parallel Qs req atomic inc:s)*/
if (eo_ctx->q_type == EM_QUEUE_TYPE_ATOMIC)
queue_events = q_ctx->num_events++;
else
queue_events = __atomic_add_fetch(&q_ctx->num_events, 1,
__ATOMIC_RELAXED);
test_fatal_if(test_uarea->data.src != queue,
"EO-A queue mismatch:%" PRI_QUEUE "!=%" PRI_QUEUE "",
test_uarea->data.src, queue);
if (eo_ctx->ordered_pair && eo_ctx->q_type == EM_QUEUE_TYPE_ATOMIC) {
/* Verify the seq nbr to make sure event order is maintained*/
verify_seqno(eo_ctx, q_ctx, seqno);
}
dest_queue = test_uarea->data.dest;
test_uarea->data.src = test_uarea->data.dest;
test_uarea->data.dest = queue;
ret = em_send(event, dest_queue);
if (unlikely(ret != EM_OK)) {
em_free(event);
test_fatal_if(!appl_shm->exit_flag, "EO-A em_send failure");
}
if (VERIFY_ATOMIC_ACCESS)
verify_atomic_access__end(eo_ctx);
if (CALL_ATOMIC_PROCESSING_END__A) {
/* Call em_atomic_processing_end() every once in a while */
if (eo_ctx->q_type == EM_QUEUE_TYPE_ATOMIC &&
(queue_events % qtypes_shm->num_queues == q_ctx->idx))
}
/*
* Update _core_ statistics after potentially releasing the
* atomic context.
*/
uint64_t core_events = cstat->events;
uint64_t print_events = 0;
if (unlikely(core_events == 0)) {
cstat->begin_cycles = env_get_cycle();
core_events += 1;
cstat->pt_count[eo_ctx->pair_type] += 1;
} else if (unlikely(core_events > PRINT_COUNT)) {
cstat->end_cycles = env_get_cycle();
/* indicate that statistics should be printed this round: */
print_events = core_events;
core_events = 0;
} else {
core_events += 1;
cstat->pt_count[eo_ctx->pair_type] += 1;
}
cstat->events = core_events;
/* Print core specific statistics */
if (unlikely(print_events)) {
if (eo_ctx->q_type == EM_QUEUE_TYPE_ATOMIC)
if (core == 0)
verify_all_queues_get_events();
print_core_stats(cstat, print_events);
for (int i = 0; i < QUEUE_TYPE_PAIRS; i++)
cstat->pt_count[i] = 0;
cstat->begin_cycles = env_get_cycle();
}
}
/**
* @private
*
* EO receive function for EO B.
*
* Forwards events to the next processing stage (EO).
*/
static void
receive_b(void *eo_context, em_event_t event, em_event_type_t type,
em_queue_t queue, void *queue_context)
{
eo_context_t *const eo_ctx = eo_context;
queue_context_t *const q_ctx = queue_context;
em_queue_t dest_queue;
test_event_uarea_t *test_uarea;
uint64_t queue_events;
const int core = em_core_id();
core_stat_t *cstat = &qtypes_shm->core_stat[core];
(void)type;
if (unlikely(appl_shm->exit_flag)) {
em_free(event);
return;
}
if (VERIFY_ATOMIC_ACCESS)
verify_atomic_access__begin(eo_ctx);
if (VERIFY_PROCESSING_CONTEXT)
verify_processing_context(eo_ctx, queue);
ret = em_event_uarea_info(event, &uarea_info);
test_fatal_if(ret != EM_OK,
"em_event_uarea_info() fails:%" PRI_STAT "", ret);
test_fatal_if(uarea_info.id.value != EV_ID_DATA_EVENT,
"Unexpected ev-id:%d", uarea_info.id.value);
/* Increment Q specific event counter (parallel Qs req atomic inc:s) */
if (eo_ctx->q_type == EM_QUEUE_TYPE_ATOMIC)
queue_events = q_ctx->num_events++;
else
queue_events = __atomic_add_fetch(&q_ctx->num_events, 1,
__ATOMIC_RELAXED);
test_uarea = uarea_info.uarea;
test_fatal_if(test_uarea->data.src != queue,
"EO-B queue mismatch:%" PRI_QUEUE "!=%" PRI_QUEUE "",
test_uarea->data.src, queue);
if (eo_ctx->ordered_pair && eo_ctx->q_type == EM_QUEUE_TYPE_ATOMIC) {
/* Verify the seq nbr to make sure event order is maintained*/
verify_seqno(eo_ctx, q_ctx, test_uarea->data.seqno);
}
dest_queue = test_uarea->data.dest;
test_uarea->data.src = test_uarea->data.dest;
test_uarea->data.dest = queue;
ret = em_send(event, dest_queue);
if (unlikely(ret != EM_OK)) {
em_free(event);
test_fatal_if(!appl_shm->exit_flag, "EO-B em_send failure");
}
if (VERIFY_ATOMIC_ACCESS)
verify_atomic_access__end(eo_ctx);
if (CALL_ATOMIC_PROCESSING_END__B) {
/* Call em_atomic_processing_end() every once in a while */
if (eo_ctx->q_type == EM_QUEUE_TYPE_ATOMIC &&
(queue_events % qtypes_shm->num_queues == q_ctx->idx))
}
/*
* Update _core_ statistics after potentially releasing the
* atomic context.
*/
if (unlikely(cstat->events == 0))
cstat->begin_cycles = env_get_cycle();
cstat->events++;
cstat->pt_count[eo_ctx->pair_type] += 1;
}
static pair_type_t
get_pair_type(queue_type_pair_t *queue_type_pair)
{
em_queue_type_t qt1 = queue_type_pair->q_type[0];
em_queue_type_t qt2 = queue_type_pair->q_type[1];
int in_ag1 = queue_type_pair->in_atomic_group[0];
int in_ag2 = queue_type_pair->in_atomic_group[1];
switch (qt1) {
switch (qt2) {
if (in_ag1 && in_ag2)
return PT_AG_AG;
else if (in_ag1 || in_ag2)
return PT_AG_ATOMIC;
else
return PT_ATOMIC_ATOMIC;
if (in_ag1)
return PT_AG_PARALLEL;
else
return PT_PARALLEL_ATOMIC;
if (in_ag1)
return PT_AG_PARALORD;
else
return PT_PARALORD_ATOMIC;
}
break;
switch (qt2) {
if (in_ag2)
return PT_AG_PARALLEL;
else
return PT_PARALLEL_ATOMIC;
return PT_PARALLEL_PARALLEL;
return PT_PARALORD_PARALLEL;
}
break;
switch (qt2) {
if (in_ag2)
return PT_AG_PARALORD;
else
return PT_PARALORD_ATOMIC;
return PT_PARALORD_PARALLEL;
return PT_PARALORD_PARALORD;
}
break;
}
return PT_UNDEFINED;
}
static inline void
verify_seqno(eo_context_t *const eo_ctx, queue_context_t *const q_ctx,
uint64_t seqno)
{
if (unlikely(qtypes_shm->teardown_in_progress))
return;
if (eo_ctx->owns_ag_queues == eo_ctx->peer_owns_ag_queues) {
const uint64_t max_seqno = (eo_ctx->owns_ag_queues) ?
NUM_EVENT / 3 - 1 : NUM_EVENT - 1;
if (q_ctx->seqno != seqno) {
test_error((em_status_t)__LINE__, 0xdead,
"SEQUENCE ERROR A:\t"
"queue=%" PRI_QUEUE " Q-seqno=%" PRIu64 "\t"
"Event-seqno=%" PRIu64 " PT:%i",
q_ctx->q_hdl, q_ctx->seqno, seqno,
eo_ctx->pair_type);
exit(EXIT_FAILURE);
}
if (q_ctx->seqno < max_seqno)
q_ctx->seqno++;
else
q_ctx->seqno = 0;
}
}
/**
* Verifies that each queue processes all its events at least once per
* statistics round.
*/
static void
verify_all_queues_get_events(void)
{
const unsigned int num_queues = qtypes_shm->num_queues;
unsigned int i, first = 1, q_evcnt_low = 0;
uint64_t curr, prev, diff;
for (i = 0; i < num_queues; i++) {
queue_context_t *const tmp_qctx =
&qtypes_shm->queue_context[i];
const uint64_t min_events = (tmp_qctx->in_atomic_group) ?
NUM_EVENT / 3 : NUM_EVENT;
const char *q_type_str;
curr = __atomic_load_n(&tmp_qctx->num_events, __ATOMIC_RELAXED);
prev = tmp_qctx->prev_events;
diff = (curr >= prev) ?
curr - prev : UINT64_MAX - prev + curr + 1;
tmp_qctx->prev_events = curr;
if (unlikely(diff < min_events)) {
q_evcnt_low++;
if (first) {
first = 0;
print_event_msg_string();
}
switch (tmp_qctx->q_type) {
if (tmp_qctx->in_atomic_group)
q_type_str = "AG";
else
q_type_str = "A ";
break;
q_type_str = "P ";
break;
q_type_str = "PO";
break;
default:
q_type_str = "??";
break;
}
APPL_PRINT("Q=%3" PRI_QUEUE "(%s cnt:%" PRIu64 ") %c",
tmp_qctx->q_hdl, q_type_str, diff,
(q_evcnt_low % 8 == 0) ? '\n' : ' ');
}
}
if (!first)
APPL_PRINT("\nQueue count with too few events:%u\n\n",
q_evcnt_low);
}
/**
* Try to take a spinlock and if it fails we know that another core is
* processing an event from the same atomic queue or atomic group, which
* should never happen => fatal error! The lock is for verification only,
* no sync purpose whatsoever.
*/
static inline void
verify_atomic_access__begin(eo_context_t *const eo_ctx)
{
if (eo_ctx->q_type == EM_QUEUE_TYPE_ATOMIC &&
unlikely(!env_spinlock_trylock(&eo_ctx->verify_atomic_access)))
test_error(EM_ERROR_SET_FATAL(__LINE__), 0xdead,
"EO Atomic context lost!");
}
/**
* Release the verification lock
*/
static inline void
verify_atomic_access__end(eo_context_t *const eo_ctx)
{
if (eo_ctx->q_type == EM_QUEUE_TYPE_ATOMIC)
env_spinlock_unlock(&eo_ctx->verify_atomic_access);
}
/**
* Verify that the receive func processing context works as expected
*/
static inline void
verify_processing_context(eo_context_t *const eo_ctx, em_queue_t queue)
{
const em_eo_t eo = eo_ctx->eo_hdl;
em_queue_t tmp_queue;
em_queue_type_t queue_type;
/*
* Test: Verify that EO & queue _current() and
* _get_context() APIs work as expected.
*/
test_fatal_if(em_eo_current() != eo, "Invalid current EO");
test_fatal_if(em_eo_get_context(eo) != eo_ctx,
"Invalid current EO context");
test_fatal_if(em_queue_current() != queue, "Invalid current queue");
queue_type = em_queue_get_type(queue);
sched_type = em_sched_context_type_current(&tmp_queue);
test_fatal_if(tmp_queue != queue, "Invalid queue");
test_fatal_if(queue_type != eo_ctx->q_type, "Q-type mismatch");
if (queue_type == EM_QUEUE_TYPE_ATOMIC) {
test_fatal_if(sched_type != EM_SCHED_CONTEXT_TYPE_ATOMIC,
"Invalid sched context type");
} else if (queue_type == EM_QUEUE_TYPE_PARALLEL_ORDERED) {
test_fatal_if(sched_type != EM_SCHED_CONTEXT_TYPE_ORDERED,
"Invalid sched context type");
} else if (queue_type == EM_QUEUE_TYPE_PARALLEL) {
test_fatal_if(sched_type != EM_SCHED_CONTEXT_TYPE_NONE,
"Invalid sched context type");
}
}
/**
* Print core specific statistics
*/
static void
print_core_stats(core_stat_t *const cstat, uint64_t print_events)
{
uint64_t diff;
uint32_t hz;
double mhz;
double cycles_per_event;
uint64_t print_count;
diff = env_cycles_diff(cstat->end_cycles, cstat->begin_cycles);
print_count = cstat->print_count++;
cycles_per_event = (double)diff / (double)print_events;
hz = env_core_hz();
mhz = ((double)hz) / 1000000.0;
APPL_PRINT(PRINT_CORE_STAT_FMT, em_core_id(),
cstat->pt_count[0], cstat->pt_count[1], cstat->pt_count[2],
cstat->pt_count[3], cstat->pt_count[4], cstat->pt_count[5],
cstat->pt_count[6], cstat->pt_count[7], cstat->pt_count[8],
cstat->pt_count[9], cycles_per_event, mhz, print_count);
}
static void
print_event_msg_string(void)
{
APPL_PRINT("\nToo few events detected for the following queues:\n");
}
static void
print_test_info(void)
{
unsigned int num;
/* Print the EO list */
em_eo_t eo = em_eo_get_first(&num);
APPL_PRINT("%d EOs:\n", num);
while (eo != EM_EO_UNDEF) {
const char *state_str;
char buf[EM_EO_NAME_LEN];
em_queue_t q;
state = em_eo_get_state(eo);
switch (state) {
state_str = "UNDEF";
break;
state_str = "CREATED";
break;
state_str = "STARTING";
break;
state_str = "RUNNING";
break;
state_str = "STOPPING";
break;
state_str = "ERROR";
break;
default:
state_str = "UNKNOWN";
break;
}
APPL_PRINT(" EO:%" PRI_EO ":'%s' state:%s\n",
eo, buf, state_str);
q = em_eo_queue_get_first(&num, eo);
while (q != EM_QUEUE_UNDEF) {
APPL_PRINT(" - Q:%" PRI_QUEUE "\n", q);
}
}
APPL_PRINT("\n");
/* Print the queue list */
em_queue_t q = em_queue_get_first(&num);
APPL_PRINT("%d queues:\n", num);
while (q != EM_QUEUE_UNDEF) {
const char *type_str;
em_queue_t q_check;
char buf[EM_QUEUE_NAME_LEN];
type = em_queue_get_type(q);
switch (type) {
type_str = "UNDEF";
break;
type_str = "ATOMIC";
break;
type_str = "PARALLEL";
break;
type_str = "ORDERED";
break;
type_str = "UNSCHEDULED";
break;
type_str = "LOCAL";
break;
type_str = "OUTPUT";
break;
default:
type_str = "UNKNOWN";
break;
}
APPL_PRINT(" Q:%" PRI_QUEUE ":'%s'\ttype:%s\n",
q, buf, type_str);
q_check = em_queue_find(buf);
test_fatal_if(q_check != q, "Queue mismatch:\n"
"%" PRI_QUEUE " != %" PRI_QUEUE "",
q_check, q);
}
APPL_PRINT("\n");
/* Print the atomic group list */
em_atomic_group_t ag = em_atomic_group_get_first(&num);
char ag_name[EM_ATOMIC_GROUP_NAME_LEN];
APPL_PRINT("%d Atomic-Groups:\n", num);
while (ag != EM_ATOMIC_GROUP_UNDEF) {
if (ag != EM_ATOMIC_GROUP_UNDEF) {
em_queue_t ag_queue;
em_atomic_group_t ag_check;
em_atomic_group_get_name(ag, ag_name, sizeof(ag_name));
APPL_PRINT(" AG:%" PRI_AGRP ":'%s'\n", ag, ag_name);
ag_check = em_atomic_group_find(ag_name);
test_fatal_if(ag_check != ag, "AG mismatch:\n"
"%" PRI_AGRP " != %" PRI_AGRP "",
ag_check, ag);
ag_queue = em_atomic_group_queue_get_first(&num, ag);
while (ag_queue != EM_QUEUE_UNDEF) {
APPL_PRINT(" - Q:%" PRI_QUEUE "\n",
ag_queue);
}
}
}
APPL_PRINT("\n");
}
em_atomic_group_get_next
em_atomic_group_t em_atomic_group_get_next(void)
Definition: event_machine_atomic_group.c:413
EM_EO_STATE_CREATED
@ EM_EO_STATE_CREATED
Definition: event_machine_types.h:299
EM_OK
#define EM_OK
Definition: event_machine_types.h:329
EM_EVENT_TYPE_SW
@ EM_EVENT_TYPE_SW
Definition: event_machine_hw_types.h:72
EM_EO_STATE_STARTING
@ EM_EO_STATE_STARTING
Definition: event_machine_types.h:301
EM_EO_STATE_RUNNING
@ EM_EO_STATE_RUNNING
Definition: event_machine_types.h:303
EM_QUEUE_PRIO_NORMAL
@ EM_QUEUE_PRIO_NORMAL
Definition: event_machine_hw_types.h:153
EM_EVENT_UNDEF
#define EM_EVENT_UNDEF
Definition: event_machine_types.h:62
ENV_CACHE_LINE_SIZE
#define ENV_CACHE_LINE_SIZE
Definition: environment.h:62
EM_QUEUE_GROUP_DEFAULT
#define EM_QUEUE_GROUP_DEFAULT
Definition: event_machine_hw_config.h:147
em_event_uarea_info_t::value
uint16_t value
Definition: event_machine_event.h:895
em_pool_cfg_t::user_area
struct em_pool_cfg_t::@9 user_area
EM_QUEUE_TYPE_PARALLEL
@ EM_QUEUE_TYPE_PARALLEL
Definition: event_machine_hw_types.h:117
em_eo_get_name
size_t em_eo_get_name(em_eo_t eo, char *name, size_t maxlen)
Definition: event_machine_eo.c:236
em_event_uarea_id_set
em_status_t em_event_uarea_id_set(em_event_t event, uint16_t id)
Set the event user area ID.
Definition: event_machine_event.c:1568
em_atomic_group_find
em_atomic_group_t em_atomic_group_find(const char *name)
Definition: event_machine_atomic_group.c:367
PRI_POOL
#define PRI_POOL
Definition: event_machine_hw_types.h:62
em_eo_get_context
void * em_eo_get_context(em_eo_t eo)
Definition: event_machine_eo.c:1002
PRI_EO
#define PRI_EO
Definition: event_machine_types.h:97
em_free
void em_free(em_event_t event)
Definition: event_machine_event.c:261
em_atomic_processing_end
void em_atomic_processing_end(void)
Definition: event_machine_scheduler.c:34
EM_EO_STATE_UNDEF
@ EM_EO_STATE_UNDEF
Definition: event_machine_types.h:297
em_send
em_status_t em_send(em_event_t event, em_queue_t queue)
Definition: event_machine_event.c:661
EM_EO_STATE_ERROR
@ EM_EO_STATE_ERROR
Definition: event_machine_types.h:307
EM_EO_UNDEF
#define EM_EO_UNDEF
Definition: event_machine_types.h:95
em_eo_add_queue_sync
em_status_t em_eo_add_queue_sync(em_eo_t eo, em_queue_t queue)
Definition: event_machine_eo.c:344
em_event_uarea_get
void * em_event_uarea_get(em_event_t event, size_t *size)
Get a pointer to the event user area, optionally along with its size.
Definition: event_machine_event.c:1531
em_atomic_group_get_first
em_atomic_group_t em_atomic_group_get_first(unsigned int *num)
Definition: event_machine_atomic_group.c:384
em_event_uarea_info_t
Event user area information filled by em_event_uarea_info()
Definition: event_machine_event.h:881
em_pool_cfg_t::in_use
bool in_use
Definition: event_machine_pool.h:185
EM_QUEUE_TYPE_ATOMIC
@ EM_QUEUE_TYPE_ATOMIC
Definition: event_machine_hw_types.h:112
em_pool_create
em_pool_t em_pool_create(const char *name, em_pool_t pool, const em_pool_cfg_t *pool_cfg)
Definition: event_machine_pool.c:76
em_queue_type_t
uint32_t em_queue_type_t
Definition: event_machine_types.h:168
em_eo_current
em_eo_t em_eo_current(void)
Definition: event_machine_eo.c:996
em_queue_get_first
em_queue_t em_queue_get_first(unsigned int *num)
Definition: event_machine_queue.c:307
event_machine.h
em_eo_remove_queue_all_sync
em_status_t em_eo_remove_queue_all_sync(em_eo_t eo, int delete_queues)
Definition: event_machine_eo.c:517
PRI_AGRP
#define PRI_AGRP
Definition: event_machine_types.h:158
em_queue_create
em_queue_t em_queue_create(const char *name, em_queue_type_t type, em_queue_prio_t prio, em_queue_group_t group, const em_queue_conf_t *conf)
Definition: event_machine_queue.c:41
EM_QUEUE_NAME_LEN
#define EM_QUEUE_NAME_LEN
Definition: event_machine_config.h:125
em_pool_delete
em_status_t em_pool_delete(em_pool_t pool)
Definition: event_machine_pool.c:115
em_eo_get_next
em_eo_t em_eo_get_next(void)
Definition: event_machine_eo.c:1075
EM_QUEUE_TYPE_UNSCHEDULED
@ EM_QUEUE_TYPE_UNSCHEDULED
Definition: event_machine_hw_types.h:127
em_queue_current
em_queue_t em_queue_current(void)
Definition: event_machine_queue.c:302
em_atomic_group_queue_get_next
em_queue_t em_atomic_group_queue_get_next(void)
Definition: event_machine_atomic_group.c:492
EM_TRUE
#define EM_TRUE
Definition: event_machine_types.h:53
EM_QUEUE_TYPE_UNDEF
@ EM_QUEUE_TYPE_UNDEF
Definition: event_machine_hw_types.h:107
em_pool_cfg_t
Definition: event_machine_pool.h:141
em_event_uarea_info_t::uarea
void * uarea
Definition: event_machine_event.h:883
em_sched_context_type_current
em_sched_context_type_t em_sched_context_type_current(em_queue_t *queue)
Definition: event_machine_scheduler.c:89
ENV_CACHE_LINE_ALIGNED
#define ENV_CACHE_LINE_ALIGNED
Definition: environment.h:76
EM_EO_NAME_LEN
#define EM_EO_NAME_LEN
Definition: event_machine_config.h:155
em_atomic_group_create
em_atomic_group_t em_atomic_group_create(const char *name, em_queue_group_t queue_group)
Definition: event_machine_atomic_group.c:40
em_eo_delete
em_status_t em_eo_delete(em_eo_t eo)
Definition: event_machine_eo.c:205
em_alloc
em_event_t em_alloc(uint32_t size, em_event_type_t type, em_pool_t pool)
Definition: event_machine_event.c:33
em_pool_cfg_init
void em_pool_cfg_init(em_pool_cfg_t *const pool_cfg)
Definition: event_machine_pool.c:43
EM_FALSE
#define EM_FALSE
Definition: event_machine_types.h:54
em_pool_cfg_t::num
uint32_t num
Definition: event_machine_pool.h:281
EM_ATOMIC_GROUP_NAME_LEN
#define EM_ATOMIC_GROUP_NAME_LEN
Definition: event_machine_config.h:143
em_pool_cfg_t::cache_size
uint32_t cache_size
Definition: event_machine_pool.h:294
EM_EO_STATE_STOPPING
@ EM_EO_STATE_STOPPING
Definition: event_machine_types.h:305
EM_SCHED_CONTEXT_TYPE_ATOMIC
@ EM_SCHED_CONTEXT_TYPE_ATOMIC
Definition: event_machine_types.h:285
em_pool_cfg_t::num_subpools
int num_subpools
Definition: event_machine_pool.h:264
em_queue_get_name
size_t em_queue_get_name(em_queue_t queue, char *name, size_t maxlen)
Definition: event_machine_queue.c:145
em_eo_start_sync
em_status_t em_eo_start_sync(em_eo_t eo, em_status_t *result, const em_eo_conf_t *conf)
Definition: event_machine_eo.c:725
em_status_t
uint32_t em_status_t
Definition: event_machine_types.h:321
EM_QUEUE_TYPE_LOCAL
@ EM_QUEUE_TYPE_LOCAL
Definition: event_machine_hw_types.h:131
PRI_QUEUE
#define PRI_QUEUE
Definition: event_machine_types.h:109
em_atomic_group_queue_get_first
em_queue_t em_atomic_group_queue_get_first(unsigned int *num, em_atomic_group_t atomic_group)
Definition: event_machine_atomic_group.c:436
EM_SCHED_CONTEXT_TYPE_ORDERED
@ EM_SCHED_CONTEXT_TYPE_ORDERED
Definition: event_machine_types.h:289
EM_ERROR_SET_FATAL
#define EM_ERROR_SET_FATAL(error)
Definition: event_machine_hw_types.h:428
em_unregister_error_handler
em_status_t em_unregister_error_handler(void)
Definition: event_machine_error.c:50
em_atomic_group_get_name
size_t em_atomic_group_get_name(em_atomic_group_t atomic_group, char *name, size_t maxlen)
Definition: event_machine_atomic_group.c:334
EM_ATOMIC_GROUP_UNDEF
#define EM_ATOMIC_GROUP_UNDEF
Definition: event_machine_types.h:156
em_event_type_t
uint32_t em_event_type_t
Definition: event_machine_types.h:85
EM_QUEUE_UNDEF
#define EM_QUEUE_UNDEF
Definition: event_machine_types.h:107
em_queue_set_context
em_status_t em_queue_set_context(em_queue_t queue, const void *context)
Definition: event_machine_queue.c:112
EM_QUEUE_TYPE_PARALLEL_ORDERED
@ EM_QUEUE_TYPE_PARALLEL_ORDERED
Definition: event_machine_hw_types.h:122
EM_QUEUE_TYPE_OUTPUT
@ EM_QUEUE_TYPE_OUTPUT
Definition: event_machine_hw_types.h:137
em_core_id
int em_core_id(void)
Definition: event_machine_core.c:34
EM_SCHED_CONTEXT_TYPE_NONE
@ EM_SCHED_CONTEXT_TYPE_NONE
Definition: event_machine_types.h:281
em_atomic_group_delete
em_status_t em_atomic_group_delete(em_atomic_group_t atomic_group)
Definition: event_machine_atomic_group.c:181
em_queue_get_type
em_queue_type_t em_queue_get_type(em_queue_t queue)
Definition: event_machine_queue.c:201
EM_POOL_UNDEF
#define EM_POOL_UNDEF
Definition: event_machine_hw_types.h:60
environment.h
em_eo_create
em_eo_t em_eo_create(const char *name, em_start_func_t start, em_start_local_func_t local_start, em_stop_func_t stop, em_stop_local_func_t local_stop, em_receive_func_t receive, const void *eo_ctx)
Definition: event_machine_eo.c:40
em_pool_cfg_t::subpool
struct em_pool_cfg_t::@11 subpool[EM_MAX_SUBPOOLS]
em_eo_queue_get_first
em_queue_t em_eo_queue_get_first(unsigned int *num, em_eo_t eo)
Definition: event_machine_eo.c:1093
em_register_error_handler
em_status_t em_register_error_handler(em_error_handler_t handler)
Definition: event_machine_error.c:34
em_queue_create_ag
em_queue_t em_queue_create_ag(const char *name, em_queue_prio_t prio, em_atomic_group_t atomic_group, const em_queue_conf_t *conf)
Definition: event_machine_atomic_group.c:224
em_eo_queue_get_next
em_queue_t em_eo_queue_get_next(void)
Definition: event_machine_eo.c:1140
em_eo_stop_sync
em_status_t em_eo_stop_sync(em_eo_t eo)
Definition: event_machine_eo.c:897
em_queue_get_next
em_queue_t em_queue_get_next(void)
Definition: event_machine_queue.c:333
em_eo_state_t
em_eo_state_t
Definition: event_machine_types.h:295
em_pool_cfg_t::size
size_t size
Definition: event_machine_pool.h:224
em_send_multi
int em_send_multi(const em_event_t events[], int num, em_queue_t queue)
Definition: event_machine_event.c:710
ENV_LOCAL
#define ENV_LOCAL
Definition: environment.h:57
em_eo_conf_t
Definition: event_machine_types.h:242
em_eo_get_state
em_eo_state_t em_eo_get_state(em_eo_t eo)
Definition: event_machine_eo.c:1031
em_pool_cfg_t::event_type
em_event_type_t event_type
Definition: event_machine_pool.h:156
em_event_uarea_info_t::id
struct em_event_uarea_info_t::@1 id
em_eo_get_first
em_eo_t em_eo_get_first(unsigned int *num)
Definition: event_machine_eo.c:1051
em_event_pointer
void * em_event_pointer(em_event_t event)
Definition: event_machine_event.c:750
EM_ERROR
#define EM_ERROR
Definition: event_machine_types.h:337
em_sched_context_type_t
em_sched_context_type_t
Definition: event_machine_types.h:277
em_queue_find
em_queue_t em_queue_find(const char *name)
Definition: event_machine_queue.c:169
em_event_uarea_info
em_status_t em_event_uarea_info(em_event_t event, em_event_uarea_info_t *uarea_info)
Get the event user area information for a given event.
Definition: event_machine_event.c:1628