]> git.cworth.org Git - sup/blob - lib/sup/mbox/ssh-file.rb
Merge branch 'logging'
[sup] / lib / sup / mbox / ssh-file.rb
1 require 'net/ssh'
2
3 module Redwood
4 module MBox
5
6 class SSHFileError < StandardError; end
7
8 ## this is a file-like interface to a file that actually lives on the
9 ## other end of an ssh connection. it works by using wc, head and tail
10 ## to simulate (buffered) random access. on a fast connection, this
11 ## can have a good bandwidth, but the latency is pretty terrible:
12 ## about 1 second (!) per request.  luckily, we're either just reading
13 ## straight through the mbox (an import) or we're reading a few
14 ## messages at a time (viewing messages) so the latency is not a problem.
15
16 ## all of the methods here can throw SSHFileErrors, SocketErrors,
17 ## Net::SSH::Exceptions and Errno::ENOENTs.
18
19 ## a simple buffer of contiguous data
20 class Buffer
21   def initialize
22     clear!
23   end
24
25   def clear!
26     @start = nil
27     @buf = ""
28   end
29
30   def empty?; @start.nil?; end
31   def start; @start; end
32   def endd; @start + @buf.length; end
33
34   def add data, offset=endd
35     #MBox::debug "+ adding #{data.length} bytes; size will be #{size + data.length}; limit #{SSHFile::MAX_BUF_SIZE}"
36
37     if start.nil?
38       @buf = data
39       @start = offset
40       return
41     end
42
43     raise "non-continguous data added to buffer (data #{offset}:#{offset + data.length}, buf range #{start}:#{endd})" if offset + data.length < start || offset > endd
44
45     if offset < start
46       @buf = data[0 ... (start - offset)] + @buf
47       @start = offset
48     else
49       return if offset + data.length < endd
50       @buf += data[(endd - offset) .. -1]
51     end
52   end
53
54   def [](o)
55     raise "only ranges supported due to programmer's laziness" unless o.is_a? Range
56     @buf[Range.new(o.first - @start, o.last - @start, o.exclude_end?)]
57   end
58
59   def index what, start=0
60     x = @buf.index(what, start - @start)
61     x.nil? ? nil : x + @start
62   end
63
64   def rindex what, start=0
65     x = @buf.rindex(what, start - @start)
66     x.nil? ? nil : x + @start
67   end
68
69   def size; empty? ? 0 : @buf.size; end
70   def to_s; empty? ? "<empty>" : "[#{start}, #{endd})"; end # for debugging
71 end
72
73 ## sharing a ssh connection to one machines between sources seems to
74 ## create lots of broken situations: commands returning bizarre (large
75 ## positive integer) return codes despite working; commands
76 ## occasionally not working, etc. i suspect this is because of the
77 ## fragile nature of the ssh syncshell. 
78 ##
79 ## at any rate, we now open up one ssh connection per file, which is
80 ## probably silly in the extreme case.
81
82 ## the file-like interface to a remote file
83 class SSHFile
84   MAX_BUF_SIZE = 1024 * 1024 # bytes
85   MAX_TRANSFER_SIZE = 1024 * 128
86   REASONABLE_TRANSFER_SIZE = 1024 * 32
87   SIZE_CHECK_INTERVAL = 60 * 1 # seconds
88
89   ## upon these errors we'll try to rereconnect a few times
90   RECOVERABLE_ERRORS = [ Errno::EPIPE, Errno::ETIMEDOUT ]
91
92   @@shells = {}
93   @@shells_mutex = Mutex.new
94
95   def initialize host, fn, ssh_opts={}
96     @buf = Buffer.new
97     @host = host
98     @fn = fn
99     @ssh_opts = ssh_opts
100     @file_size = nil
101     @offset = 0
102     @say_id = nil
103     @shell = nil
104     @shell_mutex = nil
105     @buf_mutex = Mutex.new
106   end
107
108   def to_s; "mbox+ssh://#@host/#@fn"; end ## TODO: remove this EVILness
109
110   def connect
111     do_remote nil
112   end
113
114   def eof?; @offset >= size; end
115   def eof; eof?; end # lame but IO's method is named this and rmail calls that
116   def seek loc; @offset = loc; end
117   def tell; @offset; end
118   def total; size; end
119   def path; @fn end
120
121   def size
122     if @file_size.nil? || (Time.now - @last_size_check) > SIZE_CHECK_INTERVAL
123       @last_size_check = Time.now
124       @file_size = do_remote("wc -c #@fn").split.first.to_i
125     end
126     @file_size
127   end
128
129   def gets
130     return nil if eof?
131     @buf_mutex.synchronize do
132       make_buf_include @offset
133       expand_buf_forward while @buf.index("\n", @offset).nil? && @buf.endd < size
134       returning(@buf[@offset .. (@buf.index("\n", @offset) || -1)]) { |line| @offset += line.length }
135     end
136   end
137
138   def read n
139     return nil if eof?
140     @buf_mutex.synchronize do
141       make_buf_include @offset, n
142       @buf[@offset ... (@offset += n)]
143     end
144   end
145
146 private
147
148   ## TODO: share this code with imap
149   def say s
150     @say_id = BufferManager.say s, @say_id if BufferManager.instantiated?
151     info s
152   end
153
154   def shutup
155     BufferManager.clear @say_id if BufferManager.instantiated? && @say_id
156     @say_id = nil
157   end
158
159   def unsafe_connect
160     return if @shell
161
162     @key = [@host, @ssh_opts[:username]]
163     begin
164       @shell, @shell_mutex = @@shells_mutex.synchronize do
165         unless @@shells.member? @key
166           say "Opening SSH connection to #{@host} for #@fn..."
167           session = Net::SSH.start @host, @ssh_opts
168           say "Starting SSH shell..."
169           @@shells[@key] = [session.shell.sync, Mutex.new]
170         end
171         @@shells[@key]
172       end
173       
174       say "Checking for #@fn..."
175       @shell_mutex.synchronize { raise Errno::ENOENT, @fn unless @shell.test("-e #@fn").status == 0 }
176     ensure
177       shutup
178     end
179   end
180
181   def do_remote cmd, expected_size=0
182     retries = 0
183     result = nil
184
185     begin
186       unsafe_connect
187       if cmd
188         # MBox::debug "sending command: #{cmd.inspect}"
189         result = @shell_mutex.synchronize { x = @shell.send_command cmd; sleep 0.25; x }
190         raise SSHFileError, "Failure during remote command #{cmd.inspect}: #{(result.stderr || result.stdout || "")[0 .. 100]}" unless result.status == 0
191       end
192       ## Net::SSH::Exceptions seem to happen every once in a while for
193       ## no good reason.
194     rescue Net::SSH::Exception, *RECOVERABLE_ERRORS
195       if (retries += 1) <= 3
196         @@shells_mutex.synchronize do
197           @shell = nil
198           @@shells[@key] = nil
199         end
200         retry
201       end
202       raise
203     end
204
205     result.stdout if cmd
206   end
207
208   def get_bytes offset, size
209     do_remote "tail -c +#{offset + 1} #@fn | head -c #{size}", size
210   end
211
212   def expand_buf_forward n=REASONABLE_TRANSFER_SIZE
213     @buf.add get_bytes(@buf.endd, n)
214   end
215
216   ## try our best to transfer somewhere between
217   ## REASONABLE_TRANSFER_SIZE and MAX_TRANSFER_SIZE bytes
218   def make_buf_include offset, size=0
219     good_size = [size, REASONABLE_TRANSFER_SIZE].max
220
221     trans_start, trans_size = 
222       if @buf.empty?
223         [offset, good_size]
224       elsif offset < @buf.start
225         if @buf.start - offset <= good_size
226           start = [@buf.start - good_size, 0].max
227           [start, @buf.start - start]
228         elsif @buf.start - offset < MAX_TRANSFER_SIZE
229           [offset, @buf.start - offset]
230         else
231           MBox::debug "clearing SSH buffer because buf.start #{@buf.start} - offset #{offset} >= #{MAX_TRANSFER_SIZE}"
232           @buf.clear!
233           [offset, good_size]
234         end
235       else
236         return if [offset + size, self.size].min <= @buf.endd # whoohoo!
237         if offset - @buf.endd <= good_size
238           [@buf.endd, good_size]
239         elsif offset - @buf.endd < MAX_TRANSFER_SIZE
240           [@buf.endd, offset - @buf.endd]
241         else
242           MBox::debug "clearing SSH buffer because offset #{offset} - buf.end #{@buf.endd} >= #{MAX_TRANSFER_SIZE}"
243           @buf.clear!
244           [offset, good_size]
245         end
246       end          
247
248     @buf.clear! if @buf.size > MAX_BUF_SIZE
249     @buf.add get_bytes(trans_start, trans_size), trans_start
250   end
251 end
252
253 end
254 end