rpc_lesson3b

This example is identical to the rpc_lesson3 example except that 10 copies of the server model are run via the copies model parameter and the client model makes calls the server from inside an OpenMP threaded loop. This example demonstrates the use of the RPC communication pattern in conjunction with automated model wrapping, OpenMP threading, and model duplication.

C Version

Model Code:

 1#include <stdio.h>
 2
 3int model_function(char *in_buf, uint64_t length_in_buf,
 4		   char **out_buf, uint64_t *length_out_buf) {
 5  printf("server%d(C): %s\n", atoi(getenv("YGG_MODEL_COPY")), in_buf);
 6  length_out_buf[0] = length_in_buf;
 7  out_buf[0] = (char*)malloc(length_in_buf);
 8  memcpy(out_buf[0], in_buf, length_in_buf);
 9  out_buf[0][length_in_buf] = '\0';
10  return 0;
11};
 1#include <omp.h>
 2#include "YggInterface.h"
 3#include <stdio.h>
 4
 5
 6int model_function(char* in_buf, uint64_t length_in_buf,
 7		   char** out_buf, uint64_t* length_out_buf) {
 8  // Initialize yggdrasil outside the threaded section
 9  ygg_init();
10  
11  // Get the number of threads from an environment variable set in the yaml
12  int nthreads = atoi(getenv("NTHREAD"));
13  
14  int error_code = 0;
15#ifdef _OPENMP
16  omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19  for (int i = 0; i < nthreads; ++i){
20    int flag;
21#ifdef _OPENMP
22#pragma omp critical
23    {
24#endif
25      flag = error_code;
26#ifdef _OPENMP
27    }
28#endif
29    if (flag == 0) {
30      char* out_temp = NULL;
31      uint64_t length_out_temp = 0;
32      char** out_buf_temp = &out_temp;
33      uint64_t* length_out_buf_temp = &length_out_temp;
34      if (i == 0) {
35	out_buf_temp = out_buf;
36	length_out_buf_temp = length_out_buf;
37      }
38      
39      // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40      // comm persists between function calls
41      WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42      printf("client(C:%d): Sending %s (length = %d)\n",
43	     i, in_buf, (int)(length_in_buf));
44      int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45			       out_buf_temp, length_out_buf_temp);
46      printf("client(C:%d): Received %s (length = %d)\n",
47	     i, in_buf, (int)(length_in_buf));
48      if (ret < 0) {
49	printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52	{
53#endif
54	  error_code = -1;
55#ifdef _OPENMP
56	}
57#endif
58      }
59      if (i != 0) {
60	free(out_temp);
61      }
62    }
63  }
64  return error_code;
65}

Model YAML:

 1---
 2
 3model:
 4  name: server
 5  language: c
 6  args: ./src/server.c
 7  function: model_function
 8  is_server:  # Creates a RPC server queue called "server"
 9    input: in_buf
10    output: out_buf
11  inputs: in_buf
12  outputs: out_buf
13  copies: 10
 1model:
 2  name: client
 3  language: c
 4  args: ./src/client.c
 5  function: model_function
 6  client_of: server
 7  compiler_flags: [-fopenmp]
 8  linker_flags: [-fopenmp]
 9  allow_threading: true
10  env:
11    NTHREAD: 10
12  inputs:
13    name: in_buf
14    default_file:
15      name: ./Input/input.txt
16      filetype: ascii
17  outputs:
18    name: out_buf
19    default_file:
20      name: ./client_output.txt
21      in_temp: true

C++ Version

Model Code:

 1#include <iostream>
 2
 3int model_function(char *in_buf, uint64_t length_in_buf,
 4		   char* &out_buf, uint64_t &length_out_buf) {
 5  std::cout << "server" << atoi(getenv("YGG_MODEL_COPY")) << "(C++): " << in_buf << std::endl;
 6  length_out_buf = length_in_buf;
 7  out_buf = (char*)realloc(out_buf, length_in_buf);
 8  memcpy(out_buf, in_buf, length_in_buf);
 9  out_buf[length_in_buf] = '\0';
10  return 0;
11};
 1#include <omp.h>
 2#include "YggInterface.hpp"
 3#include <stdio.h>
 4
 5
 6int model_function(char* in_buf, uint64_t length_in_buf,
 7		   char* &out_buf, uint64_t &length_out_buf) {
 8  // Initialize yggdrasil outside the threaded section
 9  ygg_init();
10  
11  // Get the number of threads from an environment variable set in the yaml
12  int nthreads = atoi(getenv("NTHREAD"));
13  
14  int error_code = 0;
15#ifdef _OPENMP
16  omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19  for (int i = 0; i < nthreads; ++i){
20    int flag;
21#ifdef _OPENMP
22#pragma omp critical
23    {
24#endif
25      flag = error_code;
26#ifdef _OPENMP
27    }
28#endif
29    if (flag == 0) {
30      char* out_temp = NULL;
31      uint64_t length_out_temp = 0;
32      char** out_buf_temp = &out_temp;
33      uint64_t* length_out_buf_temp = &length_out_temp;
34      if (i == 0) {
35	out_buf_temp = &out_buf;
36	length_out_buf_temp = &length_out_buf;
37      }
38      
39      // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40      // comm persists between function calls
41      WITH_GLOBAL_SCOPE(YggRpcClient rpc("server_client", "%s", "%s"));
42      std::cout << "client(C++:" << i << "): Sending " << in_buf
43		<< " (length = " << length_in_buf << ")" << std::endl;
44      int ret = rpc.callRealloc(4, in_buf, length_in_buf,
45				out_buf_temp, length_out_buf_temp);
46      std::cout << "client(C++:" << i << "): Received " << *out_buf_temp
47		<< " (length = " << *length_out_buf_temp << ")" << std::endl;
48      if (ret < 0) {
49	std::cout << "client(C++:" << i << "): RPC CALL ERROR" << std::endl;
50#ifdef _OPENMP
51#pragma omp critical
52	{
53#endif
54	  error_code = -1;
55#ifdef _OPENMP
56	}
57#endif
58      }
59      if (i != 0) {
60	free(out_temp);
61      }
62    }
63  }
64  return error_code;
65}

Model YAML:

1---
2
3model:
4  name: server
5  language: c++
6  args: ./src/server.cpp
7  function: model_function
8  is_server: True  # Creates a RPC server queue called "server"
9  copies: 10
 1model:
 2  name: client
 3  language: cpp
 4  args: ./src/client.cpp
 5  function: model_function
 6  client_of: server
 7  compiler_flags: [-fopenmp]
 8  linker_flags: [-fopenmp]
 9  allow_threading: true
10  env:
11    NTHREAD: 10
12  inputs:
13    name: in_buf
14    default_file:
15      name: ./Input/input.txt
16      filetype: ascii
17  outputs:
18    name: out_buf
19    default_file:
20      name: ./client_output.txt
21      in_temp: true

Fortran Version

Model Code:

 1function model_function(in_buf, out_buf) result(out)
 2  character(len=*), intent(in) :: in_buf
 3  character(len=:), pointer :: out_buf
 4  logical :: out
 5  character(len=255) :: copy_str
 6  integer :: copy
 7  call get_environment_variable("YGG_MODEL_COPY", copy_str)
 8  read(copy_str,*) copy
 9  write(*, '("server",I1,"(Fortran): ",A)') copy, in_buf
10  out = .true.
11  allocate(character(len=len(in_buf)) :: out_buf)
12  out_buf = in_buf
13end function model_function
 1! use omp_lib
 2function model_function(in_buf, out_buf) result(ret)
 3  character(len=*), intent(in) :: in_buf
 4  type(yggchar_r) :: out_buf
 5  logical :: ret
 6  type(yggcomm) :: rpc
 7  character(len=255) :: nthreads_str
 8  integer :: nthreads, i, j, error_code, flag
 9  type(yggchar_r) :: out_temp
10  error_code = 0
11  flag = 0
12
13  ! Initialize yggdrasil outside the threaded section
14  ret = ygg_init()
15  if (.not.ret) then
16     write(*, '("client(F): ERROR initializing yggdrasil")')
17  end if
18
19  ! Get the number of threads from an environment variable set in the yaml
20  call get_environment_variable("NTHREAD", nthreads_str)
21  read(nthreads_str,*) nthreads
22  call omp_set_num_threads(nthreads)
23  
24  !$OMP PARALLEL DO PRIVATE(ret,flag,out_temp,i,j,rpc) SHARED(error_code,out_buf,in_buf,nthreads)
25  do i=1,nthreads
26     !$OMP CRITICAL
27     flag = error_code
28     !$OMP END CRITICAL
29
30     if (flag.eq.0) then
31        ! The WITH_GLOBAL_SCOPE macro is required to ensure that the
32        ! comm persists between function calls
33        !$OMP CRITICAL
34        WITH_GLOBAL_SCOPE(rpc = ygg_rpc_client("server_client"))
35        write(*, '("client(F:",i2,"): ",A," (length = ",I3,")")') i, in_buf, len(in_buf)
36        out_temp%x => null()
37        ret = ygg_rpc_call_realloc(rpc, yggarg(in_buf), yggarg(out_temp))
38        !$OMP END CRITICAL
39        if (.not.ret) then
40           write(*, '("client(F:",i2,"): RPC CALL ERROR")') i
41           !$OMP CRITICAL
42           error_code = -1
43           !$OMP END CRITICAL
44        end if
45
46        if (i.eq.1) then
47           !$OMP CRITICAL
48           allocate(out_buf%x(size(out_temp%x)))
49           out_buf%x = out_temp%x;
50           !$OMP END CRITICAL
51        end if
52     end if
53  end do
54  !$OMP END PARALLEL DO
55  if (error_code.ne.0) stop 1
56end function model_function

Model YAML:

1---
2
3model:
4  name: server
5  language: fortran
6  args: ./src/server.f90
7  function: model_function
8  is_server: True  # Creates a RPC server queue called "server"
9  copies: 10
 1model:
 2  name: client
 3  language: fortran
 4  args: ./src/client.f90
 5  function: model_function
 6  client_of: server
 7  compiler_flags: [-fopenmp]
 8  linker_flags: [-fopenmp]
 9  allow_threading: true
10  env:
11    NTHREAD: 10
12  inputs:
13    name: in_buf
14    default_file:
15      name: ./Input/input.txt
16      filetype: ascii
17  outputs:
18    name: out_buf
19    default_file:
20      name: ./client_output.txt
21      in_temp: true

Python Version

Model Code:

1import os
2
3
4def model_function(in_buf):
5    print("server%s(Python): %s" % (os.environ['YGG_MODEL_COPY'], in_buf))
6    out_buf = in_buf
7    return out_buf
 1import os
 2from yggdrasil.languages.Python.YggInterface import YggRpcClient
 3
 4
 5def model_function(in_buf):
 6    # Get the number of threads from an environment variable set in the yaml
 7    nthreads = int(os.environ["NTHREAD"])
 8    for i in range(nthreads):
 9    
10        # The global_scope keyword is required to ensure that the comm
11        # persists between function calls
12        rpc = YggRpcClient('server_client', global_scope=True)
13        print("client(Python:%d): %s" % (i, in_buf))
14        ret, result = rpc.call(in_buf)
15        if not ret:
16            raise RuntimeError('client(Python:%d): RPC CALL ERROR' % i)
17        out_buf = result
18
19    return out_buf

Model YAML:

1model:
2  name: server
3  language: python
4  args: ./src/server.py
5  function: model_function
6  is_server: True
7  copies: 10
 1model:
 2  name: client
 3  language: python
 4  args: ./src/client.py
 5  function: model_function
 6  client_of: server
 7  allow_threading: true
 8  env:
 9    NTHREAD: 10
10  inputs:
11    name: in_buf
12    default_file:
13      name: ./Input/input.txt
14      filetype: ascii
15  outputs:
16    name: out_buf
17    default_file:
18      name: ./client_output.txt
19      in_temp: true