OpenMP Threading in ModelsΒΆ

Models written in C, C++, or Fortran can make use of threading via OpenMP, but need to follow a few rules to work nicely with yggdrasil.

  1. Models that will use threading need to have allow_threading: true in their YAML specification to tell yggdrasil that comms should allow multiple threads to connect to the same comm.

  2. In their source code, models need to call the ygg_init() funciton before any threaded sections that make calls to the yggdrasil interface.

  3. Comms that are used inside threads must be initialized by the thread that will use it. Each thread can connect to the same channel (use the same name etc.), but it must be initialized on the thread.

  4. Comms that are used inside threads must be initialized using the WITH_GLOBAL_SCOPE macro so that comms are stored for reuse during subsequent calls to the same interface initialization.

  5. Do not explicitly cleanup comms that are used inside threads. Doing so may cause the connection to be permanently disconnected. yggdrasil will clean these up at exit.

An example using threading can be seen below that follows these rules

Model Code:

 1#include <iostream>
 2
 3int model_function(char *in_buf, uint64_t length_in_buf,
 4		   char* &out_buf, uint64_t &length_out_buf) {
 5  std::cout << "server" << atoi(getenv("YGG_MODEL_COPY")) << "(C++): " << in_buf << std::endl;
 6  length_out_buf = length_in_buf;
 7  out_buf = (char*)realloc(out_buf, length_in_buf);
 8  memcpy(out_buf, in_buf, length_in_buf);
 9  out_buf[length_in_buf] = '\0';
10  return 0;
11};
 1#include <omp.h>
 2#include "YggInterface.h"
 3#include <stdio.h>
 4
 5
 6int model_function(char* in_buf, uint64_t length_in_buf,
 7		   char** out_buf, uint64_t* length_out_buf) {
 8  // Initialize yggdrasil outside the threaded section
 9  ygg_init();
10  
11  // Get the number of threads from an environment variable set in the yaml
12  int nthreads = atoi(getenv("NTHREAD"));
13  
14  int error_code = 0;
15#ifdef _OPENMP
16  omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19  for (int i = 0; i < nthreads; ++i){
20    int flag;
21#ifdef _OPENMP
22#pragma omp critical
23    {
24#endif
25      flag = error_code;
26#ifdef _OPENMP
27    }
28#endif
29    if (flag == 0) {
30      char* out_temp = NULL;
31      uint64_t length_out_temp = 0;
32      char** out_buf_temp = &out_temp;
33      uint64_t* length_out_buf_temp = &length_out_temp;
34      if (i == 0) {
35	out_buf_temp = out_buf;
36	length_out_buf_temp = length_out_buf;
37      }
38      
39      // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40      // comm persists between function calls
41      WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42      printf("client(C:%d): Sending %s (length = %d)\n",
43	     i, in_buf, (int)(length_in_buf));
44      int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45			       out_buf_temp, length_out_buf_temp);
46      printf("client(C:%d): Received %s (length = %d)\n",
47	     i, in_buf, (int)(length_in_buf));
48      if (ret < 0) {
49	printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52	{
53#endif
54	  error_code = -1;
55#ifdef _OPENMP
56	}
57#endif
58      }
59      if (i != 0) {
60	free(out_temp);
61      }
62    }
63  }
64  return error_code;
65}

(Example in other languages)

Model YAML:

1---
2
3model:
4  name: server
5  language: c++
6  args: ./src/server.cpp
7  function: model_function
8  is_server: True  # Creates a RPC server queue called "server"
9  copies: 5
 1model:
 2  name: client
 3  language: c
 4  args: ./src/client.c
 5  function: model_function
 6  client_of: server
 7  compiler_flags: [-fopenmp]
 8  linker_flags: [-fopenmp]
 9  allow_threading: true
10  env:
11    NTHREAD: 5
12  inputs:
13    name: in_buf
14    default_file:
15      name: ./Input/input.txt
16      filetype: ascii
17  outputs:
18    name: out_buf
19    default_file:
20      name: ./client_output.txt
21      in_temp: true

(Example in other languages)