#include <inttypes.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include "cm_setup.h"
#include "cm_error_handler.h"
#define QUEUE_TYPE_PAIRS 10
#define NUM_EO (8 * QUEUE_TYPE_PAIRS)
#define MAX_QUEUES (NUM_EO / QUEUE_TYPE_PAIRS * 30)
#define NUM_EVENT (3 * 32)
#define DATA_SIZE 64
#define MAX_CORES 64
#define PRINT_COUNT 0x1000000
#define SEND_MULTI_MAX 32
#define VERIFY_ATOMIC_ACCESS 1
#define VERIFY_PROCESSING_CONTEXT 1
#define CALL_ATOMIC_PROCESSING_END__A 1
#define CALL_ATOMIC_PROCESSING_END__B 1
#define ORDERED_PAIR(q_type_a, q_type_b) ( \
(((q_type_a) == EM_QUEUE_TYPE_ATOMIC) || \
((q_type_a) == EM_QUEUE_TYPE_PARALLEL_ORDERED)) && \
(((q_type_b) == EM_QUEUE_TYPE_ATOMIC) || \
((q_type_b) == EM_QUEUE_TYPE_PARALLEL_ORDERED)))
#define ABS(nbr1, nbr2) (((nbr1) > (nbr2)) ? ((nbr1) - (nbr2)) : \
((nbr2) - (nbr1)))
#define PRINT_CORE_STAT_FMT \
"Stat Core-%02i: Count/PairType\t" \
"A-A:%6" PRIu64 " P-P:%6" PRIu64 " PO-PO:%6" PRIu64 "\t" \
"P-A:%6" PRIu64 " PO-A:%6" PRIu64 " PO-P:%6" PRIu64 "\t" \
"AG-AG:%6" PRIu64 " AG-A:%6" PRIu64 " AG-P:%6" PRIu64 " AG-PO:%6" PRIu64 "\t" \
"cycles/event:%.0f @%.0fMHz %" PRIu64 "\n"
#define NO_AG (0)
#define IN_AG (1)
typedef struct queue_type_pairs_ {
int in_atomic_group[2];
} queue_type_pair_t;
queue_type_pair_t queue_type_pairs[QUEUE_TYPE_PAIRS] = {
{NO_AG, NO_AG} },
{NO_AG, NO_AG} },
{NO_AG, NO_AG} },
{IN_AG, NO_AG} },
};
COMPILE_TIME_ASSERT(sizeof(queue_type_pairs) ==
(QUEUE_TYPE_PAIRS * sizeof(queue_type_pair_t)),
QUEUE_TYPE_PAIRS_SIZE_ERROR);
typedef enum {
PT_ATOMIC_ATOMIC = 0,
PT_PARALLEL_PARALLEL = 1,
PT_PARALORD_PARALORD = 2,
PT_PARALLEL_ATOMIC = 3,
PT_PARALORD_ATOMIC = 4,
PT_PARALORD_PARALLEL = 5,
PT_AG_AG = 6,
PT_AG_ATOMIC = 7,
PT_AG_PARALLEL = 8,
PT_AG_PARALORD = 9,
PT_UNDEFINED
} pair_type_t;
typedef union {
struct {
uint64_t events;
uint64_t begin_cycles;
uint64_t end_cycles;
uint64_t print_count;
uint64_t pt_count[QUEUE_TYPE_PAIRS];
};
} core_stat_t;
CORE_STAT_T__SIZE_ERROR);
typedef struct {
em_eo_t eo_hdl;
int ordered_pair;
pair_type_t pair_type;
int owns_ag_queues;
em_atomic_group_t agrp_hdl;
int peer_owns_ag_queues;
env_spinlock_t verify_atomic_access;
} eo_context_t;
EO_CTX_T__SIZE_ERROR);
typedef struct {
em_queue_t q_hdl;
int in_atomic_group;
unsigned int idx;
uint64_t seqno;
uint64_t prev_events;
} queue_context_t;
Q_CTX_T__SIZE_ERROR);
#define EV_ID_START_EVENT 1
#define EV_ID_DATA_EVENT 2
typedef struct {
int in_atomic_group_a;
int src_q_cnt;
em_queue_t src_queues[3];
int in_atomic_group_b;
int dst_q_cnt;
em_queue_t dst_queues[3];
} start_event_uarea_t;
typedef struct {
em_queue_t dest;
em_queue_t src;
uint64_t seqno;
} data_event_uarea_t;
typedef union {
start_event_uarea_t start;
data_event_uarea_t data;
} test_event_uarea_t;
typedef struct {
uint8_t data[DATA_SIZE];
} data_event_t;
typedef struct {
uint8_t u8[0];
} start_event_t;
typedef union {
start_event_t start;
data_event_t data;
} test_event_t;
typedef struct {
em_pool_t pool;
int teardown_in_progress;
} qtypes_shm_t;
QTYPES_SHM_T__SIZE_ERROR);
start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf);
stop(void *eo_context, em_eo_t eo);
static void
initialize_events(const start_event_uarea_t *start);
static void
em_queue_t queue, void *q_ctx);
static void
em_queue_t queue, void *q_ctx);
static pair_type_t
get_pair_type(queue_type_pair_t *queue_type_pair);
static inline void
verify_seqno(eo_context_t *const eo_ctx, queue_context_t *const q_ctx,
uint64_t seqno);
static void
verify_all_queues_get_events(void);
static inline void
verify_atomic_access__begin(eo_context_t *const eo_ctx);
static inline void
verify_atomic_access__end(eo_context_t *const eo_ctx);
static inline void
verify_processing_context(eo_context_t *const eo_ctx, em_queue_t queue);
static void
print_core_stats(core_stat_t *const cstat, uint64_t print_events);
static void
print_event_msg_string(void);
static void
print_test_info(void);
int main(int argc, char *argv[])
{
return cm_setup(argc, argv);
}
void test_init(const appl_conf_t *appl_conf)
{
(void)appl_conf;
if (core == 0) {
qtypes_shm = env_shared_reserve("QueueTypesSharedMem",
sizeof(qtypes_shm_t));
} else {
qtypes_shm = env_shared_lookup("QueueTypesSharedMem");
}
if (qtypes_shm == NULL) {
"Queue Types test init failed on EM-core: %u\n",
} else if (core == 0) {
memset(qtypes_shm, 0, sizeof(qtypes_shm_t));
}
}
void test_start(const appl_conf_t *appl_conf)
{
em_atomic_group_t atomic_group;
em_eo_t eo;
em_queue_t queue_a, queue_b;
em_queue_t queue_ag_a1, queue_ag_a2, queue_ag_a3;
em_queue_t queue_ag_b1, queue_ag_b2, queue_ag_b3;
eo_context_t *eo_ctx;
queue_context_t *q_ctx;
pair_type_t pair_type;
unsigned int qcnt = 0;
int in_atomic_group_a, in_atomic_group_b;
int ordered_pair;
int i;
uint8_t eo_idx = 0, q_idx = 0, agrp_idx = 0;
qtypes_shm->pool = pool;
APPL_PRINT("\n"
"***********************************************************\n"
"EM APPLICATION: '%s' initializing:\n"
" %s: %s() - EM-core:%d\n"
" Application running on %u EM-cores (procs:%u, threads:%u)\n"
"***********************************************************\n"
"\n",
appl_conf->name, NO_PATH(__FILE__), __func__,
em_core_id(),
appl_conf->core_count, appl_conf->num_procs, appl_conf->num_threads,
qtypes_shm->pool);
"Undefined application event pool!");
qtypes_shm->num_queues = 0;
qtypes_shm->teardown_in_progress =
EM_FALSE;
for (i = 0; i < (NUM_EO / 2); i++) {
q_type_a = queue_type_pairs[i % QUEUE_TYPE_PAIRS].q_type[0];
in_atomic_group_a =
queue_type_pairs[i % QUEUE_TYPE_PAIRS].in_atomic_group[0];
q_type_b = queue_type_pairs[i % QUEUE_TYPE_PAIRS].q_type[1];
in_atomic_group_b =
queue_type_pairs[i % QUEUE_TYPE_PAIRS].in_atomic_group[1];
ordered_pair = ORDERED_PAIR(q_type_a, q_type_b);
pair_type =
get_pair_type(&queue_type_pairs[i % QUEUE_TYPE_PAIRS]);
test_fatal_if(pair_type == PT_UNDEFINED,
"Queue Pair Type UNDEFINED! (%u, %u)",
q_type_a, q_type_b);
eo_ctx = &qtypes_shm->eo_context[2 * i];
eo_ctx->ordered_pair = ordered_pair;
eo_ctx->pair_type = pair_type;
eo_ctx->q_type = q_type_a;
eo_ctx->owns_ag_queues = in_atomic_group_a;
eo_ctx->peer_owns_ag_queues = in_atomic_group_b;
snprintf(eo_name, sizeof(eo_name), "EO-A%" PRIu8 "", ++eo_idx);
eo_name[sizeof(eo_name) - 1] = '\0';
eo =
em_eo_create(eo_name, start, NULL, stop, NULL, receive_a,
eo_ctx);
snprintf(ag_name, sizeof(ag_name), "AG-A%" PRIu8 "",
++agrp_idx);
ag_name[sizeof(ag_name) - 1] = '\0';
atomic_group =
"Atomic group creation failed!");
eo_ctx->agrp_hdl = atomic_group;
snprintf(q_name, sizeof(q_name), "AG:Q-A%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
atomic_group, NULL);
snprintf(q_name, sizeof(q_name), "AG:Q-A%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
atomic_group, NULL);
snprintf(q_name, sizeof(q_name), "AG:Q-A%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
atomic_group, NULL);
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_a1;
q_ctx->q_type = q_type_a;
q_ctx->in_atomic_group = in_atomic_group_a;
q_ctx->idx = qcnt++;
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_a2;
q_ctx->q_type = q_type_a;
q_ctx->in_atomic_group = in_atomic_group_a;
q_ctx->idx = qcnt++;
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_a3;
q_ctx->q_type = q_type_a;
q_ctx->in_atomic_group = in_atomic_group_a;
q_ctx->idx = qcnt++;
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
} else {
snprintf(q_name, sizeof(q_name), "Q-A%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_a;
q_ctx->q_type = q_type_a;
q_ctx->in_atomic_group = in_atomic_group_a;
q_ctx->idx = qcnt++;
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
}
qtypes_shm->num_queues = qcnt;
"EO-A setup failed:%" PRI_STAT " %" PRI_STAT "",
ret, start_ret);
eo_ctx = &qtypes_shm->eo_context[2 * i + 1];
eo_ctx->ordered_pair = ordered_pair;
eo_ctx->pair_type = pair_type;
eo_ctx->q_type = q_type_b;
eo_ctx->owns_ag_queues = in_atomic_group_b;
eo_ctx->peer_owns_ag_queues = in_atomic_group_a;
snprintf(eo_name, sizeof(eo_name), "EO-B%" PRIu8 "", ++eo_idx);
eo_name[sizeof(eo_name) - 1] = '\0';
eo =
em_eo_create(eo_name, start, NULL, stop, NULL, receive_b,
eo_ctx);
snprintf(ag_name, sizeof(ag_name), "AG-B%" PRIu8 "",
++agrp_idx);
ag_name[sizeof(ag_name) - 1] = '\0';
atomic_group =
"Atomic group creation failed!");
eo_ctx->agrp_hdl = atomic_group;
snprintf(q_name, sizeof(q_name), "AG:Q-B%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
atomic_group, NULL);
snprintf(q_name, sizeof(q_name), "AG:Q-B%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
atomic_group, NULL);
snprintf(q_name, sizeof(q_name), "AG:Q-B%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
atomic_group, NULL);
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_b1;
q_ctx->q_type = q_type_b;
q_ctx->in_atomic_group = in_atomic_group_b;
q_ctx->idx = qcnt++;
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_b2;
q_ctx->q_type = q_type_b;
q_ctx->in_atomic_group = in_atomic_group_b;
q_ctx->idx = qcnt++;
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_b3;
q_ctx->q_type = q_type_b;
q_ctx->in_atomic_group = in_atomic_group_b;
q_ctx->idx = qcnt++;
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
} else {
snprintf(q_name, sizeof(q_name), "Q-B%" PRIu8 "",
++q_idx);
q_name[sizeof(q_name) - 1] = '\0';
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_b;
q_ctx->q_type = q_type_b;
q_ctx->in_atomic_group = in_atomic_group_b;
q_ctx->idx = qcnt++;
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
}
qtypes_shm->num_queues = qcnt;
"EO-B setup failed:%" PRI_STAT " %" PRI_STAT "",
ret, start_ret);
em_event_t
event =
em_alloc(
sizeof(test_event_t),
qtypes_shm->pool);
size_t uarea_size;
test_event_uarea_t *test_uarea;
test_fatal_if(!test_uarea && uarea_size < sizeof(test_event_uarea_t),
"Event User Area error: ptr:%p sz:%zu < %zu",
test_uarea, uarea_size, sizeof(test_event_uarea_t));
test_fatal_if(ret !=
EM_OK,
"Error setting uarea id, err:%" PRI_STAT "");
test_uarea->start.in_atomic_group_a = in_atomic_group_a;
if (in_atomic_group_a) {
test_uarea->start.src_q_cnt = 3;
test_uarea->start.src_queues[0] = queue_ag_a1;
test_uarea->start.src_queues[1] = queue_ag_a2;
test_uarea->start.src_queues[2] = queue_ag_a3;
} else {
test_uarea->start.src_q_cnt = 1;
test_uarea->start.src_queues[0] = queue_a;
}
test_uarea->start.in_atomic_group_b = in_atomic_group_b;
if (in_atomic_group_b) {
test_uarea->start.dst_q_cnt = 3;
test_uarea->start.dst_queues[0] = queue_ag_b1;
test_uarea->start.dst_queues[1] = queue_ag_b2;
test_uarea->start.dst_queues[2] = queue_ag_b3;
} else {
test_uarea->start.dst_q_cnt = 1;
test_uarea->start.dst_queues[0] = queue_b;
}
ret =
em_send(event, test_uarea->start.src_queues[0]);
test_fatal_if(ret !=
EM_OK,
"Event send:%" PRI_STAT
"", ret);
}
APPL_PRINT("\n\nqctx:%i MAX:%i\n\n", qcnt, MAX_QUEUES);
test_fatal_if(qcnt > MAX_QUEUES || qtypes_shm->num_queues != qcnt,
"Queue context number too high!");
print_test_info();
}
void test_stop(const appl_conf_t *appl_conf)
{
em_eo_t eo;
eo_context_t *eo_ctx;
int i;
(void)appl_conf;
qtypes_shm->teardown_in_progress =
EM_TRUE;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
for (i = 0; i < NUM_EO; i++) {
eo_ctx = &qtypes_shm->eo_context[i];
eo = eo_ctx->eo_hdl;
test_fatal_if(ret !=
EM_OK,
"EO stop:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
}
}
void test_term(const appl_conf_t *appl_conf)
{
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
test_fatal_if(ret !=
EM_OK,
"em_pool_delete(%" PRI_POOL "):%" PRI_STAT
"",
qtypes_shm->pool, ret);
if (core == 0) {
env_shared_free(qtypes_shm);
}
}
start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf)
{
eo_context_t *eo_ctx = eo_context;
(void)conf;
APPL_PRINT(
"EO %" PRI_EO " starting.\n", eo);
eo_ctx->eo_hdl = eo;
if (VERIFY_ATOMIC_ACCESS)
env_spinlock_init(&eo_ctx->verify_atomic_access);
"Invalid current EO context");
"Invalid current queue");
}
stop(void *eo_context, em_eo_t eo)
{
eo_context_t *const eo_ctx = (eo_context_t *)eo_context;
APPL_PRINT(
"EO %" PRI_EO " stopping.\n", eo);
test_fatal_if(ret !=
EM_OK,
"EO remove queue all:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
test_fatal_if(ret !=
EM_OK,
"AGrp delete:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
}
test_fatal_if(ret !=
EM_OK,
"EO delete:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
}
static void
initialize_events(const start_event_uarea_t *start)
{
const int max_q_cnt = start->src_q_cnt > start->dst_q_cnt ?
start->src_q_cnt : start->dst_q_cnt;
em_event_t all_events[max_q_cnt][NUM_EVENT];
int ev_cnt[max_q_cnt];
uint64_t seqno = 0;
int j, x, y;
for (x = 0; x < max_q_cnt; x++)
ev_cnt[x] = 0;
for (j = 0; j < NUM_EVENT;) {
for (x = 0, y = 0; x < max_q_cnt; x++, y++, j++) {
em_event_t
event =
em_alloc(
sizeof(test_event_t),
qtypes_shm->pool);
"Event alloc fails");
size_t uarea_size = 0;
test_event_uarea_t *test_uarea =
test_fatal_if(!test_event || !test_uarea ||
uarea_size != sizeof(test_event_uarea_t),
"Event payload/uarea error");
memset(test_event, 0, sizeof(test_event_t));
memset(test_uarea, 0, uarea_size);
test_fatal_if(ret !=
EM_OK,
"Error setting uarea id, err:%" PRI_STAT "");
if (start->in_atomic_group_b)
test_uarea->data.dest = start->dst_queues[y];
else
test_uarea->data.dest = start->dst_queues[0];
test_uarea->data.src = start->src_queues[x];
if (start->in_atomic_group_a ==
start->in_atomic_group_b) {
test_uarea->data.seqno = seqno;
}
all_events[x][ev_cnt[x]] = event;
ev_cnt[x] += 1;
}
seqno += 1;
}
for (x = 0; x < max_q_cnt; x++) {
int n, m;
int num_sent = 0;
const int send_rounds = ev_cnt[x] / SEND_MULTI_MAX;
const int left_over = ev_cnt[x] % SEND_MULTI_MAX;
for (n = 0, m = 0; n < send_rounds;
n++, m += SEND_MULTI_MAX) {
SEND_MULTI_MAX,
start->src_queues[x]);
}
if (left_over) {
start->src_queues[x]);
}
test_fatal_if(num_sent != ev_cnt[x],
"Event send multi failed:%d (%d)\n"
num_sent, ev_cnt[x], start->src_queues[x]);
}
}
static void
em_queue_t queue, void *queue_context)
{
eo_context_t *const eo_ctx = eo_context;
queue_context_t *const q_ctx = queue_context;
test_event_uarea_t *test_uarea;
em_queue_t dest_queue;
uint64_t queue_events;
uint64_t seqno;
core_stat_t *cstat = &qtypes_shm->core_stat[core];
(void)type;
if (unlikely(appl_shm->exit_flag)) {
return;
}
test_fatal_if(ret !=
EM_OK,
"em_event_uarea_info() fails:%" PRI_STAT "", ret);
test_uarea = uarea_info.
uarea;
if (unlikely(uarea_info.
id.
value == EV_ID_START_EVENT)) {
initialize_events(&test_uarea->start);
return;
}
test_fatal_if(uarea_info.
id.
value != EV_ID_DATA_EVENT,
"Unexpected ev-id:%d", uarea_info.
id.
value);
if (VERIFY_ATOMIC_ACCESS)
verify_atomic_access__begin(eo_ctx);
if (VERIFY_PROCESSING_CONTEXT)
verify_processing_context(eo_ctx, queue);
seqno = test_uarea->data.seqno;
queue_events = q_ctx->num_events++;
else
queue_events = __atomic_add_fetch(&q_ctx->num_events, 1,
__ATOMIC_RELAXED);
test_fatal_if(test_uarea->data.src != queue,
test_uarea->data.src, queue);
verify_seqno(eo_ctx, q_ctx, seqno);
}
dest_queue = test_uarea->data.dest;
test_uarea->data.src = test_uarea->data.dest;
test_uarea->data.dest = queue;
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag, "EO-A em_send failure");
}
if (VERIFY_ATOMIC_ACCESS)
verify_atomic_access__end(eo_ctx);
if (CALL_ATOMIC_PROCESSING_END__A) {
(queue_events % qtypes_shm->num_queues == q_ctx->idx))
}
uint64_t core_events = cstat->events;
uint64_t print_events = 0;
if (unlikely(core_events == 0)) {
cstat->begin_cycles = env_get_cycle();
core_events += 1;
cstat->pt_count[eo_ctx->pair_type] += 1;
} else if (unlikely(core_events > PRINT_COUNT)) {
cstat->end_cycles = env_get_cycle();
print_events = core_events;
core_events = 0;
} else {
core_events += 1;
cstat->pt_count[eo_ctx->pair_type] += 1;
}
cstat->events = core_events;
if (unlikely(print_events)) {
if (core == 0)
verify_all_queues_get_events();
print_core_stats(cstat, print_events);
for (int i = 0; i < QUEUE_TYPE_PAIRS; i++)
cstat->pt_count[i] = 0;
cstat->begin_cycles = env_get_cycle();
}
}
static void
em_queue_t queue, void *queue_context)
{
eo_context_t *const eo_ctx = eo_context;
queue_context_t *const q_ctx = queue_context;
em_queue_t dest_queue;
test_event_uarea_t *test_uarea;
uint64_t queue_events;
core_stat_t *cstat = &qtypes_shm->core_stat[core];
(void)type;
if (unlikely(appl_shm->exit_flag)) {
return;
}
if (VERIFY_ATOMIC_ACCESS)
verify_atomic_access__begin(eo_ctx);
if (VERIFY_PROCESSING_CONTEXT)
verify_processing_context(eo_ctx, queue);
test_fatal_if(ret !=
EM_OK,
"em_event_uarea_info() fails:%" PRI_STAT "", ret);
test_fatal_if(uarea_info.
id.
value != EV_ID_DATA_EVENT,
"Unexpected ev-id:%d", uarea_info.
id.
value);
queue_events = q_ctx->num_events++;
else
queue_events = __atomic_add_fetch(&q_ctx->num_events, 1,
__ATOMIC_RELAXED);
test_uarea = uarea_info.
uarea;
test_fatal_if(test_uarea->data.src != queue,
test_uarea->data.src, queue);
verify_seqno(eo_ctx, q_ctx, test_uarea->data.seqno);
}
dest_queue = test_uarea->data.dest;
test_uarea->data.src = test_uarea->data.dest;
test_uarea->data.dest = queue;
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag, "EO-B em_send failure");
}
if (VERIFY_ATOMIC_ACCESS)
verify_atomic_access__end(eo_ctx);
if (CALL_ATOMIC_PROCESSING_END__B) {
(queue_events % qtypes_shm->num_queues == q_ctx->idx))
}
if (unlikely(cstat->events == 0))
cstat->begin_cycles = env_get_cycle();
cstat->events++;
cstat->pt_count[eo_ctx->pair_type] += 1;
}
static pair_type_t
get_pair_type(queue_type_pair_t *queue_type_pair)
{
int in_ag1 = queue_type_pair->in_atomic_group[0];
int in_ag2 = queue_type_pair->in_atomic_group[1];
switch (qt1) {
switch (qt2) {
if (in_ag1 && in_ag2)
return PT_AG_AG;
else if (in_ag1 || in_ag2)
return PT_AG_ATOMIC;
else
return PT_ATOMIC_ATOMIC;
if (in_ag1)
return PT_AG_PARALLEL;
else
return PT_PARALLEL_ATOMIC;
if (in_ag1)
return PT_AG_PARALORD;
else
return PT_PARALORD_ATOMIC;
}
break;
switch (qt2) {
if (in_ag2)
return PT_AG_PARALLEL;
else
return PT_PARALLEL_ATOMIC;
return PT_PARALLEL_PARALLEL;
return PT_PARALORD_PARALLEL;
}
break;
switch (qt2) {
if (in_ag2)
return PT_AG_PARALORD;
else
return PT_PARALORD_ATOMIC;
return PT_PARALORD_PARALLEL;
return PT_PARALORD_PARALORD;
}
break;
}
return PT_UNDEFINED;
}
static inline void
verify_seqno(eo_context_t *const eo_ctx, queue_context_t *const q_ctx,
uint64_t seqno)
{
if (unlikely(qtypes_shm->teardown_in_progress))
return;
if (eo_ctx->owns_ag_queues == eo_ctx->peer_owns_ag_queues) {
const uint64_t max_seqno = (eo_ctx->owns_ag_queues) ?
NUM_EVENT / 3 - 1 : NUM_EVENT - 1;
if (q_ctx->seqno != seqno) {
"SEQUENCE ERROR A:\t"
"Event-seqno=%" PRIu64 " PT:%i",
q_ctx->q_hdl, q_ctx->seqno, seqno,
eo_ctx->pair_type);
exit(EXIT_FAILURE);
}
if (q_ctx->seqno < max_seqno)
q_ctx->seqno++;
else
q_ctx->seqno = 0;
}
}
static void
verify_all_queues_get_events(void)
{
const unsigned int num_queues = qtypes_shm->num_queues;
unsigned int i, first = 1, q_evcnt_low = 0;
uint64_t curr, prev, diff;
for (i = 0; i < num_queues; i++) {
queue_context_t *const tmp_qctx =
&qtypes_shm->queue_context[i];
const uint64_t min_events = (tmp_qctx->in_atomic_group) ?
NUM_EVENT / 3 : NUM_EVENT;
const char *q_type_str;
curr = __atomic_load_n(&tmp_qctx->num_events, __ATOMIC_RELAXED);
prev = tmp_qctx->prev_events;
diff = (curr >= prev) ?
curr - prev : UINT64_MAX - prev + curr + 1;
tmp_qctx->prev_events = curr;
if (unlikely(diff < min_events)) {
q_evcnt_low++;
if (first) {
first = 0;
print_event_msg_string();
}
switch (tmp_qctx->q_type) {
if (tmp_qctx->in_atomic_group)
q_type_str = "AG";
else
q_type_str = "A ";
break;
q_type_str = "P ";
break;
q_type_str = "PO";
break;
default:
q_type_str = "??";
break;
}
APPL_PRINT(
"Q=%3" PRI_QUEUE "(%s cnt:%" PRIu64
") %c",
tmp_qctx->q_hdl, q_type_str, diff,
(q_evcnt_low % 8 == 0) ? '\n' : ' ');
}
}
if (!first)
APPL_PRINT("\nQueue count with too few events:%u\n\n",
q_evcnt_low);
}
static inline void
verify_atomic_access__begin(eo_context_t *const eo_ctx)
{
unlikely(!env_spinlock_trylock(&eo_ctx->verify_atomic_access)))
"EO Atomic context lost!");
}
static inline void
verify_atomic_access__end(eo_context_t *const eo_ctx)
{
env_spinlock_unlock(&eo_ctx->verify_atomic_access);
}
static inline void
verify_processing_context(eo_context_t *const eo_ctx, em_queue_t queue)
{
const em_eo_t eo = eo_ctx->eo_hdl;
em_queue_t tmp_queue;
"Invalid current EO context");
test_fatal_if(tmp_queue != queue, "Invalid queue");
test_fatal_if(queue_type != eo_ctx->q_type, "Q-type mismatch");
"Invalid sched context type");
"Invalid sched context type");
"Invalid sched context type");
}
}
static void
print_core_stats(core_stat_t *const cstat, uint64_t print_events)
{
uint64_t diff;
uint32_t hz;
double mhz;
double cycles_per_event;
uint64_t print_count;
diff = env_cycles_diff(cstat->end_cycles, cstat->begin_cycles);
print_count = cstat->print_count++;
cycles_per_event = (double)diff / (double)print_events;
hz = env_core_hz();
mhz = ((double)hz) / 1000000.0;
cstat->pt_count[0], cstat->pt_count[1], cstat->pt_count[2],
cstat->pt_count[3], cstat->pt_count[4], cstat->pt_count[5],
cstat->pt_count[6], cstat->pt_count[7], cstat->pt_count[8],
cstat->pt_count[9], cycles_per_event, mhz, print_count);
}
static void
print_event_msg_string(void)
{
APPL_PRINT("\nToo few events detected for the following queues:\n");
}
static void
print_test_info(void)
{
unsigned int num;
APPL_PRINT("%d EOs:\n", num);
const char *state_str;
em_queue_t q;
switch (state) {
state_str = "UNDEF";
break;
state_str = "CREATED";
break;
state_str = "STARTING";
break;
state_str = "RUNNING";
break;
state_str = "STOPPING";
break;
state_str = "ERROR";
break;
default:
state_str = "UNKNOWN";
break;
}
APPL_PRINT(
" EO:%" PRI_EO ":'%s' state:%s\n",
eo, buf, state_str);
}
}
APPL_PRINT("\n");
APPL_PRINT("%d queues:\n", num);
const char *type_str;
em_queue_t q_check;
switch (type) {
type_str = "UNDEF";
break;
type_str = "ATOMIC";
break;
type_str = "PARALLEL";
break;
type_str = "ORDERED";
break;
type_str = "UNSCHEDULED";
break;
type_str = "LOCAL";
break;
type_str = "OUTPUT";
break;
default:
type_str = "UNKNOWN";
break;
}
APPL_PRINT(
" Q:%" PRI_QUEUE ":'%s'\ttype:%s\n",
q, buf, type_str);
test_fatal_if(q_check != q, "Queue mismatch:\n"
q_check, q);
}
APPL_PRINT("\n");
APPL_PRINT("%d Atomic-Groups:\n", num);
em_queue_t ag_queue;
em_atomic_group_t ag_check;
APPL_PRINT(
" AG:%" PRI_AGRP ":'%s'\n", ag, ag_name);
test_fatal_if(ag_check != ag, "AG mismatch:\n"
ag_check, ag);
ag_queue);
}
}
}
APPL_PRINT("\n");
}