timed_pipe

This example is used for gathering performance information about communication between model in yggdrasil integrations and should only be run as part of the timing module.

C Version

Model Code:

 1#include <stdio.h>
 2#include "YggInterface.h"
 3
 4int main() {
 5  int exit_code = 0;
 6  int ret = 0;
 7  int bufsiz = 512;
 8  char *buf = (char*)malloc(bufsiz);
 9  
10  printf("Hello from C pipe_dst\n");
11
12  // Ins/outs matching with the the model yaml
13  yggInput_t inq = yggInput("input_pipe");
14  yggOutput_t outf = yggOutput("output_file");
15  printf("pipe_dst(C): Created I/O channels\n");
16
17  // Continue receiving input from the queue
18  int count = 0;
19  while (1) {
20    ret = ygg_recv_nolimit(inq, &buf, bufsiz);
21    if (ret < 0) {
22      printf("pipe_dst(C): Input channel closed\n");
23      break;
24    }
25    if (ret > (bufsiz - 1)) {
26      bufsiz = ret + 1;
27      printf("pipe_dst(C): Buffer increased to %d bytes\n", bufsiz);
28    }
29    ret = ygg_send_nolimit(outf, buf, ret);
30    if (ret < 0) {
31      printf("pipe_dst(C): SEND ERROR ON MSG %d\n", count);
32      exit_code = -1;
33      break;
34    }
35    count++;
36  }
37
38  printf("Goodbye from C destination. Received %d messages.\n", count);
39
40  free(buf);
41  return exit_code;
42}
43
 1#include <stdio.h>
 2#include "YggInterface.h"
 3
 4int main(int argc, char *argv[]) {
 5  if (argc != 3) {
 6    printf("Error in C pipe_src: The message count and size must be provided as input arguments.\n");
 7    return -1;
 8  }
 9  int exit_code = 0;
10  int ret = 0;
11
12  int msg_count = atoi(argv[1]);
13  int msg_size = atoi(argv[2]);
14  printf("Hello from C pipe_src: msg_count = %d, msg_size = %d\n",
15	 msg_count, msg_size);
16
17  // Ins/outs matching with the the model yaml
18  yggOutput_t outq = yggOutput("output_pipe");
19  printf("pipe_src(C): Created I/O channels\n");
20
21  // Create test message
22  char *test_msg = (char*)malloc(msg_size + 1);
23  int i;
24  for (i = 0; i < msg_size; i++)
25    test_msg[i] = '0';
26  test_msg[i] = '\0';
27
28  // Send test message multiple times
29  int count = 0;
30  for (i = 0; i < msg_count; i++) {
31    ret = ygg_send(outq, test_msg, msg_size);
32    if (ret < 0) {
33      printf("pipe_src(C): SEND ERROR ON MSG %d\n", i);
34      exit_code = -1;
35      break;
36    }
37    count++;
38  }
39
40  printf("Goodbye from C source. Sent %d messages.\n", count);
41  free(test_msg);
42  return exit_code;
43}
44

Model YAML:

 1---
 2
 3models:
 4  - name: timed_pipe_src
 5    language: c
 6    args:
 7      - ./src/timed_pipe_src.c
 8      - "{{PIPE_MSG_COUNT}}"
 9      - "{{PIPE_MSG_SIZE}}"
10    outputs:
11      - output_pipe
12
13  - name: timed_pipe_dst
14    language: c
15    args: ./src/timed_pipe_dst.c
16    inputs:
17      - input_pipe
18    outputs:
19      - output_file
20
21connections:
22  - input: output_pipe
23    output: input_pipe
24  - input: output_file
25    output_file:
26      name: output_timed_pipe.txt
27      filetype: ascii
28      in_temp: True

C++ Version

Model Code:

 1#include <stdio.h>
 2#include "YggInterface.hpp"
 3
 4int main() {
 5  int ret = 0;
 6  int bufsiz = 512;
 7  char *buf = (char*)malloc(bufsiz);
 8  
 9  printf("Hello from C++ pipe_dst\n");
10
11  // Ins/outs matching with the the model yaml
12  YggInput inq("input_pipe");
13  YggOutput outf("output_file");
14  printf("pipe_dst(CPP): Created I/O channels\n");
15
16  // Continue receiving input from the queue
17  int count = 0;
18  while (1) {
19    ret = inq.recv_nolimit(&buf, bufsiz);
20    if (ret < 0) {
21      printf("pipe_dst(CPP): Input channel closed\n");
22      break;
23    }
24    if (ret > (bufsiz - 1)) {
25      bufsiz = ret + 1;
26      printf("pipe_dst(CPP): Buffer increased to %d bytes\n", bufsiz);
27    }
28    ret = outf.send_nolimit(buf, ret);
29    if (ret < 0) {
30      printf("pipe_dst(CPP): SEND ERROR ON MSG %d\n", count);
31      free(buf);
32      return -1;
33    }
34    count++;
35  }
36
37  printf("Goodbye from C++ destination. Received %d messages.\n", count);
38
39  free(buf);
40  return 0;
41}
 1#include <stdio.h>
 2#include "YggInterface.hpp"
 3
 4int main(int argc, char *argv[]) {
 5  if (argc != 3) {
 6    printf("Error in C++ pipe_src: The message count and size must be provided as input arguments.\n");
 7    return -1;
 8  }
 9  int exit_code = 0;
10  int ret = 0;
11
12  int msg_count = atoi(argv[1]);
13  int msg_size = atoi(argv[2]);
14  printf("Hello from C++ pipe_src: msg_count = %d, msg_size = %d\n",
15	 msg_count, msg_size);
16
17  // Ins/outs matching with the the model yaml
18  YggOutput outq("output_pipe");
19  printf("pipe_src(CPP): Created I/O channels\n");
20
21  // Create test message
22  char *test_msg = (char*)malloc(msg_size + 1);
23  int i;
24  for (i = 0; i < msg_size; i++)
25    test_msg[i] = '0';
26  test_msg[i] = '\0';
27
28  // Send test message multiple times
29  int count = 0;
30  for (i = 0; i < msg_count; i++) {
31    ret = outq.send(test_msg, msg_size);
32    if (ret < 0) {
33      printf("pipe_src(CPP): SEND ERROR ON MSG %d\n", i);
34      exit_code = -1;
35      break;
36    }
37    count++;
38  }
39
40  printf("Goodbye from C++ source. Sent %d messages.\n", count);
41  free(test_msg);
42  return exit_code;
43}

Model YAML:

 1---
 2
 3models:
 4  - name: timed_pipe_src
 5    language: c++
 6    args:
 7      - ./src/timed_pipe_src.cpp
 8      - "{{PIPE_MSG_COUNT}}"
 9      - "{{PIPE_MSG_SIZE}}"
10    outputs:
11      - output_pipe
12
13  - name: timed_pipe_dst
14    language: c++
15    args: ./src/timed_pipe_dst.cpp
16    inputs:
17      - input_pipe
18    outputs:
19      - output_file
20
21connections:
22  - input: output_pipe
23    output: input_pipe
24  - input: output_file
25    output_file:
26      name: output_timed_pipe.txt
27      filetype: ascii
28      in_temp: True

Fortran Version

Model Code:

 1program main
 2  use fygg
 3
 4  character(len=32) :: arg
 5  integer :: msg_count, msg_size
 6  type(yggcomm) :: inq, outf
 7  logical :: ret
 8  integer :: i, count
 9  integer :: exit_code = 0
10  integer :: bufsiz = 0, old_bufsiz = 0
11  type(yggchar_r) :: buf
12
13  write(*, '("Hello from Fortran pipe_dst")')
14
15  ! Ins/outs matching with the the model yaml
16  inq = ygg_input("input_pipe")
17  outf = ygg_output("output_file")
18  write(*, '("pipe_dst(F): Created I/O channels")')
19
20  ! Continue receiving input from the queue
21  count = 0
22  do while(.true.)
23     bufsiz = size(buf%x)
24     old_bufsiz = bufsiz
25     ret = ygg_recv_nolimit(inq, buf, bufsiz)
26     if (.not.ret) then
27        write(*, '("pipe_dst(F): Input channel closed")')
28        exit
29     end if
30     if (bufsiz.gt.old_bufsiz) then
31        write(*, '("pipe_dst(F): Buffer increased from ",&
32             &i5.1," to ",i5.1," bytes")') old_bufsiz, bufsiz
33     end if
34     ret = ygg_send_nolimit(outf, buf, bufsiz)
35     if (.not.ret) then
36        write(*, '("pipe_dst(F): SEND ERROR ON MSG ",i5.1)') count
37        exit_code = -1
38        exit
39     end if
40     count = count + 1
41  end do
42
43  write(*, '("Goodbye from Fortran destination. Received ",&
44       &i5.1," messages.")') count
45  if (exit_code.lt.0) then
46     stop 1
47  end if
48
49end program main
 1program main
 2  use fygg
 3
 4  character(len=32) :: arg
 5  integer :: msg_count, msg_size
 6  type(yggcomm) :: outq
 7  logical :: ret
 8  integer :: i, count
 9  integer :: exit_code = 0
10  character(len=:), allocatable :: test_msg
11
12  if (command_argument_count().ne.2) then
13     write(*, '("Error in Fortran pipe_src: The message count and size &
14          &must be provided as input arguments.")')
15     stop 1
16  end if
17
18  call get_command_argument(1, arg)
19  read(arg, *) msg_count
20  call get_command_argument(2, arg)
21  read(arg, *) msg_size
22  write(*, '("Hello from Fortran pipe_src: msg_count = ",i5.1,", &
23       &msg_size = ",i5.1)') msg_count, msg_size
24
25  ! Ins/outs matching with the the model yaml
26  outq = ygg_output("output_pipe")
27  write(*, '("pipe_src(F): Created I/O channels")')
28
29  ! Create test message
30  allocate(character(len=(msg_size + 1)) :: test_msg)
31  do i = 1, msg_size
32     test_msg(i:i) = '0'
33  end do
34  test_msg((msg_size+1):(msg_size+1)) = c_null_char
35
36  ! Send test message multiple times
37  count = 0
38  do i = 1, msg_count
39     ret = ygg_send(outq, test_msg, msg_size)
40     if (.not.ret) then
41        write(*, '("pipe_src(F): SEND ERROR ON MSG ",i5.1)') i
42        exit_code = -1
43        exit
44     end if
45     count = count + 1
46  end do
47  
48  write(*, '("Goodbye from Fortran source. Sent ",i5.1," messages.")') count
49  if (allocated(test_msg)) deallocate(test_msg)
50  if (exit_code.lt.0) then
51     stop 1
52  end if
53
54end program main

Model YAML:

 1---
 2
 3models:
 4  - name: timed_pipe_src
 5    language: fortran
 6    args:
 7      - ./src/timed_pipe_src.f90
 8      - "{{PIPE_MSG_COUNT}}"
 9      - "{{PIPE_MSG_SIZE}}"
10    outputs:
11      - output_pipe
12
13  - name: timed_pipe_dst
14    language: fortran
15    args: ./src/timed_pipe_dst.f90
16    inputs:
17      - input_pipe
18    outputs:
19      - output_file
20
21connections:
22  - input: output_pipe
23    output: input_pipe
24  - input: output_file
25    output_file:
26      name: output_timed_pipe.txt
27      filetype: ascii
28      in_temp: True

Matlab Version

Model Code:

 1disp('Hello from Matlab pipe_dst');
 2
 3% Ins/outs matching with the the model yaml
 4inq = YggInterface('YggInput', 'input_pipe');
 5outf = YggInterface('YggOutput', 'output_file');
 6disp('pipe_dst(M): Created I/O channels');
 7
 8% Continue receiving input from the queue
 9count = 0;
10while (1);
11  [flag, buf] = inq.recv();
12  if (~flag);
13    disp('pipe_dst(M): Input channel closed');
14    break;
15  end;
16  ret = outf.send(buf);
17  if (~ret);
18    error(sprintf('pipe_dst(M): SEND ERROR ON MSG %d\n', count));
19  end;
20  count = count + 1;
21end;
22
23fprintf('Goodbye from Matlab destination. Received %d messages.\n', count);
 1function timed_pipe_src(msg_count, msg_size)
 2
 3  msg_count = str2num(msg_count);
 4  msg_size = str2num(msg_size);
 5  fprintf('Hello from Matlab pipe_src: msg_count = %d, msg_size = %d\n', ...
 6          msg_count, msg_size);
 7
 8  % Ins/outs matching with the the model yaml
 9  outq = YggInterface('YggOutput', 'output_pipe');
10  disp('pipe_src(M): Created I/O channels');
11
12  % Send test message multiple times
13  test_msg(1:msg_size) = '0';
14  count = 0;
15  for i = 1:msg_count
16      ret = outq.send(test_msg);
17      if (~ret)
18          error(sprintf('pipe_src(M): SEND ERROR ON MSG %d\n', i));
19      end;
20      count = count + 1;
21  end;
22
23  fprintf('Goodbye from Matlab source. Sent %d messages.\n', count);
24
25end

Model YAML:

 1---
 2
 3models:
 4  - name: timed_pipe_src
 5    language: matlab
 6    args:
 7      - ./src/timed_pipe_src.m
 8      - "{{PIPE_MSG_COUNT}}"
 9      - "{{PIPE_MSG_SIZE}}"
10    outputs:
11      - output_pipe
12
13  - name: timed_pipe_dst
14    language: matlab
15    args: ./src/timed_pipe_dst.m
16    inputs:
17      - input_pipe
18    outputs:
19      - output_file
20
21connections:
22  - input: output_pipe
23    output: input_pipe
24  - input: output_file
25    output_file:
26      name: output_timed_pipe.txt
27      filetype: ascii
28      in_temp: True

Python Version

Model Code:

 1from __future__ import print_function
 2from yggdrasil.interface.YggInterface import YggInput, YggOutput
 3
 4
 5def run():
 6    print('Hello from Python pipe_dst')
 7
 8    # Ins/outs matching with the the model yaml
 9    inq = YggInput('input_pipe')
10    outf = YggOutput('output_file')
11    print("pipe_dst(P): Created I/O channels")
12
13    # Continue receiving input from the queue
14    count = 0
15    while True:
16        ret, buf = inq.recv()
17        if not ret:
18            print("pipe_dst(P): Input channel closed")
19            break
20        ret = outf.send(buf)
21        if not ret:
22            raise RuntimeError("pipe_dst(P): SEND ERROR ON MSG %d" % count)
23        count += 1
24
25    print('Goodbye from Python destination. Received %d messages.' % count)
26
27    
28if __name__ == '__main__':
29    run()
 1from __future__ import print_function
 2import sys
 3from yggdrasil.interface.YggInterface import YggOutput
 4
 5
 6def run(args):
 7    msg_count = int(args[0])
 8    msg_size = int(args[1])
 9    print('Hello from Python pipe_src: msg_count = %d, msg_size = %d' % (
10        msg_count, msg_size))
11
12    # Ins/outs matching with the the model yaml
13    outq = YggOutput('output_pipe')
14    print("pipe_src(P): Created I/O channels")
15
16    # Send test message multiple times
17    test_msg = b'0' * msg_size
18    count = 0
19    for i in range(msg_count):
20        ret = outq.send(test_msg)
21        if not ret:
22            raise RuntimeError('pipe_src(P): SEND ERROR ON MSG %d' % i)
23        count += 1
24
25    print('Goodbye from Python source. Sent %d messages.' % count)
26    
27
28if __name__ == '__main__':
29    run(sys.argv[1:])

Model YAML:

 1---
 2
 3models:
 4  - name: timed_pipe_src
 5    language: python
 6    args:
 7      - ./src/timed_pipe_src.py
 8      - "{{PIPE_MSG_COUNT}}"
 9      - "{{PIPE_MSG_SIZE}}"
10    outputs:
11      - output_pipe
12
13  - name: timed_pipe_dst
14    language: python
15    args: ./src/timed_pipe_dst.py
16    inputs:
17      - input_pipe
18    outputs:
19      - output_file
20
21connections:
22  - input: output_pipe
23    output: input_pipe
24  - input: output_file
25    output_file:
26      name: output_timed_pipe.txt
27      filetype: ascii
28      in_temp: True

R Version

Model Code:

 1library(yggdrasil)
 2
 3
 4run <- function() {
 5  print('Hello from R pipe_dst')
 6
 7  # Ins/outs matching with the the model yaml
 8  inq <- YggInterface('YggInput', 'input_pipe')
 9  outf <- YggInterface('YggOutput', 'output_file')
10  print("pipe_dst(R): Created I/O channels")
11
12  # Continue receiving input from the queue
13  count <- 0
14  while (TRUE) {
15    c(ret, buf) %<-% inq$recv()
16    if (!ret) {
17      print("pipe_dst(R): Input channel closed")
18      break
19    }
20    ret <- outf$send(buf)
21    if (!ret) {
22      stop(sprintf("pipe_dst(R): SEND ERROR ON MSG %d", count))
23    }
24    count <- count + 1
25  }
26
27  fprintf('Goodbye from R destination. Received %d messages.', count)
28}
29
30
31run()
 1library(yggdrasil)
 2
 3
 4run <- function(args) {
 5  msg_count <- strtoi(args[[1]])
 6  msg_size <- strtoi(args[[2]])
 7  fprintf('Hello from R pipe_src: msg_count = %d, msg_size = %d',
 8          msg_count, msg_size)
 9
10  # Ins/outs matching with the the model yaml
11  outq <- YggInterface('YggOutput', 'output_pipe')
12  print("pipe_src(R): Created I/O channels")
13
14  # Send test message multiple times
15  test_msg <- paste(replicate(msg_size, '0'), collapse="")
16  count <- 0
17  for (i in 1:msg_count) {
18    ret <- outq$send(test_msg)
19    if (!ret) {
20      stop(sprintf('pipe_src(R): SEND ERROR ON MSG %d', i))
21    }
22    count <- count + 1
23  }
24
25  fprintf('Goodbye from R source. Sent %d messages.', count)
26}
27    
28
29args <- commandArgs(trailingOnly=TRUE)
30run(args)

Model YAML:

 1---
 2
 3models:
 4  - name: timed_pipe_src
 5    language: r
 6    args:
 7      - ./src/timed_pipe_src.R
 8      - "{{PIPE_MSG_COUNT}}"
 9      - "{{PIPE_MSG_SIZE}}"
10    outputs:
11      - output_pipe
12
13  - name: timed_pipe_dst
14    language: r
15    args: ./src/timed_pipe_dst.R
16    inputs:
17      - input_pipe
18    outputs:
19      - output_file
20
21connections:
22  - input: output_pipe
23    output: input_pipe
24  - input: output_file
25    output_file:
26      name: output_timed_pipe.txt
27      filetype: ascii
28      in_temp: True