timed_pipe¶
This example is used for gathering performance information about communication between model in yggdrasil integrations and should only be run as part of the timing module.
C Version¶
Model Code:
1#include <stdio.h>
2#include "YggInterface.h"
3
4int main() {
5 int exit_code = 0;
6 int ret = 0;
7 int bufsiz = 512;
8 char *buf = (char*)malloc(bufsiz);
9
10 printf("Hello from C pipe_dst\n");
11
12 // Ins/outs matching with the the model yaml
13 yggInput_t inq = yggInput("input_pipe");
14 yggOutput_t outf = yggOutput("output_file");
15 printf("pipe_dst(C): Created I/O channels\n");
16
17 // Continue receiving input from the queue
18 int count = 0;
19 while (1) {
20 ret = ygg_recv_nolimit(inq, &buf, bufsiz);
21 if (ret < 0) {
22 printf("pipe_dst(C): Input channel closed\n");
23 break;
24 }
25 if (ret > (bufsiz - 1)) {
26 bufsiz = ret + 1;
27 printf("pipe_dst(C): Buffer increased to %d bytes\n", bufsiz);
28 }
29 ret = ygg_send_nolimit(outf, buf, ret);
30 if (ret < 0) {
31 printf("pipe_dst(C): SEND ERROR ON MSG %d\n", count);
32 exit_code = -1;
33 break;
34 }
35 count++;
36 }
37
38 printf("Goodbye from C destination. Received %d messages.\n", count);
39
40 free(buf);
41 return exit_code;
42}
43
1#include <stdio.h>
2#include "YggInterface.h"
3
4int main(int argc, char *argv[]) {
5 if (argc != 3) {
6 printf("Error in C pipe_src: The message count and size must be provided as input arguments.\n");
7 return -1;
8 }
9 int exit_code = 0;
10 int ret = 0;
11
12 int msg_count = atoi(argv[1]);
13 int msg_size = atoi(argv[2]);
14 printf("Hello from C pipe_src: msg_count = %d, msg_size = %d\n",
15 msg_count, msg_size);
16
17 // Ins/outs matching with the the model yaml
18 yggOutput_t outq = yggOutput("output_pipe");
19 printf("pipe_src(C): Created I/O channels\n");
20
21 // Create test message
22 char *test_msg = (char*)malloc(msg_size + 1);
23 int i;
24 for (i = 0; i < msg_size; i++)
25 test_msg[i] = '0';
26 test_msg[i] = '\0';
27
28 // Send test message multiple times
29 int count = 0;
30 for (i = 0; i < msg_count; i++) {
31 ret = ygg_send(outq, test_msg, msg_size);
32 if (ret < 0) {
33 printf("pipe_src(C): SEND ERROR ON MSG %d\n", i);
34 exit_code = -1;
35 break;
36 }
37 count++;
38 }
39
40 printf("Goodbye from C source. Sent %d messages.\n", count);
41 free(test_msg);
42 return exit_code;
43}
44
Model YAML:
1---
2
3models:
4 - name: timed_pipe_src
5 language: c
6 args:
7 - ./src/timed_pipe_src.c
8 - "{{PIPE_MSG_COUNT}}"
9 - "{{PIPE_MSG_SIZE}}"
10 outputs:
11 - output_pipe
12
13 - name: timed_pipe_dst
14 language: c
15 args: ./src/timed_pipe_dst.c
16 inputs:
17 - input_pipe
18 outputs:
19 - output_file
20
21connections:
22 - input: output_pipe
23 output: input_pipe
24 - input: output_file
25 output_file:
26 name: output_timed_pipe.txt
27 filetype: ascii
28 in_temp: True
C++ Version¶
Model Code:
1#include <stdio.h>
2#include "YggInterface.hpp"
3
4int main() {
5 int ret = 0;
6 int bufsiz = 512;
7 char *buf = (char*)malloc(bufsiz);
8
9 printf("Hello from C++ pipe_dst\n");
10
11 // Ins/outs matching with the the model yaml
12 YggInput inq("input_pipe");
13 YggOutput outf("output_file");
14 printf("pipe_dst(CPP): Created I/O channels\n");
15
16 // Continue receiving input from the queue
17 int count = 0;
18 while (1) {
19 ret = inq.recv_nolimit(&buf, bufsiz);
20 if (ret < 0) {
21 printf("pipe_dst(CPP): Input channel closed\n");
22 break;
23 }
24 if (ret > (bufsiz - 1)) {
25 bufsiz = ret + 1;
26 printf("pipe_dst(CPP): Buffer increased to %d bytes\n", bufsiz);
27 }
28 ret = outf.send_nolimit(buf, ret);
29 if (ret < 0) {
30 printf("pipe_dst(CPP): SEND ERROR ON MSG %d\n", count);
31 free(buf);
32 return -1;
33 }
34 count++;
35 }
36
37 printf("Goodbye from C++ destination. Received %d messages.\n", count);
38
39 free(buf);
40 return 0;
41}
1#include <stdio.h>
2#include "YggInterface.hpp"
3
4int main(int argc, char *argv[]) {
5 if (argc != 3) {
6 printf("Error in C++ pipe_src: The message count and size must be provided as input arguments.\n");
7 return -1;
8 }
9 int exit_code = 0;
10 int ret = 0;
11
12 int msg_count = atoi(argv[1]);
13 int msg_size = atoi(argv[2]);
14 printf("Hello from C++ pipe_src: msg_count = %d, msg_size = %d\n",
15 msg_count, msg_size);
16
17 // Ins/outs matching with the the model yaml
18 YggOutput outq("output_pipe");
19 printf("pipe_src(CPP): Created I/O channels\n");
20
21 // Create test message
22 char *test_msg = (char*)malloc(msg_size + 1);
23 int i;
24 for (i = 0; i < msg_size; i++)
25 test_msg[i] = '0';
26 test_msg[i] = '\0';
27
28 // Send test message multiple times
29 int count = 0;
30 for (i = 0; i < msg_count; i++) {
31 ret = outq.send(test_msg, msg_size);
32 if (ret < 0) {
33 printf("pipe_src(CPP): SEND ERROR ON MSG %d\n", i);
34 exit_code = -1;
35 break;
36 }
37 count++;
38 }
39
40 printf("Goodbye from C++ source. Sent %d messages.\n", count);
41 free(test_msg);
42 return exit_code;
43}
Model YAML:
1---
2
3models:
4 - name: timed_pipe_src
5 language: c++
6 args:
7 - ./src/timed_pipe_src.cpp
8 - "{{PIPE_MSG_COUNT}}"
9 - "{{PIPE_MSG_SIZE}}"
10 outputs:
11 - output_pipe
12
13 - name: timed_pipe_dst
14 language: c++
15 args: ./src/timed_pipe_dst.cpp
16 inputs:
17 - input_pipe
18 outputs:
19 - output_file
20
21connections:
22 - input: output_pipe
23 output: input_pipe
24 - input: output_file
25 output_file:
26 name: output_timed_pipe.txt
27 filetype: ascii
28 in_temp: True
Fortran Version¶
Model Code:
1program main
2 use fygg
3
4 character(len=32) :: arg
5 integer :: msg_count, msg_size
6 type(yggcomm) :: inq, outf
7 logical :: ret
8 integer :: i, count
9 integer :: exit_code = 0
10 integer :: bufsiz = 0, old_bufsiz = 0
11 type(yggchar_r) :: buf
12
13 write(*, '("Hello from Fortran pipe_dst")')
14
15 ! Ins/outs matching with the the model yaml
16 inq = ygg_input("input_pipe")
17 outf = ygg_output("output_file")
18 write(*, '("pipe_dst(F): Created I/O channels")')
19
20 ! Continue receiving input from the queue
21 count = 0
22 do while(.true.)
23 bufsiz = size(buf%x)
24 old_bufsiz = bufsiz
25 ret = ygg_recv_nolimit(inq, buf, bufsiz)
26 if (.not.ret) then
27 write(*, '("pipe_dst(F): Input channel closed")')
28 exit
29 end if
30 if (bufsiz.gt.old_bufsiz) then
31 write(*, '("pipe_dst(F): Buffer increased from ",&
32 &i5.1," to ",i5.1," bytes")') old_bufsiz, bufsiz
33 end if
34 ret = ygg_send_nolimit(outf, buf, bufsiz)
35 if (.not.ret) then
36 write(*, '("pipe_dst(F): SEND ERROR ON MSG ",i5.1)') count
37 exit_code = -1
38 exit
39 end if
40 count = count + 1
41 end do
42
43 write(*, '("Goodbye from Fortran destination. Received ",&
44 &i5.1," messages.")') count
45 if (exit_code.lt.0) then
46 stop 1
47 end if
48
49end program main
1program main
2 use fygg
3
4 character(len=32) :: arg
5 integer :: msg_count, msg_size
6 type(yggcomm) :: outq
7 logical :: ret
8 integer :: i, count
9 integer :: exit_code = 0
10 character(len=:), allocatable :: test_msg
11
12 if (command_argument_count().ne.2) then
13 write(*, '("Error in Fortran pipe_src: The message count and size &
14 &must be provided as input arguments.")')
15 stop 1
16 end if
17
18 call get_command_argument(1, arg)
19 read(arg, *) msg_count
20 call get_command_argument(2, arg)
21 read(arg, *) msg_size
22 write(*, '("Hello from Fortran pipe_src: msg_count = ",i5.1,", &
23 &msg_size = ",i5.1)') msg_count, msg_size
24
25 ! Ins/outs matching with the the model yaml
26 outq = ygg_output("output_pipe")
27 write(*, '("pipe_src(F): Created I/O channels")')
28
29 ! Create test message
30 allocate(character(len=(msg_size + 1)) :: test_msg)
31 do i = 1, msg_size
32 test_msg(i:i) = '0'
33 end do
34 test_msg((msg_size+1):(msg_size+1)) = c_null_char
35
36 ! Send test message multiple times
37 count = 0
38 do i = 1, msg_count
39 ret = ygg_send(outq, test_msg, msg_size)
40 if (.not.ret) then
41 write(*, '("pipe_src(F): SEND ERROR ON MSG ",i5.1)') i
42 exit_code = -1
43 exit
44 end if
45 count = count + 1
46 end do
47
48 write(*, '("Goodbye from Fortran source. Sent ",i5.1," messages.")') count
49 if (allocated(test_msg)) deallocate(test_msg)
50 if (exit_code.lt.0) then
51 stop 1
52 end if
53
54end program main
Model YAML:
1---
2
3models:
4 - name: timed_pipe_src
5 language: fortran
6 args:
7 - ./src/timed_pipe_src.f90
8 - "{{PIPE_MSG_COUNT}}"
9 - "{{PIPE_MSG_SIZE}}"
10 outputs:
11 - output_pipe
12
13 - name: timed_pipe_dst
14 language: fortran
15 args: ./src/timed_pipe_dst.f90
16 inputs:
17 - input_pipe
18 outputs:
19 - output_file
20
21connections:
22 - input: output_pipe
23 output: input_pipe
24 - input: output_file
25 output_file:
26 name: output_timed_pipe.txt
27 filetype: ascii
28 in_temp: True
Julia Version¶
Model Code:
1using Yggdrasil
2using Printf
3
4
5function run(args)
6 println("Hello from Julia pipe_dst")
7
8 # Ins/outs matching with the the model yaml
9 inq = Yggdrasil.YggInterface("YggInput", "input_pipe")
10 outf = Yggdrasil.YggInterface("YggOutput", "output_file")
11 println("pipe_dst(Julia): Created I/O channels")
12
13 # Continue receiving input from the queue
14 global count = 0
15 while (true)
16 ret, buf = inq.recv()
17 if (!ret)
18 println("pipe_dst(Julia): Input channel closed")
19 break
20 end
21 ret = outf.send(buf)
22 if (!ret)
23 error(@sprintf("pipe_dst(Julia): SEND ERROR ON MSG %d\n", count))
24 end
25 global count = count + 1
26 end
27
28 @printf("Goodbye from Julia destination. Received %d messages.\n", count)
29end
30
31
32run(ARGS)
1using Yggdrasil
2using Printf
3
4
5function run(args)
6 msg_count = parse(Int64, args[1])
7 msg_size = parse(Int64, args[2])
8 @printf("Hello from Julia pipe_src: msg_count = %d, msg_size = %d\n",
9 msg_count, msg_size)
10
11 # Ins/outs matching with the the model yaml
12 outq = Yggdrasil.YggInterface("YggOutput", "output_pipe")
13 println("pipe_src(Julia): Created I/O channels")
14
15 # Send test message multiple times
16 test_msg = "0"^msg_size
17 global count = 0
18 for i = 1:msg_count
19 ret = outq.send(test_msg)
20 if (!ret)
21 error(@sprintf("pipe_src(Julia): SEND ERROR ON MSG %d", i))
22 end
23 global count = count + 1
24 end
25
26 @printf("Goodbye from Julia source. Sent %d messages.\n", count)
27end
28
29
30run(ARGS)
Model YAML:
1---
2
3models:
4 - name: timed_pipe_src
5 language: julia
6 args:
7 - ./src/timed_pipe_src.jl
8 - "{{PIPE_MSG_COUNT}}"
9 - "{{PIPE_MSG_SIZE}}"
10 outputs:
11 - output_pipe
12
13 - name: timed_pipe_dst
14 language: julia
15 args: ./src/timed_pipe_dst.jl
16 inputs:
17 - input_pipe
18 outputs:
19 - output_file
20
21connections:
22 - input: output_pipe
23 output: input_pipe
24 - input: output_file
25 output_file:
26 name: output_timed_pipe.txt
27 filetype: ascii
28 in_temp: True
Matlab Version¶
Model Code:
1disp('Hello from Matlab pipe_dst');
2
3% Ins/outs matching with the the model yaml
4inq = YggInterface('YggInput', 'input_pipe');
5outf = YggInterface('YggOutput', 'output_file');
6disp('pipe_dst(M): Created I/O channels');
7
8% Continue receiving input from the queue
9count = 0;
10while (1);
11 [flag, buf] = inq.recv();
12 if (~flag);
13 disp('pipe_dst(M): Input channel closed');
14 break;
15 end;
16 ret = outf.send(buf);
17 if (~ret);
18 error(sprintf('pipe_dst(M): SEND ERROR ON MSG %d\n', count));
19 end;
20 count = count + 1;
21end;
22
23fprintf('Goodbye from Matlab destination. Received %d messages.\n', count);
1function timed_pipe_src(msg_count, msg_size)
2
3 msg_count = str2num(msg_count);
4 msg_size = str2num(msg_size);
5 fprintf('Hello from Matlab pipe_src: msg_count = %d, msg_size = %d\n', ...
6 msg_count, msg_size);
7
8 % Ins/outs matching with the the model yaml
9 outq = YggInterface('YggOutput', 'output_pipe');
10 disp('pipe_src(M): Created I/O channels');
11
12 % Send test message multiple times
13 test_msg(1:msg_size) = '0';
14 count = 0;
15 for i = 1:msg_count
16 ret = outq.send(test_msg);
17 if (~ret)
18 error(sprintf('pipe_src(M): SEND ERROR ON MSG %d\n', i));
19 end;
20 count = count + 1;
21 end;
22
23 fprintf('Goodbye from Matlab source. Sent %d messages.\n', count);
24
25end
Model YAML:
1---
2
3models:
4 - name: timed_pipe_src
5 language: matlab
6 args:
7 - ./src/timed_pipe_src.m
8 - "{{PIPE_MSG_COUNT}}"
9 - "{{PIPE_MSG_SIZE}}"
10 outputs:
11 - output_pipe
12
13 - name: timed_pipe_dst
14 language: matlab
15 args: ./src/timed_pipe_dst.m
16 inputs:
17 - input_pipe
18 outputs:
19 - output_file
20
21connections:
22 - input: output_pipe
23 output: input_pipe
24 - input: output_file
25 output_file:
26 name: output_timed_pipe.txt
27 filetype: ascii
28 in_temp: True
Python Version¶
Model Code:
1from __future__ import print_function
2from yggdrasil.interface.YggInterface import YggInput, YggOutput
3
4
5def run():
6 print('Hello from Python pipe_dst')
7
8 # Ins/outs matching with the the model yaml
9 inq = YggInput('input_pipe')
10 outf = YggOutput('output_file')
11 print("pipe_dst(P): Created I/O channels")
12
13 # Continue receiving input from the queue
14 count = 0
15 while True:
16 ret, buf = inq.recv()
17 if not ret:
18 print("pipe_dst(P): Input channel closed")
19 break
20 ret = outf.send(buf)
21 if not ret:
22 raise RuntimeError("pipe_dst(P): SEND ERROR ON MSG %d" % count)
23 count += 1
24
25 print('Goodbye from Python destination. Received %d messages.' % count)
26
27
28if __name__ == '__main__':
29 run()
1from __future__ import print_function
2import sys
3from yggdrasil.interface.YggInterface import YggOutput
4
5
6def run(args):
7 msg_count = int(args[0])
8 msg_size = int(args[1])
9 print('Hello from Python pipe_src: msg_count = %d, msg_size = %d' % (
10 msg_count, msg_size))
11
12 # Ins/outs matching with the the model yaml
13 outq = YggOutput('output_pipe')
14 print("pipe_src(P): Created I/O channels")
15
16 # Send test message multiple times
17 test_msg = b'0' * msg_size
18 count = 0
19 for i in range(msg_count):
20 ret = outq.send(test_msg)
21 if not ret:
22 raise RuntimeError('pipe_src(P): SEND ERROR ON MSG %d' % i)
23 count += 1
24
25 print('Goodbye from Python source. Sent %d messages.' % count)
26
27
28if __name__ == '__main__':
29 run(sys.argv[1:])
Model YAML:
1---
2
3models:
4 - name: timed_pipe_src
5 language: python
6 args:
7 - ./src/timed_pipe_src.py
8 - "{{PIPE_MSG_COUNT}}"
9 - "{{PIPE_MSG_SIZE}}"
10 outputs:
11 - output_pipe
12
13 - name: timed_pipe_dst
14 language: python
15 args: ./src/timed_pipe_dst.py
16 inputs:
17 - input_pipe
18 outputs:
19 - output_file
20
21connections:
22 - input: output_pipe
23 output: input_pipe
24 - input: output_file
25 output_file:
26 name: output_timed_pipe.txt
27 filetype: ascii
28 in_temp: True
R Version¶
Model Code:
1library(yggdrasil)
2
3
4run <- function() {
5 print('Hello from R pipe_dst')
6
7 # Ins/outs matching with the the model yaml
8 inq <- YggInterface('YggInput', 'input_pipe')
9 outf <- YggInterface('YggOutput', 'output_file')
10 print("pipe_dst(R): Created I/O channels")
11
12 # Continue receiving input from the queue
13 count <- 0
14 while (TRUE) {
15 c(ret, buf) %<-% inq$recv()
16 if (!ret) {
17 print("pipe_dst(R): Input channel closed")
18 break
19 }
20 ret <- outf$send(buf)
21 if (!ret) {
22 stop(sprintf("pipe_dst(R): SEND ERROR ON MSG %d", count))
23 }
24 count <- count + 1
25 }
26
27 fprintf('Goodbye from R destination. Received %d messages.', count)
28}
29
30
31run()
1library(yggdrasil)
2
3
4run <- function(args) {
5 msg_count <- strtoi(args[[1]])
6 msg_size <- strtoi(args[[2]])
7 fprintf('Hello from R pipe_src: msg_count = %d, msg_size = %d',
8 msg_count, msg_size)
9
10 # Ins/outs matching with the the model yaml
11 outq <- YggInterface('YggOutput', 'output_pipe')
12 print("pipe_src(R): Created I/O channels")
13
14 # Send test message multiple times
15 test_msg <- paste(replicate(msg_size, '0'), collapse="")
16 count <- 0
17 for (i in 1:msg_count) {
18 ret <- outq$send(test_msg)
19 if (!ret) {
20 stop(sprintf('pipe_src(R): SEND ERROR ON MSG %d', i))
21 }
22 count <- count + 1
23 }
24
25 fprintf('Goodbye from R source. Sent %d messages.', count)
26}
27
28
29args <- commandArgs(trailingOnly=TRUE)
30run(args)
Model YAML:
1---
2
3models:
4 - name: timed_pipe_src
5 language: r
6 args:
7 - ./src/timed_pipe_src.R
8 - "{{PIPE_MSG_COUNT}}"
9 - "{{PIPE_MSG_SIZE}}"
10 outputs:
11 - output_pipe
12
13 - name: timed_pipe_dst
14 language: r
15 args: ./src/timed_pipe_dst.R
16 inputs:
17 - input_pipe
18 outputs:
19 - output_file
20
21connections:
22 - input: output_pipe
23 output: input_pipe
24 - input: output_file
25 output_file:
26 name: output_timed_pipe.txt
27 filetype: ascii
28 in_temp: True