]> git.cworth.org Git - sup/blobdiff - lib/sup/xapian_index.rb
Merge branch 'xapian-updates'
[sup] / lib / sup / xapian_index.rb
index dbf66431bc263ada5420b9a981f766c9ca5675d4..1395601367ef49fdebb9ef5019a82a07a0c83ca3 100644 (file)
@@ -1,5 +1,4 @@
 require 'xapian'
-require 'gdbm'
 require 'set'
 
 module Redwood
@@ -9,6 +8,7 @@ module Redwood
 # for searching due to precomputing thread membership.
 class XapianIndex < BaseIndex
   STEM_LANGUAGE = "english"
+  INDEX_VERSION = '1'
 
   ## dates are converted to integers for xapian, and are used for document ids,
   ## so we must ensure they're reasonably valid. this typically only affect
@@ -23,13 +23,18 @@ class XapianIndex < BaseIndex
   end
 
   def load_index
-    @entries = MarshalledGDBM.new File.join(@dir, "entries.db")
-    @docids = MarshalledGDBM.new File.join(@dir, "docids.db")
-    @thread_members = MarshalledGDBM.new File.join(@dir, "thread_members.db")
-    @thread_ids = MarshalledGDBM.new File.join(@dir, "thread_ids.db")
-    @assigned_docids = GDBM.new File.join(@dir, "assigned_docids.db")
-
-    @xapian = Xapian::WritableDatabase.new(File.join(@dir, "xapian"), Xapian::DB_CREATE_OR_OPEN)
+    path = File.join(@dir, 'xapian')
+    if File.exists? path
+      @xapian = Xapian::WritableDatabase.new(path, Xapian::DB_OPEN)
+      db_version = @xapian.get_metadata 'version'
+      db_version = '0' if db_version.empty?
+      if db_version != INDEX_VERSION
+        fail "This Sup version expects a v#{INDEX_VERSION} index, but you have an existing v#{db_version} index. Please downgrade to your previous version and dump your labels before upgrading to this version (then run sup-sync --restore)."
+      end
+    else
+      @xapian = Xapian::WritableDatabase.new(path, Xapian::DB_CREATE)
+      @xapian.set_metadata 'version', INDEX_VERSION
+    end
     @term_generator = Xapian::TermGenerator.new()
     @term_generator.stemmer = Xapian::Stem.new(STEM_LANGUAGE)
     @enquire = Xapian::Enquire.new @xapian
@@ -48,43 +53,35 @@ class XapianIndex < BaseIndex
   end
 
   def contains_id? id
-    synchronize { @entries.member? id }
+    synchronize { find_docid(id) && true }
   end
 
   def source_for_id id
-    synchronize { @entries[id][:source_id] }
+    synchronize { get_entry(id)[:source_id] }
   end
 
   def delete id
-    synchronize { @xapian.delete_document @docids[id] }
+    synchronize { @xapian.delete_document mkterm(:msgid, id) }
   end
 
   def build_message id
-    entry = synchronize { @entries[id] }
+    entry = synchronize { get_entry id }
     return unless entry
 
     source = SourceManager[entry[:source_id]]
     raise "invalid source #{entry[:source_id]}" unless source
 
-    mk_addrs = lambda { |l| l.map { |e,n| "#{n} <#{e}>" } * ', ' }
-    mk_refs = lambda { |l| l.map { |r| "<#{r}>" } * ' ' }
-    fake_header = {
-      'message-id' => entry[:message_id],
-      'date' => Time.at(entry[:date]),
-      'subject' => entry[:subject],
-      'from' => mk_addrs[[entry[:from]]],
-      'to' => mk_addrs[entry[:to]],
-      'cc' => mk_addrs[entry[:cc]],
-      'bcc' => mk_addrs[entry[:bcc]],
-      'reply-tos' => mk_refs[entry[:replytos]],
-      'references' => mk_refs[entry[:refs]],
-     }
-
-      m = Message.new :source => source, :source_info => entry[:source_info],
-                  :labels => entry[:labels],
-                  :snippet => entry[:snippet]
-      m.parse_header fake_header
-      m
+    m = Message.new :source => source, :source_info => entry[:source_info],
+                    :labels => entry[:labels], :snippet => entry[:snippet]
+
+    mk_person = lambda { |x| Person.new(*x.reverse!) }
+    entry[:from] = mk_person[entry[:from]]
+    entry[:to].map!(&mk_person)
+    entry[:cc].map!(&mk_person)
+    entry[:bcc].map!(&mk_person)
+
+    m.load_from_index! entry
+    m
   end
 
   def add_message m; sync_message m end
@@ -92,7 +89,7 @@ class XapianIndex < BaseIndex
   def update_message_state m; sync_message m end
 
   def sync_message m, opts={}
-    entry = synchronize { @entries[m.id] }
+    entry = synchronize { get_entry m.id }
     snippet = m.snippet
     entry ||= {}
     labels = m.labels
@@ -117,9 +114,7 @@ class XapianIndex < BaseIndex
     labels.each { |l| LabelManager << l }
 
     synchronize do
-      index_message m, opts
-      union_threads([m.id] + m.refs + m.replytos)
-      @entries[m.id] = d
+      index_message m, d, opts
     end
     true
   end
@@ -152,8 +147,26 @@ class XapianIndex < BaseIndex
   def each_message_in_thread_for m, opts={}
     # TODO thread by subject
     # TODO handle killed threads
-    ids = synchronize { @thread_members[@thread_ids[m.id]] } || []
-    ids.select { |id| contains_id? id }.each { |id| yield id, lambda { build_message id } }
+    return unless doc = find_doc(m.id)
+    queue = doc.value(THREAD_VALUENO).split(',')
+    msgids = [m.id]
+    seen_threads = Set.new
+    seen_messages = Set.new [m.id]
+    while not queue.empty?
+      thread_id = queue.pop
+      next if seen_threads.member? thread_id
+      return false if thread_killed? thread_id
+      seen_threads << thread_id
+      docs = term_docids(mkterm(:thread, thread_id)).map { |x| @xapian.document x }
+      docs.each do |doc|
+        msgid = doc.value MSGID_VALUENO
+        next if seen_messages.member? msgid
+        msgids << msgid
+        seen_messages << msgid
+        queue.concat doc.value(THREAD_VALUENO).split(',')
+      end
+    end
+    msgids.each { |id| yield id, lambda { build_message id } }
     true
   end
 
@@ -302,11 +315,16 @@ class XapianIndex < BaseIndex
     'label' => 'L',
     'source_id' => 'I',
     'attachment_extension' => 'O',
+    'msgid' => 'Q',
+    'thread' => 'H',
+    'ref' => 'R',
   }
 
   PREFIX = NORMAL_PREFIX.merge BOOLEAN_PREFIX
 
-  DATE_VALUENO = 0
+  MSGID_VALUENO = 0
+  THREAD_VALUENO = 1
+  DATE_VALUENO = 2
 
   MAX_TERM_LENGTH = 245
 
@@ -322,14 +340,50 @@ class XapianIndex < BaseIndex
   def assign_docid m, truncated_date
     t = (truncated_date.to_i - MIDDLE_DATE.to_i).to_f
     docid = (DOCID_SCALE - DOCID_SCALE/(Math::E**(-(t/TIME_SCALE)) + 1)).to_i
+    while docid > 0 and docid_exists? docid
+      docid -= 1
+    end
+    docid > 0 ? docid : nil
+  end
+
+  # XXX is there a better way?
+  def docid_exists? docid
     begin
-      while @assigned_docids.member? [docid].pack("N")
-        docid -= 1
-      end
-    rescue
+      @xapian.doclength docid
+      true
+    rescue RuntimeError #Xapian::DocNotFoundError
+      raise unless $!.message =~ /DocNotFoundError/
+      false
     end
-    @assigned_docids[[docid].pack("N")] = ''
-    docid
+  end
+
+  def term_docids term
+    @xapian.postlist(term).map { |x| x.docid }
+  end
+
+  def find_docid id
+    docids = term_docids(mkterm(:msgid,id))
+    fail unless docids.size <= 1
+    docids.first
+  end
+
+  def find_doc id
+    return unless docid = find_docid(id)
+    @xapian.document docid
+  end
+
+  def get_id docid
+    return unless doc = @xapian.document(docid)
+    doc.value MSGID_VALUENO
+  end
+
+  def get_entry id
+    return unless doc = find_doc(id)
+    Marshal.load doc.data
+  end
+
+  def thread_killed? thread_id
+    not run_query(Q.new(Q::OP_AND, mkterm(:thread, thread_id), mkterm(:label, :Killed)), 0, 1).empty?
   end
 
   def synchronize &b
@@ -345,7 +399,7 @@ class XapianIndex < BaseIndex
 
   def run_query_ids xapian_query, offset, limit
     matchset = run_query xapian_query, offset, limit
-    matchset.matches.map { |r| r.document.data }
+    matchset.matches.map { |r| r.document.value MSGID_VALUENO }
   end
 
   Q = Xapian::Query
@@ -376,7 +430,7 @@ class XapianIndex < BaseIndex
     end
   end
 
-  def index_message m, opts
+  def index_message m, entry, opts
     terms = []
     text = []
 
@@ -399,6 +453,7 @@ class XapianIndex < BaseIndex
     terms << mkterm(:date,m.date) if m.date
     m.labels.each { |t| terms << mkterm(:label,t) }
     terms << mkterm(:type, 'mail')
+    terms << mkterm(:msgid, m.id)
     terms << mkterm(:source_id, m.source.id)
     m.attachments.each do |a|
       a =~ /\.(\w+)$/ or next
@@ -406,6 +461,23 @@ class XapianIndex < BaseIndex
       terms << t
     end
 
+    ## Thread membership
+    children = term_docids(mkterm(:ref, m.id)).map { |docid| @xapian.document docid }
+    parent_ids = m.refs + m.replytos
+    parents = parent_ids.map { |id| find_doc id }.compact
+    thread_members = SavingHash.new { [] }
+    (children + parents).each do |doc2|
+      thread_ids = doc2.value(THREAD_VALUENO).split ','
+      thread_ids.each { |thread_id| thread_members[thread_id] << doc2 }
+    end
+
+    thread_ids = thread_members.empty? ? [m.id] : thread_members.keys
+
+    thread_ids.each { |thread_id| terms << mkterm(:thread, thread_id) }
+    parent_ids.each do |ref|
+      terms << mkterm(:ref, ref)
+    end
+
     # Full text search content
     text << [subject_text, PREFIX['subject']]
     text << [subject_text, PREFIX['body']]
@@ -429,17 +501,29 @@ class XapianIndex < BaseIndex
       Xapian.sortable_serialise 0
     end
 
-    doc = Xapian::Document.new
-    docid = @docids[m.id] || assign_docid(m, truncated_date)
+    docid = nil
+    unless doc = find_doc(m.id)
+      doc = Xapian::Document.new
+      if not docid = assign_docid(m, truncated_date)
+        # Could be triggered by spam
+        Redwood::log "warning: docid underflow, dropping #{m.id.inspect}"
+        return
+      end
+    else
+      doc.clear_terms
+      doc.clear_values
+      docid = doc.docid
+    end
 
     @term_generator.document = doc
     text.each { |text,prefix| @term_generator.index_text text, 1, prefix }
     terms.each { |term| doc.add_term term if term.length <= MAX_TERM_LENGTH }
+    doc.add_value MSGID_VALUENO, m.id
+    doc.add_value THREAD_VALUENO, (thread_ids * ',')
     doc.add_value DATE_VALUENO, date_value
-    doc.data = m.id
+    doc.data = Marshal.dump entry
 
     @xapian.replace_document docid, doc
-    @docids[m.id] = docid
   end
 
   # Construct a Xapian term
@@ -462,48 +546,13 @@ class XapianIndex < BaseIndex
       PREFIX['source_id'] + args[0].to_s.downcase
     when :attachment_extension
       PREFIX['attachment_extension'] + args[0].to_s.downcase
+    when :msgid, :ref, :thread
+      PREFIX[type.to_s] + args[0][0...(MAX_TERM_LENGTH-1)]
     else
       raise "Invalid term type #{type}"
     end
   end
 
-  # Join all the given message-ids into a single thread
-  def union_threads ids
-    seen_threads = Set.new
-    related = Set.new
-
-    # Get all the ids that will be in the new thread
-    ids.each do |id|
-      related << id
-      thread_id = @thread_ids[id]
-      if thread_id && !seen_threads.member?(thread_id)
-        thread_members = @thread_members[thread_id]
-        related.merge thread_members
-        seen_threads << thread_id
-      end
-    end
-
-    # Pick a leader and move all the others to its thread
-    a = related.to_a
-    best, *rest = a.sort_by { |x| x.hash }
-    @thread_members[best] = a
-    @thread_ids[best] = best
-    rest.each do |x|
-      @thread_members.delete x
-      @thread_ids[x] = best
-    end
-  end
 end
 
 end
-
-class MarshalledGDBM < GDBM
-  def []= k, v
-    super k, Marshal.dump(v)
-  end
-
-  def [] k
-    v = super k
-    v ? Marshal.load(v) : nil
-  end
-end