]> git.cworth.org Git - sup/blob - lib/sup/xapian_index.rb
Merge branch 'various-api-refactors' into next
[sup] / lib / sup / xapian_index.rb
1 require 'xapian'
2 require 'gdbm'
3 require 'set'
4
5 module Redwood
6
7 # This index implementation uses Xapian for searching and GDBM for storage. It
8 # tends to be slightly faster than Ferret for indexing and significantly faster
9 # for searching due to precomputing thread membership.
10 class XapianIndex < BaseIndex
11   STEM_LANGUAGE = "english"
12
13   ## dates are converted to integers for xapian, and are used for document ids,
14   ## so we must ensure they're reasonably valid. this typically only affect
15   ## spam.
16   MIN_DATE = Time.at 0
17   MAX_DATE = Time.at(2**31-1)
18
19   def initialize dir=BASE_DIR
20     super
21
22     @index_mutex = Monitor.new
23   end
24
25   def load_index
26     @entries = MarshalledGDBM.new File.join(@dir, "entries.db")
27     @docids = MarshalledGDBM.new File.join(@dir, "docids.db")
28     @thread_members = MarshalledGDBM.new File.join(@dir, "thread_members.db")
29     @thread_ids = MarshalledGDBM.new File.join(@dir, "thread_ids.db")
30     @assigned_docids = GDBM.new File.join(@dir, "assigned_docids.db")
31
32     @xapian = Xapian::WritableDatabase.new(File.join(@dir, "xapian"), Xapian::DB_CREATE_OR_OPEN)
33     @term_generator = Xapian::TermGenerator.new()
34     @term_generator.stemmer = Xapian::Stem.new(STEM_LANGUAGE)
35     @enquire = Xapian::Enquire.new @xapian
36     @enquire.weighting_scheme = Xapian::BoolWeight.new
37     @enquire.docid_order = Xapian::Enquire::ASCENDING
38   end
39
40   def save_index
41   end
42
43   def optimize
44   end
45
46   def size
47     synchronize { @xapian.doccount }
48   end
49
50   def contains_id? id
51     synchronize { @entries.member? id }
52   end
53
54   def source_for_id id
55     synchronize { @entries[id][:source_id] }
56   end
57
58   def delete id
59     synchronize { @xapian.delete_document @docids[id] }
60   end
61
62   def build_message id
63     entry = synchronize { @entries[id] }
64     return unless entry
65
66     source = SourceManager[entry[:source_id]]
67     raise "invalid source #{entry[:source_id]}" unless source
68
69     mk_addrs = lambda { |l| l.map { |e,n| "#{n} <#{e}>" } * ', ' }
70     mk_refs = lambda { |l| l.map { |r| "<#{r}>" } * ' ' }
71     fake_header = {
72       'message-id' => entry[:message_id],
73       'date' => Time.at(entry[:date]),
74       'subject' => entry[:subject],
75       'from' => mk_addrs[[entry[:from]]],
76       'to' => mk_addrs[entry[:to]],
77       'cc' => mk_addrs[entry[:cc]],
78       'bcc' => mk_addrs[entry[:bcc]],
79       'reply-tos' => mk_refs[entry[:replytos]],
80       'references' => mk_refs[entry[:refs]],
81      }
82
83       m = Message.new :source => source, :source_info => entry[:source_info],
84                   :labels => entry[:labels],
85                   :snippet => entry[:snippet]
86       m.parse_header fake_header
87       m
88   end
89
90   def add_message m; sync_message m end
91   def update_message m; sync_message m end
92   def update_message_state m; sync_message m end
93
94   def sync_message m, opts={}
95     entry = synchronize { @entries[m.id] }
96     snippet = m.snippet
97     entry ||= {}
98     labels = m.labels
99     entry = {} if opts[:force_overwrite]
100
101     d = {
102       :message_id => m.id,
103       :source_id => m.source.id,
104       :source_info => m.source_info,
105       :date => (entry[:date] || m.date),
106       :snippet => snippet,
107       :labels => labels,
108       :from => (entry[:from] || [m.from.email, m.from.name]),
109       :to => (entry[:to] || m.to.map { |p| [p.email, p.name] }),
110       :cc => (entry[:cc] || m.cc.map { |p| [p.email, p.name] }),
111       :bcc => (entry[:bcc] || m.bcc.map { |p| [p.email, p.name] }),
112       :subject => m.subj,
113       :refs => (entry[:refs] || m.refs),
114       :replytos => (entry[:replytos] || m.replytos),
115     }
116
117     labels.each { |l| LabelManager << l }
118
119     synchronize do
120       index_message m, opts
121       union_threads([m.id] + m.refs + m.replytos)
122       @entries[m.id] = d
123     end
124     true
125   end
126   private :sync_message
127
128   def num_results_for query={}
129     xapian_query = build_xapian_query query
130     matchset = run_query xapian_query, 0, 0, 100
131     matchset.matches_estimated
132   end
133
134   EACH_ID_PAGE = 100
135   def each_id query={}
136     offset = 0
137     page = EACH_ID_PAGE
138
139     xapian_query = build_xapian_query query
140     while true
141       ids = run_query_ids xapian_query, offset, (offset+page)
142       ids.each { |id| yield id }
143       break if ids.size < page
144       offset += page
145     end
146   end
147
148   def each_id_by_date query={}
149     each_id(query) { |id| yield id, lambda { build_message id } }
150   end
151
152   def each_message_in_thread_for m, opts={}
153     # TODO thread by subject
154     # TODO handle killed threads
155     ids = synchronize { @thread_members[@thread_ids[m.id]] } || []
156     ids.select { |id| contains_id? id }.each { |id| yield id, lambda { build_message id } }
157     true
158   end
159
160   def load_contacts emails, opts={}
161     contacts = Set.new
162     num = opts[:num] || 20
163     each_id_by_date :participants => emails do |id,b|
164       break if contacts.size >= num
165       m = b.call
166       ([m.from]+m.to+m.cc+m.bcc).compact.each { |p| contacts << [p.name, p.email] }
167     end
168     contacts.to_a.compact.map { |n,e| Person.new n, e }[0...num]
169   end
170
171   # TODO share code with the Ferret index
172   def parse_query s
173     query = {}
174
175     subs = s.gsub(/\b(to|from):(\S+)\b/) do
176       field, name = $1, $2
177       if(p = ContactManager.contact_for(name))
178         [field, p.email]
179       elsif name == "me"
180         [field, "(" + AccountManager.user_emails.join("||") + ")"]
181       else
182         [field, name]
183       end.join(":")
184     end
185
186     ## if we see a label:deleted or a label:spam term anywhere in the query
187     ## string, we set the extra load_spam or load_deleted options to true.
188     ## bizarre? well, because the query allows arbitrary parenthesized boolean
189     ## expressions, without fully parsing the query, we can't tell whether
190     ## the user is explicitly directing us to search spam messages or not.
191     ## e.g. if the string is -(-(-(-(-label:spam)))), does the user want to
192     ## search spam messages or not?
193     ##
194     ## so, we rely on the fact that turning these extra options ON turns OFF
195     ## the adding of "-label:deleted" or "-label:spam" terms at the very
196     ## final stage of query processing. if the user wants to search spam
197     ## messages, not adding that is the right thing; if he doesn't want to
198     ## search spam messages, then not adding it won't have any effect.
199     query[:load_spam] = true if subs =~ /\blabel:spam\b/
200     query[:load_deleted] = true if subs =~ /\blabel:deleted\b/
201
202     ## gmail style "is" operator
203     subs = subs.gsub(/\b(is|has):(\S+)\b/) do
204       field, label = $1, $2
205       case label
206       when "read"
207         "-label:unread"
208       when "spam"
209         query[:load_spam] = true
210         "label:spam"
211       when "deleted"
212         query[:load_deleted] = true
213         "label:deleted"
214       else
215         "label:#{$2}"
216       end
217     end
218
219     ## gmail style attachments "filename" and "filetype" searches
220     subs = subs.gsub(/\b(filename|filetype):(\((.+?)\)\B|(\S+)\b)/) do
221       field, name = $1, ($3 || $4)
222       case field
223       when "filename"
224         debug "filename: translated #{field}:#{name} to attachment:\"#{name.downcase}\""
225         "attachment:\"#{name.downcase}\""
226       when "filetype"
227         debug "filetype: translated #{field}:#{name} to attachment_extension:#{name.downcase}"
228         "attachment_extension:#{name.downcase}"
229       end
230     end
231
232     if $have_chronic
233       lastdate = 2<<32 - 1
234       firstdate = 0
235       subs = subs.gsub(/\b(before|on|in|during|after):(\((.+?)\)\B|(\S+)\b)/) do
236         field, datestr = $1, ($3 || $4)
237         realdate = Chronic.parse datestr, :guess => false, :context => :past
238         if realdate
239           case field
240           when "after"
241             debug "chronic: translated #{field}:#{datestr} to #{realdate.end}"
242             "date:#{realdate.end.to_i}..#{lastdate}"
243           when "before"
244             debug "chronic: translated #{field}:#{datestr} to #{realdate.begin}"
245             "date:#{firstdate}..#{realdate.end.to_i}"
246           else
247             debug "chronic: translated #{field}:#{datestr} to #{realdate}"
248             "date:#{realdate.begin.to_i}..#{realdate.end.to_i}"
249           end
250         else
251           raise ParseError, "can't understand date #{datestr.inspect}"
252         end
253       end
254     end
255
256     ## limit:42 restrict the search to 42 results
257     subs = subs.gsub(/\blimit:(\S+)\b/) do
258       lim = $1
259       if lim =~ /^\d+$/
260         query[:limit] = lim.to_i
261         ''
262       else
263         raise ParseError, "non-numeric limit #{lim.inspect}"
264       end
265     end
266
267     qp = Xapian::QueryParser.new
268     qp.database = @xapian
269     qp.stemmer = Xapian::Stem.new(STEM_LANGUAGE)
270     qp.stemming_strategy = Xapian::QueryParser::STEM_SOME
271     qp.default_op = Xapian::Query::OP_AND
272     qp.add_valuerangeprocessor(Xapian::NumberValueRangeProcessor.new(DATE_VALUENO, 'date:', true))
273     NORMAL_PREFIX.each { |k,v| qp.add_prefix k, v }
274     BOOLEAN_PREFIX.each { |k,v| qp.add_boolean_prefix k, v }
275     xapian_query = qp.parse_query(subs, Xapian::QueryParser::FLAG_PHRASE|Xapian::QueryParser::FLAG_BOOLEAN|Xapian::QueryParser::FLAG_LOVEHATE|Xapian::QueryParser::FLAG_WILDCARD, PREFIX['body'])
276
277     raise ParseError if xapian_query.nil? or xapian_query.empty?
278     query[:qobj] = xapian_query
279     query[:text] = s
280     query
281   end
282
283   private
284
285   # Stemmed
286   NORMAL_PREFIX = {
287     'subject' => 'S',
288     'body' => 'B',
289     'from_name' => 'FN',
290     'to_name' => 'TN',
291     'name' => 'N',
292     'attachment' => 'A',
293   }
294
295   # Unstemmed
296   BOOLEAN_PREFIX = {
297     'type' => 'K',
298     'from_email' => 'FE',
299     'to_email' => 'TE',
300     'email' => 'E',
301     'date' => 'D',
302     'label' => 'L',
303     'source_id' => 'I',
304     'attachment_extension' => 'O',
305   }
306
307   PREFIX = NORMAL_PREFIX.merge BOOLEAN_PREFIX
308
309   DATE_VALUENO = 0
310
311   MAX_TERM_LENGTH = 245
312
313   # Xapian can very efficiently sort in ascending docid order. Sup always wants
314   # to sort by descending date, so this method maps between them. In order to
315   # handle multiple messages per second, we use a logistic curve centered
316   # around MIDDLE_DATE so that the slope (docid/s) is greatest in this time
317   # period. A docid collision is not an error - the code will pick the next
318   # smallest unused one.
319   DOCID_SCALE = 2.0**32
320   TIME_SCALE = 2.0**27
321   MIDDLE_DATE = Time.gm(2011)
322   def assign_docid m, truncated_date
323     t = (truncated_date.to_i - MIDDLE_DATE.to_i).to_f
324     docid = (DOCID_SCALE - DOCID_SCALE/(Math::E**(-(t/TIME_SCALE)) + 1)).to_i
325     begin
326       while @assigned_docids.member? [docid].pack("N")
327         docid -= 1
328       end
329     rescue
330     end
331     @assigned_docids[[docid].pack("N")] = ''
332     docid
333   end
334
335   def synchronize &b
336     @index_mutex.synchronize &b
337   end
338
339   def run_query xapian_query, offset, limit, checkatleast=0
340     synchronize do
341       @enquire.query = xapian_query
342       @enquire.mset(offset, limit-offset, checkatleast)
343     end
344   end
345
346   def run_query_ids xapian_query, offset, limit
347     matchset = run_query xapian_query, offset, limit
348     matchset.matches.map { |r| r.document.data }
349   end
350
351   Q = Xapian::Query
352   def build_xapian_query opts
353     labels = ([opts[:label]] + (opts[:labels] || [])).compact
354     neglabels = [:spam, :deleted, :killed].reject { |l| (labels.include? l) || opts.member?("load_#{l}".intern) }
355     pos_terms, neg_terms = [], []
356
357     pos_terms << mkterm(:type, 'mail')
358     pos_terms.concat(labels.map { |l| mkterm(:label,l) })
359     pos_terms << opts[:qobj] if opts[:qobj]
360     pos_terms << mkterm(:source_id, opts[:source_id]) if opts[:source_id]
361
362     if opts[:participants]
363       participant_terms = opts[:participants].map { |p| mkterm(:email,:any, (Redwood::Person === p) ? p.email : p) }
364       pos_terms << Q.new(Q::OP_OR, participant_terms)
365     end
366
367     neg_terms.concat(neglabels.map { |l| mkterm(:label,l) })
368
369     pos_query = Q.new(Q::OP_AND, pos_terms)
370     neg_query = Q.new(Q::OP_OR, neg_terms)
371
372     if neg_query.empty?
373       pos_query
374     else
375       Q.new(Q::OP_AND_NOT, [pos_query, neg_query])
376     end
377   end
378
379   def index_message m, opts
380     terms = []
381     text = []
382
383     subject_text = m.indexable_subject
384     body_text = m.indexable_body
385
386     # Person names are indexed with several prefixes
387     person_termer = lambda do |d|
388       lambda do |p|
389         ["#{d}_name", "name", "body"].each do |x|
390           text << [p.name, PREFIX[x]]
391         end if p.name
392         [d, :any].each { |x| terms << mkterm(:email, x, p.email) }
393       end
394     end
395
396     person_termer[:from][m.from] if m.from
397     (m.to+m.cc+m.bcc).each(&(person_termer[:to]))
398
399     terms << mkterm(:date,m.date) if m.date
400     m.labels.each { |t| terms << mkterm(:label,t) }
401     terms << mkterm(:type, 'mail')
402     terms << mkterm(:source_id, m.source.id)
403     m.attachments.each do |a|
404       a =~ /\.(\w+)$/ or next
405       t = mkterm(:attachment_extension, $1)
406       terms << t
407     end
408
409     # Full text search content
410     text << [subject_text, PREFIX['subject']]
411     text << [subject_text, PREFIX['body']]
412     text << [body_text, PREFIX['body']]
413     m.attachments.each { |a| text << [a, PREFIX['attachment']] }
414
415     truncated_date = if m.date < MIN_DATE
416       debug "warning: adjusting too-low date #{m.date} for indexing"
417       MIN_DATE
418     elsif m.date > MAX_DATE
419       debug "warning: adjusting too-high date #{m.date} for indexing"
420       MAX_DATE
421     else
422       m.date
423     end
424
425     # Date value for range queries
426     date_value = begin
427       Xapian.sortable_serialise truncated_date.to_i
428     rescue TypeError
429       Xapian.sortable_serialise 0
430     end
431
432     doc = Xapian::Document.new
433     docid = @docids[m.id] || assign_docid(m, truncated_date)
434
435     @term_generator.document = doc
436     text.each { |text,prefix| @term_generator.index_text text, 1, prefix }
437     terms.each { |term| doc.add_term term if term.length <= MAX_TERM_LENGTH }
438     doc.add_value DATE_VALUENO, date_value
439     doc.data = m.id
440
441     @xapian.replace_document docid, doc
442     @docids[m.id] = docid
443   end
444
445   # Construct a Xapian term
446   def mkterm type, *args
447     case type
448     when :label
449       PREFIX['label'] + args[0].to_s.downcase
450     when :type
451       PREFIX['type'] + args[0].to_s.downcase
452     when :date
453       PREFIX['date'] + args[0].getutc.strftime("%Y%m%d%H%M%S")
454     when :email
455       case args[0]
456       when :from then PREFIX['from_email']
457       when :to then PREFIX['to_email']
458       when :any then PREFIX['email']
459       else raise "Invalid email term type #{args[0]}"
460       end + args[1].to_s.downcase
461     when :source_id
462       PREFIX['source_id'] + args[0].to_s.downcase
463     when :attachment_extension
464       PREFIX['attachment_extension'] + args[0].to_s.downcase
465     else
466       raise "Invalid term type #{type}"
467     end
468   end
469
470   # Join all the given message-ids into a single thread
471   def union_threads ids
472     seen_threads = Set.new
473     related = Set.new
474
475     # Get all the ids that will be in the new thread
476     ids.each do |id|
477       related << id
478       thread_id = @thread_ids[id]
479       if thread_id && !seen_threads.member?(thread_id)
480         thread_members = @thread_members[thread_id]
481         related.merge thread_members
482         seen_threads << thread_id
483       end
484     end
485
486     # Pick a leader and move all the others to its thread
487     a = related.to_a
488     best, *rest = a.sort_by { |x| x.hash }
489     @thread_members[best] = a
490     @thread_ids[best] = best
491     rest.each do |x|
492       @thread_members.delete x
493       @thread_ids[x] = best
494     end
495   end
496 end
497
498 end
499
500 class MarshalledGDBM < GDBM
501   def []= k, v
502     super k, Marshal.dump(v)
503   end
504
505   def [] k
506     v = super k
507     v ? Marshal.load(v) : nil
508   end
509 end