OpenMP Threading in ModelsΒΆ
Models written in C, C++, or Fortran can make use of threading via OpenMP, but need to follow a few rules to work nicely with yggdrasil.
Models that will use threading need to have
allow_threading: true
in their YAML specification to tell yggdrasil that comms should allow multiple threads to connect to the same comm.In their source code, models need to call the
ygg_init()
funciton before any threaded sections that make calls to the yggdrasil interface.Comms that are used inside threads must be initialized by the thread that will use it. Each thread can connect to the same channel (use the same name etc.), but it must be initialized on the thread.
Comms that are used inside threads must be initialized using the
WITH_GLOBAL_SCOPE
macro so that comms are stored for reuse during subsequent calls to the same interface initialization.Do not explicitly cleanup comms that are used inside threads. Doing so may cause the connection to be permanently disconnected. yggdrasil will clean these up at exit.
An example using threading can be seen below that follows these rules
Model Code:
1#include <iostream>
2
3int model_function(char *in_buf, uint64_t length_in_buf,
4 char* &out_buf, uint64_t &length_out_buf) {
5 std::cout << "server" << atoi(getenv("YGG_MODEL_COPY")) << "(C++): " << in_buf << std::endl;
6 length_out_buf = length_in_buf;
7 out_buf = (char*)realloc(out_buf, length_in_buf);
8 memcpy(out_buf, in_buf, length_in_buf);
9 out_buf[length_in_buf] = '\0';
10 return 0;
11};
1#include <omp.h>
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int model_function(char* in_buf, uint64_t length_in_buf,
7 char** out_buf, uint64_t* length_out_buf) {
8 // Initialize yggdrasil outside the threaded section
9 ygg_init();
10
11 // Get the number of threads from an environment variable set in the yaml
12 int nthreads = atoi(getenv("NTHREAD"));
13
14 int error_code = 0;
15#ifdef _OPENMP
16 omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19 for (int i = 0; i < nthreads; ++i){
20 int flag;
21#ifdef _OPENMP
22#pragma omp critical
23 {
24#endif
25 flag = error_code;
26#ifdef _OPENMP
27 }
28#endif
29 if (flag == 0) {
30 char* out_temp = NULL;
31 uint64_t length_out_temp = 0;
32 char** out_buf_temp = &out_temp;
33 uint64_t* length_out_buf_temp = &length_out_temp;
34 if (i == 0) {
35 out_buf_temp = out_buf;
36 length_out_buf_temp = length_out_buf;
37 }
38
39 // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40 // comm persists between function calls
41 WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42 printf("client(C:%d): Sending %s (length = %d)\n",
43 i, in_buf, (int)(length_in_buf));
44 int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45 out_buf_temp, length_out_buf_temp);
46 printf("client(C:%d): Received %s (length = %d)\n",
47 i, in_buf, (int)(length_in_buf));
48 if (ret < 0) {
49 printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52 {
53#endif
54 error_code = -1;
55#ifdef _OPENMP
56 }
57#endif
58 }
59 if (i != 0) {
60 free(out_temp);
61 }
62 }
63 }
64 return error_code;
65}
Model YAML:
1---
2
3model:
4 name: server
5 language: c++
6 args: ./src/server.cpp
7 function: model_function
8 is_server: True # Creates a RPC server queue called "server"
9 copies: 5
1model:
2 name: client
3 language: c
4 args: ./src/client.c
5 function: model_function
6 client_of: server
7 compiler_flags: [-fopenmp]
8 linker_flags: [-fopenmp]
9 allow_threading: true
10 env:
11 NTHREAD: 5
12 inputs:
13 name: in_buf
14 default_file:
15 name: ./Input/input.txt
16 filetype: ascii
17 outputs:
18 name: out_buf
19 default_file:
20 name: ./client_output.txt
21 in_temp: true