EM-ODP  3.7.0
Event Machine on ODP
dispatcher_callback.c
/*
* Copyright (c) 2015, Nokia Solutions and Networks
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/**
* @file
*
* Event Machine dispatcher callback example.
*
* Based on the hello world example. Adds dispatcher enter and exit callback
* functions which are called right before and after the EO receive function.
*
*/
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <event_machine.h>
#include "cm_setup.h"
#include "cm_error_handler.h"
#define SPIN_COUNT 50000000
/**
* Test ping event
*/
typedef struct {
/* Destination queue for the reply event */
em_queue_t dest;
/* Sequence number */
unsigned int seq;
} ping_event_t;
/**
* EO context in the dispatcher callback test
*/
typedef struct {
/* Init before start */
em_eo_t this_eo;
em_eo_t other_eo;
em_queue_t my_queue;
int is_a;
/* Init in start */
char name[16];
} my_eo_context_t;
/**
* Queue context data
*/
typedef struct {
em_queue_t queue;
} my_queue_context_t;
/**
* Test shared memory
*/
typedef struct {
/* Event pool used by this application */
em_pool_t pool;
/* Allocate EO contexts from shared memory region */
my_eo_context_t eo_context_a;
my_eo_context_t eo_context_b;
/* Queue context */
my_queue_context_t queue_context_a;
my_queue_context_t queue_context_b;
/* EO A's queue */
em_queue_t queue_a;
/* Pad size to a multiple of cache line size */
void *end[0] ENV_CACHE_LINE_ALIGNED;
} test_shm_t;
COMPILE_TIME_ASSERT((sizeof(test_shm_t) % ENV_CACHE_LINE_SIZE) == 0,
TEST_SHM_T__SIZE_ERROR);
/* EM-core local pointer to shared memory */
static ENV_LOCAL test_shm_t *test_shm;
/*
* Local function prototypes
*/
ping_start(void *eo_context, em_eo_t eo, const em_eo_conf_t *conf);
ping_stop(void *eo_context, em_eo_t eo);
static void
ping_receive(void *eo_context, em_event_t event, em_event_type_t type,
em_queue_t queue, void *queue_ctx);
/* Callback functions */
static void
enter_cb1(em_eo_t eo, void **eo_context, em_event_t events[], int num,
em_queue_t *queue, void **queue_context);
static void
enter_cb2(em_eo_t eo, void **eo_context, em_event_t events[], int num,
em_queue_t *queue, void **queue_context);
static void
exit_cb1(em_eo_t eo);
static void
exit_cb2(em_eo_t eo);
/**
* Main function
*
* Call cm_setup() to perform test & EM setup common for all the
* test applications.
*
* cm_setup() will call test_init() and test_start() and launch
* the EM dispatch loop on every EM-core.
*/
int main(int argc, char *argv[])
{
return cm_setup(argc, argv);
}
/**
* Init of the Dispatcher Callback test application.
*
* @attention Run on all cores.
*
* @see cm_setup() for setup and dispatch.
*/
void test_init(const appl_conf_t *appl_conf)
{
(void)appl_conf;
int core = em_core_id();
if (core == 0) {
test_shm = env_shared_reserve("TestSharedMem",
sizeof(test_shm_t));
em_register_error_handler(test_error_handler);
} else {
test_shm = env_shared_lookup("TestSharedMem");
}
if (test_shm == NULL) {
test_error(EM_ERROR_SET_FATAL(0xec0de), 0xdead,
"Dispatcher callback init failed on EM-core: %u\n",
} else if (core == 0) {
memset(test_shm, 0, sizeof(test_shm_t));
}
}
/**
* Startup of the Dispatcher Callback test application.
*
* @attention Run only on EM core 0.
*
* @param appl_conf Application configuration
*
* @see cm_setup() for setup and dispatch.
*/
void test_start(const appl_conf_t *appl_conf)
{
em_eo_t eo_a, eo_b;
em_status_t ret, eo_start_ret = EM_ERROR;
/*
* Store the event pool to use, use the EM default pool if no other
* pool is provided through the appl_conf.
*/
if (appl_conf->num_pools >= 1)
test_shm->pool = appl_conf->pools[0];
else
test_shm->pool = EM_POOL_DEFAULT;
APPL_PRINT("\n"
"***********************************************************\n"
"EM APPLICATION: '%s' initializing:\n"
" %s: %s() - EM-core:%d\n"
" Application running on %u EM-cores (procs:%u, threads:%u)\n"
" using event pool:%" PRI_POOL "\n"
"***********************************************************\n"
"\n",
appl_conf->name, NO_PATH(__FILE__), __func__, em_core_id(),
appl_conf->core_count, appl_conf->num_procs, appl_conf->num_threads,
test_shm->pool);
test_fatal_if(test_shm->pool == EM_POOL_UNDEF,
"Undefined application event pool!");
/* Create both EOs */
eo_a = em_eo_create("EO A", ping_start, NULL, ping_stop, NULL,
ping_receive, &test_shm->eo_context_a);
test_fatal_if(eo_a == EM_EO_UNDEF, "EO A creation failed!");
eo_b = em_eo_create("EO B", ping_start, NULL, ping_stop, NULL,
ping_receive, &test_shm->eo_context_b);
test_fatal_if(eo_b == EM_EO_UNDEF, "EO B creation failed!");
/* Init EO contexts */
test_shm->eo_context_a.this_eo = eo_a;
test_shm->eo_context_a.other_eo = eo_b;
test_shm->eo_context_a.is_a = 1;
test_shm->eo_context_b.this_eo = eo_b;
test_shm->eo_context_b.other_eo = eo_a;
test_shm->eo_context_b.is_a = 0;
/* Register/unregister dispatcher callback functions.
*
* Callback functions may be registered multiple times and unregister
* function removes only the first matching callback.
*/
test_fatal_if(ret != EM_OK, "enter_cb2() registering failed!");
test_fatal_if(ret != EM_OK, "enter_cb1() registering failed!");
test_fatal_if(ret != EM_OK, "enter_cb2() registering failed!");
test_fatal_if(ret != EM_OK, "enter_cb2() unregistering failed!");
test_fatal_if(ret != EM_OK, "exit_cb2() registering failed!");
test_fatal_if(ret != EM_OK, "exit_cb1() registering failed!");
test_fatal_if(ret != EM_OK, "exit_cb2() registering failed!");
test_fatal_if(ret != EM_OK, "exit_cb2() unregistering failed!");
/* Start EO A */
ret = em_eo_start_sync(eo_a, &eo_start_ret, NULL);
test_fatal_if(ret != EM_OK || eo_start_ret != EM_OK,
"em_eo_start(EO A) failed! EO:%" PRI_EO "\n"
"ret:%" PRI_STAT ", EO-start-ret:%" PRI_STAT "",
eo_a, ret, eo_start_ret);
/* Start EO B */
ret = em_eo_start_sync(eo_b, &eo_start_ret, NULL);
test_fatal_if(ret != EM_OK || eo_start_ret != EM_OK,
"em_eo_start(EO B) failed! EO:%" PRI_EO "\n"
"ret:%" PRI_STAT ", EO-start-ret:%" PRI_STAT "",
eo_b, ret, eo_start_ret);
}
void test_stop(const appl_conf_t *appl_conf)
{
const int core = em_core_id();
const em_eo_t eo_a = test_shm->eo_context_a.this_eo;
const em_eo_t eo_b = test_shm->eo_context_b.this_eo;
(void)appl_conf;
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
stat =
test_fatal_if(stat != EM_OK, "enter_cb2() unregistering failed!");
stat =
test_fatal_if(stat != EM_OK, "enter_cb2() unregistering failed!");
stat =
test_fatal_if(stat != EM_OK, "exit_cb2() unregistering failed!");
stat =
test_fatal_if(stat != EM_OK, "exit_cb2() unregistering failed!");
stat = em_eo_stop_sync(eo_a);
if (stat != EM_OK)
APPL_EXIT_FAILURE("EO A stop failed!");
stat = em_eo_stop_sync(eo_b);
if (stat != EM_OK)
APPL_EXIT_FAILURE("EO B stop failed!");
stat = em_eo_delete(eo_a);
if (stat != EM_OK)
APPL_EXIT_FAILURE("EO A delete failed!");
stat = em_eo_delete(eo_b);
if (stat != EM_OK)
APPL_EXIT_FAILURE("EO B delete failed!");
}
void test_term(const appl_conf_t *appl_conf)
{
(void)appl_conf;
int core = em_core_id();
APPL_PRINT("%s() on EM-core %d\n", __func__, core);
if (core == 0) {
env_shared_free(test_shm);
}
}
/**
* @private
*
* EO start function.
*/
ping_start(void *eo_context, em_eo_t eo, const em_eo_conf_t *conf)
{
my_eo_context_t *eo_ctx = eo_context;
em_queue_t queue;
em_status_t status;
my_queue_context_t *q_ctx;
const char *queue_name;
(void)conf;
/* Copy EO name */
em_eo_get_name(eo, eo_ctx->name, sizeof(eo_ctx->name));
if (eo_ctx->is_a) {
queue_name = "queue A";
q_ctx = &test_shm->queue_context_a;
} else {
queue_name = "queue B";
q_ctx = &test_shm->queue_context_b;
}
NULL);
test_fatal_if(queue == EM_QUEUE_UNDEF, "%s creation failed!",
queue_name);
eo_ctx->my_queue = queue; /* for ping_stop() */
q_ctx->queue = queue;
status = em_queue_set_context(queue, q_ctx);
test_fatal_if(status != EM_OK,
"Set queue context:%" PRI_STAT "\n"
"EO:%" PRI_EO " queue:%" PRI_QUEUE "", status, eo, queue);
status = em_eo_add_queue_sync(eo, queue);
test_fatal_if(status != EM_OK,
"EO add queue:%" PRI_STAT "\n"
"EO:%" PRI_EO " Queue:%" PRI_QUEUE "", status, eo, queue);
APPL_PRINT("Test start %s: EO %" PRI_EO ", queue:%" PRI_QUEUE ".\n",
eo_ctx->name, eo, queue);
if (eo_ctx->is_a) {
/* Save queue ID for EO B. */
test_shm->queue_a = queue;
} else {
em_event_t event;
ping_event_t *ping;
/*
* Send the first event to EO A.
* Store queue ID as the destination queue for EO A.
*/
event = em_alloc(sizeof(ping_event_t), EM_EVENT_TYPE_SW,
test_shm->pool);
test_fatal_if(event == EM_EVENT_UNDEF,
"Event allocation failed!");
ping = em_event_pointer(event);
ping->dest = queue;
ping->seq = 0;
status = em_send(event, test_shm->queue_a);
test_fatal_if(status != EM_OK,
"em_send():%" PRI_STAT "\n"
"EO:%" PRI_EO " Queue:%" PRI_QUEUE "",
status, eo, test_shm->queue_a);
}
return EM_OK;
}
/**
* @private
*
* EO stop function.
*/
ping_stop(void *eo_context, em_eo_t eo)
{
my_eo_context_t *eo_ctx = eo_context;
em_queue_t queue = eo_ctx->my_queue;
em_status_t status;
APPL_PRINT("Dispatcher callback example stop (%s, eo id %" PRI_EO ")\n",
eo_ctx->name, eo);
status = em_eo_remove_queue_sync(eo, queue);
if (status != EM_OK)
return status;
status = em_queue_delete(queue);
if (status != EM_OK)
return status;
return EM_OK;
}
/**
* @private
*
* EO receive function.
*
* Print "Event received" and send back to the sender of the event.
*/
static void
ping_receive(void *eo_context, em_event_t event, em_event_type_t type,
em_queue_t queue, void *queue_context)
{
my_eo_context_t *eo_ctx = eo_context;
my_queue_context_t *q_ctx = queue_context;
em_queue_t dest;
em_status_t status;
ping_event_t *ping;
(void)type;
ping = em_event_pointer(event);
if (unlikely(appl_shm->exit_flag)) {
em_free(event);
return;
}
dest = ping->dest;
ping->dest = queue;
APPL_PRINT("Ping from %s!\t"
"Queue: %" PRI_QUEUE " on core %02i. Event seq: %u.\n",
eo_ctx->name, q_ctx->queue, em_core_id(), ping->seq++);
delay_spin(SPIN_COUNT);
status = em_send(event, dest);
if (unlikely(status != EM_OK)) {
em_free(event);
test_fatal_if(!appl_shm->exit_flag,
"em_send():%" PRI_STAT "\n"
"EO:%" PRI_EO " Queue:%" PRI_QUEUE "",
status, eo_ctx->this_eo, dest);
}
}
/**
* Callback functions
*/
static void
enter_cb1(em_eo_t eo, void **eo_context, em_event_t events[], int num,
em_queue_t *queue, void **queue_context)
{
my_eo_context_t *eo_ctx = *eo_context;
my_queue_context_t *q_ctx = *queue_context;
ping_event_t *ping = em_event_pointer(events[0]);
(void)num; /* 1 event at a time here */
(void)queue;
APPL_PRINT("++ Dispatcher enter callback 1 for EO: %" PRI_EO " (%s)\t"
"Queue: %" PRI_QUEUE " on core %02i. Event seq: %u.\n",
eo, eo_ctx->name, q_ctx->queue, em_core_id(),
ping->seq);
}
static void
enter_cb2(em_eo_t eo, void **eo_context, em_event_t events[], int num,
em_queue_t *queue, void **queue_context)
{
my_eo_context_t *eo_ctx = *eo_context;
my_queue_context_t *q_ctx = *queue_context;
ping_event_t *ping = em_event_pointer(events[0]);
(void)num; /* 1 event at a time here */
(void)queue;
APPL_PRINT("++ Dispatcher enter callback 2 for EO: %" PRI_EO " (%s)\t"
"Queue: %" PRI_QUEUE " on core %02i. Event seq: %u.\n",
eo, eo_ctx->name, q_ctx->queue, em_core_id(),
ping->seq);
}
static void
exit_cb1(em_eo_t eo)
{
APPL_PRINT("-- Dispatcher exit callback 1 for EO: %" PRI_EO "\n", eo);
}
static void
exit_cb2(em_eo_t eo)
{
APPL_PRINT("-- Dispatcher exit callback 2 for EO: %" PRI_EO "\n", eo);
}
em_eo_remove_queue_sync
em_status_t em_eo_remove_queue_sync(em_eo_t eo, em_queue_t queue)
Definition: event_machine_eo.c:409
EM_OK
#define EM_OK
Definition: event_machine_types.h:329
EM_EVENT_TYPE_SW
@ EM_EVENT_TYPE_SW
Definition: event_machine_hw_types.h:72
EM_QUEUE_PRIO_NORMAL
@ EM_QUEUE_PRIO_NORMAL
Definition: event_machine_hw_types.h:153
em_dispatch_exit_func_t
void(* em_dispatch_exit_func_t)(em_eo_t eo)
Definition: event_machine_dispatcher.h:548
EM_EVENT_UNDEF
#define EM_EVENT_UNDEF
Definition: event_machine_types.h:62
ENV_CACHE_LINE_SIZE
#define ENV_CACHE_LINE_SIZE
Definition: environment.h:62
EM_QUEUE_GROUP_DEFAULT
#define EM_QUEUE_GROUP_DEFAULT
Definition: event_machine_hw_config.h:147
EM_POOL_DEFAULT
#define EM_POOL_DEFAULT
Definition: event_machine_hw_config.h:191
em_eo_get_name
size_t em_eo_get_name(em_eo_t eo, char *name, size_t maxlen)
Definition: event_machine_eo.c:236
PRI_POOL
#define PRI_POOL
Definition: event_machine_hw_types.h:62
PRI_EO
#define PRI_EO
Definition: event_machine_types.h:97
em_free
void em_free(em_event_t event)
Definition: event_machine_event.c:261
em_send
em_status_t em_send(em_event_t event, em_queue_t queue)
Definition: event_machine_event.c:661
EM_EO_UNDEF
#define EM_EO_UNDEF
Definition: event_machine_types.h:95
em_eo_add_queue_sync
em_status_t em_eo_add_queue_sync(em_eo_t eo, em_queue_t queue)
Definition: event_machine_eo.c:344
EM_QUEUE_TYPE_ATOMIC
@ EM_QUEUE_TYPE_ATOMIC
Definition: event_machine_hw_types.h:112
event_machine.h
em_queue_create
em_queue_t em_queue_create(const char *name, em_queue_type_t type, em_queue_prio_t prio, em_queue_group_t group, const em_queue_conf_t *conf)
Definition: event_machine_queue.c:41
em_dispatch_register_enter_cb
em_status_t em_dispatch_register_enter_cb(em_dispatch_enter_func_t func)
Definition: event_machine_dispatcher.c:274
em_queue_delete
em_status_t em_queue_delete(em_queue_t queue)
Definition: event_machine_queue.c:95
ENV_CACHE_LINE_ALIGNED
#define ENV_CACHE_LINE_ALIGNED
Definition: environment.h:76
em_eo_delete
em_status_t em_eo_delete(em_eo_t eo)
Definition: event_machine_eo.c:205
em_alloc
em_event_t em_alloc(uint32_t size, em_event_type_t type, em_pool_t pool)
Definition: event_machine_event.c:33
em_eo_start_sync
em_status_t em_eo_start_sync(em_eo_t eo, em_status_t *result, const em_eo_conf_t *conf)
Definition: event_machine_eo.c:725
em_status_t
uint32_t em_status_t
Definition: event_machine_types.h:321
PRI_QUEUE
#define PRI_QUEUE
Definition: event_machine_types.h:109
EM_ERROR_SET_FATAL
#define EM_ERROR_SET_FATAL(error)
Definition: event_machine_hw_types.h:428
em_unregister_error_handler
em_status_t em_unregister_error_handler(void)
Definition: event_machine_error.c:50
em_event_type_t
uint32_t em_event_type_t
Definition: event_machine_types.h:85
EM_QUEUE_UNDEF
#define EM_QUEUE_UNDEF
Definition: event_machine_types.h:107
em_queue_set_context
em_status_t em_queue_set_context(em_queue_t queue, const void *context)
Definition: event_machine_queue.c:112
em_core_id
int em_core_id(void)
Definition: event_machine_core.c:34
EM_POOL_UNDEF
#define EM_POOL_UNDEF
Definition: event_machine_hw_types.h:60
environment.h
em_eo_create
em_eo_t em_eo_create(const char *name, em_start_func_t start, em_start_local_func_t local_start, em_stop_func_t stop, em_stop_local_func_t local_stop, em_receive_func_t receive, const void *eo_ctx)
Definition: event_machine_eo.c:40
em_dispatch_enter_func_t
void(* em_dispatch_enter_func_t)(em_eo_t eo, void **eo_ctx, em_event_t events[], int num, em_queue_t *queue, void **q_ctx)
Definition: event_machine_dispatcher.h:533
em_register_error_handler
em_status_t em_register_error_handler(em_error_handler_t handler)
Definition: event_machine_error.c:34
em_eo_stop_sync
em_status_t em_eo_stop_sync(em_eo_t eo)
Definition: event_machine_eo.c:897
em_dispatch_unregister_enter_cb
em_status_t em_dispatch_unregister_enter_cb(em_dispatch_enter_func_t func)
Definition: event_machine_dispatcher.c:293
ENV_LOCAL
#define ENV_LOCAL
Definition: environment.h:57
em_dispatch_unregister_exit_cb
em_status_t em_dispatch_unregister_exit_cb(em_dispatch_exit_func_t func)
Definition: event_machine_dispatcher.c:330
em_eo_conf_t
Definition: event_machine_types.h:242
em_event_pointer
void * em_event_pointer(em_event_t event)
Definition: event_machine_event.c:750
EM_ERROR
#define EM_ERROR
Definition: event_machine_types.h:337
em_dispatch_register_exit_cb
em_status_t em_dispatch_register_exit_cb(em_dispatch_exit_func_t func)
Definition: event_machine_dispatcher.c:312