rpc_lesson3b¶
This example is identical to the rpc_lesson3 example except that 10 copies of the server model are run via the copies model parameter and the client model makes calls the server from inside an OpenMP threaded loop. This example demonstrates the use of the RPC communication pattern in conjunction with automated model wrapping, OpenMP threading, and model duplication.
C Version¶
Model Code:
1#include <stdio.h>
2
3int model_function(char *in_buf, uint64_t length_in_buf,
4 char **out_buf, uint64_t *length_out_buf) {
5 printf("server%d(C): %s\n", atoi(getenv("YGG_MODEL_COPY")), in_buf);
6 length_out_buf[0] = length_in_buf;
7 out_buf[0] = (char*)malloc(length_in_buf);
8 memcpy(out_buf[0], in_buf, length_in_buf);
9 out_buf[0][length_in_buf] = '\0';
10 return 0;
11};
1#include <omp.h>
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int model_function(char* in_buf, uint64_t length_in_buf,
7 char** out_buf, uint64_t* length_out_buf) {
8 // Initialize yggdrasil outside the threaded section
9 ygg_init();
10
11 // Get the number of threads from an environment variable set in the yaml
12 int nthreads = atoi(getenv("NTHREAD"));
13
14 int error_code = 0;
15#ifdef _OPENMP
16 omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19 for (int i = 0; i < nthreads; ++i){
20 int flag;
21#ifdef _OPENMP
22#pragma omp critical
23 {
24#endif
25 flag = error_code;
26#ifdef _OPENMP
27 }
28#endif
29 if (flag == 0) {
30 char* out_temp = NULL;
31 uint64_t length_out_temp = 0;
32 char** out_buf_temp = &out_temp;
33 uint64_t* length_out_buf_temp = &length_out_temp;
34 if (i == 0) {
35 out_buf_temp = out_buf;
36 length_out_buf_temp = length_out_buf;
37 }
38
39 // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40 // comm persists between function calls
41 WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42 printf("client(C:%d): Sending %s (length = %d)\n",
43 i, in_buf, (int)(length_in_buf));
44 int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45 out_buf_temp, length_out_buf_temp);
46 printf("client(C:%d): Received %s (length = %d)\n",
47 i, in_buf, (int)(length_in_buf));
48 if (ret < 0) {
49 printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52 {
53#endif
54 error_code = -1;
55#ifdef _OPENMP
56 }
57#endif
58 }
59 if (i != 0) {
60 free(out_temp);
61 }
62 }
63 }
64 return error_code;
65}
Model YAML:
1---
2
3model:
4 name: server
5 language: c
6 args: ./src/server.c
7 function: model_function
8 is_server: # Creates a RPC server queue called "server"
9 input: in_buf
10 output: out_buf
11 inputs: in_buf
12 outputs: out_buf
13 copies: 5
1model:
2 name: client
3 language: c
4 args: ./src/client.c
5 function: model_function
6 client_of: server
7 compiler_flags: [-fopenmp]
8 linker_flags: [-fopenmp]
9 allow_threading: true
10 env:
11 NTHREAD: 5
12 inputs:
13 name: in_buf
14 default_file:
15 name: ./Input/input.txt
16 filetype: ascii
17 outputs:
18 name: out_buf
19 default_file:
20 name: ./client_output.txt
21 in_temp: true
C++ Version¶
Model Code:
1#include <iostream>
2
3int model_function(char *in_buf, uint64_t length_in_buf,
4 char* &out_buf, uint64_t &length_out_buf) {
5 std::cout << "server" << atoi(getenv("YGG_MODEL_COPY")) << "(C++): " << in_buf << std::endl;
6 length_out_buf = length_in_buf;
7 out_buf = (char*)realloc(out_buf, length_in_buf);
8 memcpy(out_buf, in_buf, length_in_buf);
9 out_buf[length_in_buf] = '\0';
10 return 0;
11};
1#include <omp.h>
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int model_function(char* in_buf, uint64_t length_in_buf,
7 char** out_buf, uint64_t* length_out_buf) {
8 // Initialize yggdrasil outside the threaded section
9 ygg_init();
10
11 // Get the number of threads from an environment variable set in the yaml
12 int nthreads = atoi(getenv("NTHREAD"));
13
14 int error_code = 0;
15#ifdef _OPENMP
16 omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19 for (int i = 0; i < nthreads; ++i){
20 int flag;
21#ifdef _OPENMP
22#pragma omp critical
23 {
24#endif
25 flag = error_code;
26#ifdef _OPENMP
27 }
28#endif
29 if (flag == 0) {
30 char* out_temp = NULL;
31 uint64_t length_out_temp = 0;
32 char** out_buf_temp = &out_temp;
33 uint64_t* length_out_buf_temp = &length_out_temp;
34 if (i == 0) {
35 out_buf_temp = out_buf;
36 length_out_buf_temp = length_out_buf;
37 }
38
39 // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40 // comm persists between function calls
41 WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42 printf("client(C:%d): Sending %s (length = %d)\n",
43 i, in_buf, (int)(length_in_buf));
44 int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45 out_buf_temp, length_out_buf_temp);
46 printf("client(C:%d): Received %s (length = %d)\n",
47 i, in_buf, (int)(length_in_buf));
48 if (ret < 0) {
49 printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52 {
53#endif
54 error_code = -1;
55#ifdef _OPENMP
56 }
57#endif
58 }
59 if (i != 0) {
60 free(out_temp);
61 }
62 }
63 }
64 return error_code;
65}
Model YAML:
1---
2
3model:
4 name: server
5 language: c++
6 args: ./src/server.cpp
7 function: model_function
8 is_server: True # Creates a RPC server queue called "server"
9 copies: 5
1model:
2 name: client
3 language: c
4 args: ./src/client.c
5 function: model_function
6 client_of: server
7 compiler_flags: [-fopenmp]
8 linker_flags: [-fopenmp]
9 allow_threading: true
10 env:
11 NTHREAD: 5
12 inputs:
13 name: in_buf
14 default_file:
15 name: ./Input/input.txt
16 filetype: ascii
17 outputs:
18 name: out_buf
19 default_file:
20 name: ./client_output.txt
21 in_temp: true
Fortran Version¶
Model Code:
1function model_function(in_buf, out_buf) result(out)
2 character(len=*), intent(in) :: in_buf
3 character(len=:), pointer :: out_buf
4 logical :: out
5 character(len=255) :: copy_str
6 integer :: copy
7 call get_environment_variable("YGG_MODEL_COPY", copy_str)
8 read(copy_str,*) copy
9 write(*, '("server",I1,"(Fortran): ",A)') copy, in_buf
10 out = .true.
11 allocate(character(len=len(in_buf)) :: out_buf)
12 out_buf = in_buf
13end function model_function
1#include <omp.h>
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int model_function(char* in_buf, uint64_t length_in_buf,
7 char** out_buf, uint64_t* length_out_buf) {
8 // Initialize yggdrasil outside the threaded section
9 ygg_init();
10
11 // Get the number of threads from an environment variable set in the yaml
12 int nthreads = atoi(getenv("NTHREAD"));
13
14 int error_code = 0;
15#ifdef _OPENMP
16 omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19 for (int i = 0; i < nthreads; ++i){
20 int flag;
21#ifdef _OPENMP
22#pragma omp critical
23 {
24#endif
25 flag = error_code;
26#ifdef _OPENMP
27 }
28#endif
29 if (flag == 0) {
30 char* out_temp = NULL;
31 uint64_t length_out_temp = 0;
32 char** out_buf_temp = &out_temp;
33 uint64_t* length_out_buf_temp = &length_out_temp;
34 if (i == 0) {
35 out_buf_temp = out_buf;
36 length_out_buf_temp = length_out_buf;
37 }
38
39 // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40 // comm persists between function calls
41 WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42 printf("client(C:%d): Sending %s (length = %d)\n",
43 i, in_buf, (int)(length_in_buf));
44 int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45 out_buf_temp, length_out_buf_temp);
46 printf("client(C:%d): Received %s (length = %d)\n",
47 i, in_buf, (int)(length_in_buf));
48 if (ret < 0) {
49 printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52 {
53#endif
54 error_code = -1;
55#ifdef _OPENMP
56 }
57#endif
58 }
59 if (i != 0) {
60 free(out_temp);
61 }
62 }
63 }
64 return error_code;
65}
Model YAML:
1---
2
3model:
4 name: server
5 language: fortran
6 args: ./src/server.f90
7 function: model_function
8 is_server: True # Creates a RPC server queue called "server"
9 copies: 5
1model:
2 name: client
3 language: c
4 args: ./src/client.c
5 function: model_function
6 client_of: server
7 compiler_flags: [-fopenmp]
8 linker_flags: [-fopenmp]
9 allow_threading: true
10 env:
11 NTHREAD: 5
12 inputs:
13 name: in_buf
14 default_file:
15 name: ./Input/input.txt
16 filetype: ascii
17 outputs:
18 name: out_buf
19 default_file:
20 name: ./client_output.txt
21 in_temp: true
Julia Version¶
Model Code:
1using Printf
2
3function model_function(in_buf)
4 @printf("server(Julia): %s\n", in_buf)
5 out_buf = in_buf
6 return out_buf
7end
1#include <omp.h>
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int model_function(char* in_buf, uint64_t length_in_buf,
7 char** out_buf, uint64_t* length_out_buf) {
8 // Initialize yggdrasil outside the threaded section
9 ygg_init();
10
11 // Get the number of threads from an environment variable set in the yaml
12 int nthreads = atoi(getenv("NTHREAD"));
13
14 int error_code = 0;
15#ifdef _OPENMP
16 omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19 for (int i = 0; i < nthreads; ++i){
20 int flag;
21#ifdef _OPENMP
22#pragma omp critical
23 {
24#endif
25 flag = error_code;
26#ifdef _OPENMP
27 }
28#endif
29 if (flag == 0) {
30 char* out_temp = NULL;
31 uint64_t length_out_temp = 0;
32 char** out_buf_temp = &out_temp;
33 uint64_t* length_out_buf_temp = &length_out_temp;
34 if (i == 0) {
35 out_buf_temp = out_buf;
36 length_out_buf_temp = length_out_buf;
37 }
38
39 // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40 // comm persists between function calls
41 WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42 printf("client(C:%d): Sending %s (length = %d)\n",
43 i, in_buf, (int)(length_in_buf));
44 int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45 out_buf_temp, length_out_buf_temp);
46 printf("client(C:%d): Received %s (length = %d)\n",
47 i, in_buf, (int)(length_in_buf));
48 if (ret < 0) {
49 printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52 {
53#endif
54 error_code = -1;
55#ifdef _OPENMP
56 }
57#endif
58 }
59 if (i != 0) {
60 free(out_temp);
61 }
62 }
63 }
64 return error_code;
65}
Model YAML:
1---
2
3model:
4 name: server
5 language: julia
6 args: ./src/server.jl
7 function: model_function
8 is_server: True # Creates a RPC server queue called "server"
9 copies: 5
1model:
2 name: client
3 language: c
4 args: ./src/client.c
5 function: model_function
6 client_of: server
7 compiler_flags: [-fopenmp]
8 linker_flags: [-fopenmp]
9 allow_threading: true
10 env:
11 NTHREAD: 5
12 inputs:
13 name: in_buf
14 default_file:
15 name: ./Input/input.txt
16 filetype: ascii
17 outputs:
18 name: out_buf
19 default_file:
20 name: ./client_output.txt
21 in_temp: true
Matlab Version¶
Model Code:
1function out_buf = server(in_buf)
2 fprintf('server(Matlab): %s\n', in_buf);
3 out_buf = in_buf;
4end
1#include <omp.h>
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int model_function(char* in_buf, uint64_t length_in_buf,
7 char** out_buf, uint64_t* length_out_buf) {
8 // Initialize yggdrasil outside the threaded section
9 ygg_init();
10
11 // Get the number of threads from an environment variable set in the yaml
12 int nthreads = atoi(getenv("NTHREAD"));
13
14 int error_code = 0;
15#ifdef _OPENMP
16 omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19 for (int i = 0; i < nthreads; ++i){
20 int flag;
21#ifdef _OPENMP
22#pragma omp critical
23 {
24#endif
25 flag = error_code;
26#ifdef _OPENMP
27 }
28#endif
29 if (flag == 0) {
30 char* out_temp = NULL;
31 uint64_t length_out_temp = 0;
32 char** out_buf_temp = &out_temp;
33 uint64_t* length_out_buf_temp = &length_out_temp;
34 if (i == 0) {
35 out_buf_temp = out_buf;
36 length_out_buf_temp = length_out_buf;
37 }
38
39 // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40 // comm persists between function calls
41 WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42 printf("client(C:%d): Sending %s (length = %d)\n",
43 i, in_buf, (int)(length_in_buf));
44 int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45 out_buf_temp, length_out_buf_temp);
46 printf("client(C:%d): Received %s (length = %d)\n",
47 i, in_buf, (int)(length_in_buf));
48 if (ret < 0) {
49 printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52 {
53#endif
54 error_code = -1;
55#ifdef _OPENMP
56 }
57#endif
58 }
59 if (i != 0) {
60 free(out_temp);
61 }
62 }
63 }
64 return error_code;
65}
Model YAML:
1---
2
3model:
4 name: server
5 language: matlab
6 args: ./src/server.m
7 function: server # matlab requires function to match file
8 is_server: True # Creates a RPC server queue called "server"
9 copies: 5
1model:
2 name: client
3 language: c
4 args: ./src/client.c
5 function: model_function
6 client_of: server
7 compiler_flags: [-fopenmp]
8 linker_flags: [-fopenmp]
9 allow_threading: true
10 env:
11 NTHREAD: 5
12 inputs:
13 name: in_buf
14 default_file:
15 name: ./Input/input.txt
16 filetype: ascii
17 outputs:
18 name: out_buf
19 default_file:
20 name: ./client_output.txt
21 in_temp: true
Python Version¶
Model Code:
1import os
2
3
4def model_function(in_buf):
5 print("server%s(Python): %s" % (os.environ['YGG_MODEL_COPY'], in_buf))
6 out_buf = in_buf
7 return out_buf
1#include <omp.h>
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int model_function(char* in_buf, uint64_t length_in_buf,
7 char** out_buf, uint64_t* length_out_buf) {
8 // Initialize yggdrasil outside the threaded section
9 ygg_init();
10
11 // Get the number of threads from an environment variable set in the yaml
12 int nthreads = atoi(getenv("NTHREAD"));
13
14 int error_code = 0;
15#ifdef _OPENMP
16 omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19 for (int i = 0; i < nthreads; ++i){
20 int flag;
21#ifdef _OPENMP
22#pragma omp critical
23 {
24#endif
25 flag = error_code;
26#ifdef _OPENMP
27 }
28#endif
29 if (flag == 0) {
30 char* out_temp = NULL;
31 uint64_t length_out_temp = 0;
32 char** out_buf_temp = &out_temp;
33 uint64_t* length_out_buf_temp = &length_out_temp;
34 if (i == 0) {
35 out_buf_temp = out_buf;
36 length_out_buf_temp = length_out_buf;
37 }
38
39 // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40 // comm persists between function calls
41 WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42 printf("client(C:%d): Sending %s (length = %d)\n",
43 i, in_buf, (int)(length_in_buf));
44 int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45 out_buf_temp, length_out_buf_temp);
46 printf("client(C:%d): Received %s (length = %d)\n",
47 i, in_buf, (int)(length_in_buf));
48 if (ret < 0) {
49 printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52 {
53#endif
54 error_code = -1;
55#ifdef _OPENMP
56 }
57#endif
58 }
59 if (i != 0) {
60 free(out_temp);
61 }
62 }
63 }
64 return error_code;
65}
Model YAML:
1model:
2 name: server
3 language: python
4 args: ./src/server.py
5 function: model_function
6 is_server: True
7 copies: 5
1model:
2 name: client
3 language: c
4 args: ./src/client.c
5 function: model_function
6 client_of: server
7 compiler_flags: [-fopenmp]
8 linker_flags: [-fopenmp]
9 allow_threading: true
10 env:
11 NTHREAD: 5
12 inputs:
13 name: in_buf
14 default_file:
15 name: ./Input/input.txt
16 filetype: ascii
17 outputs:
18 name: out_buf
19 default_file:
20 name: ./client_output.txt
21 in_temp: true
R Version¶
Model Code:
1model_function <- function(in_buf) {
2 fprintf('server(R): %s', in_buf)
3 out_buf <- in_buf
4 return(out_buf);
5}
1#include <omp.h>
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int model_function(char* in_buf, uint64_t length_in_buf,
7 char** out_buf, uint64_t* length_out_buf) {
8 // Initialize yggdrasil outside the threaded section
9 ygg_init();
10
11 // Get the number of threads from an environment variable set in the yaml
12 int nthreads = atoi(getenv("NTHREAD"));
13
14 int error_code = 0;
15#ifdef _OPENMP
16 omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19 for (int i = 0; i < nthreads; ++i){
20 int flag;
21#ifdef _OPENMP
22#pragma omp critical
23 {
24#endif
25 flag = error_code;
26#ifdef _OPENMP
27 }
28#endif
29 if (flag == 0) {
30 char* out_temp = NULL;
31 uint64_t length_out_temp = 0;
32 char** out_buf_temp = &out_temp;
33 uint64_t* length_out_buf_temp = &length_out_temp;
34 if (i == 0) {
35 out_buf_temp = out_buf;
36 length_out_buf_temp = length_out_buf;
37 }
38
39 // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40 // comm persists between function calls
41 WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42 printf("client(C:%d): Sending %s (length = %d)\n",
43 i, in_buf, (int)(length_in_buf));
44 int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45 out_buf_temp, length_out_buf_temp);
46 printf("client(C:%d): Received %s (length = %d)\n",
47 i, in_buf, (int)(length_in_buf));
48 if (ret < 0) {
49 printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52 {
53#endif
54 error_code = -1;
55#ifdef _OPENMP
56 }
57#endif
58 }
59 if (i != 0) {
60 free(out_temp);
61 }
62 }
63 }
64 return error_code;
65}
Model YAML:
1---
2
3model:
4 name: server
5 language: R
6 args: ./src/server.R
7 function: model_function
8 is_server: True # Creates a RPC server queue called "server"
9 copies: 5
1model:
2 name: client
3 language: c
4 args: ./src/client.c
5 function: model_function
6 client_of: server
7 compiler_flags: [-fopenmp]
8 linker_flags: [-fopenmp]
9 allow_threading: true
10 env:
11 NTHREAD: 5
12 inputs:
13 name: in_buf
14 default_file:
15 name: ./Input/input.txt
16 filetype: ascii
17 outputs:
18 name: out_buf
19 default_file:
20 name: ./client_output.txt
21 in_temp: true