rpc_lesson3b

This example is identical to the rpc_lesson3 example except that 10 copies of the server model are run via the copies model parameter and the client model makes calls the server from inside an OpenMP threaded loop. This example demonstrates the use of the RPC communication pattern in conjunction with automated model wrapping, OpenMP threading, and model duplication.

C Version

Model Code:

 1#include <stdio.h>
 2
 3int model_function(char *in_buf, uint64_t length_in_buf,
 4		   char **out_buf, uint64_t *length_out_buf) {
 5  printf("server%d(C): %s\n", atoi(getenv("YGG_MODEL_COPY")), in_buf);
 6  length_out_buf[0] = length_in_buf;
 7  out_buf[0] = (char*)malloc(length_in_buf);
 8  memcpy(out_buf[0], in_buf, length_in_buf);
 9  out_buf[0][length_in_buf] = '\0';
10  return 0;
11};
 1#include <omp.h>
 2#include "YggInterface.h"
 3#include <stdio.h>
 4
 5
 6int model_function(char* in_buf, uint64_t length_in_buf,
 7		   char** out_buf, uint64_t* length_out_buf) {
 8  // Initialize yggdrasil outside the threaded section
 9  ygg_init();
10  
11  // Get the number of threads from an environment variable set in the yaml
12  int nthreads = atoi(getenv("NTHREAD"));
13  
14  int error_code = 0;
15#ifdef _OPENMP
16  omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19  for (int i = 0; i < nthreads; ++i){
20    int flag;
21#ifdef _OPENMP
22#pragma omp critical
23    {
24#endif
25      flag = error_code;
26#ifdef _OPENMP
27    }
28#endif
29    if (flag == 0) {
30      char* out_temp = NULL;
31      uint64_t length_out_temp = 0;
32      char** out_buf_temp = &out_temp;
33      uint64_t* length_out_buf_temp = &length_out_temp;
34      if (i == 0) {
35	out_buf_temp = out_buf;
36	length_out_buf_temp = length_out_buf;
37      }
38      
39      // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40      // comm persists between function calls
41      WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42      printf("client(C:%d): Sending %s (length = %d)\n",
43	     i, in_buf, (int)(length_in_buf));
44      int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45			       out_buf_temp, length_out_buf_temp);
46      printf("client(C:%d): Received %s (length = %d)\n",
47	     i, in_buf, (int)(length_in_buf));
48      if (ret < 0) {
49	printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52	{
53#endif
54	  error_code = -1;
55#ifdef _OPENMP
56	}
57#endif
58      }
59      if (i != 0) {
60	free(out_temp);
61      }
62    }
63  }
64  return error_code;
65}

Model YAML:

 1---
 2
 3model:
 4  name: server
 5  language: c
 6  args: ./src/server.c
 7  function: model_function
 8  is_server:  # Creates a RPC server queue called "server"
 9    input: in_buf
10    output: out_buf
11  inputs: in_buf
12  outputs: out_buf
13  copies: 5
 1model:
 2  name: client
 3  language: c
 4  args: ./src/client.c
 5  function: model_function
 6  client_of: server
 7  compiler_flags: [-fopenmp]
 8  linker_flags: [-fopenmp]
 9  allow_threading: true
10  env:
11    NTHREAD: 5
12  inputs:
13    name: in_buf
14    default_file:
15      name: ./Input/input.txt
16      filetype: ascii
17  outputs:
18    name: out_buf
19    default_file:
20      name: ./client_output.txt
21      in_temp: true

C++ Version

Model Code:

 1#include <iostream>
 2
 3int model_function(char *in_buf, uint64_t length_in_buf,
 4		   char* &out_buf, uint64_t &length_out_buf) {
 5  std::cout << "server" << atoi(getenv("YGG_MODEL_COPY")) << "(C++): " << in_buf << std::endl;
 6  length_out_buf = length_in_buf;
 7  out_buf = (char*)realloc(out_buf, length_in_buf);
 8  memcpy(out_buf, in_buf, length_in_buf);
 9  out_buf[length_in_buf] = '\0';
10  return 0;
11};
 1#include <omp.h>
 2#include "YggInterface.h"
 3#include <stdio.h>
 4
 5
 6int model_function(char* in_buf, uint64_t length_in_buf,
 7		   char** out_buf, uint64_t* length_out_buf) {
 8  // Initialize yggdrasil outside the threaded section
 9  ygg_init();
10  
11  // Get the number of threads from an environment variable set in the yaml
12  int nthreads = atoi(getenv("NTHREAD"));
13  
14  int error_code = 0;
15#ifdef _OPENMP
16  omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19  for (int i = 0; i < nthreads; ++i){
20    int flag;
21#ifdef _OPENMP
22#pragma omp critical
23    {
24#endif
25      flag = error_code;
26#ifdef _OPENMP
27    }
28#endif
29    if (flag == 0) {
30      char* out_temp = NULL;
31      uint64_t length_out_temp = 0;
32      char** out_buf_temp = &out_temp;
33      uint64_t* length_out_buf_temp = &length_out_temp;
34      if (i == 0) {
35	out_buf_temp = out_buf;
36	length_out_buf_temp = length_out_buf;
37      }
38      
39      // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40      // comm persists between function calls
41      WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42      printf("client(C:%d): Sending %s (length = %d)\n",
43	     i, in_buf, (int)(length_in_buf));
44      int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45			       out_buf_temp, length_out_buf_temp);
46      printf("client(C:%d): Received %s (length = %d)\n",
47	     i, in_buf, (int)(length_in_buf));
48      if (ret < 0) {
49	printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52	{
53#endif
54	  error_code = -1;
55#ifdef _OPENMP
56	}
57#endif
58      }
59      if (i != 0) {
60	free(out_temp);
61      }
62    }
63  }
64  return error_code;
65}

Model YAML:

1---
2
3model:
4  name: server
5  language: c++
6  args: ./src/server.cpp
7  function: model_function
8  is_server: True  # Creates a RPC server queue called "server"
9  copies: 5
 1model:
 2  name: client
 3  language: c
 4  args: ./src/client.c
 5  function: model_function
 6  client_of: server
 7  compiler_flags: [-fopenmp]
 8  linker_flags: [-fopenmp]
 9  allow_threading: true
10  env:
11    NTHREAD: 5
12  inputs:
13    name: in_buf
14    default_file:
15      name: ./Input/input.txt
16      filetype: ascii
17  outputs:
18    name: out_buf
19    default_file:
20      name: ./client_output.txt
21      in_temp: true

Fortran Version

Model Code:

 1function model_function(in_buf, out_buf) result(out)
 2  character(len=*), intent(in) :: in_buf
 3  character(len=:), pointer :: out_buf
 4  logical :: out
 5  character(len=255) :: copy_str
 6  integer :: copy
 7  call get_environment_variable("YGG_MODEL_COPY", copy_str)
 8  read(copy_str,*) copy
 9  write(*, '("server",I1,"(Fortran): ",A)') copy, in_buf
10  out = .true.
11  allocate(character(len=len(in_buf)) :: out_buf)
12  out_buf = in_buf
13end function model_function
 1#include <omp.h>
 2#include "YggInterface.h"
 3#include <stdio.h>
 4
 5
 6int model_function(char* in_buf, uint64_t length_in_buf,
 7		   char** out_buf, uint64_t* length_out_buf) {
 8  // Initialize yggdrasil outside the threaded section
 9  ygg_init();
10  
11  // Get the number of threads from an environment variable set in the yaml
12  int nthreads = atoi(getenv("NTHREAD"));
13  
14  int error_code = 0;
15#ifdef _OPENMP
16  omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19  for (int i = 0; i < nthreads; ++i){
20    int flag;
21#ifdef _OPENMP
22#pragma omp critical
23    {
24#endif
25      flag = error_code;
26#ifdef _OPENMP
27    }
28#endif
29    if (flag == 0) {
30      char* out_temp = NULL;
31      uint64_t length_out_temp = 0;
32      char** out_buf_temp = &out_temp;
33      uint64_t* length_out_buf_temp = &length_out_temp;
34      if (i == 0) {
35	out_buf_temp = out_buf;
36	length_out_buf_temp = length_out_buf;
37      }
38      
39      // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40      // comm persists between function calls
41      WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42      printf("client(C:%d): Sending %s (length = %d)\n",
43	     i, in_buf, (int)(length_in_buf));
44      int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45			       out_buf_temp, length_out_buf_temp);
46      printf("client(C:%d): Received %s (length = %d)\n",
47	     i, in_buf, (int)(length_in_buf));
48      if (ret < 0) {
49	printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52	{
53#endif
54	  error_code = -1;
55#ifdef _OPENMP
56	}
57#endif
58      }
59      if (i != 0) {
60	free(out_temp);
61      }
62    }
63  }
64  return error_code;
65}

Model YAML:

1---
2
3model:
4  name: server
5  language: fortran
6  args: ./src/server.f90
7  function: model_function
8  is_server: True  # Creates a RPC server queue called "server"
9  copies: 5
 1model:
 2  name: client
 3  language: c
 4  args: ./src/client.c
 5  function: model_function
 6  client_of: server
 7  compiler_flags: [-fopenmp]
 8  linker_flags: [-fopenmp]
 9  allow_threading: true
10  env:
11    NTHREAD: 5
12  inputs:
13    name: in_buf
14    default_file:
15      name: ./Input/input.txt
16      filetype: ascii
17  outputs:
18    name: out_buf
19    default_file:
20      name: ./client_output.txt
21      in_temp: true

Julia Version

Model Code:

1using Printf
2
3function model_function(in_buf)
4  @printf("server(Julia): %s\n", in_buf)
5  out_buf = in_buf
6  return out_buf
7end
 1#include <omp.h>
 2#include "YggInterface.h"
 3#include <stdio.h>
 4
 5
 6int model_function(char* in_buf, uint64_t length_in_buf,
 7		   char** out_buf, uint64_t* length_out_buf) {
 8  // Initialize yggdrasil outside the threaded section
 9  ygg_init();
10  
11  // Get the number of threads from an environment variable set in the yaml
12  int nthreads = atoi(getenv("NTHREAD"));
13  
14  int error_code = 0;
15#ifdef _OPENMP
16  omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19  for (int i = 0; i < nthreads; ++i){
20    int flag;
21#ifdef _OPENMP
22#pragma omp critical
23    {
24#endif
25      flag = error_code;
26#ifdef _OPENMP
27    }
28#endif
29    if (flag == 0) {
30      char* out_temp = NULL;
31      uint64_t length_out_temp = 0;
32      char** out_buf_temp = &out_temp;
33      uint64_t* length_out_buf_temp = &length_out_temp;
34      if (i == 0) {
35	out_buf_temp = out_buf;
36	length_out_buf_temp = length_out_buf;
37      }
38      
39      // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40      // comm persists between function calls
41      WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42      printf("client(C:%d): Sending %s (length = %d)\n",
43	     i, in_buf, (int)(length_in_buf));
44      int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45			       out_buf_temp, length_out_buf_temp);
46      printf("client(C:%d): Received %s (length = %d)\n",
47	     i, in_buf, (int)(length_in_buf));
48      if (ret < 0) {
49	printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52	{
53#endif
54	  error_code = -1;
55#ifdef _OPENMP
56	}
57#endif
58      }
59      if (i != 0) {
60	free(out_temp);
61      }
62    }
63  }
64  return error_code;
65}

Model YAML:

1---
2
3model:
4  name: server
5  language: julia
6  args: ./src/server.jl
7  function: model_function
8  is_server: True  # Creates a RPC server queue called "server"
9  copies: 5
 1model:
 2  name: client
 3  language: c
 4  args: ./src/client.c
 5  function: model_function
 6  client_of: server
 7  compiler_flags: [-fopenmp]
 8  linker_flags: [-fopenmp]
 9  allow_threading: true
10  env:
11    NTHREAD: 5
12  inputs:
13    name: in_buf
14    default_file:
15      name: ./Input/input.txt
16      filetype: ascii
17  outputs:
18    name: out_buf
19    default_file:
20      name: ./client_output.txt
21      in_temp: true

Matlab Version

Model Code:

1function out_buf = server(in_buf)
2  fprintf('server(Matlab): %s\n', in_buf);
3  out_buf = in_buf;
4end
 1#include <omp.h>
 2#include "YggInterface.h"
 3#include <stdio.h>
 4
 5
 6int model_function(char* in_buf, uint64_t length_in_buf,
 7		   char** out_buf, uint64_t* length_out_buf) {
 8  // Initialize yggdrasil outside the threaded section
 9  ygg_init();
10  
11  // Get the number of threads from an environment variable set in the yaml
12  int nthreads = atoi(getenv("NTHREAD"));
13  
14  int error_code = 0;
15#ifdef _OPENMP
16  omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19  for (int i = 0; i < nthreads; ++i){
20    int flag;
21#ifdef _OPENMP
22#pragma omp critical
23    {
24#endif
25      flag = error_code;
26#ifdef _OPENMP
27    }
28#endif
29    if (flag == 0) {
30      char* out_temp = NULL;
31      uint64_t length_out_temp = 0;
32      char** out_buf_temp = &out_temp;
33      uint64_t* length_out_buf_temp = &length_out_temp;
34      if (i == 0) {
35	out_buf_temp = out_buf;
36	length_out_buf_temp = length_out_buf;
37      }
38      
39      // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40      // comm persists between function calls
41      WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42      printf("client(C:%d): Sending %s (length = %d)\n",
43	     i, in_buf, (int)(length_in_buf));
44      int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45			       out_buf_temp, length_out_buf_temp);
46      printf("client(C:%d): Received %s (length = %d)\n",
47	     i, in_buf, (int)(length_in_buf));
48      if (ret < 0) {
49	printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52	{
53#endif
54	  error_code = -1;
55#ifdef _OPENMP
56	}
57#endif
58      }
59      if (i != 0) {
60	free(out_temp);
61      }
62    }
63  }
64  return error_code;
65}

Model YAML:

1---
2
3model:
4  name: server
5  language: matlab
6  args: ./src/server.m
7  function: server  # matlab requires function to match file
8  is_server: True  # Creates a RPC server queue called "server"
9  copies: 5
 1model:
 2  name: client
 3  language: c
 4  args: ./src/client.c
 5  function: model_function
 6  client_of: server
 7  compiler_flags: [-fopenmp]
 8  linker_flags: [-fopenmp]
 9  allow_threading: true
10  env:
11    NTHREAD: 5
12  inputs:
13    name: in_buf
14    default_file:
15      name: ./Input/input.txt
16      filetype: ascii
17  outputs:
18    name: out_buf
19    default_file:
20      name: ./client_output.txt
21      in_temp: true

Python Version

Model Code:

1import os
2
3
4def model_function(in_buf):
5    print("server%s(Python): %s" % (os.environ['YGG_MODEL_COPY'], in_buf))
6    out_buf = in_buf
7    return out_buf
 1#include <omp.h>
 2#include "YggInterface.h"
 3#include <stdio.h>
 4
 5
 6int model_function(char* in_buf, uint64_t length_in_buf,
 7		   char** out_buf, uint64_t* length_out_buf) {
 8  // Initialize yggdrasil outside the threaded section
 9  ygg_init();
10  
11  // Get the number of threads from an environment variable set in the yaml
12  int nthreads = atoi(getenv("NTHREAD"));
13  
14  int error_code = 0;
15#ifdef _OPENMP
16  omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19  for (int i = 0; i < nthreads; ++i){
20    int flag;
21#ifdef _OPENMP
22#pragma omp critical
23    {
24#endif
25      flag = error_code;
26#ifdef _OPENMP
27    }
28#endif
29    if (flag == 0) {
30      char* out_temp = NULL;
31      uint64_t length_out_temp = 0;
32      char** out_buf_temp = &out_temp;
33      uint64_t* length_out_buf_temp = &length_out_temp;
34      if (i == 0) {
35	out_buf_temp = out_buf;
36	length_out_buf_temp = length_out_buf;
37      }
38      
39      // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40      // comm persists between function calls
41      WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42      printf("client(C:%d): Sending %s (length = %d)\n",
43	     i, in_buf, (int)(length_in_buf));
44      int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45			       out_buf_temp, length_out_buf_temp);
46      printf("client(C:%d): Received %s (length = %d)\n",
47	     i, in_buf, (int)(length_in_buf));
48      if (ret < 0) {
49	printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52	{
53#endif
54	  error_code = -1;
55#ifdef _OPENMP
56	}
57#endif
58      }
59      if (i != 0) {
60	free(out_temp);
61      }
62    }
63  }
64  return error_code;
65}

Model YAML:

1model:
2  name: server
3  language: python
4  args: ./src/server.py
5  function: model_function
6  is_server: True
7  copies: 5
 1model:
 2  name: client
 3  language: c
 4  args: ./src/client.c
 5  function: model_function
 6  client_of: server
 7  compiler_flags: [-fopenmp]
 8  linker_flags: [-fopenmp]
 9  allow_threading: true
10  env:
11    NTHREAD: 5
12  inputs:
13    name: in_buf
14    default_file:
15      name: ./Input/input.txt
16      filetype: ascii
17  outputs:
18    name: out_buf
19    default_file:
20      name: ./client_output.txt
21      in_temp: true

R Version

Model Code:

1model_function <- function(in_buf) {
2  fprintf('server(R): %s', in_buf)
3  out_buf <- in_buf
4  return(out_buf);
5}
 1#include <omp.h>
 2#include "YggInterface.h"
 3#include <stdio.h>
 4
 5
 6int model_function(char* in_buf, uint64_t length_in_buf,
 7		   char** out_buf, uint64_t* length_out_buf) {
 8  // Initialize yggdrasil outside the threaded section
 9  ygg_init();
10  
11  // Get the number of threads from an environment variable set in the yaml
12  int nthreads = atoi(getenv("NTHREAD"));
13  
14  int error_code = 0;
15#ifdef _OPENMP
16  omp_set_num_threads(nthreads);
17#pragma omp parallel for shared(error_code)
18#endif
19  for (int i = 0; i < nthreads; ++i){
20    int flag;
21#ifdef _OPENMP
22#pragma omp critical
23    {
24#endif
25      flag = error_code;
26#ifdef _OPENMP
27    }
28#endif
29    if (flag == 0) {
30      char* out_temp = NULL;
31      uint64_t length_out_temp = 0;
32      char** out_buf_temp = &out_temp;
33      uint64_t* length_out_buf_temp = &length_out_temp;
34      if (i == 0) {
35	out_buf_temp = out_buf;
36	length_out_buf_temp = length_out_buf;
37      }
38      
39      // The WITH_GLOBAL_SCOPE macro is required to ensure that the
40      // comm persists between function calls
41      WITH_GLOBAL_SCOPE(yggRpc_t rpc = yggRpcClient("server_client", "%s", "%s"));
42      printf("client(C:%d): Sending %s (length = %d)\n",
43	     i, in_buf, (int)(length_in_buf));
44      int ret = rpcCallRealloc(rpc, in_buf, length_in_buf,
45			       out_buf_temp, length_out_buf_temp);
46      printf("client(C:%d): Received %s (length = %d)\n",
47	     i, in_buf, (int)(length_in_buf));
48      if (ret < 0) {
49	printf("client(C:%d): RPC CALL ERROR\n", i);
50#ifdef _OPENMP
51#pragma omp critical
52	{
53#endif
54	  error_code = -1;
55#ifdef _OPENMP
56	}
57#endif
58      }
59      if (i != 0) {
60	free(out_temp);
61      }
62    }
63  }
64  return error_code;
65}

Model YAML:

1---
2
3model:
4  name: server
5  language: R
6  args: ./src/server.R
7  function: model_function
8  is_server: True  # Creates a RPC server queue called "server"
9  copies: 5
 1model:
 2  name: client
 3  language: c
 4  args: ./src/client.c
 5  function: model_function
 6  client_of: server
 7  compiler_flags: [-fopenmp]
 8  linker_flags: [-fopenmp]
 9  allow_threading: true
10  env:
11    NTHREAD: 5
12  inputs:
13    name: in_buf
14    default_file:
15      name: ./Input/input.txt
16      filetype: ascii
17  outputs:
18    name: out_buf
19    default_file:
20      name: ./client_output.txt
21      in_temp: true