This is dependent on Linux /proc/ (the "pos: " field
of /proc/$PID/fdinfo/$FD to be exact).
This was written to avoid seek latencies on a remote FUSE
filesystem with occasional packet loss.
---
Post by Eric WongI'll be adding userspace readahead for playback on slow network
filesystems. It'll start off as Linux-only and will stay as a
a) allow tuning/modification without interrupting playback in
dtas-player
b) not place additional demands on the weak, non-RT (mainline)
Ruby 2.x threading/GC system we use for dtas-player.
bin/dtas-readahead | 207 +++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 207 insertions(+)
create mode 100644 bin/dtas-readahead
diff --git a/bin/dtas-readahead b/bin/dtas-readahead
new file mode 100644
index 0000000..f02bc35
--- /dev/null
+++ b/bin/dtas-readahead
@@ -0,0 +1,207 @@
+#!/usr/bin/env ruby
+# Copyright (C) 2015 all contributors <dtas-***@nongnu.org>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+#
+# Really janky readahead script. Requires dtas-player to be
+# running and unlikely to work outside of Linux as it depends on
+# the contents of /proc
+unless RUBY_PLATFORM =~ /linux/
+ warn "this relies on Linux /proc and probably does not work well for you"
+end
+
+require 'yaml'
+require 'io/wait'
+require 'dtas/unix_client'
+require 'dtas/process'
+
+include DTAS::Process
+include DTAS::SpawnFix
+trap(:CHLD) { DTAS::Process.reaper {} }
+trap(:INT) { exit(0) }
+trap(:TERM) { exit(0) }
+w = DTAS::UNIXClient.new
+w.req_ok('watch')
+c = DTAS::UNIXClient.new
+@max_ra = 30 * 1024 * 1024
+null = DTAS.null
+@redir = { err: null, out: null, in: null }.freeze
+require 'pp'
+
+if RUBY_VERSION.to_r >= '2.3'.to_r
+ # Old Rubies did FIONREAD, which breaks on SOCK_SEQPACKET
+ def wait_read(w, timeout)
+ w.to_io.wait_readable(timeout)
+ end
+else
+ def wait_read(w, timeout)
+ r = IO.select([w], nil, nil, timeout)
+ r ? r[0] : nil
+ end
+end
+
+def seek_to_cur_pos(cur_pid, fp)
+ cur_fd = []
+ fpst = fp.stat
+ begin
+ Dir["/proc/#{cur_pid}/fd/*"].each do |l|
+ path = File.readlink(l)
+ begin
+ st = File.stat(path)
+ if st.dev == fpst.dev && st.ino == fpst.ino
+ cur_fd << l.split('/')[-1]
+ end
+ rescue Errno::ENOENT, Errno::EPERM
+ end
+ end
+ rescue Errno::ENOENT => e # race, process is dead
+ return false
+ rescue => e
+ warn "error reading FDs from for PID:#{cur_pid}: #{e.message}"
+ end
+ pos = 0
+ # get the position of the file of the sox process
+ cur_fd.each do |fd|
+ if File.read("/proc/#{cur_pid}/fdinfo/#{fd}") =~ /^pos:\s*(\d+)$/
+ n = $1.to_i
+ pos = n if n > pos
+ end
+ end
+ pos
+rescue Errno::ENOENT => e # race, process is dead
+ return false
+end
+
+def children_of(ppid)
+ `ps h -o pid --ppid=#{ppid}`.split(/\s+/s).map(&:to_i)
+end
+
+def expand_pid(pid)
+ to_scan = Array(pid)
+ pids = []
+ while pid = to_scan.shift
+ pid > 0 or next
+ to_scan.concat(children_of(pid))
+ pids << pid
+ end
+ pids.uniq
+end
+
+def do_ra(fp, pos, w)
+ size = fp.size
+ len = size - pos
+ len = @todo_ra if len > @todo_ra
+ return if len <= 0
+ path = fp.path
+ pp({start_ra: File.basename(path),
+ len: '%.3f' % (len / (1024 * 1024.0)),
+ pos: pos })
+ Process.spawn('soxi', path, @redir)
+ Process.spawn('avprobe', path, @redir)
+ Process.spawn('ffprobe', path, @redir)
+ fp.advise(:sequential, pos, len)
+ Thread.new(fp.dup) { |d| d.advise(:willneed, pos, len); d.close }
+
+ at_once = 8192
+ adj = len
+ while len > 0
+ n = len > at_once ? at_once : len
+ n = IO.copy_stream(fp, DTAS.null, n, pos)
+ pos += n
+ len -= n
+
+ # stop reading immediately if there's an event
+ if wait_read(w, 0)
+ adj = @todo_ra
+ pos += size
+ break
+ end
+ end
+ @todo_ra -= adj
+ (pos + len) >= size ? fp.close : nil
+end
+
+def do_open(path)
+ if path =~ /\.ya?ml\z/
+ File.open(path) do |fp|
+ buf = fp.read(4)
+ case buf
+ when "---\n"
+ buf << fp.read(fp.size - 4)
+ Dir.chdir(File.dirname(path)) do
+ yml = YAML.load(buf)
+ x = yml['infile'] and return File.open(File.expand_path(x).freeze)
+ end
+ end
+ end
+ end
+ File.open(path)
+end
+
+begin
+ work = {}
+ cur_pid = nil
+ @todo_ra = @max_ra
+ t0 = DTAS.now
+ fp = nil
+ cur = YAML.load(c.req('current'))
+ while @todo_ra > 0 && fp.nil?
+ if current = cur['current']
+ track = current['infile'].freeze
+ work[track] ||= fp = do_open(track)
+ cur_pid = current['pid']
+ if fp
+ pos = expand_pid(cur_pid).map do |pid|
+ seek_to_cur_pos(pid, fp)
+ end.compact.max
+ pos and fp = do_ra(fp, pos, w)
+ end
+ else
+ break
+ end
+
+ # queue has priority, work on it, first
+ queue = YAML.load(c.req('queue cat'))
+ while @todo_ra > 0 && track = queue.shift
+ fp = nil
+ begin
+ work[track] ||= fp = do_open(track)
+ rescue SystemCallError
+ end
+ fp = do_ra(fp, 0, w) if fp
+ end
+ break if @todo_ra <= 0
+
+ # the normal tracklist
+ ids = c.req('tl tracks').split
+ ids.shift # ignore count
+ idx = ids.find_index(c.req('tl current-id'))
+ repeat = c.req('tl repeat').split[-1]
+ while @todo_ra > 0 && idx && (cid = ids[idx])
+ fp = nil
+ track = c.req("tl get #{cid}").sub!(/\A1 \d+=/, '').freeze
+ begin
+ work[track] ||= fp = do_open(track)
+ rescue SystemCallError
+ end
+ fp = do_ra(fp, 0, w) if fp
+ if @todo_ra > 0 && fp.nil? && ids[idx += 1].nil?
+ idx = repeat == 'true' ? 0 : nil
+ end
+ end
+ idx or break
+ cur = YAML.load(c.req('current'))
+ cur['current'] or break
+ end
+ elapsed = DTAS.now - t0
+ p [:elapsed, elapsed]
+ timeout = 5 - elapsed
+ timeout = 0 if timeout < 0
+ r = wait_read(w, timeout)
+ p w.res_wait if r
+rescue EOFError
+ abort "dtas-player exited"
+rescue => e
+ warn "#{e.message} #{e.class})"
+ e.backtrace.each {|l| warn l }
+ sleep 5
+end while true
--
EW