]> git.cworth.org Git - sup/blob - lib/sup/mbox/ssh-file.rb
automatically jump cursor to first open message in thread-view-mode
[sup] / lib / sup / mbox / ssh-file.rb
1 require 'net/ssh'
2
3 module Redwood
4 module MBox
5
6 class SSHFileError < StandardError; end
7
8 ## this is a file-like interface to a file that actually lives on the
9 ## other end of an ssh connection. it works by using wc, head and tail
10 ## to simulate (buffered) random access. on a fast connection, this
11 ## can have a good bandwidth, but the latency is pretty terrible:
12 ## about 1 second (!) per request.  luckily, we're either just reading
13 ## straight through the mbox (an import) or we're reading a few
14 ## messages at a time (viewing messages) so the latency is not a problem.
15
16 ## all of the methods here can throw SSHFileErrors, SocketErrors,
17 ## Net::SSH::Exceptions and Errno::ENOENTs.
18
19 ## debugging TODO: remove me
20 def debug s
21   Redwood::log s
22 end
23 module_function :debug
24
25 ## a simple buffer of contiguous data
26 class Buffer
27   def initialize
28     clear!
29   end
30
31   def clear!
32     @start = nil
33     @buf = ""
34   end
35
36   def empty?; @start.nil?; end
37   def start; @start; end
38   def endd; @start + @buf.length; end
39
40   def add data, offset=endd
41     #MBox::debug "+ adding #{data.length} bytes; size will be #{size + data.length}; limit #{SSHFile::MAX_BUF_SIZE}"
42
43     if start.nil?
44       @buf = data
45       @start = offset
46       return
47     end
48
49     raise "non-continguous data added to buffer (data #{offset}:#{offset + data.length}, buf range #{start}:#{endd})" if offset + data.length < start || offset > endd
50
51     if offset < start
52       @buf = data[0 ... (start - offset)] + @buf
53       @start = offset
54     else
55       return if offset + data.length < endd
56       @buf += data[(endd - offset) .. -1]
57     end
58   end
59
60   def [](o)
61     raise "only ranges supported due to programmer's laziness" unless o.is_a? Range
62     @buf[Range.new(o.first - @start, o.last - @start, o.exclude_end?)]
63   end
64
65   def index what, start=0
66     x = @buf.index(what, start - @start)
67     x.nil? ? nil : x + @start
68   end
69
70   def rindex what, start=0
71     x = @buf.rindex(what, start - @start)
72     x.nil? ? nil : x + @start
73   end
74
75   def size; empty? ? 0 : @buf.size; end
76   def to_s; empty? ? "<empty>" : "[#{start}, #{endd})"; end # for debugging
77 end
78
79 ## sharing a ssh connection to one machines between sources seems to
80 ## create lots of broken situations: commands returning bizarre (large
81 ## positive integer) return codes despite working; commands
82 ## occasionally not working, etc. i suspect this is because of the
83 ## fragile nature of the ssh syncshell. 
84 ##
85 ## at any rate, we now open up one ssh connection per file, which is
86 ## probably silly in the extreme case.
87
88 ## the file-like interface to a remote file
89 class SSHFile
90   MAX_BUF_SIZE = 1024 * 1024 # bytes
91   MAX_TRANSFER_SIZE = 1024 * 128
92   REASONABLE_TRANSFER_SIZE = 1024 * 32
93   SIZE_CHECK_INTERVAL = 60 * 1 # seconds
94
95   @@shells = {}
96   @@shells_mutex = Mutex.new
97
98   def initialize host, fn, ssh_opts={}
99     @buf = Buffer.new
100     @host = host
101     @fn = fn
102     @ssh_opts = ssh_opts
103     @file_size = nil
104     @offset = 0
105     @say_id = nil
106     @broken_msg = nil
107     @shell = nil
108     @shell_mutex = Mutex.new
109     @buf_mutex = Mutex.new
110   end
111
112   def to_s; "mbox+ssh://#@host/#@fn"; end ## TODO: remove thisis EVILness
113   def broken?; !@broken_msg.nil?; end
114
115   ## TODO: share this code with imap
116   def say s
117     @say_id = BufferManager.say s, @say_id if BufferManager.instantiated?
118     Redwood::log s
119   end
120   def shutup
121     BufferManager.clear @say_id if BufferManager.instantiated? && @say_id
122     @say_id = nil
123   end
124   private :say, :shutup
125
126   def connect
127     raise SSHFileError, @broken_msg if broken?
128     return if @shell
129
130     @key = [@host, @ssh_opts[:username]]
131     begin
132       @shell = @@shells_mutex.synchronize do
133         unless @@shells.member? @key
134           say "Opening SSH connection to #{@host} for #@fn..."
135           #raise SSHFileError, "simulated SSH file error"
136           session = Net::SSH.start @host, @ssh_opts
137           say "Starting SSH shell..."
138           @@shells[@key] = session.shell.sync
139         end
140         @@shells[@key]
141       end
142
143       say "Checking for #@fn..."
144       @shell_mutex.synchronize { raise Errno::ENOENT, @fn unless @shell.test("-e #@fn").status == 0 }
145     ensure
146       shutup
147     end
148   end
149
150   def eof?; @offset >= size; end
151   def eof; eof?; end # lame but IO's method is named this and rmail calls that
152   def seek loc; @offset = loc; end
153   def tell; @offset; end
154   def total; size; end
155
156   def size
157     if @file_size.nil? || (Time.now - @last_size_check) > SIZE_CHECK_INTERVAL
158       @last_size_check = Time.now
159       @file_size = do_remote("wc -c #@fn").split.first.to_i
160     end
161     @file_size
162   end
163
164   def gets
165     return nil if eof?
166     @buf_mutex.synchronize do
167       make_buf_include @offset
168       expand_buf_forward while @buf.index("\n", @offset).nil? && @buf.endd < size
169       returning(@buf[@offset .. (@buf.index("\n", @offset) || -1)]) { |line| @offset += line.length }
170     end
171   end
172
173   def read n
174     return nil if eof?
175     @buf_mutex.synchronize do
176       make_buf_include @offset, n
177       @buf[@offset ... (@offset += n)]
178     end
179   end
180
181 private
182
183   def do_remote cmd, expected_size=0
184     begin
185       retries = 0
186       connect
187       # MBox::debug "sending command: #{cmd.inspect}"
188       begin
189         result = @shell_mutex.synchronize { x = @shell.send_command cmd; sleep 0.25; x }
190         raise SSHFileError, "Failure during remote command #{cmd.inspect}: #{(result.stderr || result.stdout || "")[0 .. 100]}" unless result.status == 0
191       rescue Net::SSH::Exception # these happen occasionally for no apparent reason. gotta love that nondeterminism!
192         retry if (retries += 1) <= 3
193         raise
194       rescue Errno::EPIPE
195         if (retries += 1) <= e
196           @@shells_mutex.synchronize do
197             @shell = nil
198             @@shells[@key] = nil
199           end
200           connect
201           retry
202         end
203       end
204     rescue Net::SSH::Exception, SSHFileError, Errno::ENOENT => e
205       @broken_msg = e.message
206       raise
207     end
208     result.stdout
209   end
210
211   def get_bytes offset, size
212     do_remote "tail -c +#{offset + 1} #@fn | head -c #{size}", size
213   end
214
215   def expand_buf_forward n=REASONABLE_TRANSFER_SIZE
216     @buf.add get_bytes(@buf.endd, n)
217   end
218
219   ## try our best to transfer somewhere between
220   ## REASONABLE_TRANSFER_SIZE and MAX_TRANSFER_SIZE bytes
221   def make_buf_include offset, size=0
222     good_size = [size, REASONABLE_TRANSFER_SIZE].max
223
224     trans_start, trans_size = 
225       if @buf.empty?
226         [offset, good_size]
227       elsif offset < @buf.start
228         if @buf.start - offset <= good_size
229           start = [@buf.start - good_size, 0].max
230           [start, @buf.start - start]
231         elsif @buf.start - offset < MAX_TRANSFER_SIZE
232           [offset, @buf.start - offset]
233         else
234           MBox::debug "clearing SSH buffer because buf.start #{@buf.start} - offset #{offset} >= #{MAX_TRANSFER_SIZE}"
235           @buf.clear!
236           [offset, good_size]
237         end
238       else
239         return if [offset + size, self.size].min <= @buf.endd # whoohoo!
240         if offset - @buf.endd <= good_size
241           [@buf.endd, good_size]
242         elsif offset - @buf.endd < MAX_TRANSFER_SIZE
243           [@buf.endd, offset - @buf.endd]
244         else
245           MBox::debug "clearing SSH buffer because offset #{offset} - buf.end #{@buf.endd} >= #{MAX_TRANSFER_SIZE}"
246           @buf.clear!
247           [offset, good_size]
248         end
249       end          
250
251     @buf.clear! if @buf.size > MAX_BUF_SIZE
252     @buf.add get_bytes(trans_start, trans_size), trans_start
253   end
254 end
255
256 end
257 end