rpcFib¶
Three models (two clients, one server) run in a client-server, remote procedure call (RPC) communication pattern. The client models (rpcFibCli and rpcFibCliPar) send requests and receive response to/from the server model rpcFibSrv. The rpcFibCli model sends & receives requests/responses via the deprecated rpcCall method (use call instead), while rpcFibCliPar uses the separate send/recv methods rpcSend and rpcRecv (use send & recv instead). The server model receives requests and send responses. The rpcFibCli model also reads text data from and input file and sends each request/response pair to an output file. This example demostrates the RPC pattern through the use of the client_of and is_server model parameters and the RPC interface classes/functions, including splitting calls into their send/recv components. The models also receive parameters via the command line as specified in the YAML args model parameter.
Mixed Version¶
Model Code:
1from __future__ import print_function
2import sys
3import numpy as np
4from yggdrasil.interface.YggInterface import (
5 YggRpcClient, YggInput, YggOutput)
6
7
8def fibClient(args):
9
10 iterations = int(args[0])
11 print('Hello from Python rpcFibCli: iterations = %d ' % iterations)
12
13 # Set up connections matching yaml
14 # RPC client-side connection will be $(server_name)_$(client_name)
15 ymlfile = YggInput("yaml_in")
16 rpc = YggRpcClient("rpcFibSrv_rpcFibCli", "%d", "%d %d")
17 log = YggOutput("output_log")
18
19 # Read entire contents of yaml
20 ret, ycontent = ymlfile.recv()
21 if not ret:
22 raise RuntimeError('rpcFibCli(P): RECV ERROR')
23 print('rpcFibCli: yaml has %d lines' % len(ycontent.split(b'\n')))
24
25 for i in range(1, iterations + 1):
26
27 # Call the server and receive response
28 print('rpcFibCli(P): fib(->%-2d) ::: ' % i, end='')
29 ret, fib = rpc.rpcCall(np.int32(i))
30 if not ret:
31 raise RuntimeError('rpcFibCli(P): RPC CALL ERROR')
32
33 # Log result by sending it to the log connection
34 s = 'fib(%2d<-) = %-2d<-\n' % tuple(fib)
35 print(s, end='')
36 ret = log.send(s)
37 if not ret:
38 raise RuntimeError('rpcFibCli(P): SEND ERROR')
39
40 print('Goodbye from Python rpcFibCli')
41
42
43if __name__ == '__main__':
44 fibClient(sys.argv[1:])
1function rpcFibCliPar(iterations)
2
3 iterations = str2num(iterations);
4 fprintf('Hello from Matlab rpcFibCliPar: iterations = %d\n', iterations);
5
6 % Create RPC connection with serover
7 % RPC client-side connection will be $(server_name)_$(client_name)
8 rpc = YggInterface('YggRpcClient', 'rpcFibSrv_rpcFibCliPar', '%d', '%d %d');
9
10 % Send all of the requests to the server
11 for i = 1:iterations
12 fprintf('rpcFibCliPar(M): fib(->%-2d) ::: \n', i);
13 ret = rpc.send(int32(i));
14 if (~ret);
15 error('rpcFibCliPar(M): SEND FAILED');
16 end
17 end
18
19 % Receive responses for all requests that were sent
20 for i = 1:iterations
21 fprintf('rpcFibCliPar(M): fib(->%-2d) ::: ', i);
22 [ret, fib] = rpc.recv();
23 if (~ret);
24 error('rpcFibCliPar(M): RECV FAILED')
25 end
26 fprintf('rpcFibCliPar(M): fib(%2d<-) = %-2d<-\n', fib{1}, fib{2});
27 end
28
29 disp('Goodbye from Matlab rpcFibCliPar');
30
31end
1
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int main(int argc, char *argv[]) {
7
8 float timeSleep = atof(argv[1]);
9 printf("Hello from C rpcFibSrv: sleeptime = %f\n", timeSleep);
10
11 // Create server-side rpc conneciton using model name
12 yggRpc_t rpc = yggRpcServer("rpcFibSrv", "%d", "%d %d");
13
14 // Continue receiving requests until error occurs (the connection is closed
15 // by all clients that have connected).
16 int input;
17 while (1) {
18 printf("rpcFibSrv(C): receiving...\n");
19 int ret = rpcRecv(rpc, &input);
20 if (ret < 0) {
21 printf("rpcFibSrv(C): end of input\n");
22 break;
23 }
24
25 // Compute fibonacci number
26 printf("rpcFibSrv(C): <- input %d", input);
27 int result = 1;
28 int prevResult = 1;
29 int prevPrev = 0;
30 int idx = 1;
31 while(idx++ < input){
32 result = prevResult + prevPrev;
33 prevPrev = prevResult;
34 prevResult = result;
35 }
36 printf(" ::: ->(%2d %2d)\n", input, result);
37
38 // Sleep and then send response back
39 if (timeSleep)
40 sleep(timeSleep);
41 int flag = rpcSend(rpc, input, result);
42 if (flag < 0) {
43 printf("rpcFibSrv(C): ERROR sending\n");
44 break;
45 }
46 }
47
48 printf("Goodbye from C rpcFibSrv\n");
49 return 0;
50}
51
Model YAML:
1---
2
3model:
4 name: rpcFibCli
5 driver: PythonModelDriver
6 args:
7 - ./src/rpcFibCli.py
8 - "{{ FIB_ITERATIONS }}" # env_var passed as argument for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
10
11 outputs:
12 - name: output_log
13 driver: FileOutputDriver
14 args: fibCli.txt
15 in_temp: True
16
17 inputs:
18 - name: yaml_in
19 driver: FileInputDriver
20 args: rpcFibCli_python.yml
1---
2
3model:
4 name: rpcFibCliPar
5 driver: MatlabModelDriver
6 args:
7 - ./src/rpcFibCliPar.m
8 - "{{ FIB_ITERATIONS }}"
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
10
1---
2
3model:
4 name: rpcFibSrv
5 driver: GCCModelDriver
6 args:
7 - ./src/rpcFibSrv.c
8 - '{{ FIB_SERVER_SLEEP_SECONDS }}' # env var - time to sleep before returning
9 is_server: True # Creates a RPC queue called $name
Mixed w/o Matlab Version¶
Model Code:
1from __future__ import print_function
2import sys
3import numpy as np
4from yggdrasil.interface.YggInterface import (
5 YggRpcClient, YggInput, YggOutput)
6
7
8def fibClient(args):
9
10 iterations = int(args[0])
11 print('Hello from Python rpcFibCli: iterations = %d ' % iterations)
12
13 # Set up connections matching yaml
14 # RPC client-side connection will be $(server_name)_$(client_name)
15 ymlfile = YggInput("yaml_in")
16 rpc = YggRpcClient("rpcFibSrv_rpcFibCli", "%d", "%d %d")
17 log = YggOutput("output_log")
18
19 # Read entire contents of yaml
20 ret, ycontent = ymlfile.recv()
21 if not ret:
22 raise RuntimeError('rpcFibCli(P): RECV ERROR')
23 print('rpcFibCli: yaml has %d lines' % len(ycontent.split(b'\n')))
24
25 for i in range(1, iterations + 1):
26
27 # Call the server and receive response
28 print('rpcFibCli(P): fib(->%-2d) ::: ' % i, end='')
29 ret, fib = rpc.rpcCall(np.int32(i))
30 if not ret:
31 raise RuntimeError('rpcFibCli(P): RPC CALL ERROR')
32
33 # Log result by sending it to the log connection
34 s = 'fib(%2d<-) = %-2d<-\n' % tuple(fib)
35 print(s, end='')
36 ret = log.send(s)
37 if not ret:
38 raise RuntimeError('rpcFibCli(P): SEND ERROR')
39
40 print('Goodbye from Python rpcFibCli')
41
42
43if __name__ == '__main__':
44 fibClient(sys.argv[1:])
1#include "YggInterface.hpp"
2#include <stdio.h>
3
4
5int main(int argc, char *argv[]) {
6
7 int iterations = atoi(argv[1]);
8 printf("Hello from C++ rpcFibCliPar: iterations = %d\n", iterations);
9
10 // Create RPC connection with server
11 // RPC client-side connection will be $(server_name)_$(client_name)
12 YggRpcClient rpc("rpcFibSrv_rpcFibCliPar", "%d", "%d %d");
13
14 // Send all of the requests to the server
15 int ret;
16 for (int i = 1; i <= iterations; i++) {
17 printf("rpcFibCliPar(CPP): fib(->%-2d) ::: \n", i);
18 ret = rpc.send(1, i);
19 if (ret < 0) {
20 printf("rpcFibCliPar(CPP): SEND FAILED\n");
21 return -1;
22 }
23 }
24
25 // Receive responses for all requests that were sent
26 int fib = -1;
27 int fibNo = -1;
28 for (int i = 1; i <= iterations; i++) {
29 ret = rpc.recv(2, &fibNo, &fib);
30 if (ret < 0) {
31 printf("rpcFibCliPar(CPP): RECV FAILED\n");
32 return -1;
33 }
34 printf("rpcFibCliPar(CPP): fib(%2d<-) = %-2d<-\n", fibNo, fib);
35 }
36
37 printf("Goodbye from C++ rpcFibCliPar\n");
38 return 0;
39}
1
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int main(int argc, char *argv[]) {
7
8 float timeSleep = atof(argv[1]);
9 printf("Hello from C rpcFibSrv: sleeptime = %f\n", timeSleep);
10
11 // Create server-side rpc conneciton using model name
12 yggRpc_t rpc = yggRpcServer("rpcFibSrv", "%d", "%d %d");
13
14 // Continue receiving requests until error occurs (the connection is closed
15 // by all clients that have connected).
16 int input;
17 while (1) {
18 printf("rpcFibSrv(C): receiving...\n");
19 int ret = rpcRecv(rpc, &input);
20 if (ret < 0) {
21 printf("rpcFibSrv(C): end of input\n");
22 break;
23 }
24
25 // Compute fibonacci number
26 printf("rpcFibSrv(C): <- input %d", input);
27 int result = 1;
28 int prevResult = 1;
29 int prevPrev = 0;
30 int idx = 1;
31 while(idx++ < input){
32 result = prevResult + prevPrev;
33 prevPrev = prevResult;
34 prevResult = result;
35 }
36 printf(" ::: ->(%2d %2d)\n", input, result);
37
38 // Sleep and then send response back
39 if (timeSleep)
40 sleep(timeSleep);
41 int flag = rpcSend(rpc, input, result);
42 if (flag < 0) {
43 printf("rpcFibSrv(C): ERROR sending\n");
44 break;
45 }
46 }
47
48 printf("Goodbye from C rpcFibSrv\n");
49 return 0;
50}
51
Model YAML:
1---
2
3model:
4 name: rpcFibCli
5 driver: PythonModelDriver
6 args:
7 - ./src/rpcFibCli.py
8 - "{{ FIB_ITERATIONS }}" # env_var passed as argument for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
10
11 outputs:
12 - name: output_log
13 driver: FileOutputDriver
14 args: fibCli.txt
15 in_temp: True
16
17 inputs:
18 - name: yaml_in
19 driver: FileInputDriver
20 args: rpcFibCli_python.yml
1---
2
3model:
4 name: rpcFibCliPar
5 driver: GCCModelDriver
6 args:
7 - ./src/rpcFibCliPar.cpp
8 - "{{ FIB_ITERATIONS }}" # env_var for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
1---
2
3model:
4 name: rpcFibSrv
5 driver: GCCModelDriver
6 args:
7 - ./src/rpcFibSrv.c
8 - '{{ FIB_SERVER_SLEEP_SECONDS }}' # env var - time to sleep before returning
9 is_server: True # Creates a RPC queue called $name
C Version¶
Model Code:
1
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int count_lines(const char* str, const char* substr) {
7 int c = 0;
8 int inc = strlen(substr);
9 if (strlen(substr) == 0) return -1;
10 char *s;
11 for (s = (char*)str; (s = strstr(s, substr)); s += inc)
12 c++;
13 return c;
14}
15
16
17int main(int argc, char *argv[]) {
18
19 int iterations = atoi(argv[1]);
20 printf("Hello from C rpcFibCli: iterations %d\n", iterations);
21
22 // Set up connections matching yaml
23 // RPC client-side connection will be $(server_name)_$(client_name)
24 yggInput_t ymlfile = yggInput("yaml_in");
25 yggRpc_t rpc = yggRpcClient("rpcFibSrv_rpcFibCli", "%d", "%d %d");
26 yggOutput_t log = yggOutput("output_log");
27 int ret;
28
29 // Read entire contents of yaml
30 char *ycontent = (char*)malloc(YGG_MSG_MAX*sizeof(char));
31 ret = ygg_recv(ymlfile, ycontent, YGG_MSG_MAX);
32 if (ret < 0) {
33 printf("rpcFibCli(C): RECV ERROR\n");
34 free(ycontent);
35 exit(-1);
36 }
37 printf("rpcFibCli: yaml has %d lines\n", count_lines(ycontent, "\n") + 1);
38 ret = ygg_recv(ymlfile, ycontent, YGG_MSG_MAX);
39 free(ycontent);
40
41 int fib = -1;
42 int fibNo = -1;
43 char *logmsg = (char*)malloc(YGG_MSG_MAX*sizeof(char));
44 int i;
45 for (i = 1; i <= iterations; i++) {
46
47 // Call the server and receive response
48 printf("rpcFibCli(C): fib(->%-2d) ::: ", i);
49 ret = rpcCall(rpc, i, &fibNo, &fib);
50 if (ret < 0) {
51 printf("rpcFibCli(C): RPC CALL ERROR\n");
52 free(logmsg);
53 exit(-1);
54 }
55
56 // Log result by sending it to the log connection
57 sprintf(logmsg, "fib(%2d<-) = %-2d<-\n", fibNo, fib);
58 printf("%s", logmsg);
59 ret = ygg_send(log, logmsg, strlen(logmsg));
60 if (ret < 0) {
61 printf("rpcFibCli(C): SEND ERROR\n");
62 free(logmsg);
63 exit(-1);
64 }
65 }
66
67 free(logmsg);
68 printf("Goodbye from C rpcFibCli\n");
69 exit(0);
70
71}
72
1
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int main(int argc, char *argv[]) {
7
8 int iterations = atoi(argv[1]);
9 printf("Hello from C rpcFibCliPar: iterations = %d\n", iterations);
10
11 // Create RPC connection with server
12 // RPC client-side connection will be $(server_name)_$(client_name)
13 yggRpc_t rpc = yggRpcClient("rpcFibSrv_rpcFibCliPar", "%d", "%d %d");
14
15 // Send all of the requests to the server
16 int i;
17 int ret;
18 for (i = 1; i <= iterations; i++) {
19 printf("rpcFibCliPar(C): fib(->%-2d) ::: \n", i);
20 ret = rpcSend(rpc, i);
21 if (ret < 0) {
22 printf("rpcFibCliPar(C): SEND FAILED\n");
23 exit(-1);
24 }
25 }
26
27 // Receive responses for all requests that were sent
28 int fib = -1;
29 int fibNo = -1;
30 for (i = 1; i <= iterations; i++) {
31 ret = rpcRecv(rpc, &fibNo, &fib);
32 if (ret < 0) {
33 printf("rpcFibCliPar(C): RECV FAILED\n");
34 exit(-1);
35 }
36 printf("rpcFibCliPar(C): fib(%2d<-) = %-2d<-\n", fibNo, fib);
37 }
38
39 printf("Goodbye from C rpcFibCliPar\n");
40 exit(0);
41
42}
43
1
2#include "YggInterface.h"
3#include <stdio.h>
4
5
6int main(int argc, char *argv[]) {
7
8 float timeSleep = atof(argv[1]);
9 printf("Hello from C rpcFibSrv: sleeptime = %f\n", timeSleep);
10
11 // Create server-side rpc conneciton using model name
12 yggRpc_t rpc = yggRpcServer("rpcFibSrv", "%d", "%d %d");
13
14 // Continue receiving requests until error occurs (the connection is closed
15 // by all clients that have connected).
16 int input;
17 while (1) {
18 printf("rpcFibSrv(C): receiving...\n");
19 int ret = rpcRecv(rpc, &input);
20 if (ret < 0) {
21 printf("rpcFibSrv(C): end of input\n");
22 break;
23 }
24
25 // Compute fibonacci number
26 printf("rpcFibSrv(C): <- input %d", input);
27 int result = 1;
28 int prevResult = 1;
29 int prevPrev = 0;
30 int idx = 1;
31 while(idx++ < input){
32 result = prevResult + prevPrev;
33 prevPrev = prevResult;
34 prevResult = result;
35 }
36 printf(" ::: ->(%2d %2d)\n", input, result);
37
38 // Sleep and then send response back
39 if (timeSleep)
40 sleep(timeSleep);
41 int flag = rpcSend(rpc, input, result);
42 if (flag < 0) {
43 printf("rpcFibSrv(C): ERROR sending\n");
44 break;
45 }
46 }
47
48 printf("Goodbye from C rpcFibSrv\n");
49 return 0;
50}
51
Model YAML:
1---
2
3model:
4 name: rpcFibCli
5 driver: GCCModelDriver
6 args:
7 - ./src/rpcFibCli.c
8 - "{{ FIB_ITERATIONS }}" # env_var passed as argument for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
10
11 outputs:
12 - name: output_log
13 driver: FileOutputDriver
14 args: fibCli.txt
15 in_temp: True
16
17 inputs:
18 - name: yaml_in
19 driver: FileInputDriver
20 args: rpcFibCli_c.yml
1---
2
3model:
4 name: rpcFibCliPar
5 driver: GCCModelDriver
6 args:
7 - ./src/rpcFibCliPar.c
8 - "{{ FIB_ITERATIONS }}" # env_var for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
1---
2
3model:
4 name: rpcFibSrv
5 driver: GCCModelDriver
6 args:
7 - ./src/rpcFibSrv.c
8 - '{{ FIB_SERVER_SLEEP_SECONDS }}' # env var - time to sleep before returning
9 is_server: True # Creates a RPC queue called $name
C++ Version¶
Model Code:
1#include "YggInterface.hpp"
2#include <stdio.h>
3
4
5int count_lines(const char* str, const char* substr) {
6 int c = 0;
7 int inc = strlen(substr);
8 if (strlen(substr) == 0) return -1;
9 for (char* s = (char*)str; (s = strstr(s, substr)); s += inc)
10 c++;
11 return c;
12}
13
14
15int main(int argc, char *argv[]) {
16
17 int iterations = atoi(argv[1]);
18 printf("Hello from C++ rpcFibCli: iterations %d\n", iterations);
19
20 // Set up connections matching yaml
21 // RPC client-side connection will be $(server_name)_$(client_name)
22 YggInput ymlfile("yaml_in");
23 YggRpcClient rpc("rpcFibSrv_rpcFibCli", "%d", "%d %d");
24 YggOutput log("output_log");
25
26 // Read entire contents of yaml
27 char *ycontent = (char*)malloc(YGG_MSG_MAX*sizeof(char));
28 int ret = ymlfile.recv(ycontent, YGG_MSG_MAX);
29 if (ret < 0) {
30 printf("rpcFibCli(CPP): RECV ERROR\n");
31 free(ycontent);
32 exit(-1);
33 }
34 printf("rpcFibCli: yaml has %d lines\n", count_lines(ycontent, "\n") + 1);
35 free(ycontent);
36
37 int fib = -1;
38 int fibNo = -1;
39 char *logmsg = (char*)malloc(YGG_MSG_MAX*sizeof(char));
40 for (int i = 1; i <= iterations; i++) {
41
42 // Call the server and receive response
43 printf("rpcFibCli(CPP): fib(->%-2d) ::: ", i);
44 ret = rpc.call(3, i, &fibNo, &fib);
45 if (ret < 0) {
46 printf("rpcFibCli(CPP): RPC CALL ERROR\n");
47 free(logmsg);
48 return -1;
49 }
50
51 // Log result by sending it to the log connection
52 sprintf(logmsg, "fib(%2d<-) = %-2d<-\n", fibNo, fib);
53 printf(logmsg);
54 ret = log.send(logmsg, strlen(logmsg));
55 if (ret < 0) {
56 printf("rpcFibCli(CPP): SEND ERROR\n");
57 free(logmsg);
58 return -1;
59 }
60 }
61
62 printf("Goodbye from C++ rpcFibCli\n");
63 free(logmsg);
64 return 0;
65}
1#include "YggInterface.hpp"
2#include <stdio.h>
3
4
5int main(int argc, char *argv[]) {
6
7 int iterations = atoi(argv[1]);
8 printf("Hello from C++ rpcFibCliPar: iterations = %d\n", iterations);
9
10 // Create RPC connection with server
11 // RPC client-side connection will be $(server_name)_$(client_name)
12 YggRpcClient rpc("rpcFibSrv_rpcFibCliPar", "%d", "%d %d");
13
14 // Send all of the requests to the server
15 int ret;
16 for (int i = 1; i <= iterations; i++) {
17 printf("rpcFibCliPar(CPP): fib(->%-2d) ::: \n", i);
18 ret = rpc.send(1, i);
19 if (ret < 0) {
20 printf("rpcFibCliPar(CPP): SEND FAILED\n");
21 return -1;
22 }
23 }
24
25 // Receive responses for all requests that were sent
26 int fib = -1;
27 int fibNo = -1;
28 for (int i = 1; i <= iterations; i++) {
29 ret = rpc.recv(2, &fibNo, &fib);
30 if (ret < 0) {
31 printf("rpcFibCliPar(CPP): RECV FAILED\n");
32 return -1;
33 }
34 printf("rpcFibCliPar(CPP): fib(%2d<-) = %-2d<-\n", fibNo, fib);
35 }
36
37 printf("Goodbye from C++ rpcFibCliPar\n");
38 return 0;
39}
1#include "YggInterface.hpp"
2#include <stdio.h>
3
4
5int main(int argc, char *argv[]) {
6
7 float timeSleep = atof(argv[1]);
8 printf("Hello from C++ rpcFibSrv: sleeptime = %f\n", timeSleep);
9
10 // Create server-side rpc conneciton using model name
11 YggRpcServer rpc("rpcFibSrv", "%d", "%d %d");
12
13 // Continue receiving requests until error occurs (the connection is closed
14 // by all clients that have connected).
15 int input;
16 while (1) {
17 printf("rpcFibSrv(CPP): receiving...\n");
18 int ret = rpc.recv(1, &input);
19 if (ret < 0) {
20 printf("rpcFibSrv(CPP): end of input\n");
21 break;
22 }
23
24 // Compute fibonacci number
25 printf("rpcFibSrv(CPP): <- input %d", input);
26 int result = 1;
27 int prevResult = 1;
28 int prevPrev = 0;
29 int idx = 1;
30 while(idx++ < input){
31 result = prevResult + prevPrev;
32 prevPrev = prevResult;
33 prevResult = result;
34 }
35 printf(" ::: ->(%2d %2d)\n", input, result);
36
37 // Sleep and then send response back
38 if (timeSleep)
39 sleep(timeSleep);
40 int flag = rpc.send(2, input, result);
41 if (flag < 0) {
42 printf("rpcFibSrv(CPP): ERROR sending\n");
43 break;
44 }
45 }
46
47 printf("Goodbye from C++ rpcFibSrv\n");
48 return 0;
49}
Model YAML:
1---
2
3model:
4 name: rpcFibCli
5 driver: GCCModelDriver
6 args:
7 - ./src/rpcFibCli.cpp
8 - "{{ FIB_ITERATIONS }}" # env_var passed as argument for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
10
11 outputs:
12 - name: output_log
13 driver: FileOutputDriver
14 args: fibCli.txt
15 in_temp: True
16
17 inputs:
18 - name: yaml_in
19 driver: FileInputDriver
20 args: rpcFibCli_cpp.yml
1---
2
3model:
4 name: rpcFibCliPar
5 driver: GCCModelDriver
6 args:
7 - ./src/rpcFibCliPar.cpp
8 - "{{ FIB_ITERATIONS }}" # env_var for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
1---
2
3model:
4 name: rpcFibSrv
5 driver: GCCModelDriver
6 args:
7 - ./src/rpcFibSrv.cpp
8 - '{{ FIB_SERVER_SLEEP_SECONDS }}' # env var - time to sleep before returning
9 is_server: True # Creates a RPC queue called $name
Fortran Version¶
Model Code:
1program main
2 use fygg
3
4 character(len=32) :: arg
5 integer :: iterations
6 type(yggcomm) :: ymlfile
7 type(yggcomm) :: rpc
8 type(yggcomm) :: log
9 logical :: ret
10 integer, parameter :: msg_len = 2048 ! YGG_MSG_MAX
11 character(len=msg_len) :: ycontent
12 integer :: fib, fibNo, i, nlines, count_lines
13 character(len=msg_len) :: logmsg
14 integer :: ycontent_siz, logmsg_siz
15 character(len=2) :: fibStr
16
17 call get_command_argument(1, arg)
18 read(arg, *) iterations
19 write(*, '("Hello from Fortran rpcFibCli: iterations ",i2)') iterations
20
21 ! Set up connections matching yaml
22 ! RPC client-side connection will be $(server_name)_$(client_name)
23 ymlfile = ygg_input("yaml_in")
24 rpc = ygg_rpc_client("rpcFibSrv_rpcFibCli", "%d", "%d %d")
25 log = ygg_output("output_log")
26
27 ! Read entire contents of yaml
28 ycontent_siz = len(ycontent)
29 ret = ygg_recv(ymlfile, ycontent, ycontent_siz)
30 if (.not.ret) then
31 write(*, '("rpcFibCli(F): RECV ERROR")')
32 stop 1
33 end if
34 nlines = count_lines(ycontent, new_line('A'))
35 write(*, '("rpcFibCli: yaml has ",i4," lines")') nlines + 1
36 ret = ygg_recv(ymlfile, ycontent, ycontent_siz)
37
38 fib = -1
39 fibNo = -1
40 do i = 1, iterations
41
42 ! Call the server and receive response
43 write(*, '("rpcFibCli(F): fib(->",i2,") ::: ")') i
44 ret = ygg_rpc_call(rpc, yggarg(i), [yggarg(fibNo), yggarg(fib)])
45 if (.not.ret) then
46 write(*, '("rpcFibCli(F): RPC CALL ERROR")')
47 stop 1
48 end if
49
50 ! Log result by sending it to the log connection
51 write(fibStr, '(i2)') fib
52 write(logmsg, '("fib(",i2,"<-) = ",A,"<-",A)') fibNo, &
53 adjustl(fibStr), new_line('A')
54 write(*, '(A)', advance="no") trim(logmsg)
55 logmsg_siz = len_trim(logmsg)
56 ret = ygg_send(log, logmsg, logmsg_siz)
57 if (.not.ret) then
58 write(*, '("rpcFibCli(F): SEND ERROR")')
59 stop 1
60 end if
61
62 end do
63
64 write(*, '("Goodbye from Fortran rpcFibCli")')
65
66end program main
67
68function count_lines(str, substr) result(c)
69 implicit none
70 character(len=*) :: str
71 character(len=*) :: substr
72 integer :: c
73 integer :: k, inc, idx
74 c = 0
75 k = 1
76 inc = len_trim(substr)
77 idx = index(str(k:), substr)
78 do while (idx.ne.0)
79 c = c + 1
80 k = k + inc
81 idx = index(str(k:), substr)
82 end do
83end function count_lines
84
1program main
2 use fygg
3
4 character(len=32) :: arg
5 integer :: iterations
6 type(yggcomm) :: rpc
7 logical :: ret
8 integer :: fib, fibNo, i
9
10 call get_command_argument(1, arg)
11 read(arg, *) iterations
12 write(*, '("Hello from Fortran rpcFibCliPar: iterations = ",i2)') &
13 iterations
14
15 ! Create RPC connection with server
16 ! RPC client-side connection will be $(server_name)_$(client_name)
17 rpc = ygg_rpc_client("rpcFibSrv_rpcFibCliPar", "%d", "%d %d")
18
19 ! Send all of the requests to the server
20 do i = 1, iterations
21 write(*, '("rpcFibCliPar(F): fib(->",i2,") ::: ")') i
22 ret = ygg_send_var(rpc, yggarg(i))
23 if (.not.ret) then
24 write(*, '("rpcFibCliPar(F): SEND FAILED")')
25 stop 1
26 end if
27 end do
28
29 ! Receive responses for all requests that were sent
30 fib = -1
31 fibNo = -1
32 do i = 1, iterations
33 ret = ygg_recv_var(rpc, [yggarg(fibNo), yggarg(fib)])
34 if (.not.ret) then
35 write(*, '("rpcFibCliPar(F): RECV FAILED")')
36 stop 1
37 end if
38 write(*, '("rpcFibCliPar(F): fib(",i2,"<-) = ",i2,"<-")') &
39 fibNo, fib
40 end do
41
42 write(*, '("Goodbye from Fortran rpcFibCliPar")')
43
44end program main
1program main
2 use fygg
3
4 type(yggcomm) :: rpc
5 character(len=32) :: arg
6 real :: time_sleep
7 integer :: input
8 integer :: result0, prevResult, prevPrev, idx
9 logical :: ret
10
11 call get_command_argument(1, arg)
12 read(arg, *) time_sleep
13 write(*, '("Hello from Fortran rpcFibSrv: sleeptime = ",f10.4)') time_sleep
14
15 ! Create server-side rpc conneciton using model name
16 rpc = ygg_rpc_server("rpcFibSrv", "%d", "%d %d")
17
18 ! Continue receiving requests until error occurs (the connection is
19 ! closed by all clients that have connected).
20 do while (.true.)
21 write(*, '("rpcFibSrv(F): receiving...")')
22
23 ret = ygg_recv_var(rpc, yggarg(input))
24 if (.not.ret) then
25 write(*, '("rpcFibSrv(F): end of input")')
26 exit
27 end if
28
29 ! Compute fibonacci number
30 result0 = 1
31 prevResult = 1
32 prevPrev = 0
33 idx = 1
34 do while (idx.lt.input)
35 result0 = prevResult + prevPrev
36 prevPrev = prevResult
37 prevResult = result0
38 idx = idx + 1
39 end do
40 write(*, '("rpcFibSrv(F): <- input ",i2," ::: ->(",i2," ",i2,")")') &
41 input, input, result0
42
43 ! Sleep and then send response back
44 if (time_sleep.gt.0) call fsleep(int(time_sleep))
45 ret = ygg_send_var(rpc, [yggarg(input), yggarg(result0)])
46 if (.not.ret) then
47 write(*, '("rpcFibSrv(F): ERROR sending")')
48 exit
49 end if
50
51 end do
52
53 write(*, '("Goodbye from Fortran rpcFibSrv")')
54
55end program main
Model YAML:
1---
2
3model:
4 name: rpcFibCli
5 driver: FortranModelDriver
6 args:
7 - ./src/rpcFibCli.f90
8 - "{{ FIB_ITERATIONS }}" # env_var passed as argument for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
10
11 outputs:
12 - name: output_log
13 driver: FileOutputDriver
14 args: fibCli.txt
15 in_temp: True
16
17 inputs:
18 - name: yaml_in
19 driver: FileInputDriver
20 args: rpcFibCli_fortran.yml
1---
2
3model:
4 name: rpcFibCliPar
5 driver: FortranModelDriver
6 args:
7 - ./src/rpcFibCliPar.f90
8 - "{{ FIB_ITERATIONS }}" # env_var for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
1---
2
3model:
4 name: rpcFibSrv
5 driver: FortranModelDriver
6 args:
7 - ./src/rpcFibSrv.f90
8 - "{{ FIB_SERVER_SLEEP_SECONDS }}" # env var - time to sleep before returning
9 is_server: True # Creates a RPC queue called $name
Julia Version¶
Model Code:
1using Yggdrasil
2using Printf
3
4
5function fibClient(args)
6
7 iterations = parse(Int64, args[1])
8 @printf("Hello from Julia rpcFibCli: iterations = %d\n", iterations)
9
10 # Set up connections matching yaml
11 # RPC client-side connection will be $(server_name)_$(client_name)
12 ymlfile = Yggdrasil.YggInterface("YggInput", "yaml_in")
13 rpc = Yggdrasil.YggInterface("YggRpcClient", "rpcFibSrv_rpcFibCli", "%d", "%d %d")
14 log = Yggdrasil.YggInterface("YggOutput", "output_log")
15
16 # Read entire contents of yaml
17 ret, ycontent = ymlfile.recv()
18 if (!ret)
19 error("rpcFibCli(Julia): RECV ERROR")
20 end
21 @printf("rpcFibCli: yaml has %d lines\n", countlines(IOBuffer(ycontent)))
22
23 for i = 1:iterations
24
25 # Call the server and receive response
26 @printf("rpcFibCli(Julia): fib(->%-2d) ::: ", i)
27 ret, fib = rpc.call(Int64(i))
28 if (!ret)
29 error("rpcFibCli(Julia): RPC CALL ERROR")
30 end
31
32 # Log result by sending it to the log connection
33 s = @sprintf("fib(%2d<-) = %-2d<-\n", fib[1], fib[2])
34 println(s)
35 ret = log.send(s)
36 if (!ret)
37 error("rpcFibCli(Julia): SEND ERROR")
38 end
39 end
40
41 println("Goodbye from Julia rpcFibCli")
42end
43
44
45fibClient(ARGS)
1using Yggdrasil
2using Printf
3
4
5function fibClientPar(args)
6
7 iterations = parse(Int64, args[1])
8 @printf("Hello from Julia rpcFibCliPar: iterations = %d\n", iterations)
9
10 # Create RPC connection with server
11 # RPC client-side connection will be $(server_name)_$(client_name)
12 rpc = Yggdrasil.YggInterface("YggRpcClient", "rpcFibSrv_rpcFibCliPar", "%d", "%d %d")
13
14 # Send all of the requests to the server
15 for i = 1:iterations
16 @printf("rpcFibCliPar(Julia): fib(->%-2d) :::\n", i)
17 ret = rpc.send(Int64(i))
18 if (!ret)
19 error("rpcFibCliPar(Julia): SEND FAILED")
20 end
21 end
22
23 # Receive responses for all requests that were sent
24 for i = 1:iterations
25 ret, fib = rpc.recv()
26 if (!ret)
27 error("rpcFibCliPar(Julia): RECV FAILED")
28 end
29 @printf("rpcFibCliPar(Julia): fib(%2d<-) = %-2d<-\n", fib[1], fib[2])
30 end
31
32 println("Goodbye from Julia rpcFibCliPar")
33end
34
35
36fibClientPar(ARGS)
1using Yggdrasil
2using Printf
3
4
5function fibServer(args)
6
7 sleeptime = parse(Float64, args[1])
8 @printf("Hello from Julia rpcFibSrv: sleeptime = %f\n", sleeptime)
9
10 # Create server-side rpc conneciton using model name
11 rpc = Yggdrasil.YggInterface("YggRpcServer", "rpcFibSrv", "%d", "%d %d")
12
13 # Continue receiving requests until error occurs (the connection is closed
14 # by all clients that have connected).
15 while (true)
16 println("rpcFibSrv(Julia): receiving...")
17 retval, rpc_in = rpc.recv()
18 if (!retval)
19 println("rpcFibSrv(Julia): end of input")
20 break
21 end
22
23 # Compute fibonacci number
24 @printf("rpcFibSrv(Julia): <- input %d", rpc_in[1])
25 pprev = 0
26 prev = 1
27 result = 1
28 fib_no = 1
29 arg = rpc_in[1]
30 while (fib_no < arg)
31 result = prev + pprev
32 pprev = prev
33 prev = result
34 fib_no = fib_no + 1
35 end
36 @printf(" ::: ->(%2d %2d)\n", arg, result)
37
38 # Sleep and then send response back
39 Sys.sleep(sleeptime)
40 flag = rpc.send(arg, Int64(result))
41 if (!flag)
42 error("rpcFibSrv(Julia): ERROR sending")
43 end
44 end
45 println("Goodbye from Julia rpcFibSrv")
46
47end
48
49fibServer(ARGS)
Model YAML:
1---
2
3model:
4 name: rpcFibCli
5 driver: JuliaModelDriver
6 args:
7 - ./src/rpcFibCli.jl
8 - "{{ FIB_ITERATIONS }}" # env_var passed as argument for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
10
11 outputs:
12 - name: output_log
13 driver: FileOutputDriver
14 args: fibCli.txt
15 in_temp: True
16
17 inputs:
18 - name: yaml_in
19 driver: FileInputDriver
20 args: rpcFibCli_r.yml
1---
2
3model:
4 name: rpcFibCliPar
5 driver: JuliaModelDriver
6 args:
7 - ./src/rpcFibCliPar.jl
8 - "{{ FIB_ITERATIONS }}" # env_var for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
1---
2
3model:
4 name: rpcFibSrv
5 driver: JuliaModelDriver
6 args:
7 - ./src/rpcFibSrv.jl
8 - "{{ FIB_SERVER_SLEEP_SECONDS }}" # env var - time to sleep before returning
9 is_server: True # Creates a RPC queue called $name
Matlab Version¶
Model Code:
1function rpcFibCli(iterations)
2
3 iterations = str2num(iterations);
4 fprintf('Hello from Matlab rpcFibCli: iterations = %d\n', iterations);
5
6 % Set up connections matching yaml
7 % RPC client-side connection will be $(server_name)_$(client_name)
8 ymlfile = YggInterface('YggInput', 'yaml_in');
9 rpc = YggInterface('YggRpcClient', 'rpcFibSrv_rpcFibCli', '%d', '%d %d');
10 log = YggInterface('YggOutput', 'output_log');
11
12 % Read entire contents of yaml
13 [ret, ycontent] = ymlfile.recv();
14 if (~ret);
15 error('rpcFibCli(M): RECV ERROR');
16 end
17 fprintf('rpcFibCli: yaml has %d lines\n', ...
18 length(strsplit(ycontent, '\n', 'CollapseDelimiters', false)));
19
20 for i = 1:iterations
21
22 % Call the server and receive response
23 fprintf('rpcFibCli(M): fib(->%-2d) ::: ', i);
24 [ret, fib] = rpc.call(int32(i));
25 if (~ret);
26 error('rpcFibCli(M): RPC CALL ERROR');
27 end
28
29 % Log result by sending it to the log connection
30 s = sprintf('fib(%2d<-) = %-2d<-\n', fib{1}, fib{2});
31 fprintf(s);
32 ret = log.send(s);
33 if (~ret);
34 error('rpcFibCli(M): SEND ERROR');
35 end
36 end
37
38 disp('Goodbye from Matlab rpcFibCli');
39 return;
40
41end
1function rpcFibCliPar(iterations)
2
3 iterations = str2num(iterations);
4 fprintf('Hello from Matlab rpcFibCliPar: iterations = %d\n', iterations);
5
6 % Create RPC connection with serover
7 % RPC client-side connection will be $(server_name)_$(client_name)
8 rpc = YggInterface('YggRpcClient', 'rpcFibSrv_rpcFibCliPar', '%d', '%d %d');
9
10 % Send all of the requests to the server
11 for i = 1:iterations
12 fprintf('rpcFibCliPar(M): fib(->%-2d) ::: \n', i);
13 ret = rpc.send(int32(i));
14 if (~ret);
15 error('rpcFibCliPar(M): SEND FAILED');
16 end
17 end
18
19 % Receive responses for all requests that were sent
20 for i = 1:iterations
21 fprintf('rpcFibCliPar(M): fib(->%-2d) ::: ', i);
22 [ret, fib] = rpc.recv();
23 if (~ret);
24 error('rpcFibCliPar(M): RECV FAILED')
25 end
26 fprintf('rpcFibCliPar(M): fib(%2d<-) = %-2d<-\n', fib{1}, fib{2});
27 end
28
29 disp('Goodbye from Matlab rpcFibCliPar');
30
31end
1function rpcFibSrv(sleeptime)
2
3 sleeptime = str2num(sleeptime);
4 fprintf('Hello from Matlab rpcFibSrv: sleeptime = %f\n', sleeptime);
5
6 % Create server-side rpc conneciton using model name
7 rpc = YggInterface('YggRpcServer', 'rpcFibSrv', '%d', '%d %d');
8
9 % Continue receiving requests until error occurs (the connection is closed
10 % by all clients that have connected).
11 while 1
12 [flag, input] = rpc.recv();
13 if (~flag);
14 fprintf('rpcFibSrv(M): end of input');
15 return;
16 end
17
18 % Compute fibonacci number
19 fprintf('rpcFibSrv(M): <- input %d', input{1});
20 pprev = 0;
21 prev = 1;
22 result = 1;
23 n = 1;
24 while n < input{1}
25 result = prev + pprev;
26 pprev = prev;
27 prev = result;
28 n = n+1;
29 end
30 fprintf(' ::: ->(%2d %2d)\n', input{1}, result);
31
32 % Sleep and then send response back
33 pause(sleeptime);
34 flag = rpc.send(input{1}, int32(result));
35 if (~flag);
36 error('rpcFibSrv(M): ERROR sending');
37 end
38 end
39
40 disp('Goodbye from Matlab rpcFibSrv');
41
42end
Model YAML:
1---
2
3model:
4 name: rpcFibCli
5 driver: MatlabModelDriver
6 args:
7 - ./src/rpcFibCli.m
8 - "{{ FIB_ITERATIONS }}" # env_var passed as argument for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
10
11 outputs:
12 - name: output_log
13 driver: FileOutputDriver
14 args: fibCli.txt
15 in_temp: True
16
17 inputs:
18 - name: yaml_in
19 driver: FileInputDriver
20 args: rpcFibCli_matlab.yml
1---
2
3model:
4 name: rpcFibCliPar
5 driver: MatlabModelDriver
6 args:
7 - ./src/rpcFibCliPar.m
8 - "{{ FIB_ITERATIONS }}"
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
10
1---
2
3model:
4 name: rpcFibSrv
5 driver: MatlabModelDriver
6 args:
7 - ./src/rpcFibSrv.m
8 - "{{ FIB_SERVER_SLEEP_SECONDS }}" # env var - time to sleep before returning
9 is_server: True # Creates a RPC queue called $name
Python Version¶
Model Code:
1from __future__ import print_function
2import sys
3import numpy as np
4from yggdrasil.interface.YggInterface import (
5 YggRpcClient, YggInput, YggOutput)
6
7
8def fibClient(args):
9
10 iterations = int(args[0])
11 print('Hello from Python rpcFibCli: iterations = %d ' % iterations)
12
13 # Set up connections matching yaml
14 # RPC client-side connection will be $(server_name)_$(client_name)
15 ymlfile = YggInput("yaml_in")
16 rpc = YggRpcClient("rpcFibSrv_rpcFibCli", "%d", "%d %d")
17 log = YggOutput("output_log")
18
19 # Read entire contents of yaml
20 ret, ycontent = ymlfile.recv()
21 if not ret:
22 raise RuntimeError('rpcFibCli(P): RECV ERROR')
23 print('rpcFibCli: yaml has %d lines' % len(ycontent.split(b'\n')))
24
25 for i in range(1, iterations + 1):
26
27 # Call the server and receive response
28 print('rpcFibCli(P): fib(->%-2d) ::: ' % i, end='')
29 ret, fib = rpc.rpcCall(np.int32(i))
30 if not ret:
31 raise RuntimeError('rpcFibCli(P): RPC CALL ERROR')
32
33 # Log result by sending it to the log connection
34 s = 'fib(%2d<-) = %-2d<-\n' % tuple(fib)
35 print(s, end='')
36 ret = log.send(s)
37 if not ret:
38 raise RuntimeError('rpcFibCli(P): SEND ERROR')
39
40 print('Goodbye from Python rpcFibCli')
41
42
43if __name__ == '__main__':
44 fibClient(sys.argv[1:])
1from __future__ import print_function
2import sys
3import numpy as np
4from yggdrasil.interface.YggInterface import YggRpcClient
5
6
7def fibClient(args):
8
9 iterations = int(args[0])
10 print('Hello from Python rpcFibCliPar: iterations = %d' % iterations)
11
12 # Create RPC connection with server
13 # RPC client-side connection will be $(server_name)_$(client_name)
14 rpc = YggRpcClient("rpcFibSrv_rpcFibCliPar", "%d", "%d %d")
15
16 # Send all of the requests to the server
17 for i in range(1, iterations + 1):
18 print('rpcFibCliPar(P): fib(->%-2d) ::: ' % i)
19 ret = rpc.rpcSend(np.int32(i))
20 if not ret:
21 raise RuntimeError('rpcFibCliPar(P): SEND FAILED')
22
23 # Receive responses for all requests that were sent
24 for i in range(1, iterations + 1):
25 ret, fib = rpc.rpcRecv()
26 if not ret:
27 raise RuntimeError('rpcFibCliPar(P): RECV FAILED')
28 print('rpcFibCliPar(P): fib(%2d<-) = %-2d<-' % tuple(fib))
29
30 print('Goodbye from Python rpcFibCliPar')
31
32
33if __name__ == '__main__':
34 fibClient(sys.argv[1:])
1from __future__ import print_function
2import sys
3import numpy as np
4from yggdrasil.interface.YggInterface import YggRpcServer
5from yggdrasil.tools import sleep
6
7
8def fibServer(args):
9
10 sleeptime = float(args[0])
11 print('Hello from Python rpcFibSrv: sleeptime = %f' % sleeptime)
12
13 # Create server-side rpc conneciton using model name
14 rpc = YggRpcServer("rpcFibSrv", "%d", "%d %d")
15
16 # Continue receiving requests until error occurs (the connection is closed
17 # by all clients that have connected).
18 while True:
19 print('rpcFibSrv(P): receiving...')
20 retval, rpc_in = rpc.rpcRecv()
21 if not retval:
22 print('rpcFibSrv(P): end of input')
23 break
24
25 # Compute fibonacci number
26 print('rpcFibSrv(P): <- input %d' % rpc_in[0], end='')
27 pprev = 0
28 prev = 1
29 result = 1
30 fib_no = 1
31 arg = rpc_in[0]
32 while fib_no < arg:
33 result = prev + pprev
34 pprev = prev
35 prev = result
36 fib_no = fib_no + 1
37 print(' ::: ->(%2d %2d)' % (arg, result))
38
39 # Sleep and then send response back
40 sleep(float(sleeptime))
41 flag = rpc.rpcSend(arg, np.int32(result))
42 if not flag:
43 raise RuntimeError('rpcFibSrv(P): ERROR sending')
44
45 print('Goodbye from Python rpcFibSrv')
46
47
48if __name__ == '__main__':
49 fibServer(sys.argv[1:])
Model YAML:
1---
2
3model:
4 name: rpcFibCli
5 driver: PythonModelDriver
6 args:
7 - ./src/rpcFibCli.py
8 - "{{ FIB_ITERATIONS }}" # env_var passed as argument for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
10
11 outputs:
12 - name: output_log
13 driver: FileOutputDriver
14 args: fibCli.txt
15 in_temp: True
16
17 inputs:
18 - name: yaml_in
19 driver: FileInputDriver
20 args: rpcFibCli_python.yml
1---
2
3model:
4 name: rpcFibCliPar
5 driver: PythonModelDriver
6 args:
7 - ./src/rpcFibCliPar.py
8 - "{{ FIB_ITERATIONS }}" # env_var for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
1---
2
3model:
4 name: rpcFibSrv
5 driver: PythonModelDriver
6 args:
7 - ./src/rpcFibSrv.py
8 - "{{ FIB_SERVER_SLEEP_SECONDS }}" # env var - time to sleep before returning
9 is_server: True # Creates a RPC queue called $name
R Version¶
Model Code:
1library(yggdrasil)
2
3
4fibClient <- function(args) {
5
6 iterations <- strtoi(args[[1]])
7 fprintf('Hello from R rpcFibCli: iterations = %d ', iterations)
8
9 # Set up connections matching yaml
10 # RPC client-side connection will be $(server_name)_$(client_name)
11 ymlfile <- YggInterface('YggInput', "yaml_in")
12 rpc <- YggInterface('YggRpcClient', "rpcFibSrv_rpcFibCli", "%d", "%d %d")
13 log <- YggInterface('YggOutput', "output_log")
14
15 # Read entire contents of yaml
16 c(ret, ycontent) %<-% ymlfile$recv()
17 if (!ret) {
18 stop('rpcFibCli(R): RECV ERROR')
19 }
20 fprintf('rpcFibCli: yaml has %d lines', length(strsplit(ycontent, '\n')))
21
22 for (i in 1:iterations) {
23
24 # Call the server and receive response
25 fprintf('rpcFibCli(R): fib(->%-2d) ::: ', i)
26 c(ret, fib) %<-% rpc$call(as.integer(i))
27 if (!ret) {
28 stop('rpcFibCli(R): RPC CALL ERROR')
29 }
30
31 # Log result by sending it to the log connection
32 s <- sprintf('fib(%2d<-) = %-2d<-\n', fib[[1]], fib[[2]])
33 print(s)
34 ret <- log$send(s)
35 if (!ret) {
36 stop('rpcFibCli(R): SEND ERROR')
37 }
38 }
39
40 print('Goodbye from R rpcFibCli')
41}
42
43
44args <- commandArgs(trailingOnly=TRUE)
45fibClient(args)
1library(yggdrasil)
2
3
4fibClientPar <- function(args) {
5
6 iterations <- strtoi(args[[1]])
7 fprintf('Hello from R rpcFibCliPar: iterations = %d ', iterations)
8
9 # Create RPC connection with server
10 # RPC client-side connection will be $(server_name)_$(client_name)
11 rpc <- YggInterface('YggRpcClient', "rpcFibSrv_rpcFibCliPar", "%d", "%d %d")
12
13 # Send all of the requests to the server
14 for (i in 1:iterations) {
15 fprintf('rpcFibCliPar(R): fib(->%-2d) ::: ', i)
16 ret <- rpc$send(as.integer(i))
17 if (!ret) {
18 stop('rpcFibCliPar(R): SEND FAILED')
19 }
20 }
21
22 # Receive responses for all requests that were sent
23 for (i in 1:iterations) {
24 c(ret, fib) %<-% rpc$recv()
25 if (!ret) {
26 stop('rpcFibCliPar(R): RECV FAILED')
27 }
28 fprintf('rpcFibCliPar(R): fib(%2d<-) = %-2d<-', fib[[1]], fib[[2]])
29 }
30
31 print('Goodbye from R rpcFibCliPar')
32}
33
34
35args <- commandArgs(trailingOnly=TRUE)
36fibClientPar(args)
1library(yggdrasil)
2
3
4fibServer <- function(args) {
5
6 sleeptime <- as.double(args[[1]])
7 fprintf('Hello from R rpcFibSrv: sleeptime = %f', sleeptime)
8
9 # Create server-side rpc conneciton using model name
10 rpc <- YggInterface('YggRpcServer', "rpcFibSrv", "%d", "%d %d")
11
12 # Continue receiving requests until error occurs (the connection is closed
13 # by all clients that have connected).
14 while (TRUE) {
15 print('rpcFibSrv(R): receiving...')
16 c(retval, rpc_in) %<-% rpc$recv()
17 if (!retval) {
18 print('rpcFibSrv(R): end of input')
19 break
20 }
21
22 # Compute fibonacci number
23 fprintf('rpcFibSrv(R): <- input %d', rpc_in[[1]])
24 pprev <- 0
25 prev <- 1
26 result <- 1
27 fib_no <- 1
28 arg <- rpc_in[[1]]
29 while (fib_no < arg) {
30 result <- prev + pprev
31 pprev <- prev
32 prev <- result
33 fib_no <- fib_no + 1
34 }
35 fprintf(' ::: ->(%2d %2d)', arg, result)
36
37 # Sleep and then send response back
38 Sys.sleep(sleeptime)
39 flag <- rpc$send(arg, as.integer(result))
40 if (!flag) {
41 stop('rpcFibSrv(R): ERROR sending')
42 }
43 }
44 print('Goodbye from R rpcFibSrv')
45
46}
47
48args = commandArgs(trailingOnly=TRUE)
49fibServer(args)
Model YAML:
1---
2
3model:
4 name: rpcFibCli
5 driver: RModelDriver
6 args:
7 - ./src/rpcFibCli.R
8 - "{{ FIB_ITERATIONS }}" # env_var passed as argument for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
10
11 outputs:
12 - name: output_log
13 driver: FileOutputDriver
14 args: fibCli.txt
15 in_temp: True
16
17 inputs:
18 - name: yaml_in
19 driver: FileInputDriver
20 args: rpcFibCli_r.yml
1---
2
3model:
4 name: rpcFibCliPar
5 driver: RModelDriver
6 args:
7 - ./src/rpcFibCliPar.R
8 - "{{ FIB_ITERATIONS }}" # env_var for number of loops
9 client_of: rpcFibSrv # Creates an RPC client queue $(client_of)_$(name)
1---
2
3model:
4 name: rpcFibSrv
5 driver: RModelDriver
6 args:
7 - ./src/rpcFibSrv.R
8 - "{{ FIB_SERVER_SLEEP_SECONDS }}" # env var - time to sleep before returning
9 is_server: True # Creates a RPC queue called $name