X-Git-Url: https://git.cworth.org/git?a=blobdiff_plain;f=lib%2Fsup%2Fxapian_index.rb;fp=lib%2Fsup%2Fxapian_index.rb;h=1395601367ef49fdebb9ef5019a82a07a0c83ca3;hb=46f8e5116f38c8248fdc8553db18f8d2132a1f46;hp=dbf66431bc263ada5420b9a981f766c9ca5675d4;hpb=1cc47908f928359145badddbdbf154d741829965;p=sup diff --git a/lib/sup/xapian_index.rb b/lib/sup/xapian_index.rb index dbf6643..1395601 100644 --- a/lib/sup/xapian_index.rb +++ b/lib/sup/xapian_index.rb @@ -1,5 +1,4 @@ require 'xapian' -require 'gdbm' require 'set' module Redwood @@ -9,6 +8,7 @@ module Redwood # for searching due to precomputing thread membership. class XapianIndex < BaseIndex STEM_LANGUAGE = "english" + INDEX_VERSION = '1' ## dates are converted to integers for xapian, and are used for document ids, ## so we must ensure they're reasonably valid. this typically only affect @@ -23,13 +23,18 @@ class XapianIndex < BaseIndex end def load_index - @entries = MarshalledGDBM.new File.join(@dir, "entries.db") - @docids = MarshalledGDBM.new File.join(@dir, "docids.db") - @thread_members = MarshalledGDBM.new File.join(@dir, "thread_members.db") - @thread_ids = MarshalledGDBM.new File.join(@dir, "thread_ids.db") - @assigned_docids = GDBM.new File.join(@dir, "assigned_docids.db") - - @xapian = Xapian::WritableDatabase.new(File.join(@dir, "xapian"), Xapian::DB_CREATE_OR_OPEN) + path = File.join(@dir, 'xapian') + if File.exists? path + @xapian = Xapian::WritableDatabase.new(path, Xapian::DB_OPEN) + db_version = @xapian.get_metadata 'version' + db_version = '0' if db_version.empty? + if db_version != INDEX_VERSION + fail "This Sup version expects a v#{INDEX_VERSION} index, but you have an existing v#{db_version} index. Please downgrade to your previous version and dump your labels before upgrading to this version (then run sup-sync --restore)." + end + else + @xapian = Xapian::WritableDatabase.new(path, Xapian::DB_CREATE) + @xapian.set_metadata 'version', INDEX_VERSION + end @term_generator = Xapian::TermGenerator.new() @term_generator.stemmer = Xapian::Stem.new(STEM_LANGUAGE) @enquire = Xapian::Enquire.new @xapian @@ -48,43 +53,35 @@ class XapianIndex < BaseIndex end def contains_id? id - synchronize { @entries.member? id } + synchronize { find_docid(id) && true } end def source_for_id id - synchronize { @entries[id][:source_id] } + synchronize { get_entry(id)[:source_id] } end def delete id - synchronize { @xapian.delete_document @docids[id] } + synchronize { @xapian.delete_document mkterm(:msgid, id) } end def build_message id - entry = synchronize { @entries[id] } + entry = synchronize { get_entry id } return unless entry source = SourceManager[entry[:source_id]] raise "invalid source #{entry[:source_id]}" unless source - mk_addrs = lambda { |l| l.map { |e,n| "#{n} <#{e}>" } * ', ' } - mk_refs = lambda { |l| l.map { |r| "<#{r}>" } * ' ' } - fake_header = { - 'message-id' => entry[:message_id], - 'date' => Time.at(entry[:date]), - 'subject' => entry[:subject], - 'from' => mk_addrs[[entry[:from]]], - 'to' => mk_addrs[entry[:to]], - 'cc' => mk_addrs[entry[:cc]], - 'bcc' => mk_addrs[entry[:bcc]], - 'reply-tos' => mk_refs[entry[:replytos]], - 'references' => mk_refs[entry[:refs]], - } - - m = Message.new :source => source, :source_info => entry[:source_info], - :labels => entry[:labels], - :snippet => entry[:snippet] - m.parse_header fake_header - m + m = Message.new :source => source, :source_info => entry[:source_info], + :labels => entry[:labels], :snippet => entry[:snippet] + + mk_person = lambda { |x| Person.new(*x.reverse!) } + entry[:from] = mk_person[entry[:from]] + entry[:to].map!(&mk_person) + entry[:cc].map!(&mk_person) + entry[:bcc].map!(&mk_person) + + m.load_from_index! entry + m end def add_message m; sync_message m end @@ -92,7 +89,7 @@ class XapianIndex < BaseIndex def update_message_state m; sync_message m end def sync_message m, opts={} - entry = synchronize { @entries[m.id] } + entry = synchronize { get_entry m.id } snippet = m.snippet entry ||= {} labels = m.labels @@ -117,9 +114,7 @@ class XapianIndex < BaseIndex labels.each { |l| LabelManager << l } synchronize do - index_message m, opts - union_threads([m.id] + m.refs + m.replytos) - @entries[m.id] = d + index_message m, d, opts end true end @@ -152,8 +147,26 @@ class XapianIndex < BaseIndex def each_message_in_thread_for m, opts={} # TODO thread by subject # TODO handle killed threads - ids = synchronize { @thread_members[@thread_ids[m.id]] } || [] - ids.select { |id| contains_id? id }.each { |id| yield id, lambda { build_message id } } + return unless doc = find_doc(m.id) + queue = doc.value(THREAD_VALUENO).split(',') + msgids = [m.id] + seen_threads = Set.new + seen_messages = Set.new [m.id] + while not queue.empty? + thread_id = queue.pop + next if seen_threads.member? thread_id + return false if thread_killed? thread_id + seen_threads << thread_id + docs = term_docids(mkterm(:thread, thread_id)).map { |x| @xapian.document x } + docs.each do |doc| + msgid = doc.value MSGID_VALUENO + next if seen_messages.member? msgid + msgids << msgid + seen_messages << msgid + queue.concat doc.value(THREAD_VALUENO).split(',') + end + end + msgids.each { |id| yield id, lambda { build_message id } } true end @@ -302,11 +315,16 @@ class XapianIndex < BaseIndex 'label' => 'L', 'source_id' => 'I', 'attachment_extension' => 'O', + 'msgid' => 'Q', + 'thread' => 'H', + 'ref' => 'R', } PREFIX = NORMAL_PREFIX.merge BOOLEAN_PREFIX - DATE_VALUENO = 0 + MSGID_VALUENO = 0 + THREAD_VALUENO = 1 + DATE_VALUENO = 2 MAX_TERM_LENGTH = 245 @@ -322,14 +340,50 @@ class XapianIndex < BaseIndex def assign_docid m, truncated_date t = (truncated_date.to_i - MIDDLE_DATE.to_i).to_f docid = (DOCID_SCALE - DOCID_SCALE/(Math::E**(-(t/TIME_SCALE)) + 1)).to_i + while docid > 0 and docid_exists? docid + docid -= 1 + end + docid > 0 ? docid : nil + end + + # XXX is there a better way? + def docid_exists? docid begin - while @assigned_docids.member? [docid].pack("N") - docid -= 1 - end - rescue + @xapian.doclength docid + true + rescue RuntimeError #Xapian::DocNotFoundError + raise unless $!.message =~ /DocNotFoundError/ + false end - @assigned_docids[[docid].pack("N")] = '' - docid + end + + def term_docids term + @xapian.postlist(term).map { |x| x.docid } + end + + def find_docid id + docids = term_docids(mkterm(:msgid,id)) + fail unless docids.size <= 1 + docids.first + end + + def find_doc id + return unless docid = find_docid(id) + @xapian.document docid + end + + def get_id docid + return unless doc = @xapian.document(docid) + doc.value MSGID_VALUENO + end + + def get_entry id + return unless doc = find_doc(id) + Marshal.load doc.data + end + + def thread_killed? thread_id + not run_query(Q.new(Q::OP_AND, mkterm(:thread, thread_id), mkterm(:label, :Killed)), 0, 1).empty? end def synchronize &b @@ -345,7 +399,7 @@ class XapianIndex < BaseIndex def run_query_ids xapian_query, offset, limit matchset = run_query xapian_query, offset, limit - matchset.matches.map { |r| r.document.data } + matchset.matches.map { |r| r.document.value MSGID_VALUENO } end Q = Xapian::Query @@ -376,7 +430,7 @@ class XapianIndex < BaseIndex end end - def index_message m, opts + def index_message m, entry, opts terms = [] text = [] @@ -399,6 +453,7 @@ class XapianIndex < BaseIndex terms << mkterm(:date,m.date) if m.date m.labels.each { |t| terms << mkterm(:label,t) } terms << mkterm(:type, 'mail') + terms << mkterm(:msgid, m.id) terms << mkterm(:source_id, m.source.id) m.attachments.each do |a| a =~ /\.(\w+)$/ or next @@ -406,6 +461,23 @@ class XapianIndex < BaseIndex terms << t end + ## Thread membership + children = term_docids(mkterm(:ref, m.id)).map { |docid| @xapian.document docid } + parent_ids = m.refs + m.replytos + parents = parent_ids.map { |id| find_doc id }.compact + thread_members = SavingHash.new { [] } + (children + parents).each do |doc2| + thread_ids = doc2.value(THREAD_VALUENO).split ',' + thread_ids.each { |thread_id| thread_members[thread_id] << doc2 } + end + + thread_ids = thread_members.empty? ? [m.id] : thread_members.keys + + thread_ids.each { |thread_id| terms << mkterm(:thread, thread_id) } + parent_ids.each do |ref| + terms << mkterm(:ref, ref) + end + # Full text search content text << [subject_text, PREFIX['subject']] text << [subject_text, PREFIX['body']] @@ -429,17 +501,29 @@ class XapianIndex < BaseIndex Xapian.sortable_serialise 0 end - doc = Xapian::Document.new - docid = @docids[m.id] || assign_docid(m, truncated_date) + docid = nil + unless doc = find_doc(m.id) + doc = Xapian::Document.new + if not docid = assign_docid(m, truncated_date) + # Could be triggered by spam + Redwood::log "warning: docid underflow, dropping #{m.id.inspect}" + return + end + else + doc.clear_terms + doc.clear_values + docid = doc.docid + end @term_generator.document = doc text.each { |text,prefix| @term_generator.index_text text, 1, prefix } terms.each { |term| doc.add_term term if term.length <= MAX_TERM_LENGTH } + doc.add_value MSGID_VALUENO, m.id + doc.add_value THREAD_VALUENO, (thread_ids * ',') doc.add_value DATE_VALUENO, date_value - doc.data = m.id + doc.data = Marshal.dump entry @xapian.replace_document docid, doc - @docids[m.id] = docid end # Construct a Xapian term @@ -462,48 +546,13 @@ class XapianIndex < BaseIndex PREFIX['source_id'] + args[0].to_s.downcase when :attachment_extension PREFIX['attachment_extension'] + args[0].to_s.downcase + when :msgid, :ref, :thread + PREFIX[type.to_s] + args[0][0...(MAX_TERM_LENGTH-1)] else raise "Invalid term type #{type}" end end - # Join all the given message-ids into a single thread - def union_threads ids - seen_threads = Set.new - related = Set.new - - # Get all the ids that will be in the new thread - ids.each do |id| - related << id - thread_id = @thread_ids[id] - if thread_id && !seen_threads.member?(thread_id) - thread_members = @thread_members[thread_id] - related.merge thread_members - seen_threads << thread_id - end - end - - # Pick a leader and move all the others to its thread - a = related.to_a - best, *rest = a.sort_by { |x| x.hash } - @thread_members[best] = a - @thread_ids[best] = best - rest.each do |x| - @thread_members.delete x - @thread_ids[x] = best - end - end end end - -class MarshalledGDBM < GDBM - def []= k, v - super k, Marshal.dump(v) - end - - def [] k - v = super k - v ? Marshal.load(v) : nil - end -end