]> git.cworth.org Git - sup/blob - lib/sup/mbox/loader.rb
grrr
[sup] / lib / sup / mbox / loader.rb
1 require 'rmail'
2 require 'uri'
3
4 module Redwood
5 module MBox
6
7 class Loader < Source
8   def initialize uri_or_fp, start_offset=nil, usual=true, archived=false, id=nil
9     super
10
11     @mutex = Mutex.new
12     @labels = [:unread]
13
14     case uri_or_fp
15     when String
16       uri = URI(uri_or_fp)
17       raise ArgumentError, "not an mbox uri" unless uri.scheme == "mbox"
18       raise ArgumentError, "mbox uri cannot have a host: #{uri.host}" if uri.host
19       ## heuristic: use the filename as a label, unless the file
20       ## has a path that probably represents an inbox.
21       @labels << File.basename(uri.path).intern unless File.dirname(uri.path) =~ /\b(var|usr|spool)\b/
22       @f = File.open uri.path
23     else
24       @f = uri_or_fp
25     end
26   end
27
28   def check
29     if (cur_offset ||= start_offset) > end_offset
30       raise OutOfSyncSourceError, "mbox file is smaller than last recorded message offset. Messages have probably been deleted by another client."
31     end
32   end
33     
34   def start_offset; 0; end
35   def end_offset; File.size @f; end
36
37   def load_header offset
38     header = nil
39     @mutex.synchronize do
40       @f.seek offset
41       l = @f.gets
42       unless l =~ BREAK_RE
43         raise OutOfSyncSourceError, "mismatch in mbox file offset #{offset.inspect}: #{l.inspect}." 
44       end
45       header = MBox::read_header @f
46     end
47     header
48   end
49
50   def load_message offset
51     @mutex.synchronize do
52       @f.seek offset
53       begin
54         RMail::Mailbox::MBoxReader.new(@f).each_message do |input|
55           return RMail::Parser.read(input)
56         end
57       rescue RMail::Parser::Error => e
58         raise FatalSourceError, "error parsing mbox file: #{e.message}"
59       end
60     end
61   end
62
63   def raw_header offset
64     ret = ""
65     @mutex.synchronize do
66       @f.seek offset
67       until @f.eof? || (l = @f.gets) =~ /^$/
68         ret += l
69       end
70     end
71     ret
72   end
73
74   def raw_full_message offset
75     ret = ""
76     @mutex.synchronize do
77       @f.seek offset
78       @f.gets # skip mbox header
79       until @f.eof? || (l = @f.gets) =~ BREAK_RE
80         ret += l
81       end
82     end
83     ret
84   end
85
86   def next
87     returned_offset = nil
88     next_offset = cur_offset
89
90     begin
91       @mutex.synchronize do
92         @f.seek cur_offset
93
94         ## cur_offset could be at one of two places here:
95
96         ## 1. before a \n and a mbox separator, if it was previously at
97         ##    EOF and a new message was added; or,
98         ## 2. at the beginning of an mbox separator (in all other
99         ##    cases).
100
101         l = @f.gets or raise "next while at EOF"
102         if l =~ /^\s*$/ # case 1
103           returned_offset = @f.tell
104           @f.gets # now we're at a BREAK_RE, so skip past it
105         else # case 2
106           returned_offset = cur_offset
107           ## we've already skipped past the BREAK_RE, so just go
108         end
109
110         while(line = @f.gets)
111           break if line =~ BREAK_RE
112           next_offset = @f.tell
113         end
114       end
115     rescue SystemCallError => e
116       raise FatalSourceError, "Error reading #{@f.path}: #{e.message}"
117     end
118
119     self.cur_offset = next_offset
120     [returned_offset, @labels.clone]
121   end
122 end
123
124 Redwood::register_yaml(Loader, %w(uri cur_offset usual archived id))
125
126 end
127 end