#include <inttypes.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include "cm_setup.h"
#include "cm_error_handler.h"
#define QUEUE_TYPE_PAIRS 10
#define NUM_EO (2 * 8 * QUEUE_TYPE_PAIRS)
#define MAX_QUEUES (NUM_EO / QUEUE_TYPE_PAIRS * 30)
#define NUM_EVENT (3 * 32)
#define DATA_SIZE 64
#define MAX_CORES 64
#define PRINT_COUNT 0x1000000
#define SEND_MULTI_MAX 32
#define VERIFY_ATOMIC_ACCESS 1
#define CALL_ATOMIC_PROCESSING_END 1
#define ORDERED_PAIR(q_type_a, q_type_b) ( \
(((q_type_a) == EM_QUEUE_TYPE_ATOMIC) || \
((q_type_a) == EM_QUEUE_TYPE_PARALLEL_ORDERED)) && \
(((q_type_b) == EM_QUEUE_TYPE_ATOMIC) || \
((q_type_b) == EM_QUEUE_TYPE_PARALLEL_ORDERED)))
#define ABS(nbr1, nbr2) (((nbr1) > (nbr2)) ? ((nbr1) - (nbr2)) : \
((nbr2) - (nbr1)))
#define PRINT_CORE_STAT_FMT \
"Core-%02i:\t" \
"A-L-A-L:%6" PRIu64 " P-L-P-L:%6" PRIu64 " PO-L-PO-L:%6" PRIu64 "\t" \
"P-L-A-L:%6" PRIu64 " PO-L-A-L:%6" PRIu64 " PO-L-P-L:%6" PRIu64 "\t" \
"AG-L-AG-L:%6" PRIu64 " AG-L-A-L:%6" PRIu64 "\t" \
"AG-L-P-L:%6" PRIu64 " AG-L-PO-L:%6" PRIu64 "\t" \
"cycles/event:%.0f @%.0fMHz %" PRIu64 "\n"
#define NO_AG (0)
#define IN_AG (1)
typedef struct queue_type_pairs_ {
int in_atomic_group[2];
} queue_type_pair_t;
queue_type_pair_t queue_type_pairs[QUEUE_TYPE_PAIRS] = {
{NO_AG, NO_AG} },
{NO_AG, NO_AG} },
{NO_AG, NO_AG} },
{IN_AG, NO_AG} },
};
COMPILE_TIME_ASSERT(sizeof(queue_type_pairs) ==
(QUEUE_TYPE_PAIRS * sizeof(queue_type_pair_t)),
QUEUE_TYPE_PAIRS_SIZE_ERROR);
typedef enum {
PT_ATOMIC_ATOMIC = 0,
PT_PARALLEL_PARALLEL = 1,
PT_PARALORD_PARALORD = 2,
PT_PARALLEL_ATOMIC = 3,
PT_PARALORD_ATOMIC = 4,
PT_PARALORD_PARALLEL = 5,
PT_AG_AG = 6,
PT_AG_ATOMIC = 7,
PT_AG_PARALLEL = 8,
PT_AG_PARALORD = 9,
PT_UNDEFINED
} pair_type_t;
typedef union {
struct {
uint64_t events;
uint64_t begin_cycles;
uint64_t end_cycles;
uint64_t print_count;
uint64_t pt_count[QUEUE_TYPE_PAIRS];
};
} core_stat_t;
CORE_STAT_T__SIZE_ERROR);
typedef struct {
em_eo_t eo_hdl;
int ordered_pair;
pair_type_t pair_type;
int owns_ag_queues;
em_atomic_group_t agrp_hdl;
int peer_owns_ag_queues;
env_spinlock_t verify_atomic_access;
} eo_context_t;
EO_CTX_T__SIZE_ERROR);
typedef struct {
em_queue_t q_hdl;
unsigned int idx;
union {
struct {
em_queue_t local_q_hdl;
int in_atomic_group;
uint64_t seqno;
uint64_t prev_events;
} sched;
struct {
uint64_t num_events;
uint64_t prev_events;
} local;
};
} queue_context_t;
Q_CTX_T__SIZE_ERROR);
#define EV_ID_DATA_EVENT 1
#define EV_ID_START_EVENT 2
typedef struct {
int ev_id;
em_queue_t dest;
em_queue_t src;
uint64_t seqno;
uint8_t data[DATA_SIZE];
} data_event_t;
typedef struct {
int ev_id;
int in_atomic_group_a;
int src_q_cnt;
em_queue_t src_queues[3];
int in_atomic_group_b;
int dst_q_cnt;
em_queue_t dst_queues[3];
} start_event_t;
typedef union {
int ev_id;
data_event_t data;
start_event_t start;
} test_event_t;
typedef struct {
em_pool_t pool;
int teardown_in_progress;
} qtypes_shm_t;
QTYPES_SHM_T__SIZE_ERROR);
start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf);
start_locq(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf);
start_local(void *eo_ctx, em_eo_t eo);
start_local_locq(void *eo_ctx, em_eo_t eo);
stop(void *eo_context, em_eo_t eo);
stop_locq(void *eo_context, em_eo_t eo);
stop_local(void *eo_ctx, em_eo_t eo);
stop_local_locq(void *eo_ctx, em_eo_t eo);
static void
em_queue_t queue, void *q_ctx);
static void
em_queue_t queue, void *q_ctx);
static void
em_queue_t queue, void *q_ctx);
static pair_type_t
get_pair_type(queue_type_pair_t *queue_type_pair);
static inline void
verify_seqno(eo_context_t *const eo_ctx, queue_context_t *const q_ctx,
uint64_t seqno);
static void
verify_all_queues_get_events(void);
static inline void
verify_atomic_access__begin(eo_context_t *const eo_ctx);
static inline void
verify_atomic_access__end(eo_context_t *const eo_ctx);
static void
print_core_stats(core_stat_t *const cstat, uint64_t print_events);
static void
print_event_msg_string(void);
int main(int argc, char *argv[])
{
return cm_setup(argc, argv);
}
void test_init(const appl_conf_t *appl_conf)
{
(void)appl_conf;
if (core == 0) {
qtypes_shm = env_shared_reserve("QueueTypesSharedMem",
sizeof(qtypes_shm_t));
} else {
qtypes_shm = env_shared_lookup("QueueTypesSharedMem");
}
if (qtypes_shm == NULL) {
"Queue Types test init failed on EM-core: %u\n",
} else if (core == 0) {
memset(qtypes_shm, 0, sizeof(qtypes_shm_t));
}
}
void test_start(const appl_conf_t *appl_conf)
{
em_atomic_group_t atomic_group;
em_eo_t eo, eo_locq;
em_queue_t queue_a, queue_b;
em_queue_t queue_ag_a1, queue_ag_a2, queue_ag_a3;
em_queue_t queue_ag_b1, queue_ag_b2, queue_ag_b3;
em_queue_t queue_local_a, queue_local_b;
eo_context_t *eo_ctx;
queue_context_t *q_ctx;
pair_type_t pair_type;
unsigned int qcnt = 0;
unsigned int eocnt = 0;
int in_atomic_group_a, in_atomic_group_b;
int ordered_pair;
int i;
if (appl_conf->num_pools >= 1)
qtypes_shm->pool = appl_conf->pools[0];
else
APPL_PRINT("\n"
"***********************************************************\n"
"EM APPLICATION: '%s' initializing:\n"
" %s: %s() - EM-core:%d\n"
" Application running on %u EM-cores (procs:%u, threads:%u)\n"
"***********************************************************\n"
"\n",
appl_conf->name, NO_PATH(__FILE__), __func__,
em_core_id(),
appl_conf->core_count, appl_conf->num_procs, appl_conf->num_threads,
qtypes_shm->pool);
"Undefined application event pool!");
"Invalid current EO");
"Invalid current queue");
qtypes_shm->num_queues = 0;
qtypes_shm->teardown_in_progress =
EM_FALSE;
for (i = 0; i < (NUM_EO / 4); i++) {
q_type_a = queue_type_pairs[i % QUEUE_TYPE_PAIRS].q_type[0];
in_atomic_group_a =
queue_type_pairs[i % QUEUE_TYPE_PAIRS].in_atomic_group[0];
q_type_b = queue_type_pairs[i % QUEUE_TYPE_PAIRS].q_type[1];
in_atomic_group_b =
queue_type_pairs[i % QUEUE_TYPE_PAIRS].in_atomic_group[1];
ordered_pair = ORDERED_PAIR(q_type_a, q_type_b);
pair_type =
get_pair_type(&queue_type_pairs[i % QUEUE_TYPE_PAIRS]);
test_fatal_if(pair_type == PT_UNDEFINED,
"Queue Pair Type UNDEFINED! (%u, %u)",
q_type_a, q_type_b);
eo_ctx = &qtypes_shm->eo_context[eocnt++];
start_locq, start_local_locq,
stop_locq, stop_local_locq,
receive_locq, eo_ctx);
test_fatal_if(ret !=
EM_OK,
"EO-local-A setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_local_a;
q_ctx->idx = qcnt++;
test_fatal_if(ret !=
EM_OK,
"EO-local-A setup failed!");
"EO-local-A setup:%" PRI_STAT " %" PRI_STAT "",
ret, start_ret);
eo_ctx = &qtypes_shm->eo_context[eocnt++];
eo_ctx->ordered_pair = ordered_pair;
eo_ctx->pair_type = pair_type;
eo_ctx->q_type = q_type_a;
eo_ctx->owns_ag_queues = in_atomic_group_a;
eo_ctx->peer_owns_ag_queues = in_atomic_group_b;
eo =
em_eo_create(
"EO-A", start, NULL, stop, NULL, receive_a,
eo_ctx);
atomic_group =
"Atomic group creation failed!");
APPL_PRINT("New atomic group:%s for EO:\t"
"%" PRI_EO "\n", ag_name, eo);
eo_ctx->agrp_hdl = atomic_group;
atomic_group, NULL);
atomic_group, NULL);
atomic_group, NULL);
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_a1;
q_ctx->q_type = q_type_a;
q_ctx->idx = qcnt++;
q_ctx->sched.local_q_hdl = queue_local_a;
q_ctx->sched.in_atomic_group = in_atomic_group_a;
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_a2;
q_ctx->q_type = q_type_a;
q_ctx->idx = qcnt++;
q_ctx->sched.local_q_hdl = queue_local_a;
q_ctx->sched.in_atomic_group = in_atomic_group_a;
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_a3;
q_ctx->q_type = q_type_a;
q_ctx->idx = qcnt++;
q_ctx->sched.local_q_hdl = queue_local_a;
q_ctx->sched.in_atomic_group = in_atomic_group_a;
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
} else {
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_a;
q_ctx->q_type = q_type_a;
q_ctx->idx = qcnt++;
q_ctx->sched.local_q_hdl = queue_local_a;
q_ctx->sched.in_atomic_group = in_atomic_group_a;
test_fatal_if(ret !=
EM_OK,
"EO-A setup failed!");
}
"EO-A setup failed:%" PRI_STAT " %" PRI_STAT "",
ret, start_ret);
eo_ctx = &qtypes_shm->eo_context[eocnt++];
stop_locq, NULL, receive_locq, eo_ctx);
test_fatal_if(ret !=
EM_OK,
"EO-local-B setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_local_b;
q_ctx->idx = qcnt++;
test_fatal_if(ret !=
EM_OK,
"EO-local-B setup failed!");
"EO-local-B setup:%" PRI_STAT " %" PRI_STAT "",
ret, start_ret);
test_fatal_if(ret !=
EM_OK,
"EO-local-B setup failed!");
eo_ctx = &qtypes_shm->eo_context[eocnt++];
eo_ctx->ordered_pair = ordered_pair;
eo_ctx->pair_type = pair_type;
eo_ctx->q_type = q_type_b;
eo_ctx->owns_ag_queues = in_atomic_group_b;
eo_ctx->peer_owns_ag_queues = in_atomic_group_a;
eo =
em_eo_create(
"EO-B", start, start_local, stop, stop_local,
receive_b, eo_ctx);
atomic_group =
"Atomic group creation failed!");
APPL_PRINT("New atomic group:%s for EO:\t"
"%" PRI_EO "\n", ag_name, eo);
eo_ctx->agrp_hdl = atomic_group;
atomic_group, NULL);
atomic_group, NULL);
atomic_group, NULL);
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_b1;
q_ctx->q_type = q_type_b;
q_ctx->idx = qcnt++;
q_ctx->sched.local_q_hdl = queue_local_b;
q_ctx->sched.in_atomic_group = in_atomic_group_b;
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_b2;
q_ctx->q_type = q_type_b;
q_ctx->idx = qcnt++;
q_ctx->sched.local_q_hdl = queue_local_b;
q_ctx->sched.in_atomic_group = in_atomic_group_b;
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_ag_b3;
q_ctx->q_type = q_type_b;
q_ctx->idx = qcnt++;
q_ctx->sched.local_q_hdl = queue_local_b;
q_ctx->sched.in_atomic_group = in_atomic_group_b;
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
} else {
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
q_ctx = &qtypes_shm->queue_context[qcnt];
q_ctx->q_hdl = queue_b;
q_ctx->q_type = q_type_b;
q_ctx->idx = qcnt++;
q_ctx->sched.local_q_hdl = queue_local_b;
q_ctx->sched.in_atomic_group = in_atomic_group_b;
test_fatal_if(ret !=
EM_OK,
"EO-B setup failed!");
}
"EO-B setup failed:%" PRI_STAT " %" PRI_STAT "",
ret, start_ret);
qtypes_shm->num_queues = qcnt;
APPL_PRINT("\n");
em_event_t
event =
em_alloc(
sizeof(start_event_t),
qtypes_shm->pool);
start_event->ev_id = EV_ID_START_EVENT;
start_event->in_atomic_group_a = in_atomic_group_a;
if (in_atomic_group_a) {
start_event->src_q_cnt = 3;
start_event->src_queues[0] = queue_ag_a1;
start_event->src_queues[1] = queue_ag_a2;
start_event->src_queues[2] = queue_ag_a3;
} else {
start_event->src_q_cnt = 1;
start_event->src_queues[0] = queue_a;
}
start_event->in_atomic_group_b = in_atomic_group_b;
if (in_atomic_group_b) {
start_event->dst_q_cnt = 3;
start_event->dst_queues[0] = queue_ag_b1;
start_event->dst_queues[1] = queue_ag_b2;
start_event->dst_queues[2] = queue_ag_b3;
} else {
start_event->dst_q_cnt = 1;
start_event->dst_queues[0] = queue_b;
}
ret =
em_send(event, start_event->src_queues[0]);
test_fatal_if(ret !=
EM_OK,
"Event send:%" PRI_STAT
"", ret);
}
APPL_PRINT("\n\nQs:%i MAX:%i\n", qcnt, MAX_QUEUES);
APPL_PRINT("EOs:%i MAX:%i\n\n", eocnt, NUM_EO);
qtypes_shm->num_queues = qcnt;
test_fatal_if(qcnt > MAX_QUEUES, "Queue context number too high!");
}
void test_stop(const appl_conf_t *appl_conf)
{
em_eo_t eo;
eo_context_t *eo_ctx;
int i;
(void)appl_conf;
qtypes_shm->teardown_in_progress =
EM_TRUE;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
for (i = 0; i < NUM_EO; i++) {
eo_ctx = &qtypes_shm->eo_context[i];
eo = eo_ctx->eo_hdl;
test_fatal_if(ret !=
EM_OK,
"EO stop:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
}
}
void test_term(const appl_conf_t *appl_conf)
{
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
if (core == 0) {
env_shared_free(qtypes_shm);
}
}
start(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf)
{
eo_context_t *eo_ctx = eo_context;
(void)conf;
APPL_PRINT(
"EO %" PRI_EO " starting ", eo);
eo_ctx->eo_hdl = eo;
"Invalid current EO context");
"Invalid current queue");
if (VERIFY_ATOMIC_ACCESS)
env_spinlock_init(&eo_ctx->verify_atomic_access);
}
start_local(void *eo_context, em_eo_t eo)
{
"Invalid current EO context");
"Invalid current queue");
}
start_locq(
void *eo_context, em_eo_t eo,
const em_eo_conf_t *conf)
{
eo_context_t *eo_ctx = eo_context;
(void)conf;
APPL_PRINT(
"EO-locq %" PRI_EO " starting ", eo);
eo_ctx->eo_hdl = eo;
"Invalid current EO context");
"Invalid current queue");
}
start_local_locq(void *eo_context, em_eo_t eo)
{
"Invalid current EO context");
"Invalid current queue");
}
stop(void *eo_context, em_eo_t eo)
{
eo_context_t *const eo_ctx = (eo_context_t *)eo_context;
APPL_PRINT(
"EO %" PRI_EO " stopping.\n", eo);
"Invalid current EO context");
"Invalid current queue");
test_fatal_if(ret !=
EM_OK,
"EO remove queue all:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
test_fatal_if(ret !=
EM_OK,
"AGrp delete:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
}
test_fatal_if(ret !=
EM_OK,
"EO delete:%" PRI_STAT
" EO:%" PRI_EO "",
ret, eo);
}
stop_local(void *eo_context, em_eo_t eo)
{
"Invalid current EO context");
"Invalid current queue");
}
stop_locq(void *eo_context, em_eo_t eo)
{
(void)eo_context;
APPL_PRINT(
"EO-locq %" PRI_EO " stopping.\n", eo);
"Invalid current EO context");
"Invalid current queue");
}
stop_local_locq(void *eo_context, em_eo_t eo)
{
"Invalid current EO context");
"Invalid current queue");
}
static void
initialize_events(start_event_t *const start)
{
const int max_q_cnt = start->src_q_cnt > start->dst_q_cnt ?
start->src_q_cnt : start->dst_q_cnt;
em_event_t all_events[max_q_cnt][NUM_EVENT];
int ev_cnt[max_q_cnt];
uint64_t seqno = 0;
int j, x, y;
for (x = 0; x < max_q_cnt; x++)
ev_cnt[x] = 0;
for (j = 0; j < NUM_EVENT;) {
for (x = 0, y = 0; x < max_q_cnt; x++, y++, j++) {
em_event_t
event =
em_alloc(
sizeof(test_event_t),
qtypes_shm->pool);
"Event alloc fails");
test_event_t *const test_event =
memset(test_event, 0, sizeof(test_event_t));
test_event->ev_id = EV_ID_DATA_EVENT;
if (start->in_atomic_group_b)
test_event->data.dest = start->dst_queues[y];
else
test_event->data.dest = start->dst_queues[0];
test_event->data.src = start->src_queues[x];
if (start->in_atomic_group_a ==
start->in_atomic_group_b) {
test_event->data.seqno = seqno;
}
all_events[x][ev_cnt[x]] = event;
ev_cnt[x] += 1;
}
seqno += 1;
}
for (x = 0; x < max_q_cnt; x++) {
int n, m;
int num_sent = 0;
const int send_rounds = ev_cnt[x] / SEND_MULTI_MAX;
const int left_over = ev_cnt[x] % SEND_MULTI_MAX;
for (n = 0, m = 0; n < send_rounds;
n++, m += SEND_MULTI_MAX) {
SEND_MULTI_MAX,
start->src_queues[x]);
}
if (left_over) {
start->src_queues[x]);
}
test_fatal_if(num_sent != ev_cnt[x],
"Event send multi failed:%d (%d)\n"
num_sent, ev_cnt[x], start->src_queues[x]);
}
}
static void
em_queue_t queue, void *queue_context)
{
eo_context_t *const eo_ctx = eo_context;
queue_context_t *const q_ctx = queue_context;
data_event_t *data_event;
core_stat_t *cstat;
em_queue_t dest_queue;
int core;
uint64_t core_events, print_events = 0;
uint64_t seqno;
(void)type;
if (unlikely(appl_shm->exit_flag)) {
return;
}
if (unlikely(test_event->ev_id == EV_ID_START_EVENT)) {
initialize_events(&test_event->start);
return;
}
if (VERIFY_ATOMIC_ACCESS)
verify_atomic_access__begin(eo_ctx);
test_fatal_if(test_event->ev_id != EV_ID_DATA_EVENT,
"Unexpected ev-id:%d", test_event->ev_id);
data_event = &test_event->data;
cstat = &qtypes_shm->core_stat[core];
core_events = cstat->events;
seqno = data_event->seqno;
env_atomic64_inc(&q_ctx->sched.num_events);
test_fatal_if(data_event->src != queue,
data_event->src, queue);
if (unlikely(core_events == 0)) {
cstat->begin_cycles = env_get_cycle();
core_events += 1;
cstat->pt_count[eo_ctx->pair_type] += 1;
} else if (unlikely(core_events > PRINT_COUNT)) {
cstat->end_cycles = env_get_cycle();
print_events = core_events;
core_events = 0;
} else {
core_events += 1;
cstat->pt_count[eo_ctx->pair_type] += 1;
}
verify_seqno(eo_ctx, q_ctx, seqno);
}
dest_queue = q_ctx->sched.local_q_hdl;
data_event->src = queue;
cstat->events = core_events;
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag, "EO-A em_send failure");
}
if (VERIFY_ATOMIC_ACCESS)
verify_atomic_access__end(eo_ctx);
if (unlikely(print_events)) {
int i;
if (core == 0)
verify_all_queues_get_events();
print_core_stats(cstat, print_events);
for (i = 0; i < QUEUE_TYPE_PAIRS; i++)
cstat->pt_count[i] = 0;
"Invalid current EO");
"Invalid current EO context");
"Invalid current queue");
"Invalid current EO context");
cstat->begin_cycles = env_get_cycle();
}
}
static void
em_queue_t queue, void *queue_context)
{
eo_context_t *const eo_ctx = eo_context;
queue_context_t *const q_ctx = queue_context;
core_stat_t *cstat;
em_queue_t dest_queue;
test_event_t *test_event;
data_event_t *data_event;
int core;
uint64_t core_events;
(void)type;
if (unlikely(appl_shm->exit_flag)) {
return;
}
if (VERIFY_ATOMIC_ACCESS)
verify_atomic_access__begin(eo_ctx);
test_fatal_if(test_event->ev_id != EV_ID_DATA_EVENT,
"Unexpected ev-id:%d", test_event->ev_id);
data_event = &test_event->data;
cstat = &qtypes_shm->core_stat[core];
core_events = cstat->events;
env_atomic64_inc(&q_ctx->sched.num_events);
test_fatal_if(data_event->src != queue,
data_event->src, queue);
verify_seqno(eo_ctx, q_ctx, data_event->seqno);
}
dest_queue = q_ctx->sched.local_q_hdl;
data_event->src = queue;
if (unlikely(core_events == 0))
cstat->begin_cycles = env_get_cycle();
core_events++;
cstat->events = core_events;
cstat->pt_count[eo_ctx->pair_type] += 1;
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag, "EO-B em_send failure");
}
if (VERIFY_ATOMIC_ACCESS)
verify_atomic_access__end(eo_ctx);
}
static void
em_queue_t queue, void *queue_context)
{
eo_context_t *const eo_ctx = eo_context;
queue_context_t *const q_ctx = queue_context;
data_event_t *data_event;
em_queue_t dest_queue;
uint64_t queue_events;
(void)type;
(void)queue;
(void)eo_ctx;
if (unlikely(appl_shm->exit_flag)) {
return;
}
test_fatal_if(test_event->ev_id != EV_ID_DATA_EVENT,
"Unexpected ev-id:%d", test_event->ev_id);
data_event = &test_event->data;
queue_events = q_ctx->local.num_events++;
dest_queue = data_event->dest;
data_event->dest = data_event->src;
data_event->src = dest_queue;
if (unlikely(ret !=
EM_OK)) {
test_fatal_if(!appl_shm->exit_flag, "EO-local em_send failure");
}
if (CALL_ATOMIC_PROCESSING_END) {
if (queue_events % qtypes_shm->num_queues == q_ctx->idx)
}
}
static pair_type_t
get_pair_type(queue_type_pair_t *queue_type_pair)
{
int in_ag1 = queue_type_pair->in_atomic_group[0];
int in_ag2 = queue_type_pair->in_atomic_group[1];
switch (qt1) {
switch (qt2) {
if (in_ag1 && in_ag2)
return PT_AG_AG;
else if (in_ag1 || in_ag2)
return PT_AG_ATOMIC;
else
return PT_ATOMIC_ATOMIC;
if (in_ag1)
return PT_AG_PARALLEL;
else
return PT_PARALLEL_ATOMIC;
if (in_ag1)
return PT_AG_PARALORD;
else
return PT_PARALORD_ATOMIC;
}
break;
switch (qt2) {
if (in_ag2)
return PT_AG_PARALLEL;
else
return PT_PARALLEL_ATOMIC;
return PT_PARALLEL_PARALLEL;
return PT_PARALORD_PARALLEL;
}
break;
switch (qt2) {
if (in_ag2)
return PT_AG_PARALORD;
else
return PT_PARALORD_ATOMIC;
return PT_PARALORD_PARALLEL;
return PT_PARALORD_PARALORD;
}
break;
}
return PT_UNDEFINED;
}
static inline void
verify_seqno(eo_context_t *const eo_ctx, queue_context_t *const q_ctx,
uint64_t seqno)
{
if (unlikely(qtypes_shm->teardown_in_progress))
return;
if (eo_ctx->owns_ag_queues == eo_ctx->peer_owns_ag_queues) {
const uint64_t max_seqno = (eo_ctx->owns_ag_queues) ?
NUM_EVENT / 3 - 1 : NUM_EVENT - 1;
if (q_ctx->sched.seqno != seqno) {
"SEQUENCE ERROR A:\t"
"Event-seqno=%" PRIu64 " PT:%i",
q_ctx->q_hdl, q_ctx->sched.seqno, seqno,
eo_ctx->pair_type);
exit(EXIT_FAILURE);
}
if (q_ctx->sched.seqno < max_seqno)
q_ctx->sched.seqno++;
else
q_ctx->sched.seqno = 0;
}
}
static void
verify_all_queues_get_events(void)
{
const unsigned int num_queues = qtypes_shm->num_queues;
unsigned int i, first = 1, q_evcnt_low = 0;
uint64_t curr, prev, diff;
for (i = 0; i < num_queues; i++) {
queue_context_t *const tmp_qctx =
&qtypes_shm->queue_context[i];
const uint64_t min_events = (tmp_qctx->sched.in_atomic_group) ?
NUM_EVENT / 3 : NUM_EVENT;
curr = tmp_qctx->local.num_events;
prev = tmp_qctx->local.prev_events;
tmp_qctx->local.prev_events = curr;
} else {
curr = env_atomic64_get(&tmp_qctx->sched.num_events);
prev = tmp_qctx->sched.prev_events;
tmp_qctx->sched.prev_events = curr;
}
diff = (curr >= prev) ?
curr - prev : UINT64_MAX - prev + curr + 1;
if (unlikely(diff < min_events)) {
const char *q_type_str;
q_evcnt_low++;
if (first) {
first = 0;
print_event_msg_string();
}
switch (tmp_qctx->q_type) {
if (tmp_qctx->sched.in_atomic_group)
q_type_str = "AG";
else
q_type_str = "A ";
break;
q_type_str = "P ";
break;
q_type_str = "PO";
break;
q_type_str = "L ";
break;
default:
q_type_str = "??";
break;
}
APPL_PRINT(
"Q=%3" PRI_QUEUE "(%s cnt:%" PRIu64
") %c",
tmp_qctx->q_hdl, q_type_str, diff,
(q_evcnt_low % 8 == 0) ? '\n' : ' ');
}
}
if (!first)
APPL_PRINT("\nQueue count with too few events:%u\n\n",
q_evcnt_low);
}
static inline void
verify_atomic_access__begin(eo_context_t *const eo_ctx)
{
!env_spinlock_trylock(&eo_ctx->verify_atomic_access)))
"EO Atomic context lost!");
}
static inline void
verify_atomic_access__end(eo_context_t *const eo_ctx)
{
env_spinlock_unlock(&eo_ctx->verify_atomic_access);
}
static void
print_core_stats(core_stat_t *const cstat, uint64_t print_events)
{
uint64_t diff;
uint32_t hz;
double mhz;
double cycles_per_event;
uint64_t print_count;
diff = env_cycles_diff(cstat->end_cycles, cstat->begin_cycles);
print_count = cstat->print_count++;
cycles_per_event = (double)diff / (double)print_events;
hz = env_core_hz();
mhz = ((double)hz) / 1000000.0;
cstat->pt_count[0], cstat->pt_count[1], cstat->pt_count[2],
cstat->pt_count[3], cstat->pt_count[4], cstat->pt_count[5],
cstat->pt_count[6], cstat->pt_count[7], cstat->pt_count[8],
cstat->pt_count[9], cycles_per_event, mhz, print_count);
}
static void
print_event_msg_string(void)
{
APPL_PRINT("\nToo few events detected for the following queues:\n");
}