From 4d82ef881de074268183a5534e66ffa0922af698 Mon Sep 17 00:00:00 2001 From: Rich Lane Date: Sun, 16 Aug 2009 12:35:30 -0700 Subject: [PATCH] move all GDBM data into Xapian Keeping everything in Xapian means much better consistency in case of a crash. Thread killing is now supported. --- lib/sup/xapian_index.rb | 177 +++++++++++++++++++++++++--------------- 1 file changed, 110 insertions(+), 67 deletions(-) diff --git a/lib/sup/xapian_index.rb b/lib/sup/xapian_index.rb index 861c2a3..1753f96 100644 --- a/lib/sup/xapian_index.rb +++ b/lib/sup/xapian_index.rb @@ -1,5 +1,4 @@ require 'xapian' -require 'gdbm' require 'set' module Redwood @@ -23,12 +22,6 @@ class XapianIndex < BaseIndex end def load_index - @entries = MarshalledGDBM.new File.join(@dir, "entries.db") - @docids = MarshalledGDBM.new File.join(@dir, "docids.db") - @thread_members = MarshalledGDBM.new File.join(@dir, "thread_members.db") - @thread_ids = MarshalledGDBM.new File.join(@dir, "thread_ids.db") - @assigned_docids = GDBM.new File.join(@dir, "assigned_docids.db") - @xapian = Xapian::WritableDatabase.new(File.join(@dir, "xapian"), Xapian::DB_CREATE_OR_OPEN) @term_generator = Xapian::TermGenerator.new() @term_generator.stemmer = Xapian::Stem.new(STEM_LANGUAGE) @@ -48,19 +41,19 @@ class XapianIndex < BaseIndex end def contains_id? id - synchronize { @entries.member? id } + synchronize { find_docid(id) && true } end def source_for_id id - synchronize { @entries[id][:source_id] } + synchronize { get_entry(id)[:source_id] } end def delete id - synchronize { @xapian.delete_document @docids[id] } + synchronize { @xapian.delete_document mkterm(:msgid, id) } end def build_message id - entry = synchronize { @entries[id] } + entry = synchronize { get_entry id } return unless entry source = SourceManager[entry[:source_id]] @@ -88,7 +81,7 @@ class XapianIndex < BaseIndex end def sync_message m, opts={} - entry = synchronize { @entries[m.id] } + entry = synchronize { get_entry m.id } snippet = m.snippet entry ||= {} labels = m.labels @@ -113,9 +106,7 @@ class XapianIndex < BaseIndex m.labels.each { |l| LabelManager << l } synchronize do - index_message m, opts - union_threads([m.id] + m.refs + m.replytos) - @entries[m.id] = d + index_message m, d, opts end true end @@ -147,8 +138,26 @@ class XapianIndex < BaseIndex def each_message_in_thread_for m, opts={} # TODO thread by subject # TODO handle killed threads - ids = synchronize { @thread_members[@thread_ids[m.id]] } || [] - ids.select { |id| contains_id? id }.each { |id| yield id, lambda { build_message id } } + return unless doc = find_doc(m.id) + queue = doc.value(THREAD_VALUENO).split(',') + msgids = [m.id] + seen_threads = Set.new + seen_messages = Set.new [m.id] + while not queue.empty? + thread_id = queue.pop + next if seen_threads.member? thread_id + return false if thread_killed? thread_id + seen_threads << thread_id + docs = term_docids(mkterm(:thread, thread_id)).map { |x| @xapian.document x } + docs.each do |doc| + msgid = doc.value MSGID_VALUENO + next if seen_messages.member? msgid + msgids << msgid + seen_messages << msgid + queue.concat doc.value(THREAD_VALUENO).split(',') + end + end + msgids.each { |id| yield id, lambda { build_message id } } true end @@ -297,11 +306,16 @@ class XapianIndex < BaseIndex 'label' => 'L', 'source_id' => 'I', 'attachment_extension' => 'O', + 'msgid' => 'Q', + 'thread' => 'H', + 'ref' => 'R', } PREFIX = NORMAL_PREFIX.merge BOOLEAN_PREFIX - DATE_VALUENO = 0 + MSGID_VALUENO = 0 + THREAD_VALUENO = 1 + DATE_VALUENO = 2 MAX_TERM_LENGTH = 245 @@ -317,14 +331,48 @@ class XapianIndex < BaseIndex def assign_docid m, truncated_date t = (truncated_date.to_i - MIDDLE_DATE.to_i).to_f docid = (DOCID_SCALE - DOCID_SCALE/(Math::E**(-(t/TIME_SCALE)) + 1)).to_i + while docid > 0 and docid_exists? docid + docid -= 1 + end + docid > 0 ? docid : nil + end + + # XXX is there a better way? + def docid_exists? docid begin - while @assigned_docids.member? [docid].pack("N") - docid -= 1 - end - rescue + @xapian.doclength docid + true + rescue RuntimeError #Xapian::DocNotFoundError + raise unless $!.message =~ /DocNotFoundError/ + false end - @assigned_docids[[docid].pack("N")] = '' - docid + end + + def term_docids term + @xapian.postlist(term).map { |x| x.docid } + end + + def find_docid id + term_docids(mkterm(:msgid,id)).tap { |x| fail unless x.size <= 1 }.first + end + + def find_doc id + return unless docid = find_docid(id) + @xapian.document docid + end + + def get_id docid + return unless doc = @xapian.document(docid) + doc.value MSGID_VALUENO + end + + def get_entry id + return unless doc = find_doc(id) + Marshal.load doc.data + end + + def thread_killed? thread_id + not run_query(Q.new(Q::OP_AND, mkterm(:thread, thread_id), mkterm(:label, :Killed)), 0, 1).empty? end def synchronize &b @@ -340,7 +388,7 @@ class XapianIndex < BaseIndex def run_query_ids xapian_query, offset, limit matchset = run_query xapian_query, offset, limit - matchset.matches.map { |r| r.document.data } + matchset.matches.map { |r| r.document.value MSGID_VALUENO } end Q = Xapian::Query @@ -371,7 +419,7 @@ class XapianIndex < BaseIndex end end - def index_message m, opts + def index_message m, entry, opts terms = [] text = [] @@ -394,6 +442,7 @@ class XapianIndex < BaseIndex terms << mkterm(:date,m.date) if m.date m.labels.each { |t| terms << mkterm(:label,t) } terms << mkterm(:type, 'mail') + terms << mkterm(:msgid, m.id) terms << mkterm(:source_id, m.source.id) m.attachments.each do |a| a =~ /\.(\w+)$/ or next @@ -401,6 +450,23 @@ class XapianIndex < BaseIndex terms << t end + ## Thread membership + children = term_docids(mkterm(:ref, m.id)).map { |docid| @xapian.document docid } + parent_ids = m.refs + m.replytos + parents = parent_ids.map { |id| find_doc id }.compact + thread_members = SavingHash.new { [] } + (children + parents).each do |doc2| + thread_ids = doc2.value(THREAD_VALUENO).split ',' + thread_ids.each { |thread_id| thread_members[thread_id] << doc2 } + end + + thread_ids = thread_members.empty? ? [m.id] : thread_members.keys + + thread_ids.each { |thread_id| terms << mkterm(:thread, thread_id) } + parent_ids.each do |ref| + terms << mkterm(:ref, ref) + end + # Full text search content text << [subject_text, PREFIX['subject']] text << [subject_text, PREFIX['body']] @@ -424,17 +490,29 @@ class XapianIndex < BaseIndex Xapian.sortable_serialise 0 end - doc = Xapian::Document.new - docid = @docids[m.id] || assign_docid(m, truncated_date) + docid = nil + unless doc = find_doc(m.id) + doc = Xapian::Document.new + if not docid = assign_docid(m, truncated_date) + # Could be triggered by spam + Redwood::log "warning: docid underflow, dropping #{m.id.inspect}" + return + end + else + doc.clear_terms + doc.clear_values + docid = doc.docid + end @term_generator.document = doc text.each { |text,prefix| @term_generator.index_text text, 1, prefix } terms.each { |term| doc.add_term term if term.length <= MAX_TERM_LENGTH } + doc.add_value MSGID_VALUENO, m.id + doc.add_value THREAD_VALUENO, (thread_ids * ',') doc.add_value DATE_VALUENO, date_value - doc.data = m.id + doc.data = Marshal.dump entry @xapian.replace_document docid, doc - @docids[m.id] = docid end # Construct a Xapian term @@ -457,48 +535,13 @@ class XapianIndex < BaseIndex PREFIX['source_id'] + args[0].to_s.downcase when :attachment_extension PREFIX['attachment_extension'] + args[0].to_s.downcase + when :msgid, :ref, :thread + PREFIX[type.to_s] + args[0][0...(MAX_TERM_LENGTH-1)] else raise "Invalid term type #{type}" end end - # Join all the given message-ids into a single thread - def union_threads ids - seen_threads = Set.new - related = Set.new - - # Get all the ids that will be in the new thread - ids.each do |id| - related << id - thread_id = @thread_ids[id] - if thread_id && !seen_threads.member?(thread_id) - thread_members = @thread_members[thread_id] - related.merge thread_members - seen_threads << thread_id - end - end - - # Pick a leader and move all the others to its thread - a = related.to_a - best, *rest = a.sort_by { |x| x.hash } - @thread_members[best] = a - @thread_ids[best] = best - rest.each do |x| - @thread_members.delete x - @thread_ids[x] = best - end - end end end - -class MarshalledGDBM < GDBM - def []= k, v - super k, Marshal.dump(v) - end - - def [] k - v = super k - v ? Marshal.load(v) : nil - end -end -- 2.43.0