async_select  2.0.2
async_select
async_select.c
Go to the documentation of this file.
1 /*
2  * C
3  *
4  * Copyright 2017-2020 MicroEJ Corp. All rights reserved.
5  * This library is provided in source code for use, modification and test, subject to license terms.
6  * Any modification of the source code will break MicroEJ Corp. warranties on the whole library.
7  */
8 
17 #include "async_select.h"
19 #include <string.h>
20 #include <sys/socket.h>
21 #include <sys/select.h>
22 #include <netinet/in.h>
23 #include <stdbool.h>
24 #include <unistd.h>
25 #include "LLNET_Common.h"
26 
27 #ifdef __cplusplus
28  extern "C" {
29 #endif
30 
38 #if ASYNC_SELECT_CONFIGURATION_VERSION != 2
39 
40  #error "Version of the configuration file async_select_configuration.h is not compatible with this implementation."
41 
42 #endif
43 
45 typedef struct async_select_Request{
46  int32_t fd;
47  int32_t java_thread_id;
48  // Absolute time for timeout in milliseconds, 0 if no timeout
49  int64_t absolute_timeout_ms;
50  SELECT_Operation operation;
51  struct async_select_Request* next;
53 
57 extern void async_select_lock(void);
61 extern void async_select_unlock(void);
62 
66 extern int64_t LLMJVM_IMPL_getCurrentTime__Z(uint8_t system);
67 
72 #define async_select_get_current_time_ms() LLMJVM_IMPL_getCurrentTime__Z(1) // 1 means that system time is required
73 
74 
75 /*
76  * See implementations for descriptions.
77  */
78 static void async_select_do_select(void);
79 static void async_select_update_notified_requests(void);
80 static int32_t async_select_get_notify_fd(void);
81 static async_select_Request* async_select_allocate_request(void);
82 static async_select_Request* async_select_free_used_request(async_select_Request* request, async_select_Request* previous_request_in_used_fifo);
83 static void async_select_free_unused_request(async_select_Request* request);
84 static int32_t async_select_send_new_request(async_select_Request* request);
85 static void async_select_notify_select(void);
87 
91 static async_select_Request all_requests[MAX_NB_ASYNC_SELECT];
95 static async_select_Request* free_requests_fifo;
99 static async_select_Request* used_requests_fifo;
103 static fd_set read_fds;
107 static fd_set write_fds;
112 #ifdef ASYNC_SELECT_USE_PIPE_FOR_NOTIFICATION
113 
114 static int8_t pipe_fds_initialized = 0;
115 static int32_t pipe_fds[2];
116 
117 #else
118 
119 volatile static int32_t notify_fd_cache = -1;
120 
121 #endif //ASYNC_SELECT_USE_PIPE_FOR_NOTIFICATION
122 
126 volatile static uint8_t async_select_fifo_initialized = 0;
127 
136 int32_t non_blocking_select(int32_t fd, SELECT_Operation operation){
137  struct timeval zero_timeout;
138  zero_timeout.tv_sec = 0;
139  zero_timeout.tv_usec = 0;
140  fd_set io_fds;
141  fd_set* read_fds_ptr;
142  fd_set* write_fds_ptr;
143  FD_ZERO(&io_fds);
144  FD_SET(fd, &io_fds);
145 
146  if(operation == SELECT_READ){
147  read_fds_ptr = &io_fds;
148  write_fds_ptr = NULL;
149  }
150  else { // SELECT_WRITE
151  read_fds_ptr = NULL;
152  write_fds_ptr = &io_fds;
153  }
154 
155  int32_t selectRes = select(fd+1, read_fds_ptr, write_fds_ptr, NULL, &zero_timeout);
156 
157  return selectRes;
158 }
159 
174 int32_t async_select(int32_t fd, SELECT_Operation operation, int64_t timeout_ms, SNI_callback callback){
175 
176  int32_t res;
177 
178  async_select_Request* request = async_select_allocate_request();
179  if(request == NULL){
180  // No request available :-(
181  return -1;
182  }
183 
184  int32_t java_thread_id = SNI_getCurrentJavaThreadID();
185  if(java_thread_id == SNI_ERROR){
186  // Not called from the VM task
187  async_select_free_unused_request(request);
188  return -1;
189  }
190 
191  LLNET_DEBUG_TRACE("async_select: async_select on fd=0x%X operation=%s thread 0x%X\n", fd, operation==SELECT_READ ? "read":"write", java_thread_id);
192  request->java_thread_id = java_thread_id;
193  request->fd = fd;
194  request->operation = operation;
195  if(timeout_ms != 0){
196  request->absolute_timeout_ms = async_select_get_current_time_ms() + timeout_ms;
197  }
198  else { // infinite timeout
199  request->absolute_timeout_ms = 0;
200  }
201 
202  SNI_suspendCurrentJavaThreadWithCallback(0, callback, NULL);
203  res = async_select_send_new_request(request);
204 
205  return res;
206 }
207 
214  // Init free requests FIFO
216  if(async_select_fifo_initialized == 0){
217  free_requests_fifo = &all_requests[0];
218  for(int i=0 ; i<MAX_NB_ASYNC_SELECT-1 ; i++){
219  all_requests[i].next = &all_requests[i+1];
220  }
221  all_requests[MAX_NB_ASYNC_SELECT-1].next = NULL;
222 
223  // Init used requests FIFO
224  used_requests_fifo = NULL;
225  async_select_fifo_initialized = 1;
226  }
228 }
229 
230 
237 
238 // If the close unblock the select we don't need to do anything here
239 #ifndef ASYNC_SELECT_CLOSE_UNBLOCK_SELECT
240 
241  // Search for the file descriptor in the used requests FIFO.
242  // For the requests that match the given fd, set the timeout
244 
245  async_select_Request* request = used_requests_fifo;
246  while(request != NULL){
247  if(request->fd == fd){
248  // Modify timeout value so that when the task will check this request
249  // it will detect a timeout.
250  request->absolute_timeout_ms = 1;
251  }
252  request = request->next;
253  }
254 
256 
257  async_select_notify_select();
258 
259 #endif //ASYNC_SELECT_CLOSE_UNBLOCK_SELECT
260 
261 }
262 
268 
269  llnet_init();
270 
272 
273  while(true){
274  // Execute a select().
275  async_select_do_select();
276  // Update the received request depending on the select() results.
277  async_select_update_notified_requests();
278  }
279 }
280 
286 static int32_t async_select_get_notify_fd(){
287 
288 
289 #ifdef ASYNC_SELECT_USE_PIPE_FOR_NOTIFICATION
290 
291  if(pipe_fds_initialized == 0){
292  if(pipe(pipe_fds) == -1 ||
293  set_socket_non_blocking(pipe_fds[0], true) != 0 ||
294  set_socket_non_blocking(pipe_fds[1], true) != 0){
295  //error : can not create the pipe
296  return -1;
297  }
298  pipe_fds_initialized = 1;
299  }
300  return pipe_fds[0];
301 
302 #else
303 
304  // Just take and release the lock to ensure that the VM task has not been preempted in the
305  // middle of async_select_notify_select() (i.e. notify_fd_cache has been set to -1 but socket
306  // has not been closed yet).
308  int32_t notify_fd = notify_fd_cache;
310  if(notify_fd != -1){
311  // fd already exists
312  return notify_fd;
313  }
314 
315  int domain;
316 
317 // If IPv6 or IPv4+IPv6 configuration, then use IPv6. Otherwise (only IPv4 configuration) use IPv4.
318 #if LLNET_AF & LLNET_AF_IPV6
319  domain = AF_INET6;
320 #else // only IPv4
321  domain = AF_INET;
322 #endif
323  // Create a simple local TCP server to wait on and call close when we want to unblock the select
324  notify_fd = llnet_socket(domain, SOCK_STREAM, IPPROTO_TCP);
325  if(notify_fd != -1){
326 
327 
328 #if LLNET_AF & LLNET_AF_IPV6
329  struct sockaddr_in6 sockaddr = {0};
330  sockaddr.sin6_family = AF_INET6;
331  sockaddr.sin6_port = llnet_htons(0);
332  sockaddr.sin6_addr = in6addr_loopback;
333 #else
334  struct sockaddr_in sockaddr = {0};
335  sockaddr.sin_family = AF_INET;
336  sockaddr.sin_port = htons(0);
337  sockaddr.sin_addr.s_addr = INADDR_LOOPBACK;
338 #endif
339  int32_t ret = llnet_bind(notify_fd, (struct sockaddr*)&sockaddr, sizeof(sockaddr));
340  if(ret != -1){
341  ret = llnet_listen(notify_fd, 1);
342  if(ret != -1){
343  // Save it for next time to avoid a new creation.
344  notify_fd_cache = notify_fd;
345  return notify_fd;
346  }
347  }
348 
349  // Something was wrong when configuring the notify_fd
350  llnet_close(notify_fd);
351  }
352 
353  return -1;
354 
355 #endif // ASYNC_SELECT_USE_PIPE_FOR_NOTIFICATION
356 }
357 
361 static void async_select_do_select(){
362 
363  async_select_Request* request;
364 
365  int32_t notify_fd = async_select_get_notify_fd();
366  // Used to save the highest fd found in the requests.
367  int32_t max_request_fd = notify_fd;
368  // Used to save the lower timeout found in the requests.
369  int64_t min_absolute_timeout_ms = INT64_MAX;
370 
371  FD_ZERO(&read_fds);
372  FD_ZERO(&write_fds);
373 
374  // add the notify file descriptor to the read select list
375  if(notify_fd != -1){
376  FD_SET(notify_fd, &read_fds);
377  }
378  else {
379  // We were not able to create the socket to unlock the select.
380  // To prevent an infinite lock of the select we will poll for
381  // incoming messages by setting a timeout to the select.
383  LLNET_DEBUG_TRACE("async_select: WARNING: notify_fd cannot be allocated, fall back in polling mode\n");
384  }
385 
386  // -----------------------------------------------------------------
387  // Add read/write waiting operations in file descriptors select list
388  // -----------------------------------------------------------------
389  request = used_requests_fifo;
390  while(request != NULL){
391  int32_t request_fd = request->fd;
392  if(request_fd > max_request_fd){
393  // Save the highest fd
394  max_request_fd = request_fd;
395  }
396 
397  int64_t request_absolute_timeout_ms = request->absolute_timeout_ms;
398  if(request_absolute_timeout_ms != 0 && request_absolute_timeout_ms < min_absolute_timeout_ms){
399  // Save the lowest timeout
400  min_absolute_timeout_ms = request_absolute_timeout_ms;
401  }
402 
403  if(request->operation == SELECT_READ){
404  FD_SET(request_fd, &read_fds);
405  }
406  else { // operation == SELECT_WRITE
407  FD_SET(request_fd, &write_fds);
408  }
409 
410  request = request->next;
411  }
412 
413  // -----------------------------
414  // Compute select timeout value
415  // -----------------------------
416 
417  // Timeout to use for the select
418  struct timeval select_timeout = {0};
419  struct timeval* select_timeout_ptr;
420 
421  if(min_absolute_timeout_ms != INT64_MAX){
422  // At least one request has a timeout.
423  select_timeout_ptr = &select_timeout;
424  int64_t min_relative_timeout_ms = min_absolute_timeout_ms - async_select_get_current_time_ms();
425  // Saturate the relative timeout to a positive value
426  if(min_relative_timeout_ms < 0){
427  // 0 means no timeout
428  min_relative_timeout_ms = 0;
429  }
430  time_ms_to_timeval(min_relative_timeout_ms, select_timeout_ptr);
431  }
432  else {
433  // No request has timeout -> NULL timeout means infinite timeout
434  select_timeout_ptr = NULL;
435  }
436 
437 
438  // --------------
439  // Do the select
440  // --------------
441  LLNET_DEBUG_TRACE("async_select: select (timeout sec=%d usec=%d)\n", (int32_t)select_timeout.tv_sec, (int32_t)select_timeout.tv_usec);
442  int32_t res = select(max_request_fd+1, &read_fds, &write_fds, NULL, select_timeout_ptr);
443 
444  if(res >= 0 || llnet_errno(-1) == EBADF){
445  //errno == EBADF when one of fd in the fdset is invalid/closed
446  //We consider that the select was succeeded in this case
447  //because all operations through an invalid/closed fd would not block
448 
449 #ifdef ASYNC_SELECT_USE_PIPE_FOR_NOTIFICATION
450  //check if notify_fd is selected and cleanup the pipe
451  if(FD_ISSET(notify_fd, &read_fds)){
452  //cleanup pipe
453  char bytes[1];
454  while(read(notify_fd, (void*)bytes, 1) > 0); //non blocking pipe fds
455  }
456 #endif
457 
458  LLNET_DEBUG_TRACE("async_select: select finished %d sockets available\n", res);
459  }
460 }
461 
466 static void async_select_update_notified_requests(){
467 
468  async_select_Request* request;
469  async_select_Request* previous_request = NULL;
470  int64_t current_time_ms = async_select_get_current_time_ms();
471 
473  // Browse all the requests to find which have been modified
474  request = used_requests_fifo;
475  while(request != NULL){
476  int32_t request_fd = request->fd;
477  bool request_timeout_reached;
478 
479  // Check if timeout has been reached.
480  if(request->absolute_timeout_ms != 0 && request->absolute_timeout_ms <= current_time_ms){
481  request_timeout_reached = true;
482  }
483  else {
484  request_timeout_reached = false;
485  }
486 
487  if((request->operation == SELECT_READ && FD_ISSET(request_fd, &read_fds)) // data received
488  || (request->operation == SELECT_WRITE && FD_ISSET(request_fd, &write_fds)) // or data sent
489  || (request_timeout_reached) // or timeout reached
490  ){
491  // Request done.
492  LLNET_DEBUG_TRACE("async_select: request done for fd=0x%X operation=%s notify thread 0x%X (%s)\n", request_fd, request->operation==SELECT_READ ? "read":"write", request->java_thread_id, request_timeout_reached==true ? "timeout":"no timeout");
493  SNI_resumeJavaThread(request->java_thread_id);
494  request = async_select_free_used_request(request, previous_request);
495  //previous_request is still the same because we have removed request from the used FIFO
496  }
497  else {
498  previous_request = request;
499  request = request->next;
500  }
501  }
503 }
504 
512 static async_select_Request* async_select_free_used_request(async_select_Request* request, async_select_Request* previous_request_in_used_fifo){
513 
514  async_select_Request* next_request;
515 
516  next_request = request->next;
517 
518  // Remove the request from the used FIFO
519  if(previous_request_in_used_fifo != NULL){
520  previous_request_in_used_fifo->next = next_request;
521  }
522  else{
523  // The request was the first in the used list
524  used_requests_fifo = next_request;
525  }
526 
527  // Add the request into the free FIFO
528  request->next = free_requests_fifo;
529  free_requests_fifo = request;
530 
531  return next_request;
532 }
533 
540 static void async_select_free_unused_request(async_select_Request* request){
541 
543 
544  // Add the request into the free FIFO
545  request->next = free_requests_fifo;
546  free_requests_fifo = request;
547 
549 }
550 
554 static int32_t async_select_send_new_request(async_select_Request* request){
555 
557  // Add the request in the used FIFO
558  request->next = used_requests_fifo;
559  used_requests_fifo = request;
561 
562  // Notify the async_select task
563  async_select_notify_select();
564 
565  return 0;
566 }
567 
578 static async_select_Request* async_select_allocate_request(){
579 
581 
582  async_select_Request* new_request = free_requests_fifo;
583  if(new_request != NULL){
584  // Remove the request from the free FIFO
585  free_requests_fifo = new_request->next;
586  }
587  // else: no request available
588 
590 
591  return new_request;
592 }
593 
599 static void async_select_notify_select(){
600 
601  int32_t res = 0;
602  int32_t notify_fd;
603 
604 #ifdef ASYNC_SELECT_USE_PIPE_FOR_NOTIFICATION
605 
606  if(pipe_fds_initialized == 1){
607  //Write through the pipe to cancel the current (or the next) blocking select operation.
608  char bytes[1] = {1};
609  notify_fd = pipe_fds[1];
610  res = write(notify_fd, (void*)bytes, 1); //pipe_fds[1] refers to the write end of the pipe.
611  }
612  else {
613  notify_fd = -1;
614  }
615 
616 #else
617 
619 
620  notify_fd = notify_fd_cache;
621  if(notify_fd != -1){
622  // async_select task is blocked on select operation. Call
623  // close function on notify_fd socket to unblock the select.
624  // WARNING: these two operations must be atomic because we don't want
625  // the async_selec task to create a new socket while we have not closed
626  // this one.
627  notify_fd_cache = -1;
628  res = llnet_close(notify_fd);
629  }
630  // else:
631  // The notify_fd_cache is not yet defined. It is not a problem because the
632  // select task will browse the requests list later (after notify_fd_cache creation).
633  // and so manage the request modification.
634 
636 
637 
638 #endif // ASYNC_SELECT_USE_PIPE_FOR_NOTIFICATION
639 
640  if(res == -1){
641  LLNET_DEBUG_TRACE("Error on notify select (notify_fd: 0x%X errno: %d)\n", notify_fd, llnet_errno(notify_fd));
642  }
643 }
644 
645 #ifdef __cplusplus
646  }
647 #endif
int64_t LLMJVM_IMPL_getCurrentTime__Z(uint8_t system)
External function used to retrieve currentTime (defined in LLMJVM)
void async_select_unlock(void)
Exit critical section for the async_select component.
int32_t non_blocking_select(int32_t fd, SELECT_Operation operation)
Execute a select() for the given file descriptor and operation without blocking.
Definition: async_select.c:136
void async_select_lock(void)
Enter critical section for the async_select component.
#define async_select_get_current_time_ms()
Definition: async_select.c:72
void async_select_request_fifo_init(void)
Initializes the requests FIFOs. This function must be called prior to any call of async_select()...
Definition: async_select.c:213
An asynchronous select request.
Definition: async_select.c:45
void async_select_task_main()
The entry point for the async_select task. This function must be called from a dedicated task...
Definition: async_select.c:267
Asynchronous network select configuration.
#define ASYNC_SELECT_POLLING_MODE_TIMEOUT_MS
Timeout in milliseconds used when the async_select task cannot allocate a socket for notifications...
struct async_select_Request async_select_Request
An asynchronous select request.
void async_select_notify_closed_fd(int32_t fd)
Notifies the async_select task that a file descriptor has been closed. On some systems the close of a...
Definition: async_select.c:236
#define MAX_NB_ASYNC_SELECT
Maximum number of asynchronous select that can be done at the same moment.
SELECT_Operation
Select operations list.
Definition: async_select.h:28
int32_t async_select(int32_t fd, SELECT_Operation operation, int64_t timeout_ms, SNI_callback callback)
Executes asynchronously a select() operation for the given file descriptor.
Definition: async_select.c:174
Asynchronous network select API.