bsp-microej-async-worker  0.2.1
bsp-microej-async-worker
microej_async_worker.c
Go to the documentation of this file.
1 /*
2  * C
3  *
4  * Copyright 2018-2019 MicroEJ Corp. All rights reserved.
5  * This library is provided in source code for use, modification and test, subject to license terms.
6  * Any modification of the source code will break MicroEJ Corp. warranties on the whole library.
7  */
8 
17 #include "microej_async_worker.h"
18 #include <stdlib.h>
19 #include <string.h>
20 #include <stdbool.h>
21 
22 
23 #ifdef __cplusplus
24  extern "C" {
25 #endif
26 
27 // Entry point of the async worker task.
28 static void* MICROEJ_ASYNC_WORKER_loop(void* args);
29 
30 // Generic method for MICROEJ_ASYNC_WORKER_async_exec and MICROEJ_ASYNC_WORKER_async_exec_no_wait
31 static MICROEJ_ASYNC_WORKER_status_t MICROEJ_ASYNC_WORKER_async_exec_intern(MICROEJ_ASYNC_WORKER_handle_t* async_worker, MICROEJ_ASYNC_WORKER_job_t* job, MICROEJ_ASYNC_WORKER_action_t action, SNI_callback on_done_callback, bool wait);
32 
33 MICROEJ_ASYNC_WORKER_status_t MICROEJ_ASYNC_WORKER_initialize(MICROEJ_ASYNC_WORKER_handle_t* async_worker, uint8_t* name, OSAL_task_stack_t stack, int32_t priority){
34  // Check configuration
35  int32_t job_count = async_worker->job_count;
36  if(job_count <= 0
37  || async_worker->waiting_threads_length <= 1 // compare with 1 because '+1' is added when declaring the array
38  ){
39  return MICROEJ_ASYNC_WORKER_INVALID_ARGS;
40  }
41 
42  // Init jobs
43  MICROEJ_ASYNC_WORKER_job_t* jobs = async_worker->free_jobs;
44  void* params = async_worker->params;
45  int32_t params_sizeof = async_worker->params_sizeof;
46  for(int i=0 ; i<job_count-1 ; i++){
47  jobs[i]._intern.next_free_job = &jobs[i+1];
48  jobs[i].params = params;
49  params = ( void *) ( (int32_t)params + params_sizeof );
50  }
51  jobs[job_count-1]._intern.next_free_job = NULL;
52  jobs[job_count-1].params = params;
53 
54  // Create queue
55  OSAL_status_t res = OSAL_queue_create(name, async_worker->job_count, &async_worker->jobs_queue);
56  if(res != OSAL_OK){
57  return MICROEJ_ASYNC_WORKER_ERROR;
58  }
59 
60  // Create mutex
61  res = OSAL_mutex_create(name, &async_worker->mutex);
62  if(res != OSAL_OK){
63  return MICROEJ_ASYNC_WORKER_ERROR;
64  }
65 
66  // Create task
67  res = OSAL_task_create(MICROEJ_ASYNC_WORKER_loop, name, stack, priority, async_worker, &async_worker->task);
68  if(res != OSAL_OK){
69  return MICROEJ_ASYNC_WORKER_ERROR;
70  }
71 
72  return MICROEJ_ASYNC_WORKER_OK;
73 }
74 
75 
77 
78  MICROEJ_ASYNC_WORKER_job_t* job = NULL;
79 
80  OSAL_mutex_take(&async_worker->mutex, OSAL_INFINITE_TIME);
81  {
82  job = async_worker->free_jobs;
83  if(job != NULL){
84  // Free job found: remove it from the free list
85  async_worker->free_jobs = job->_intern.next_free_job;
86  job->_intern.next_free_job = NULL;
87  }
88  }
89  OSAL_mutex_give(&async_worker->mutex);
90 
91  if(job == NULL){
92  // No free job available: wait for a free job.
93  // Store the current thread id in the waiting list.
94  // First check if there is a free element in the waiting list.
95  int32_t free_waiting_thread_offset = async_worker->free_waiting_thread_offset;
96  int32_t new_free_waiting_thread_offset = free_waiting_thread_offset + 1;
97  if(new_free_waiting_thread_offset >= async_worker->waiting_threads_length){
98  new_free_waiting_thread_offset = 0;
99  }
100 
101  if(new_free_waiting_thread_offset == async_worker->waiting_thread_offset){
102  // The waiting list is full.
103  SNI_throwNativeIOException(-1, "MICROEJ_ASYNC_WORKER: thread cannot be suspended, waiting list is full.");
104  }
105  else {
106  async_worker->free_waiting_thread_offset = (uint16_t)new_free_waiting_thread_offset;
107  int32_t thread_id = SNI_getCurrentJavaThreadID();
108  async_worker->waiting_threads[free_waiting_thread_offset] = thread_id;
109  SNI_suspendCurrentJavaThreadWithCallback(0, (SNI_callback) sni_retry_callback, NULL);
110  }
111  }
112 
113  return job;
114 }
115 
116 
118  OSAL_mutex_take(&async_worker->mutex, OSAL_INFINITE_TIME);
119  {
120  job->_intern.next_free_job = async_worker->free_jobs;
121  async_worker->free_jobs = job;
122 
123  int32_t waiting_thread_offset = async_worker->waiting_thread_offset;
124  if(waiting_thread_offset != async_worker->free_waiting_thread_offset){
125  // A thread was waiting for a free job: notify it
126  int32_t thread_id = async_worker->waiting_threads[waiting_thread_offset];
127  int32_t new_waiting_thread_offset = waiting_thread_offset + 1;
128  if(new_waiting_thread_offset >= async_worker->waiting_threads_length){
129  new_waiting_thread_offset = 0;
130  }
131  async_worker->waiting_thread_offset = new_waiting_thread_offset;
132  SNI_resumeJavaThread(thread_id);
133  }
134  }
135  OSAL_mutex_give(&async_worker->mutex);
136 
137  return MICROEJ_ASYNC_WORKER_OK;
138 }
139 
141  return MICROEJ_ASYNC_WORKER_async_exec_intern(async_worker, job, action, on_done_callback, true);
142 }
143 
145  return MICROEJ_ASYNC_WORKER_async_exec_intern(async_worker, job, action, NULL, false);
146 }
147 
148 MICROEJ_ASYNC_WORKER_status_t MICROEJ_ASYNC_WORKER_async_exec_intern(MICROEJ_ASYNC_WORKER_handle_t* async_worker, MICROEJ_ASYNC_WORKER_job_t* job, MICROEJ_ASYNC_WORKER_action_t action, SNI_callback on_done_callback, bool wait){
149  job->_intern.action = action;
150 
151  if(wait == true){
152  job->_intern.thread_id = SNI_getCurrentJavaThreadID();
153  }
154  else {
155  job->_intern.thread_id = SNI_ERROR;
156  }
157 
158  OSAL_status_t res = OSAL_queue_post(&async_worker->jobs_queue, job);
159  if(res == OSAL_OK){
160  if(wait == true){
161  SNI_suspendCurrentJavaThreadWithCallback(0, (SNI_callback)on_done_callback, job);
162  }
163  return MICROEJ_ASYNC_WORKER_OK;
164  }
165  else {
166  SNI_throwNativeIOException(-1, "MICROEJ_ASYNC_WORKER: Internal error.");
167  return MICROEJ_ASYNC_WORKER_ERROR;
168  }
169 }
170 
171 
173  MICROEJ_ASYNC_WORKER_job_t* job = NULL;
174  SNI_getCallbackArgs((void**)&job, NULL);
175  return job;
176 }
177 
178 static void* MICROEJ_ASYNC_WORKER_loop(void* args){
180 
181  while(1){
183  OSAL_status_t res = OSAL_queue_fetch(&async_worker->jobs_queue, (void**)&job, OSAL_INFINITE_TIME);
184 
185  if(res == OSAL_OK){
186  // New job to execute
187  job->_intern.action(job);
188  if(job->_intern.thread_id != SNI_ERROR){
189  SNI_resumeJavaThread(job->_intern.thread_id);
190  }
191  else {
192  MICROEJ_ASYNC_WORKER_free_job(async_worker, job);
193  }
194  }
195  }
196  return NULL;
197 }
198 
199 #ifdef __cplusplus
200  }
201 #endif
202 
203 
Asynchronous Worker API. This library helps writing SNI functions that must be executed asynchronousl...
MICROEJ_ASYNC_WORKER_status_t MICROEJ_ASYNC_WORKER_async_exec(MICROEJ_ASYNC_WORKER_handle_t *async_worker, MICROEJ_ASYNC_WORKER_job_t *job, MICROEJ_ASYNC_WORKER_action_t action, SNI_callback on_done_callback)
Executes the given job asynchronously.
void(* MICROEJ_ASYNC_WORKER_action_t)(MICROEJ_ASYNC_WORKER_job_t *job)
Pointer to a function to call asynchronously.
MICROEJ_ASYNC_WORKER_status_t MICROEJ_ASYNC_WORKER_free_job(MICROEJ_ASYNC_WORKER_handle_t *async_worker, MICROEJ_ASYNC_WORKER_job_t *job)
Frees a job previously allocated with MICROEJ_ASYNC_WORKER_allocate_job().
void * params
Pointers to the parameters.
A job to execute in a worker.
MICROEJ_ASYNC_WORKER_job_t * MICROEJ_ASYNC_WORKER_allocate_job(MICROEJ_ASYNC_WORKER_handle_t *async_worker, SNI_callback sni_retry_callback)
Allocates a new job for the given worker.
MICROEJ_ASYNC_WORKER_status_t
Return codes list.
struct MICROEJ_ASYNC_WORKER_job::@0 _intern
Structure internal data. Must not be modified.
MICROEJ_ASYNC_WORKER_status_t MICROEJ_ASYNC_WORKER_async_exec_no_wait(MICROEJ_ASYNC_WORKER_handle_t *async_worker, MICROEJ_ASYNC_WORKER_job_t *job, MICROEJ_ASYNC_WORKER_action_t action)
Executes the given job asynchronously.
MICROEJ_ASYNC_WORKER_status_t MICROEJ_ASYNC_WORKER_initialize(MICROEJ_ASYNC_WORKER_handle_t *async_worker, uint8_t *name, OSAL_task_stack_t stack, int32_t priority)
Initializes and starts a worker previously declared with MICROEJ_ASYNC_WORKER_worker_declare() macro...
MICROEJ_ASYNC_WORKER_job_t * MICROEJ_ASYNC_WORKER_get_job_done(void)
Returns the job that has been executed.