Eric Wong
2017-04-28 20:08:09 UTC
This should allow us easily to manipulate process pipelines
as an array of arrays.
Originally posted at
lib/dtas/pipeline.rb | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++
test/test_pipeline.rb | 47 ++++++++++++++++++++++++++++++++
2 files changed, 122 insertions(+)
create mode 100644 lib/dtas/pipeline.rb
create mode 100644 test/test_pipeline.rb
diff --git a/lib/dtas/pipeline.rb b/lib/dtas/pipeline.rb
new file mode 100644
index 0000000..b04b7f7
--- /dev/null
+++ b/lib/dtas/pipeline.rb
@@ -0,0 +1,75 @@
+# Copyright (C) 2017 all contributors <dtas-***>
+# License: GPL-3.0+ <>
+# frozen_string_literal: true
+require_relative '../dtas'
+require_relative 'spawn_fix'
+module DTAS::Pipeline # :nodoc:
+ include DTAS::SpawnFix
+ # Process.spawn wrapper which supports running Proc-like objects in
+ # a separate process, not just external commands.
+ # Returns the pid of the spawned process
+ def pspawn(env, cmd, rdr = {})
+ case cmd
+ when Array
+ spawn(env, *cmd, rdr)
+ else # support running Proc-like objects, too:
+ fork do
+ ENV.update(env) if env
+ # setup redirects
+ [ $stdin, $stdout, $stderr ].each_with_index do |io, fd|
+ dst = rdr[fd] and io.reopen(dst)
+ end
+ # close all other pipes, since we can't rely on FD_CLOEXEC
+ # (as we do not exec, here)
+ rdr.each do |k, v|
+ k.close if v == :close
+ end
+ end
+ end
+ end
+ # +pipeline+ is an Array of (Arrays or Procs)
+ def run_pipeline(env, pipeline)
+ pids = {} # pid => pipeline index
+ work = pipeline.dup
+ last = work.pop
+ nr = work.size
+ rdr = {} # redirect mapping for Process.spawn
+ # we need to make sure pipes are closed in any forked processes
+ # (they are redirected to stdin or stdout, first)
+ pipes = { IO.pipe.each { |io| rdr[io] = :close } }
+ # start the first and last commands first, they only have one pipe, each
+ last_pid = pspawn(env, last, rdr.merge(0 => pipes[-1][0]))
+ pids[last_pid] = nr
+ first = work.shift
+ first_pid = pspawn(env, first, rdr.merge(1 => pipes[0][1]))
+ pids[first_pid] = 0
+ # start the middle commands, they both have two pipes:
+ work.each_with_index do |cmd, i|
+ pid = pspawn(env, cmd, rdr.merge(0 => pipes[i][0], 1 => pipes[i+1][1]))
+ pids[pid] = i + 1
+ end
+ # all pipes handed off to children, close so they see EOF
+ pipes.flatten!.each(&:close).clear
+ # wait for children to finish
+ fails = []
+ until pids.empty?
+ pid, status = Process.waitpid2(-1)
+ nr = pids.delete(pid)
+ status.success? or
+ fails << "reaped #{nr} #{pipeline[nr].inspect} #{status.inspect}"
+ end
+ # behave like "set -o pipefail" in bash
+ raise fails.join("\n") if fails[0]
+ end
diff --git a/test/test_pipeline.rb b/test/test_pipeline.rb
new file mode 100644
index 0000000..3cc32cc
--- /dev/null
+++ b/test/test_pipeline.rb
@@ -0,0 +1,47 @@
+# Copyright (C) 2017 all contributors <dtas-***>
+# License: GPL-3.0+ <>
+# frozen_string_literal: true
+require './test/helper'
+require 'dtas/pipeline'
+class TestPipeline < Testcase
+ include DTAS::Pipeline
+ def setup
+ @env = ENV.to_hash
+ end
+ def pipeline_result
+ IO.pipe do |rd, wr|
+ begin
+ pid = fork do
+ rd.close
+ $stdout.reopen(wr)
+ yield
+ exit!(0)
+ end
+ wr.close
+ return
+ ensure
+ _, status = Process.waitpid2(pid)
+ assert_predicate status, :success?
+ end
+ end
+ nil
+ end
+ def test_pipeline
+ assert_equal("BYYRU\n", pipeline_result do
+ run_pipeline(@env, [
+ %w(echo hello), # anything which generates something to stdout
+ %w(tr [a-z] [A-Z]), # upcase
+ # this lambda runs inside its own process
+ lambda do
+ $stdin.each_line { |l| $stdout.write("#{l.chomp.reverse}\n") }
+ exit!(0)
+ end,
+ # rot13
+ %w(tr [a-m][n-z][A-M][N-Z] [n-z][a-m][N-Z][A-M])
+ ])
+ end)
+ end
as an array of arrays.
Originally posted at
lib/dtas/pipeline.rb | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++
test/test_pipeline.rb | 47 ++++++++++++++++++++++++++++++++
2 files changed, 122 insertions(+)
create mode 100644 lib/dtas/pipeline.rb
create mode 100644 test/test_pipeline.rb
diff --git a/lib/dtas/pipeline.rb b/lib/dtas/pipeline.rb
new file mode 100644
index 0000000..b04b7f7
--- /dev/null
+++ b/lib/dtas/pipeline.rb
@@ -0,0 +1,75 @@
+# Copyright (C) 2017 all contributors <dtas-***>
+# License: GPL-3.0+ <>
+# frozen_string_literal: true
+require_relative '../dtas'
+require_relative 'spawn_fix'
+module DTAS::Pipeline # :nodoc:
+ include DTAS::SpawnFix
+ # Process.spawn wrapper which supports running Proc-like objects in
+ # a separate process, not just external commands.
+ # Returns the pid of the spawned process
+ def pspawn(env, cmd, rdr = {})
+ case cmd
+ when Array
+ spawn(env, *cmd, rdr)
+ else # support running Proc-like objects, too:
+ fork do
+ ENV.update(env) if env
+ # setup redirects
+ [ $stdin, $stdout, $stderr ].each_with_index do |io, fd|
+ dst = rdr[fd] and io.reopen(dst)
+ end
+ # close all other pipes, since we can't rely on FD_CLOEXEC
+ # (as we do not exec, here)
+ rdr.each do |k, v|
+ k.close if v == :close
+ end
+ end
+ end
+ end
+ # +pipeline+ is an Array of (Arrays or Procs)
+ def run_pipeline(env, pipeline)
+ pids = {} # pid => pipeline index
+ work = pipeline.dup
+ last = work.pop
+ nr = work.size
+ rdr = {} # redirect mapping for Process.spawn
+ # we need to make sure pipes are closed in any forked processes
+ # (they are redirected to stdin or stdout, first)
+ pipes = { IO.pipe.each { |io| rdr[io] = :close } }
+ # start the first and last commands first, they only have one pipe, each
+ last_pid = pspawn(env, last, rdr.merge(0 => pipes[-1][0]))
+ pids[last_pid] = nr
+ first = work.shift
+ first_pid = pspawn(env, first, rdr.merge(1 => pipes[0][1]))
+ pids[first_pid] = 0
+ # start the middle commands, they both have two pipes:
+ work.each_with_index do |cmd, i|
+ pid = pspawn(env, cmd, rdr.merge(0 => pipes[i][0], 1 => pipes[i+1][1]))
+ pids[pid] = i + 1
+ end
+ # all pipes handed off to children, close so they see EOF
+ pipes.flatten!.each(&:close).clear
+ # wait for children to finish
+ fails = []
+ until pids.empty?
+ pid, status = Process.waitpid2(-1)
+ nr = pids.delete(pid)
+ status.success? or
+ fails << "reaped #{nr} #{pipeline[nr].inspect} #{status.inspect}"
+ end
+ # behave like "set -o pipefail" in bash
+ raise fails.join("\n") if fails[0]
+ end
diff --git a/test/test_pipeline.rb b/test/test_pipeline.rb
new file mode 100644
index 0000000..3cc32cc
--- /dev/null
+++ b/test/test_pipeline.rb
@@ -0,0 +1,47 @@
+# Copyright (C) 2017 all contributors <dtas-***>
+# License: GPL-3.0+ <>
+# frozen_string_literal: true
+require './test/helper'
+require 'dtas/pipeline'
+class TestPipeline < Testcase
+ include DTAS::Pipeline
+ def setup
+ @env = ENV.to_hash
+ end
+ def pipeline_result
+ IO.pipe do |rd, wr|
+ begin
+ pid = fork do
+ rd.close
+ $stdout.reopen(wr)
+ yield
+ exit!(0)
+ end
+ wr.close
+ return
+ ensure
+ _, status = Process.waitpid2(pid)
+ assert_predicate status, :success?
+ end
+ end
+ nil
+ end
+ def test_pipeline
+ assert_equal("BYYRU\n", pipeline_result do
+ run_pipeline(@env, [
+ %w(echo hello), # anything which generates something to stdout
+ %w(tr [a-z] [A-Z]), # upcase
+ # this lambda runs inside its own process
+ lambda do
+ $stdin.each_line { |l| $stdout.write("#{l.chomp.reverse}\n") }
+ exit!(0)
+ end,
+ # rot13
+ %w(tr [a-m][n-z][A-M][N-Z] [n-z][a-m][N-Z][A-M])
+ ])
+ end)
+ end