]> git.cworth.org Git - sup/blob - lib/sup/xapian_index.rb
maintain labels as Sets rather than arrays
[sup] / lib / sup / xapian_index.rb
1 require 'xapian'
2 require 'gdbm'
3 require 'set'
4
5 module Redwood
6
7 # This index implementation uses Xapian for searching and GDBM for storage. It
8 # tends to be slightly faster than Ferret for indexing and significantly faster
9 # for searching due to precomputing thread membership.
10 class XapianIndex < BaseIndex
11   STEM_LANGUAGE = "english"
12
13   ## dates are converted to integers for xapian, and are used for document ids,
14   ## so we must ensure they're reasonably valid. this typically only affect
15   ## spam.
16   MIN_DATE = Time.at 0
17   MAX_DATE = Time.at(2**31-1)
18
19   def initialize dir=BASE_DIR
20     super
21
22     @index_mutex = Monitor.new
23   end
24
25   def load_index
26     @entries = MarshalledGDBM.new File.join(@dir, "entries.db")
27     @docids = MarshalledGDBM.new File.join(@dir, "docids.db")
28     @thread_members = MarshalledGDBM.new File.join(@dir, "thread_members.db")
29     @thread_ids = MarshalledGDBM.new File.join(@dir, "thread_ids.db")
30     @assigned_docids = GDBM.new File.join(@dir, "assigned_docids.db")
31
32     @xapian = Xapian::WritableDatabase.new(File.join(@dir, "xapian"), Xapian::DB_CREATE_OR_OPEN)
33     @term_generator = Xapian::TermGenerator.new()
34     @term_generator.stemmer = Xapian::Stem.new(STEM_LANGUAGE)
35     @enquire = Xapian::Enquire.new @xapian
36     @enquire.weighting_scheme = Xapian::BoolWeight.new
37     @enquire.docid_order = Xapian::Enquire::ASCENDING
38   end
39
40   def save_index
41   end
42
43   def optimize
44   end
45
46   def size
47     synchronize { @xapian.doccount }
48   end
49
50   def contains_id? id
51     synchronize { @entries.member? id }
52   end
53
54   def source_for_id id
55     synchronize { @entries[id][:source_id] }
56   end
57
58   def delete id
59     synchronize { @xapian.delete_document @docids[id] }
60   end
61
62   def build_message id
63     entry = synchronize { @entries[id] }
64     return unless entry
65
66     source = SourceManager[entry[:source_id]]
67     raise "invalid source #{entry[:source_id]}" unless source
68
69     mk_addrs = lambda { |l| l.map { |e,n| "#{n} <#{e}>" } * ', ' }
70     mk_refs = lambda { |l| l.map { |r| "<#{r}>" } * ' ' }
71     fake_header = {
72       'message-id' => entry[:message_id],
73       'date' => Time.at(entry[:date]),
74       'subject' => entry[:subject],
75       'from' => mk_addrs[[entry[:from]]],
76       'to' => mk_addrs[entry[:to]],
77       'cc' => mk_addrs[entry[:cc]],
78       'bcc' => mk_addrs[entry[:bcc]],
79       'reply-tos' => mk_refs[entry[:replytos]],
80       'references' => mk_refs[entry[:refs]],
81      }
82
83       m = Message.new :source => source, :source_info => entry[:source_info],
84                   :labels => entry[:labels],
85                   :snippet => entry[:snippet]
86       m.parse_header fake_header
87       m
88   end
89
90   def sync_message m, opts={}
91     entry = synchronize { @entries[m.id] }
92     snippet = m.snippet
93     entry ||= {}
94     labels = m.labels
95     entry = {} if opts[:force_overwrite]
96
97     d = {
98       :message_id => m.id,
99       :source_id => m.source.id,
100       :source_info => m.source_info,
101       :date => (entry[:date] || m.date),
102       :snippet => snippet,
103       :labels => labels,
104       :from => (entry[:from] || [m.from.email, m.from.name]),
105       :to => (entry[:to] || m.to.map { |p| [p.email, p.name] }),
106       :cc => (entry[:cc] || m.cc.map { |p| [p.email, p.name] }),
107       :bcc => (entry[:bcc] || m.bcc.map { |p| [p.email, p.name] }),
108       :subject => m.subj,
109       :refs => (entry[:refs] || m.refs),
110       :replytos => (entry[:replytos] || m.replytos),
111     }
112
113     labels.each { |l| LabelManager << l }
114
115     synchronize do
116       index_message m, opts
117       union_threads([m.id] + m.refs + m.replytos)
118       @entries[m.id] = d
119     end
120     true
121   end
122
123   def num_results_for query={}
124     xapian_query = build_xapian_query query
125     matchset = run_query xapian_query, 0, 0, 100
126     matchset.matches_estimated
127   end
128
129   EACH_ID_PAGE = 100
130   def each_id query={}
131     offset = 0
132     page = EACH_ID_PAGE
133
134     xapian_query = build_xapian_query query
135     while true
136       ids = run_query_ids xapian_query, offset, (offset+page)
137       ids.each { |id| yield id }
138       break if ids.size < page
139       offset += page
140     end
141   end
142
143   def each_id_by_date query={}
144     each_id(query) { |id| yield id, lambda { build_message id } }
145   end
146
147   def each_message_in_thread_for m, opts={}
148     # TODO thread by subject
149     # TODO handle killed threads
150     ids = synchronize { @thread_members[@thread_ids[m.id]] } || []
151     ids.select { |id| contains_id? id }.each { |id| yield id, lambda { build_message id } }
152     true
153   end
154
155   def load_contacts emails, opts={}
156     contacts = Set.new
157     num = opts[:num] || 20
158     each_id_by_date :participants => emails do |id,b|
159       break if contacts.size >= num
160       m = b.call
161       ([m.from]+m.to+m.cc+m.bcc).compact.each { |p| contacts << [p.name, p.email] }
162     end
163     contacts.to_a.compact.map { |n,e| Person.new n, e }[0...num]
164   end
165
166   # TODO share code with the Ferret index
167   def parse_query s
168     query = {}
169
170     subs = s.gsub(/\b(to|from):(\S+)\b/) do
171       field, name = $1, $2
172       if(p = ContactManager.contact_for(name))
173         [field, p.email]
174       elsif name == "me"
175         [field, "(" + AccountManager.user_emails.join("||") + ")"]
176       else
177         [field, name]
178       end.join(":")
179     end
180
181     ## if we see a label:deleted or a label:spam term anywhere in the query
182     ## string, we set the extra load_spam or load_deleted options to true.
183     ## bizarre? well, because the query allows arbitrary parenthesized boolean
184     ## expressions, without fully parsing the query, we can't tell whether
185     ## the user is explicitly directing us to search spam messages or not.
186     ## e.g. if the string is -(-(-(-(-label:spam)))), does the user want to
187     ## search spam messages or not?
188     ##
189     ## so, we rely on the fact that turning these extra options ON turns OFF
190     ## the adding of "-label:deleted" or "-label:spam" terms at the very
191     ## final stage of query processing. if the user wants to search spam
192     ## messages, not adding that is the right thing; if he doesn't want to
193     ## search spam messages, then not adding it won't have any effect.
194     query[:load_spam] = true if subs =~ /\blabel:spam\b/
195     query[:load_deleted] = true if subs =~ /\blabel:deleted\b/
196
197     ## gmail style "is" operator
198     subs = subs.gsub(/\b(is|has):(\S+)\b/) do
199       field, label = $1, $2
200       case label
201       when "read"
202         "-label:unread"
203       when "spam"
204         query[:load_spam] = true
205         "label:spam"
206       when "deleted"
207         query[:load_deleted] = true
208         "label:deleted"
209       else
210         "label:#{$2}"
211       end
212     end
213
214     ## gmail style attachments "filename" and "filetype" searches
215     subs = subs.gsub(/\b(filename|filetype):(\((.+?)\)\B|(\S+)\b)/) do
216       field, name = $1, ($3 || $4)
217       case field
218       when "filename"
219         Redwood::log "filename - translated #{field}:#{name} to attachment:\"#{name.downcase}\""
220         "attachment:\"#{name.downcase}\""
221       when "filetype"
222         Redwood::log "filetype - translated #{field}:#{name} to attachment_extension:#{name.downcase}"
223         "attachment_extension:#{name.downcase}"
224       end
225     end
226
227     if $have_chronic
228       lastdate = 2<<32 - 1
229       firstdate = 0
230       subs = subs.gsub(/\b(before|on|in|during|after):(\((.+?)\)\B|(\S+)\b)/) do
231         field, datestr = $1, ($3 || $4)
232         realdate = Chronic.parse datestr, :guess => false, :context => :past
233         if realdate
234           case field
235           when "after"
236             Redwood::log "chronic: translated #{field}:#{datestr} to #{realdate.end}"
237             "date:#{realdate.end.to_i}..#{lastdate}"
238           when "before"
239             Redwood::log "chronic: translated #{field}:#{datestr} to #{realdate.begin}"
240             "date:#{firstdate}..#{realdate.end.to_i}"
241           else
242             Redwood::log "chronic: translated #{field}:#{datestr} to #{realdate}"
243             "date:#{realdate.begin.to_i}..#{realdate.end.to_i}"
244           end
245         else
246           raise ParseError, "can't understand date #{datestr.inspect}"
247         end
248       end
249     end
250
251     ## limit:42 restrict the search to 42 results
252     subs = subs.gsub(/\blimit:(\S+)\b/) do
253       lim = $1
254       if lim =~ /^\d+$/
255         query[:limit] = lim.to_i
256         ''
257       else
258         raise ParseError, "non-numeric limit #{lim.inspect}"
259       end
260     end
261
262     qp = Xapian::QueryParser.new
263     qp.database = @xapian
264     qp.stemmer = Xapian::Stem.new(STEM_LANGUAGE)
265     qp.stemming_strategy = Xapian::QueryParser::STEM_SOME
266     qp.default_op = Xapian::Query::OP_AND
267     qp.add_valuerangeprocessor(Xapian::NumberValueRangeProcessor.new(DATE_VALUENO, 'date:', true))
268     NORMAL_PREFIX.each { |k,v| qp.add_prefix k, v }
269     BOOLEAN_PREFIX.each { |k,v| qp.add_boolean_prefix k, v }
270     xapian_query = qp.parse_query(subs, Xapian::QueryParser::FLAG_PHRASE|Xapian::QueryParser::FLAG_BOOLEAN|Xapian::QueryParser::FLAG_LOVEHATE|Xapian::QueryParser::FLAG_WILDCARD, PREFIX['body'])
271
272     raise ParseError if xapian_query.nil? or xapian_query.empty?
273     query[:qobj] = xapian_query
274     query[:text] = s
275     query
276   end
277
278   private
279
280   # Stemmed
281   NORMAL_PREFIX = {
282     'subject' => 'S',
283     'body' => 'B',
284     'from_name' => 'FN',
285     'to_name' => 'TN',
286     'name' => 'N',
287     'attachment' => 'A',
288   }
289
290   # Unstemmed
291   BOOLEAN_PREFIX = {
292     'type' => 'K',
293     'from_email' => 'FE',
294     'to_email' => 'TE',
295     'email' => 'E',
296     'date' => 'D',
297     'label' => 'L',
298     'source_id' => 'I',
299     'attachment_extension' => 'O',
300   }
301
302   PREFIX = NORMAL_PREFIX.merge BOOLEAN_PREFIX
303
304   DATE_VALUENO = 0
305
306   MAX_TERM_LENGTH = 245
307
308   # Xapian can very efficiently sort in ascending docid order. Sup always wants
309   # to sort by descending date, so this method maps between them. In order to
310   # handle multiple messages per second, we use a logistic curve centered
311   # around MIDDLE_DATE so that the slope (docid/s) is greatest in this time
312   # period. A docid collision is not an error - the code will pick the next
313   # smallest unused one.
314   DOCID_SCALE = 2.0**32
315   TIME_SCALE = 2.0**27
316   MIDDLE_DATE = Time.gm(2011)
317   def assign_docid m, truncated_date
318     t = (truncated_date.to_i - MIDDLE_DATE.to_i).to_f
319     docid = (DOCID_SCALE - DOCID_SCALE/(Math::E**(-(t/TIME_SCALE)) + 1)).to_i
320     begin
321       while @assigned_docids.member? [docid].pack("N")
322         docid -= 1
323       end
324     rescue
325     end
326     @assigned_docids[[docid].pack("N")] = ''
327     docid
328   end
329
330   def synchronize &b
331     @index_mutex.synchronize &b
332   end
333
334   def run_query xapian_query, offset, limit, checkatleast=0
335     synchronize do
336       @enquire.query = xapian_query
337       @enquire.mset(offset, limit-offset, checkatleast)
338     end
339   end
340
341   def run_query_ids xapian_query, offset, limit
342     matchset = run_query xapian_query, offset, limit
343     matchset.matches.map { |r| r.document.data }
344   end
345
346   Q = Xapian::Query
347   def build_xapian_query opts
348     labels = ([opts[:label]] + (opts[:labels] || [])).compact
349     neglabels = [:spam, :deleted, :killed].reject { |l| (labels.include? l) || opts.member?("load_#{l}".intern) }
350     pos_terms, neg_terms = [], []
351
352     pos_terms << mkterm(:type, 'mail')
353     pos_terms.concat(labels.map { |l| mkterm(:label,l) })
354     pos_terms << opts[:qobj] if opts[:qobj]
355     pos_terms << mkterm(:source_id, opts[:source_id]) if opts[:source_id]
356
357     if opts[:participants]
358       participant_terms = opts[:participants].map { |p| mkterm(:email,:any, (Redwood::Person === p) ? p.email : p) }
359       pos_terms << Q.new(Q::OP_OR, participant_terms)
360     end
361
362     neg_terms.concat(neglabels.map { |l| mkterm(:label,l) })
363
364     pos_query = Q.new(Q::OP_AND, pos_terms)
365     neg_query = Q.new(Q::OP_OR, neg_terms)
366
367     if neg_query.empty?
368       pos_query
369     else
370       Q.new(Q::OP_AND_NOT, [pos_query, neg_query])
371     end
372   end
373
374   def index_message m, opts
375     terms = []
376     text = []
377
378     subject_text = m.indexable_subject
379     body_text = m.indexable_body
380
381     # Person names are indexed with several prefixes
382     person_termer = lambda do |d|
383       lambda do |p|
384         ["#{d}_name", "name", "body"].each do |x|
385           text << [p.name, PREFIX[x]]
386         end if p.name
387         [d, :any].each { |x| terms << mkterm(:email, x, p.email) }
388       end
389     end
390
391     person_termer[:from][m.from] if m.from
392     (m.to+m.cc+m.bcc).each(&(person_termer[:to]))
393
394     terms << mkterm(:date,m.date) if m.date
395     m.labels.each { |t| terms << mkterm(:label,t) }
396     terms << mkterm(:type, 'mail')
397     terms << mkterm(:source_id, m.source.id)
398     m.attachments.each do |a|
399       a =~ /\.(\w+)$/ or next
400       t = mkterm(:attachment_extension, $1)
401       terms << t
402     end
403
404     # Full text search content
405     text << [subject_text, PREFIX['subject']]
406     text << [subject_text, PREFIX['body']]
407     text << [body_text, PREFIX['body']]
408     m.attachments.each { |a| text << [a, PREFIX['attachment']] }
409
410     truncated_date = if m.date < MIN_DATE
411       Redwood::log "warning: adjusting too-low date #{m.date} for indexing"
412       MIN_DATE
413     elsif m.date > MAX_DATE
414       Redwood::log "warning: adjusting too-high date #{m.date} for indexing"
415       MAX_DATE
416     else
417       m.date
418     end
419
420     # Date value for range queries
421     date_value = begin
422       Xapian.sortable_serialise truncated_date.to_i
423     rescue TypeError
424       Xapian.sortable_serialise 0
425     end
426
427     doc = Xapian::Document.new
428     docid = @docids[m.id] || assign_docid(m, truncated_date)
429
430     @term_generator.document = doc
431     text.each { |text,prefix| @term_generator.index_text text, 1, prefix }
432     terms.each { |term| doc.add_term term if term.length <= MAX_TERM_LENGTH }
433     doc.add_value DATE_VALUENO, date_value
434     doc.data = m.id
435
436     @xapian.replace_document docid, doc
437     @docids[m.id] = docid
438   end
439
440   # Construct a Xapian term
441   def mkterm type, *args
442     case type
443     when :label
444       PREFIX['label'] + args[0].to_s.downcase
445     when :type
446       PREFIX['type'] + args[0].to_s.downcase
447     when :date
448       PREFIX['date'] + args[0].getutc.strftime("%Y%m%d%H%M%S")
449     when :email
450       case args[0]
451       when :from then PREFIX['from_email']
452       when :to then PREFIX['to_email']
453       when :any then PREFIX['email']
454       else raise "Invalid email term type #{args[0]}"
455       end + args[1].to_s.downcase
456     when :source_id
457       PREFIX['source_id'] + args[0].to_s.downcase
458     when :attachment_extension
459       PREFIX['attachment_extension'] + args[0].to_s.downcase
460     else
461       raise "Invalid term type #{type}"
462     end
463   end
464
465   # Join all the given message-ids into a single thread
466   def union_threads ids
467     seen_threads = Set.new
468     related = Set.new
469
470     # Get all the ids that will be in the new thread
471     ids.each do |id|
472       related << id
473       thread_id = @thread_ids[id]
474       if thread_id && !seen_threads.member?(thread_id)
475         thread_members = @thread_members[thread_id]
476         related.merge thread_members
477         seen_threads << thread_id
478       end
479     end
480
481     # Pick a leader and move all the others to its thread
482     a = related.to_a
483     best, *rest = a.sort_by { |x| x.hash }
484     @thread_members[best] = a
485     @thread_ids[best] = best
486     rest.each do |x|
487       @thread_members.delete x
488       @thread_ids[x] = best
489     end
490   end
491 end
492
493 end
494
495 class MarshalledGDBM < GDBM
496   def []= k, v
497     super k, Marshal.dump(v)
498   end
499
500   def [] k
501     v = super k
502     v ? Marshal.load(v) : nil
503   end
504 end