1 ## the index structure for redwood. interacts with ferret.
5 require_gem 'ferret', ">= 0.10.13"
9 class IndexError < StandardError
12 def initialize source, s
21 LOAD_THREAD_PETIT_DELAY = 0.1
22 LOAD_THREAD_GRAND_DELAY = 5
24 MESSAGES_AT_A_TIME = 10
26 attr_reader :index # debugging only
28 def initialize dir=BASE_DIR
31 @load_thread = nil # loads new messages
33 @sources_dirty = false
35 self.class.i_am_the_instance self
44 FileUtils.mkdir_p @dir unless File.exists? @dir
50 raise "duplicate source!" if @sources.include? source
52 source.id ||= @sources.size
53 source.id += 1 while @sources.member? source.id
54 @sources[source.id] = source
57 def source_for name; @sources.values.find { |s| s.is_source_for? name }; end
58 def usual_sources; @sources.values.find_all { |s| s.usual? }; end
60 def load_index dir=File.join(@dir, "ferret")
61 wsa = Ferret::Analysis::WhiteSpaceAnalyzer.new false
62 sa = Ferret::Analysis::StandardAnalyzer.new
63 analyzer = Ferret::Analysis::PerFieldAnalyzer.new wsa
67 Redwood::log "loading index"
68 @index = Ferret::Index::Index.new(:path => dir, :analyzer => analyzer)
70 Redwood::log "creating index"
71 field_infos = Ferret::Index::FieldInfos.new :store => :yes
72 field_infos.add_field :message_id
73 field_infos.add_field :source_id
74 field_infos.add_field :source_info, :index => :no, :term_vector => :no
75 field_infos.add_field :date, :index => :untokenized
76 field_infos.add_field :body, :store => :no
77 field_infos.add_field :label
78 field_infos.add_field :subject
79 field_infos.add_field :from
80 field_infos.add_field :to
81 field_infos.add_field :refs
82 field_infos.add_field :snippet, :index => :no, :term_vector => :no
83 field_infos.create_index dir
84 @index = Ferret::Index::Index.new(:path => dir, :analyzer => analyzer)
88 ## update the message by deleting and re-adding
89 def update_message m, source=nil, source_info=nil
90 docid, entry = load_entry_for_id m.id
92 source ||= entry[:source_id].to_i
93 source_info ||= entry[:source_info].to_i
95 raise "no entry and no source info for message #{m.id}" unless source && source_info
97 raise "deleting non-corresponding entry #{docid}" unless @index[docid][:message_id] == m.id
102 def save_index fn=File.join(@dir, "ferret")
103 # don't have to do anything apparently
107 @index.search(Ferret::Search::TermQuery.new(:message_id, id)).total_hits > 0
109 def contains? m; contains_id? m.id; end
110 def size; @index.size; end
112 ## you should probably not call this on a block that doesn't break
113 ## rather quickly because the results will probably be, as we say
114 ## in scotland, frikkin' huuuge.
115 EACH_BY_DATE_NUM = 100
116 def each_id_by_date opts={}
117 return if @index.size == 0 # otherwise ferret barfs
118 query = build_query opts
121 results = @index.search(query, :sort => "date DESC", :limit => EACH_BY_DATE_NUM, :offset => offset)
122 Redwood::log "got #{results.total_hits} results for query (offset #{offset}) #{query.inspect}"
123 results.hits.each { |hit| yield @index[hit.doc][:message_id], lambda { build_message hit.doc } }
124 break if offset >= results.total_hits - EACH_BY_DATE_NUM
125 offset += EACH_BY_DATE_NUM
129 def num_results_for opts={}
130 query = build_query opts
131 x = @index.search(query).total_hits
132 Redwood::log "num_results_for: have #{x} for query #{query}"
136 SAME_SUBJECT_DATE_LIMIT = 7
137 def each_message_in_thread_for m, opts={}
142 ## temporarily disabling subject searching because it's a
143 ## significant slowdown.
145 ## TODO: make this configurable, i guess
147 date_min = m.date - (SAME_SUBJECT_DATE_LIMIT * 12 * 3600)
148 date_max = m.date + (SAME_SUBJECT_DATE_LIMIT * 12 * 3600)
150 q = Ferret::Search::BooleanQuery.new true
151 sq = Ferret::Search::PhraseQuery.new(:subject)
152 wrap_subj(Message.normalize_subj(m.subj)).split(/\s+/).each do |t|
155 q.add_query sq, :must
156 q.add_query Ferret::Search::RangeQuery.new(:date, :>= => date_min.to_indexable_s, :<= => date_max.to_indexable_s), :must
158 pending = @index.search(q).hits.map { |hit| @index[hit.doc][:message_id] }
159 Redwood::log "found #{pending.size} results for subject query #{q}"
164 until pending.empty? || (opts[:limit] && messages.size >= opts[:limit])
166 next if searched.member? id
168 q = Ferret::Search::BooleanQuery.new true
169 q.add_query Ferret::Search::TermQuery.new(:message_id, id), :should
170 q.add_query Ferret::Search::TermQuery.new(:refs, id), :should
173 @index.search_each(q, :limit => :all) do |docid, score|
174 break if opts[:limit] && messages.size >= opts[:limit]
175 mid = @index[docid][:message_id]
176 unless messages.member? mid
177 messages[mid] ||= lambda { build_message docid }
178 refs = @index[docid][:refs].split(" ")
183 Redwood::log "ran #{num_queries} queries to build thread of #{messages.size} messages for #{m.id}"
184 messages.each { |mid, builder| yield mid, builder }
187 ## builds a message object from a ferret result
188 def build_message docid
190 source = @sources[doc[:source_id].to_i]
191 #puts "building message #{doc[:message_id]} (#{source}##{doc[:source_info]})"
192 raise "invalid source #{doc[:source_id]}" unless source
194 raise "no snippet" unless doc[:snippet]
195 Message.new source, doc[:source_info].to_i,
196 doc[:label].split(" ").map { |s| s.intern },
198 rescue MessageFormatError => e
199 raise IndexError.new(source, "error building message #{doc[:message_id]} at #{source}/#{doc[:source_info]}: #{e.message}")
204 def start_load_thread
205 return if @load_thread
207 @load_thread = ::Thread.new do
209 load_some_entries ENTRIES_AT_A_TIME, LOAD_THREAD_PETIT_DELAY, LOAD_THREAD_GRAND_DELAY
214 def end_load_thread; @load_thread = nil; end
215 def fresh_thread_id; @next_thread_id += 1; end
217 def wrap_subj subj; "__START_SUBJECT__ #{subj} __END_SUBJECT__"; end
220 return false if contains? m
223 if m.source.is_a? Integer
226 m.source.id or raise "unregistered source #{m.source}"
229 to = (m.to + m.cc + m.bcc).map { |x| x.email }.join(" ")
232 :source_id => source_id,
233 :source_info => m.source_info,
234 :date => m.date.to_indexable_s,
236 :snippet => m.snippet,
237 :label => m.labels.join(" "),
238 :from => m.from ? m.from.email : "",
239 :to => (m.to + m.cc + m.bcc).map { |x| x.email }.join(" "),
240 :subject => wrap_subj(Message.normalize_subj(m.subj)),
241 :refs => (m.refs + m.replytos).join(" "),
244 @index.add_document d
246 ## TODO: figure out why this is sometimes triggered
247 #docid, entry = load_entry_for_id m.id
248 #raise "just added message #{m.id} but couldn't find it in a search" unless docid
252 def drop_entry docno; @index.delete docno; end
254 def load_entry_for_id mid
255 results = @index.search(Ferret::Search::TermQuery.new(:message_id, mid))
256 return if results.total_hits == 0
257 docid = results.hits[0].doc
258 [docid, @index[docid]]
261 def load_contacts emails, h={}
262 q = Ferret::Search::BooleanQuery.new true
264 qq = Ferret::Search::BooleanQuery.new true
265 qq.add_query Ferret::Search::TermQuery.new(:from, e), :should
266 qq.add_query Ferret::Search::TermQuery.new(:to, e), :should
269 q.add_query Ferret::Search::TermQuery.new(:label, "spam"), :must_not
271 Redwood::log "contact search: #{q}"
274 @index.search_each(q, :sort => "date DESC", :limit => :all) do |docid, score|
275 break if contacts.size >= num
276 #Redwood::log "got message with to: #{@index[docid][:to].inspect} and from: #{@index[docid][:from].inspect}"
277 f = @index[docid][:from]
278 t = @index[docid][:to]
280 if AccountManager.is_account_email? f
281 t.split(" ").each { |e| #Redwood::log "adding #{e} because there's a message to him from account email #{f}";
282 contacts[Person.for(e)] = true }
284 #Redwood::log "adding from #{f} because there's a message from him to #{t}"
285 contacts[Person.for(f)] = true
289 contacts.keys.compact
294 ## TODO: convert this to query objects rather than strings
297 query += opts[:labels].map { |t| "+label:#{t}" }.join(" ") if opts[:labels]
298 query += " +label:#{opts[:label]}" if opts[:label]
299 query += " #{opts[:content]}" if opts[:content]
300 if opts[:participants]
302 opts[:participants].map { |p| "from:#{p.email} OR to:#{p.email}" }.join(" OR ") + ")"
305 query += " -label:spam" unless opts[:load_spam] || opts[:labels] == :spam ||
306 (opts[:labels] && opts[:labels].include?(:spam))
307 query += " -label:killed" unless opts[:load_killed] || opts[:labels] == :killed ||
308 (opts[:labels] && opts[:labels].include?(:killed))
312 def load_sources fn=Redwood::SOURCE_FN
313 @sources = Hash[*(Redwood::load_yaml_obj(fn) || []).map { |s| [s.id, s] }.flatten]
314 @sources_dirty = false
317 def save_sources fn=Redwood::SOURCE_FN
318 if @sources_dirty || @sources.any? { |id, s| s.dirty? }
319 FileUtils.mv fn, fn + ".bak", :force => true if File.exists? fn
320 Redwood::save_yaml_obj @sources.values, fn
322 @sources_dirty = false
325 def load_some_entries max=ENTRIES_AT_A_TIME, delay1=nil, delay2=nil
328 @sources.each_with_index do |source, source_id|
329 next if source.done? || num >= max
330 source.each do |source_info, label|
332 m = Message.new(source, source_info, label + [:inbox])
333 add_message m unless contains_id? m.id
334 puts m.content.inspect
336 rescue MessageFormatError => e
337 $stderr.puts "ignoring erroneous message at #{source}##{source_info}: #{e.message}"
340 sleep delay1 if delay1
342 Redwood::log "loaded #{num} entries from #{source}"
343 sleep delay2 if delay2