rpc_lesson3b¶
This example is identical to the rpc_lesson3 example except that 10 copies of the server model are run via the copies model parameter and the client model makes calls the server from inside an OpenMP threaded loop. This example demonstrates the use of the RPC communication pattern in conjunction with automated model wrapping, OpenMP threading, and model duplication.
C Version¶
Model Code:
1#include <stdio.h>
2
3int model_function(char *in_buf, uint64_t length_in_buf,
4 char **out_buf, uint64_t *length_out_buf) {
5 printf("server%d(C): %s\n", atoi(getenv("YGG_MODEL_COPY")), in_buf);
6 length_out_buf[0] = length_in_buf;
7 out_buf[0] = (char*)malloc(length_in_buf);
8 memcpy(out_buf[0], in_buf, length_in_buf);
9 out_buf[0][length_in_buf] = '\0';
10 return 0;
11};
1#include <omp.h>
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int model_function(char* in_buf, uint64_t length_in_buf,
7 char** out_buf, uint64_t* length_out_buf) {
8 // Initialize yggdrasil outside the threaded section
9 ygg_init();
10
11 // Get the number of threads from an environment variable set in the yaml
12 int nthreads = atoi(getenv("NTHREAD"));
13
14 int error_code = 0;
15#ifdef _OPENMP
16 omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19 for (int i = 0; i < nthreads; ++i){
20 int flag;
21#ifdef _OPENMP
22#pragma omp critical
23 {
24#endif
25 flag = error_code;
26#ifdef _OPENMP
27 }
28#endif
29 if (flag == 0) {
30 char* out_temp = NULL;
31 uint64_t length_out_temp = 0;
32 char** out_buf_temp = &out_temp;
33 uint64_t* length_out_buf_temp = &length_out_temp;
34 if (i == 0) {
35 out_buf_temp = out_buf;
36 length_out_buf_temp = length_out_buf;
37 }
38
39 // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40 // comm persists between function calls
41 WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42 printf("client(C:%d): Sending %s (length = %d)\n",
43 i, in_buf, (int)(length_in_buf));
44 int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45 out_buf_temp, length_out_buf_temp);
46 printf("client(C:%d): Received %s (length = %d)\n",
47 i, in_buf, (int)(length_in_buf));
48 if (ret < 0) {
49 printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52 {
53#endif
54 error_code = -1;
55#ifdef _OPENMP
56 }
57#endif
58 }
59 if (i != 0) {
60 free(out_temp);
61 }
62 }
63 }
64 return error_code;
65}
Model YAML:
1---
2
3model:
4 name: server
5 language: c
6 args: ./src/server.c
7 function: model_function
8 is_server: # Creates a RPC server queue called "server"
9 input: in_buf
10 output: out_buf
11 inputs: in_buf
12 outputs: out_buf
13 copies: 10
1model:
2 name: client
3 language: c
4 args: ./src/client.c
5 function: model_function
6 client_of: server
7 compiler_flags: [-fopenmp]
8 linker_flags: [-fopenmp]
9 allow_threading: true
10 env:
11 NTHREAD: 10
12 inputs:
13 name: in_buf
14 default_file:
15 name: ./Input/input.txt
16 filetype: ascii
17 outputs:
18 name: out_buf
19 default_file:
20 name: ./client_output.txt
21 in_temp: true
C++ Version¶
Model Code:
1#include <iostream>
2
3int model_function(char *in_buf, uint64_t length_in_buf,
4 char* &out_buf, uint64_t &length_out_buf) {
5 std::cout << "server" << atoi(getenv("YGG_MODEL_COPY")) << "(C++): " << in_buf << std::endl;
6 length_out_buf = length_in_buf;
7 out_buf = (char*)realloc(out_buf, length_in_buf);
8 memcpy(out_buf, in_buf, length_in_buf);
9 out_buf[length_in_buf] = '\0';
10 return 0;
11};
1#include <omp.h>
2#include "YggInterface.hpp"
3#include <stdio.h>
4
5
6int model_function(char* in_buf, uint64_t length_in_buf,
7 char* &out_buf, uint64_t &length_out_buf) {
8 // Initialize yggdrasil outside the threaded section
9 ygg_init();
10
11 // Get the number of threads from an environment variable set in the yaml
12 int nthreads = atoi(getenv("NTHREAD"));
13
14 int error_code = 0;
15#ifdef _OPENMP
16 omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19 for (int i = 0; i < nthreads; ++i){
20 int flag;
21#ifdef _OPENMP
22#pragma omp critical
23 {
24#endif
25 flag = error_code;
26#ifdef _OPENMP
27 }
28#endif
29 if (flag == 0) {
30 char* out_temp = NULL;
31 uint64_t length_out_temp = 0;
32 char** out_buf_temp = &out_temp;
33 uint64_t* length_out_buf_temp = &length_out_temp;
34 if (i == 0) {
35 out_buf_temp = &out_buf;
36 length_out_buf_temp = &length_out_buf;
37 }
38
39 // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40 // comm persists between function calls
41 WITH_GLOBAL_SCOPE(YggRpcClient rpc("server_client", "%s", "%s"));
42 std::cout << "client(C++:" << i << "): Sending " << in_buf
43 << " (length = " << length_in_buf << ")" << std::endl;
44 int ret = rpc.callRealloc(4, in_buf, length_in_buf,
45 out_buf_temp, length_out_buf_temp);
46 std::cout << "client(C++:" << i << "): Received " << *out_buf_temp
47 << " (length = " << *length_out_buf_temp << ")" << std::endl;
48 if (ret < 0) {
49 std::cout << "client(C++:" << i << "): RPC CALL ERROR" << std::endl;
50#ifdef _OPENMP
51#pragma omp critical
52 {
53#endif
54 error_code = -1;
55#ifdef _OPENMP
56 }
57#endif
58 }
59 if (i != 0) {
60 free(out_temp);
61 }
62 }
63 }
64 return error_code;
65}
Model YAML:
1---
2
3model:
4 name: server
5 language: c++
6 args: ./src/server.cpp
7 function: model_function
8 is_server: True # Creates a RPC server queue called "server"
9 copies: 10
1model:
2 name: client
3 language: cpp
4 args: ./src/client.cpp
5 function: model_function
6 client_of: server
7 compiler_flags: [-fopenmp]
8 linker_flags: [-fopenmp]
9 allow_threading: true
10 env:
11 NTHREAD: 10
12 inputs:
13 name: in_buf
14 default_file:
15 name: ./Input/input.txt
16 filetype: ascii
17 outputs:
18 name: out_buf
19 default_file:
20 name: ./client_output.txt
21 in_temp: true
Fortran Version¶
Model Code:
1function model_function(in_buf, out_buf) result(out)
2 character(len=*), intent(in) :: in_buf
3 character(len=:), pointer :: out_buf
4 logical :: out
5 character(len=255) :: copy_str
6 integer :: copy
7 call get_environment_variable("YGG_MODEL_COPY", copy_str)
8 read(copy_str,*) copy
9 write(*, '("server",I1,"(Fortran): ",A)') copy, in_buf
10 out = .true.
11 allocate(character(len=len(in_buf)) :: out_buf)
12 out_buf = in_buf
13end function model_function
1! use omp_lib
2function model_function(in_buf, out_buf) result(ret)
3 character(len=*), intent(in) :: in_buf
4 type(yggchar_r) :: out_buf
5 logical :: ret
6 type(yggcomm) :: rpc
7 character(len=255) :: nthreads_str
8 integer :: nthreads, i, j, error_code, flag
9 type(yggchar_r) :: out_temp
10 error_code = 0
11 flag = 0
12
13 ! Initialize yggdrasil outside the threaded section
14 ret = ygg_init()
15 if (.not.ret) then
16 write(*, '("client(F): ERROR initializing yggdrasil")')
17 end if
18
19 ! Get the number of threads from an environment variable set in the yaml
20 call get_environment_variable("NTHREAD", nthreads_str)
21 read(nthreads_str,*) nthreads
22 call omp_set_num_threads(nthreads)
23
24 !$OMP PARALLEL DO PRIVATE(ret,flag,out_temp,i,j,rpc) SHARED(error_code,out_buf,in_buf,nthreads)
25 do i=1,nthreads
26 !$OMP CRITICAL
27 flag = error_code
28 !$OMP END CRITICAL
29
30 if (flag.eq.0) then
31 ! The WITH_GLOBAL_SCOPE macro is required to ensure that the
32 ! comm persists between function calls
33 !$OMP CRITICAL
34 WITH_GLOBAL_SCOPE(rpc = ygg_rpc_client("server_client"))
35 write(*, '("client(F:",i2,"): ",A," (length = ",I3,")")') i, in_buf, len(in_buf)
36 out_temp%x => null()
37 ret = ygg_rpc_call_realloc(rpc, yggarg(in_buf), yggarg(out_temp))
38 !$OMP END CRITICAL
39 if (.not.ret) then
40 write(*, '("client(F:",i2,"): RPC CALL ERROR")') i
41 !$OMP CRITICAL
42 error_code = -1
43 !$OMP END CRITICAL
44 end if
45
46 if (i.eq.1) then
47 !$OMP CRITICAL
48 allocate(out_buf%x(size(out_temp%x)))
49 out_buf%x = out_temp%x;
50 !$OMP END CRITICAL
51 end if
52 end if
53 end do
54 !$OMP END PARALLEL DO
55 if (error_code.ne.0) stop 1
56end function model_function
Model YAML:
1---
2
3model:
4 name: server
5 language: fortran
6 args: ./src/server.f90
7 function: model_function
8 is_server: True # Creates a RPC server queue called "server"
9 copies: 10
1model:
2 name: client
3 language: fortran
4 args: ./src/client.f90
5 function: model_function
6 client_of: server
7 compiler_flags: [-fopenmp]
8 linker_flags: [-fopenmp]
9 allow_threading: true
10 env:
11 NTHREAD: 10
12 inputs:
13 name: in_buf
14 default_file:
15 name: ./Input/input.txt
16 filetype: ascii
17 outputs:
18 name: out_buf
19 default_file:
20 name: ./client_output.txt
21 in_temp: true
Python Version¶
Model Code:
1import os
2
3
4def model_function(in_buf):
5 print("server%s(Python): %s" % (os.environ['YGG_MODEL_COPY'], in_buf))
6 out_buf = in_buf
7 return out_buf
1import os
2from yggdrasil.languages.Python.YggInterface import YggRpcClient
3
4
5def model_function(in_buf):
6 # Get the number of threads from an environment variable set in the yaml
7 nthreads = int(os.environ["NTHREAD"])
8 for i in range(nthreads):
9
10 # The global_scope keyword is required to ensure that the comm
11 # persists between function calls
12 rpc = YggRpcClient('server_client', global_scope=True)
13 print("client(Python:%d): %s" % (i, in_buf))
14 ret, result = rpc.call(in_buf)
15 if not ret:
16 raise RuntimeError('client(Python:%d): RPC CALL ERROR' % i)
17 out_buf = result
18
19 return out_buf
Model YAML:
1model:
2 name: server
3 language: python
4 args: ./src/server.py
5 function: model_function
6 is_server: True
7 copies: 10
1model:
2 name: client
3 language: python
4 args: ./src/client.py
5 function: model_function
6 client_of: server
7 allow_threading: true
8 env:
9 NTHREAD: 10
10 inputs:
11 name: in_buf
12 default_file:
13 name: ./Input/input.txt
14 filetype: ascii
15 outputs:
16 name: out_buf
17 default_file:
18 name: ./client_output.txt
19 in_temp: true