Discussion:
[PATCH 0/2] minor updates for readahead for -player
Eric Wong
2015-09-07 21:40:32 UTC
Permalink
I'll be adding userspace readahead for playback on slow network
filesystems. It'll start off as Linux-only and will stay as a
separate process/executable to:

a) allow tuning/modification without interrupting playback in
dtas-player

b) not place additional demands on the weak, non-RT (mainline)
Ruby 2.x threading/GC system we use for dtas-player.

Eric Wong (2):
use a common /dev/null
player: add "queue cat" command

Documentation/dtas-player_protocol.txt | 5 +++++
lib/dtas.rb | 5 +++++
lib/dtas/buffer/splice.rb | 3 +--
lib/dtas/format.rb | 4 ++--
lib/dtas/player.rb | 4 +++-
lib/dtas/sink.rb | 2 +-
6 files changed, 17 insertions(+), 6 deletions(-)
Eric Wong
2015-09-07 21:40:33 UTC
Permalink
This allows us to avoid wasting time reopening the same
device over and over again.
---
lib/dtas.rb | 5 +++++
lib/dtas/buffer/splice.rb | 3 +--
lib/dtas/format.rb | 4 ++--
lib/dtas/player.rb | 2 +-
lib/dtas/sink.rb | 2 +-
5 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/lib/dtas.rb b/lib/dtas.rb
index f11d549..0b704ca 100644
--- a/lib/dtas.rb
+++ b/lib/dtas.rb
@@ -13,6 +13,11 @@ module DTAS # :nodoc:
Time.now.to_f
end
end
+
+ @null = nil
+ def self.null
+ @null ||= File.open('/dev/null', 'r+')
+ end
end

require_relative 'dtas/compat_onenine'
diff --git a/lib/dtas/buffer/splice.rb b/lib/dtas/buffer/splice.rb
index 02ce877..be40881 100644
--- a/lib/dtas/buffer/splice.rb
+++ b/lib/dtas/buffer/splice.rb
@@ -10,7 +10,6 @@ module DTAS::Buffer::Splice # :nodoc:
MAX_AT_ONCE = 4096 # page size in Linux
MAX_AT_ONCE_1 = 65536
MAX_SIZE = File.read("/proc/sys/fs/pipe-max-size").to_i
- DEVNULL = File.open("/dev/null", "r+")
F_MOVE = IO::Splice::F_MOVE

def buffer_size
@@ -25,7 +24,7 @@ module DTAS::Buffer::Splice # :nodoc:

# be sure to only call this with nil when all writers to @wr are done
def discard(bytes)
- IO.splice(@to_io, nil, DEVNULL, nil, bytes)
+ IO.splice(@to_io, nil, DTAS.null, nil, bytes)
end

def broadcast_one(targets, limit = nil)
diff --git a/lib/dtas/format.rb b/lib/dtas/format.rb
index a6314bd..cfcec64 100644
--- a/lib/dtas/format.rb
+++ b/lib/dtas/format.rb
@@ -44,9 +44,9 @@ class DTAS::Format # :nodoc:

def self.precision(env, infile)
# sox.git f4562efd0aa3
- qx(env, %W(soxi -p #{infile}), err: "/dev/null").to_i
+ qx(env, %W(soxi -p #{infile}), err: DTAS.null).to_i
rescue # fallback to parsing the whole output
- s = qx(env, %W(soxi #{infile}), err: "/dev/null")
+ s = qx(env, %W(soxi #{infile}), err: DTAS.null)
s =~ /Precision\s+:\s*(\d+)-bit/n
v = $1.to_i
return v if v > 0
diff --git a/lib/dtas/player.rb b/lib/dtas/player.rb
index cdf1265..a102618 100644
--- a/lib/dtas/player.rb
+++ b/lib/dtas/player.rb
@@ -426,7 +426,7 @@ class DTAS::Player # :nodoc:

dst = @sink_buf
pending.dst_assoc(dst)
- pending.src_spawn(@format, @rg, out: dst.wr, in: "/dev/null")
+ pending.src_spawn(@format, @rg, out: dst.wr, in: DTAS.null)

# watch and restart on modifications
pending.respond_to?(:watch_begin) and
diff --git a/lib/dtas/sink.rb b/lib/dtas/sink.rb
index 70d6861..0bf49f4 100644
--- a/lib/dtas/sink.rb
+++ b/lib/dtas/sink.rb
@@ -91,7 +91,7 @@ class DTAS::Sink # :nodoc:
w.sink = self
rv << w
end
- opts[:in] = "/dev/null"
+ opts[:in] = DTAS.null

# map to real /dev/fd/* values and setup proper redirects
cmd = cmd.gsub(DEVFD_RE) do
--
EW
Eric Wong
2015-09-07 21:40:34 UTC
Permalink
This will dump the contents of the current queue, including
positional seeking information and commands. This is mainly
intended for debugging and tools which rely on dtas internals.
---
Documentation/dtas-player_protocol.txt | 5 +++++
lib/dtas/player.rb | 2 ++
2 files changed, 7 insertions(+)

diff --git a/Documentation/dtas-player_protocol.txt b/Documentation/dtas-player_protocol.txt
index e1b2487..b02d1e5 100644
--- a/Documentation/dtas-player_protocol.txt
+++ b/Documentation/dtas-player_protocol.txt
@@ -166,6 +166,11 @@ Commands here should be alphabetized according to `LC_ALL=C sort'
* play_pause - toggle the play/pause state. This starts playback if
paused, and pauses playback if playing.

+* queue cat - dump the contents of the queue as YAML
+ This may include arbitrary commands to be executed, filenames,
+ and offsets for playback. The format is not intended to be
+ stable and subject to internal changes in dtas-player.
+
* restart - restarts all processes in the current pipeline. Playback
will be momentarily interrupted while this change occurs. This is
necessary if one of the commands (e.g. sox or ecasound) or loaded
diff --git a/lib/dtas/player.rb b/lib/dtas/player.rb
index a102618..a1e2040 100644
--- a/lib/dtas/player.rb
+++ b/lib/dtas/player.rb
@@ -251,6 +251,8 @@ class DTAS::Player # :nodoc:
tl_handler(io, msg)
when "trim"
trim_handler(io, msg)
+ when "queue"
+ msg[0] == "cat" and io.emit(@queue.to_yaml)
end
end
--
EW
Eric Wong
2015-09-20 23:02:37 UTC
Permalink
This is dependent on Linux /proc/ (the "pos: " field
of /proc/$PID/fdinfo/$FD to be exact).

This was written to avoid seek latencies on a remote FUSE
filesystem with occasional packet loss.
---
Post by Eric Wong
I'll be adding userspace readahead for playback on slow network
filesystems. It'll start off as Linux-only and will stay as a
a) allow tuning/modification without interrupting playback in
dtas-player
b) not place additional demands on the weak, non-RT (mainline)
Ruby 2.x threading/GC system we use for dtas-player.
bin/dtas-readahead | 207 +++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 207 insertions(+)
create mode 100644 bin/dtas-readahead

diff --git a/bin/dtas-readahead b/bin/dtas-readahead
new file mode 100644
index 0000000..f02bc35
--- /dev/null
+++ b/bin/dtas-readahead
@@ -0,0 +1,207 @@
+#!/usr/bin/env ruby
+# Copyright (C) 2015 all contributors <dtas-***@nongnu.org>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+#
+# Really janky readahead script. Requires dtas-player to be
+# running and unlikely to work outside of Linux as it depends on
+# the contents of /proc
+unless RUBY_PLATFORM =~ /linux/
+ warn "this relies on Linux /proc and probably does not work well for you"
+end
+
+require 'yaml'
+require 'io/wait'
+require 'dtas/unix_client'
+require 'dtas/process'
+
+include DTAS::Process
+include DTAS::SpawnFix
+trap(:CHLD) { DTAS::Process.reaper {} }
+trap(:INT) { exit(0) }
+trap(:TERM) { exit(0) }
+w = DTAS::UNIXClient.new
+w.req_ok('watch')
+c = DTAS::UNIXClient.new
+@max_ra = 30 * 1024 * 1024
+null = DTAS.null
+@redir = { err: null, out: null, in: null }.freeze
+require 'pp'
+
+if RUBY_VERSION.to_r >= '2.3'.to_r
+ # Old Rubies did FIONREAD, which breaks on SOCK_SEQPACKET
+ def wait_read(w, timeout)
+ w.to_io.wait_readable(timeout)
+ end
+else
+ def wait_read(w, timeout)
+ r = IO.select([w], nil, nil, timeout)
+ r ? r[0] : nil
+ end
+end
+
+def seek_to_cur_pos(cur_pid, fp)
+ cur_fd = []
+ fpst = fp.stat
+ begin
+ Dir["/proc/#{cur_pid}/fd/*"].each do |l|
+ path = File.readlink(l)
+ begin
+ st = File.stat(path)
+ if st.dev == fpst.dev && st.ino == fpst.ino
+ cur_fd << l.split('/')[-1]
+ end
+ rescue Errno::ENOENT, Errno::EPERM
+ end
+ end
+ rescue Errno::ENOENT => e # race, process is dead
+ return false
+ rescue => e
+ warn "error reading FDs from for PID:#{cur_pid}: #{e.message}"
+ end
+ pos = 0
+ # get the position of the file of the sox process
+ cur_fd.each do |fd|
+ if File.read("/proc/#{cur_pid}/fdinfo/#{fd}") =~ /^pos:\s*(\d+)$/
+ n = $1.to_i
+ pos = n if n > pos
+ end
+ end
+ pos
+rescue Errno::ENOENT => e # race, process is dead
+ return false
+end
+
+def children_of(ppid)
+ `ps h -o pid --ppid=#{ppid}`.split(/\s+/s).map(&:to_i)
+end
+
+def expand_pid(pid)
+ to_scan = Array(pid)
+ pids = []
+ while pid = to_scan.shift
+ pid > 0 or next
+ to_scan.concat(children_of(pid))
+ pids << pid
+ end
+ pids.uniq
+end
+
+def do_ra(fp, pos, w)
+ size = fp.size
+ len = size - pos
+ len = @todo_ra if len > @todo_ra
+ return if len <= 0
+ path = fp.path
+ pp({start_ra: File.basename(path),
+ len: '%.3f' % (len / (1024 * 1024.0)),
+ pos: pos })
+ Process.spawn('soxi', path, @redir)
+ Process.spawn('avprobe', path, @redir)
+ Process.spawn('ffprobe', path, @redir)
+ fp.advise(:sequential, pos, len)
+ Thread.new(fp.dup) { |d| d.advise(:willneed, pos, len); d.close }
+
+ at_once = 8192
+ adj = len
+ while len > 0
+ n = len > at_once ? at_once : len
+ n = IO.copy_stream(fp, DTAS.null, n, pos)
+ pos += n
+ len -= n
+
+ # stop reading immediately if there's an event
+ if wait_read(w, 0)
+ adj = @todo_ra
+ pos += size
+ break
+ end
+ end
+ @todo_ra -= adj
+ (pos + len) >= size ? fp.close : nil
+end
+
+def do_open(path)
+ if path =~ /\.ya?ml\z/
+ File.open(path) do |fp|
+ buf = fp.read(4)
+ case buf
+ when "---\n"
+ buf << fp.read(fp.size - 4)
+ Dir.chdir(File.dirname(path)) do
+ yml = YAML.load(buf)
+ x = yml['infile'] and return File.open(File.expand_path(x).freeze)
+ end
+ end
+ end
+ end
+ File.open(path)
+end
+
+begin
+ work = {}
+ cur_pid = nil
+ @todo_ra = @max_ra
+ t0 = DTAS.now
+ fp = nil
+ cur = YAML.load(c.req('current'))
+ while @todo_ra > 0 && fp.nil?
+ if current = cur['current']
+ track = current['infile'].freeze
+ work[track] ||= fp = do_open(track)
+ cur_pid = current['pid']
+ if fp
+ pos = expand_pid(cur_pid).map do |pid|
+ seek_to_cur_pos(pid, fp)
+ end.compact.max
+ pos and fp = do_ra(fp, pos, w)
+ end
+ else
+ break
+ end
+
+ # queue has priority, work on it, first
+ queue = YAML.load(c.req('queue cat'))
+ while @todo_ra > 0 && track = queue.shift
+ fp = nil
+ begin
+ work[track] ||= fp = do_open(track)
+ rescue SystemCallError
+ end
+ fp = do_ra(fp, 0, w) if fp
+ end
+ break if @todo_ra <= 0
+
+ # the normal tracklist
+ ids = c.req('tl tracks').split
+ ids.shift # ignore count
+ idx = ids.find_index(c.req('tl current-id'))
+ repeat = c.req('tl repeat').split[-1]
+ while @todo_ra > 0 && idx && (cid = ids[idx])
+ fp = nil
+ track = c.req("tl get #{cid}").sub!(/\A1 \d+=/, '').freeze
+ begin
+ work[track] ||= fp = do_open(track)
+ rescue SystemCallError
+ end
+ fp = do_ra(fp, 0, w) if fp
+ if @todo_ra > 0 && fp.nil? && ids[idx += 1].nil?
+ idx = repeat == 'true' ? 0 : nil
+ end
+ end
+ idx or break
+ cur = YAML.load(c.req('current'))
+ cur['current'] or break
+ end
+ elapsed = DTAS.now - t0
+ p [:elapsed, elapsed]
+ timeout = 5 - elapsed
+ timeout = 0 if timeout < 0
+ r = wait_read(w, timeout)
+ p w.res_wait if r
+rescue EOFError
+ abort "dtas-player exited"
+rescue => e
+ warn "#{e.message} #{e.class})"
+ e.backtrace.each {|l| warn l }
+ sleep 5
+end while true
--
EW
Eric Wong
2015-10-03 09:38:07 UTC
Permalink
Oops, files in bin/ should be executable.
---
bin/dtas-readahead | 0
1 file changed, 0 insertions(+), 0 deletions(-)
mode change 100644 => 100755 bin/dtas-readahead

diff --git a/bin/dtas-readahead b/bin/dtas-readahead
old mode 100644
new mode 100755
--
EW
Loading...