]> git.cworth.org Git - sup/blob - lib/sup/xapian_index.rb
bugfix: dates need to be truncated for xapian to index
[sup] / lib / sup / xapian_index.rb
1 require 'xapian'
2 require 'gdbm'
3 require 'set'
4
5 module Redwood
6
7 # This index implementation uses Xapian for searching and GDBM for storage. It
8 # tends to be slightly faster than Ferret for indexing and significantly faster
9 # for searching due to precomputing thread membership.
10 class XapianIndex < BaseIndex
11   STEM_LANGUAGE = "english"
12
13   ## dates are converted to integers for xapian, and are used for document ids,
14   ## so we must ensure they're reasonably valid. this typically only affect
15   ## spam.
16   MIN_DATE = Time.at 0
17   MAX_DATE = Time.at(2**31)
18
19   def initialize dir=BASE_DIR
20     super
21
22     @index_mutex = Monitor.new
23
24     @entries = MarshalledGDBM.new File.join(dir, "entries.db")
25     @docids = MarshalledGDBM.new File.join(dir, "docids.db")
26     @thread_members = MarshalledGDBM.new File.join(dir, "thread_members.db")
27     @thread_ids = MarshalledGDBM.new File.join(dir, "thread_ids.db")
28     @assigned_docids = GDBM.new File.join(dir, "assigned_docids.db")
29
30     @xapian = Xapian::WritableDatabase.new(File.join(dir, "xapian"), Xapian::DB_CREATE_OR_OPEN)
31     @term_generator = Xapian::TermGenerator.new()
32     @term_generator.stemmer = Xapian::Stem.new(STEM_LANGUAGE)
33     @enquire = Xapian::Enquire.new @xapian
34     @enquire.weighting_scheme = Xapian::BoolWeight.new
35     @enquire.docid_order = Xapian::Enquire::ASCENDING
36   end
37
38   def load_index
39   end
40
41   def save_index
42   end
43
44   def optimize
45   end
46
47   def size
48     synchronize { @xapian.doccount }
49   end
50
51   def contains_id? id
52     synchronize { @entries.member? id }
53   end
54
55   def source_for_id id
56     synchronize { @entries[id][:source_id] }
57   end
58
59   def delete id
60     synchronize { @xapian.delete_document @docids[id] }
61   end
62
63   def build_message id
64     entry = synchronize { @entries[id] }
65     return unless entry
66
67     source = SourceManager[entry[:source_id]]
68     raise "invalid source #{entry[:source_id]}" unless source
69
70     mk_addrs = lambda { |l| l.map { |e,n| "#{n} <#{e}>" } * ', ' }
71     mk_refs = lambda { |l| l.map { |r| "<#{r}>" } * ' ' }
72     fake_header = {
73       'message-id' => entry[:message_id],
74       'date' => Time.at(entry[:date]),
75       'subject' => entry[:subject],
76       'from' => mk_addrs[[entry[:from]]],
77       'to' => mk_addrs[[entry[:to]]],
78       'cc' => mk_addrs[[entry[:cc]]],
79       'bcc' => mk_addrs[[entry[:bcc]]],
80       'reply-tos' => mk_refs[entry[:replytos]],
81       'references' => mk_refs[entry[:refs]],
82      }
83
84       m = Message.new :source => source, :source_info => entry[:source_info],
85                   :labels => entry[:labels],
86                   :snippet => entry[:snippet]
87       m.parse_header fake_header
88       m
89   end
90
91   def sync_message m, opts={}
92     entry = synchronize { @entries[m.id] }
93     snippet = m.snippet
94     entry ||= {}
95     labels = m.labels
96     entry = {} if opts[:force_overwrite]
97
98     d = {
99       :message_id => m.id,
100       :source_id => m.source.id,
101       :source_info => m.source_info,
102       :date => (entry[:date] || m.date),
103       :snippet => snippet,
104       :labels => labels.uniq,
105       :from => (entry[:from] || [m.from.email, m.from.name]),
106       :to => (entry[:to] || m.to.map { |p| [p.email, p.name] }),
107       :cc => (entry[:cc] || m.cc.map { |p| [p.email, p.name] }),
108       :bcc => (entry[:bcc] || m.bcc.map { |p| [p.email, p.name] }),
109       :subject => m.subj,
110       :refs => (entry[:refs] || m.refs),
111       :replytos => (entry[:replytos] || m.replytos),
112     }
113
114     m.labels.each { |l| LabelManager << l }
115
116     synchronize do
117       index_message m, opts
118       union_threads([m.id] + m.refs + m.replytos)
119       @entries[m.id] = d
120     end
121     true
122   end
123
124   def num_results_for query={}
125     xapian_query = build_xapian_query query
126     matchset = run_query xapian_query, 0, 0, 100
127     matchset.matches_estimated
128   end
129
130   EACH_ID_PAGE = 100
131   def each_id query={}
132     offset = 0
133     page = EACH_ID_PAGE
134
135     xapian_query = build_xapian_query query
136     while true
137       ids = run_query_ids xapian_query, offset, (offset+page)
138       ids.each { |id| yield id }
139       break if ids.size < page
140       offset += page
141     end
142   end
143
144   def each_id_by_date query={}
145     each_id(query) { |id| yield id, lambda { build_message id } }
146   end
147
148   def each_message_in_thread_for m, opts={}
149     # TODO thread by subject
150     # TODO handle killed threads
151     ids = synchronize { @thread_members[@thread_ids[m.id]] } || []
152     ids.select { |id| contains_id? id }.each { |id| yield id, lambda { build_message id } }
153     true
154   end
155
156   def load_contacts emails, opts={}
157     contacts = Set.new
158     num = opts[:num] || 20
159     each_id_by_date :participants => emails do |id,b|
160       break if contacts.size >= num
161       m = b.call
162       ([m.from]+m.to+m.cc+m.bcc).compact.each { |p| contacts << [p.name, p.email] }
163     end
164     contacts.to_a.compact.map { |n,e| Person.new n, e }[0...num]
165   end
166
167   # TODO share code with the Ferret index
168   def parse_query s
169     query = {}
170
171     subs = s.gsub(/\b(to|from):(\S+)\b/) do
172       field, name = $1, $2
173       if(p = ContactManager.contact_for(name))
174         [field, p.email]
175       elsif name == "me"
176         [field, "(" + AccountManager.user_emails.join("||") + ")"]
177       else
178         [field, name]
179       end.join(":")
180     end
181
182     ## if we see a label:deleted or a label:spam term anywhere in the query
183     ## string, we set the extra load_spam or load_deleted options to true.
184     ## bizarre? well, because the query allows arbitrary parenthesized boolean
185     ## expressions, without fully parsing the query, we can't tell whether
186     ## the user is explicitly directing us to search spam messages or not.
187     ## e.g. if the string is -(-(-(-(-label:spam)))), does the user want to
188     ## search spam messages or not?
189     ##
190     ## so, we rely on the fact that turning these extra options ON turns OFF
191     ## the adding of "-label:deleted" or "-label:spam" terms at the very
192     ## final stage of query processing. if the user wants to search spam
193     ## messages, not adding that is the right thing; if he doesn't want to
194     ## search spam messages, then not adding it won't have any effect.
195     query[:load_spam] = true if subs =~ /\blabel:spam\b/
196     query[:load_deleted] = true if subs =~ /\blabel:deleted\b/
197
198     ## gmail style "is" operator
199     subs = subs.gsub(/\b(is|has):(\S+)\b/) do
200       field, label = $1, $2
201       case label
202       when "read"
203         "-label:unread"
204       when "spam"
205         query[:load_spam] = true
206         "label:spam"
207       when "deleted"
208         query[:load_deleted] = true
209         "label:deleted"
210       else
211         "label:#{$2}"
212       end
213     end
214
215     ## gmail style attachments "filename" and "filetype" searches
216     subs = subs.gsub(/\b(filename|filetype):(\((.+?)\)\B|(\S+)\b)/) do
217       field, name = $1, ($3 || $4)
218       case field
219       when "filename"
220         Redwood::log "filename - translated #{field}:#{name} to attachment:\"#{name.downcase}\""
221         "attachment:\"#{name.downcase}\""
222       when "filetype"
223         Redwood::log "filetype - translated #{field}:#{name} to attachment_extension:#{name.downcase}"
224         "attachment_extension:#{name.downcase}"
225       end
226     end
227
228     if $have_chronic
229       lastdate = 2<<32 - 1
230       firstdate = 0
231       subs = subs.gsub(/\b(before|on|in|during|after):(\((.+?)\)\B|(\S+)\b)/) do
232         field, datestr = $1, ($3 || $4)
233         realdate = Chronic.parse datestr, :guess => false, :context => :past
234         if realdate
235           case field
236           when "after"
237             Redwood::log "chronic: translated #{field}:#{datestr} to #{realdate.end}"
238             "date:#{realdate.end.to_i}..#{lastdate}"
239           when "before"
240             Redwood::log "chronic: translated #{field}:#{datestr} to #{realdate.begin}"
241             "date:#{firstdate}..#{realdate.end.to_i}"
242           else
243             Redwood::log "chronic: translated #{field}:#{datestr} to #{realdate}"
244             "date:#{realdate.begin.to_i}..#{realdate.end.to_i}"
245           end
246         else
247           raise ParseError, "can't understand date #{datestr.inspect}"
248         end
249       end
250     end
251
252     ## limit:42 restrict the search to 42 results
253     subs = subs.gsub(/\blimit:(\S+)\b/) do
254       lim = $1
255       if lim =~ /^\d+$/
256         query[:limit] = lim.to_i
257         ''
258       else
259         raise ParseError, "non-numeric limit #{lim.inspect}"
260       end
261     end
262
263     qp = Xapian::QueryParser.new
264     qp.database = @xapian
265     qp.stemmer = Xapian::Stem.new(STEM_LANGUAGE)
266     qp.stemming_strategy = Xapian::QueryParser::STEM_SOME
267     qp.default_op = Xapian::Query::OP_AND
268     qp.add_valuerangeprocessor(Xapian::NumberValueRangeProcessor.new(DATE_VALUENO, 'date:', true))
269     NORMAL_PREFIX.each { |k,v| qp.add_prefix k, v }
270     BOOLEAN_PREFIX.each { |k,v| qp.add_boolean_prefix k, v }
271     xapian_query = qp.parse_query(subs, Xapian::QueryParser::FLAG_PHRASE|Xapian::QueryParser::FLAG_BOOLEAN|Xapian::QueryParser::FLAG_LOVEHATE|Xapian::QueryParser::FLAG_WILDCARD, PREFIX['body'])
272
273     raise ParseError if xapian_query.nil? or xapian_query.empty?
274     query[:qobj] = xapian_query
275     query[:text] = s
276     query
277   end
278
279   private
280
281   # Stemmed
282   NORMAL_PREFIX = {
283     'subject' => 'S',
284     'body' => 'B',
285     'from_name' => 'FN',
286     'to_name' => 'TN',
287     'name' => 'N',
288     'attachment' => 'A',
289   }
290
291   # Unstemmed
292   BOOLEAN_PREFIX = {
293     'type' => 'K',
294     'from_email' => 'FE',
295     'to_email' => 'TE',
296     'email' => 'E',
297     'date' => 'D',
298     'label' => 'L',
299     'source_id' => 'I',
300     'attachment_extension' => 'O',
301   }
302
303   PREFIX = NORMAL_PREFIX.merge BOOLEAN_PREFIX
304
305   DATE_VALUENO = 0
306
307   # Xapian can very efficiently sort in ascending docid order. Sup always wants
308   # to sort by descending date, so this method maps between them. In order to
309   # handle multiple messages per second, we use a logistic curve centered
310   # around MIDDLE_DATE so that the slope (docid/s) is greatest in this time
311   # period. A docid collision is not an error - the code will pick the next
312   # smallest unused one.
313   DOCID_SCALE = 2.0**32
314   TIME_SCALE = 2.0**27
315   MIDDLE_DATE = Time.gm(2011)
316   def assign_docid m, truncated_date
317     t = (truncated_date.to_i - MIDDLE_DATE.to_i).to_f
318     docid = (DOCID_SCALE - DOCID_SCALE/(Math::E**(-(t/TIME_SCALE)) + 1)).to_i
319     begin
320       while @assigned_docids.member? [docid].pack("N")
321         docid -= 1
322       end
323     rescue
324     end
325     @assigned_docids[[docid].pack("N")] = ''
326     docid
327   end
328
329   def synchronize &b
330     @index_mutex.synchronize &b
331   end
332
333   def run_query xapian_query, offset, limit, checkatleast=0
334     synchronize do
335       @enquire.query = xapian_query
336       @enquire.mset(offset, limit-offset, checkatleast)
337     end
338   end
339
340   def run_query_ids xapian_query, offset, limit
341     matchset = run_query xapian_query, offset, limit
342     matchset.matches.map { |r| r.document.data }
343   end
344
345   Q = Xapian::Query
346   def build_xapian_query opts
347     labels = ([opts[:label]] + (opts[:labels] || [])).compact
348     neglabels = [:spam, :deleted, :killed].reject { |l| (labels.include? l) || opts.member?("load_#{l}".intern) }
349     pos_terms, neg_terms = [], []
350
351     pos_terms << mkterm(:type, 'mail')
352     pos_terms.concat(labels.map { |l| mkterm(:label,l) })
353     pos_terms << opts[:qobj] if opts[:qobj]
354     pos_terms << mkterm(:source_id, opts[:source_id]) if opts[:source_id]
355
356     if opts[:participants]
357       participant_terms = opts[:participants].map { |p| mkterm(:email,:any, (Redwood::Person === p) ? p.email : p) }
358       pos_terms << Q.new(Q::OP_OR, participant_terms)
359     end
360
361     neg_terms.concat(neglabels.map { |l| mkterm(:label,l) })
362
363     pos_query = Q.new(Q::OP_AND, pos_terms)
364     neg_query = Q.new(Q::OP_OR, neg_terms)
365
366     if neg_query.empty?
367       pos_query
368     else
369       Q.new(Q::OP_AND_NOT, [pos_query, neg_query])
370     end
371   end
372
373   def index_message m, opts
374     terms = []
375     text = []
376
377     subject_text = m.indexable_subject
378     body_text = m.indexable_body
379
380     # Person names are indexed with several prefixes
381     person_termer = lambda do |d|
382       lambda do |p|
383         ["#{d}_name", "name", "body"].each do |x|
384           text << [p.name, PREFIX[x]]
385         end if p.name
386         [d, :any].each { |x| terms << mkterm(:email, x, p.email) }
387       end
388     end
389
390     person_termer[:from][m.from] if m.from
391     (m.to+m.cc+m.bcc).each(&(person_termer[:to]))
392
393     terms << mkterm(:date,m.date) if m.date
394     m.labels.each { |t| terms << mkterm(:label,t) }
395     terms << mkterm(:type, 'mail')
396     terms << mkterm(:source_id, m.source.id)
397     m.attachments.each do |a|
398       a =~ /\.(\w+)$/ or next
399       t = mkterm(:attachment_extension, $1)
400       terms << t
401     end
402
403     # Full text search content
404     text << [subject_text, PREFIX['subject']]
405     text << [subject_text, PREFIX['body']]
406     text << [body_text, PREFIX['body']]
407     m.attachments.each { |a| text << [a, PREFIX['attachment']] }
408
409     truncated_date = if m.date < MIN_DATE
410       Redwood::log "warning: adjusting too-low date #{m.date} for indexing"
411       MIN_DATE
412     elsif m.date > MAX_DATE
413       Redwood::log "warning: adjusting too-high date #{m.date} for indexing"
414       MAX_DATE
415     else
416       m.date
417     end
418
419     # Date value for range queries
420     date_value = begin
421       Xapian.sortable_serialise truncated_date.to_i
422     rescue TypeError
423       Xapian.sortable_serialise 0
424     end
425
426     doc = Xapian::Document.new
427     docid = @docids[m.id] || assign_docid(m, truncated_date)
428
429     @term_generator.document = doc
430     text.each { |text,prefix| @term_generator.index_text text, 1, prefix }
431     terms.each { |term| doc.add_term term }
432     doc.add_value DATE_VALUENO, date_value
433     doc.data = m.id
434
435     @xapian.replace_document docid, doc
436     @docids[m.id] = docid
437   end
438
439   # Construct a Xapian term
440   def mkterm type, *args
441     case type
442     when :label
443       PREFIX['label'] + args[0].to_s.downcase
444     when :type
445       PREFIX['type'] + args[0].to_s.downcase
446     when :date
447       PREFIX['date'] + args[0].getutc.strftime("%Y%m%d%H%M%S")
448     when :email
449       case args[0]
450       when :from then PREFIX['from_email']
451       when :to then PREFIX['to_email']
452       when :any then PREFIX['email']
453       else raise "Invalid email term type #{args[0]}"
454       end + args[1].to_s.downcase
455     when :source_id
456       PREFIX['source_id'] + args[0].to_s.downcase
457     when :attachment_extension
458       PREFIX['attachment_extension'] + args[0].to_s.downcase
459     else
460       raise "Invalid term type #{type}"
461     end
462   end
463
464   # Join all the given message-ids into a single thread
465   def union_threads ids
466     seen_threads = Set.new
467     related = Set.new
468
469     # Get all the ids that will be in the new thread
470     ids.each do |id|
471       related << id
472       thread_id = @thread_ids[id]
473       if thread_id && !seen_threads.member?(thread_id)
474         thread_members = @thread_members[thread_id]
475         related.merge thread_members
476         seen_threads << thread_id
477       end
478     end
479
480     # Pick a leader and move all the others to its thread
481     a = related.to_a
482     best, *rest = a.sort_by { |x| x.hash }
483     @thread_members[best] = a
484     @thread_ids[best] = best
485     rest.each do |x|
486       @thread_members.delete x
487       @thread_ids[x] = best
488     end
489   end
490 end
491
492 end
493
494 class MarshalledGDBM < GDBM
495   def []= k, v
496     super k, Marshal.dump(v)
497   end
498
499   def [] k
500     v = super k
501     v ? Marshal.load(v) : nil
502   end
503 end