From: William Morgan Date: Sat, 22 Aug 2009 14:17:02 +0000 (-0400) Subject: Merge branch 'xapian-updates' into next X-Git-Url: https://git.cworth.org/git?a=commitdiff_plain;h=f5d9f2ce214a3f90e9716a8b2dc35dc74ef5a9de;hp=-c;p=sup Merge branch 'xapian-updates' into next --- f5d9f2ce214a3f90e9716a8b2dc35dc74ef5a9de diff --combined lib/sup/xapian_index.rb index dbf6643,87a5441..85f6ef0 --- a/lib/sup/xapian_index.rb +++ b/lib/sup/xapian_index.rb @@@ -1,5 -1,4 +1,4 @@@ require 'xapian' - require 'gdbm' require 'set' module Redwood @@@ -9,6 -8,7 +8,7 @@@ # for searching due to precomputing thread membership. class XapianIndex < BaseIndex STEM_LANGUAGE = "english" + INDEX_VERSION = '1' ## dates are converted to integers for xapian, and are used for document ids, ## so we must ensure they're reasonably valid. this typically only affect @@@ -23,13 -23,18 +23,18 @@@ end def load_index - @entries = MarshalledGDBM.new File.join(@dir, "entries.db") - @docids = MarshalledGDBM.new File.join(@dir, "docids.db") - @thread_members = MarshalledGDBM.new File.join(@dir, "thread_members.db") - @thread_ids = MarshalledGDBM.new File.join(@dir, "thread_ids.db") - @assigned_docids = GDBM.new File.join(@dir, "assigned_docids.db") - - @xapian = Xapian::WritableDatabase.new(File.join(@dir, "xapian"), Xapian::DB_CREATE_OR_OPEN) + path = File.join(@dir, 'xapian') + if File.exists? path + @xapian = Xapian::WritableDatabase.new(path, Xapian::DB_OPEN) + db_version = @xapian.get_metadata 'version' + db_version = '0' if db_version.empty? + if db_version != INDEX_VERSION + fail "This Sup version expects a v#{INDEX_VERSION} index, but you have an existing v#{db_version} index. Please downgrade to your previous version and dump your labels before upgrading to this version (then run sup-sync --restore)." + end + else + @xapian = Xapian::WritableDatabase.new(path, Xapian::DB_CREATE) + @xapian.set_metadata 'version', INDEX_VERSION + end @term_generator = Xapian::TermGenerator.new() @term_generator.stemmer = Xapian::Stem.new(STEM_LANGUAGE) @enquire = Xapian::Enquire.new @xapian @@@ -48,19 -53,19 +53,19 @@@ end def contains_id? id - synchronize { @entries.member? id } + synchronize { find_docid(id) && true } end def source_for_id id - synchronize { @entries[id][:source_id] } + synchronize { get_entry(id)[:source_id] } end def delete id - synchronize { @xapian.delete_document @docids[id] } + synchronize { @xapian.delete_document mkterm(:msgid, id) } end def build_message id - entry = synchronize { @entries[id] } + entry = synchronize { get_entry id } return unless entry source = SourceManager[entry[:source_id]] @@@ -87,12 -92,8 +92,12 @@@ m end + def add_message m; sync_message m end + def update_message m; sync_message m end + def update_message_state m; sync_message m end + def sync_message m, opts={} - entry = synchronize { @entries[m.id] } + entry = synchronize { get_entry m.id } snippet = m.snippet entry ||= {} labels = m.labels @@@ -104,7 -105,7 +109,7 @@@ :source_info => m.source_info, :date => (entry[:date] || m.date), :snippet => snippet, - :labels => labels.uniq, + :labels => labels, :from => (entry[:from] || [m.from.email, m.from.name]), :to => (entry[:to] || m.to.map { |p| [p.email, p.name] }), :cc => (entry[:cc] || m.cc.map { |p| [p.email, p.name] }), @@@ -114,16 -115,13 +119,14 @@@ :replytos => (entry[:replytos] || m.replytos), } - m.labels.each { |l| LabelManager << l } + labels.each { |l| LabelManager << l } synchronize do - index_message m, opts - union_threads([m.id] + m.refs + m.replytos) - @entries[m.id] = d + index_message m, d, opts end true end + private :sync_message def num_results_for query={} xapian_query = build_xapian_query query @@@ -152,8 -150,26 +155,26 @@@ def each_message_in_thread_for m, opts={} # TODO thread by subject # TODO handle killed threads - ids = synchronize { @thread_members[@thread_ids[m.id]] } || [] - ids.select { |id| contains_id? id }.each { |id| yield id, lambda { build_message id } } + return unless doc = find_doc(m.id) + queue = doc.value(THREAD_VALUENO).split(',') + msgids = [m.id] + seen_threads = Set.new + seen_messages = Set.new [m.id] + while not queue.empty? + thread_id = queue.pop + next if seen_threads.member? thread_id + return false if thread_killed? thread_id + seen_threads << thread_id + docs = term_docids(mkterm(:thread, thread_id)).map { |x| @xapian.document x } + docs.each do |doc| + msgid = doc.value MSGID_VALUENO + next if seen_messages.member? msgid + msgids << msgid + seen_messages << msgid + queue.concat doc.value(THREAD_VALUENO).split(',') + end + end + msgids.each { |id| yield id, lambda { build_message id } } true end @@@ -221,10 -237,10 +242,10 @@@ field, name = $1, ($3 || $4) case field when "filename" - Redwood::log "filename - translated #{field}:#{name} to attachment:\"#{name.downcase}\"" + debug "filename: translated #{field}:#{name} to attachment:\"#{name.downcase}\"" "attachment:\"#{name.downcase}\"" when "filetype" - Redwood::log "filetype - translated #{field}:#{name} to attachment_extension:#{name.downcase}" + debug "filetype: translated #{field}:#{name} to attachment_extension:#{name.downcase}" "attachment_extension:#{name.downcase}" end end @@@ -238,13 -254,13 +259,13 @@@ if realdate case field when "after" - Redwood::log "chronic: translated #{field}:#{datestr} to #{realdate.end}" + debug "chronic: translated #{field}:#{datestr} to #{realdate.end}" "date:#{realdate.end.to_i}..#{lastdate}" when "before" - Redwood::log "chronic: translated #{field}:#{datestr} to #{realdate.begin}" + debug "chronic: translated #{field}:#{datestr} to #{realdate.begin}" "date:#{firstdate}..#{realdate.end.to_i}" else - Redwood::log "chronic: translated #{field}:#{datestr} to #{realdate}" + debug "chronic: translated #{field}:#{datestr} to #{realdate}" "date:#{realdate.begin.to_i}..#{realdate.end.to_i}" end else @@@ -302,11 -318,16 +323,16 @@@ 'label' => 'L', 'source_id' => 'I', 'attachment_extension' => 'O', + 'msgid' => 'Q', + 'thread' => 'H', + 'ref' => 'R', } PREFIX = NORMAL_PREFIX.merge BOOLEAN_PREFIX - DATE_VALUENO = 0 + MSGID_VALUENO = 0 + THREAD_VALUENO = 1 + DATE_VALUENO = 2 MAX_TERM_LENGTH = 245 @@@ -322,14 -343,48 +348,48 @@@ def assign_docid m, truncated_date t = (truncated_date.to_i - MIDDLE_DATE.to_i).to_f docid = (DOCID_SCALE - DOCID_SCALE/(Math::E**(-(t/TIME_SCALE)) + 1)).to_i + while docid > 0 and docid_exists? docid + docid -= 1 + end + docid > 0 ? docid : nil + end + + # XXX is there a better way? + def docid_exists? docid begin - while @assigned_docids.member? [docid].pack("N") - docid -= 1 - end - rescue + @xapian.doclength docid + true + rescue RuntimeError #Xapian::DocNotFoundError + raise unless $!.message =~ /DocNotFoundError/ + false end - @assigned_docids[[docid].pack("N")] = '' - docid + end + + def term_docids term + @xapian.postlist(term).map { |x| x.docid } + end + + def find_docid id + term_docids(mkterm(:msgid,id)).tap { |x| fail unless x.size <= 1 }.first + end + + def find_doc id + return unless docid = find_docid(id) + @xapian.document docid + end + + def get_id docid + return unless doc = @xapian.document(docid) + doc.value MSGID_VALUENO + end + + def get_entry id + return unless doc = find_doc(id) + Marshal.load doc.data + end + + def thread_killed? thread_id + not run_query(Q.new(Q::OP_AND, mkterm(:thread, thread_id), mkterm(:label, :Killed)), 0, 1).empty? end def synchronize &b @@@ -345,7 -400,7 +405,7 @@@ def run_query_ids xapian_query, offset, limit matchset = run_query xapian_query, offset, limit - matchset.matches.map { |r| r.document.data } + matchset.matches.map { |r| r.document.value MSGID_VALUENO } end Q = Xapian::Query @@@ -376,7 -431,7 +436,7 @@@ end end - def index_message m, opts + def index_message m, entry, opts terms = [] text = [] @@@ -399,6 -454,7 +459,7 @@@ terms << mkterm(:date,m.date) if m.date m.labels.each { |t| terms << mkterm(:label,t) } terms << mkterm(:type, 'mail') + terms << mkterm(:msgid, m.id) terms << mkterm(:source_id, m.source.id) m.attachments.each do |a| a =~ /\.(\w+)$/ or next @@@ -406,6 -462,23 +467,23 @@@ terms << t end + ## Thread membership + children = term_docids(mkterm(:ref, m.id)).map { |docid| @xapian.document docid } + parent_ids = m.refs + m.replytos + parents = parent_ids.map { |id| find_doc id }.compact + thread_members = SavingHash.new { [] } + (children + parents).each do |doc2| + thread_ids = doc2.value(THREAD_VALUENO).split ',' + thread_ids.each { |thread_id| thread_members[thread_id] << doc2 } + end + + thread_ids = thread_members.empty? ? [m.id] : thread_members.keys + + thread_ids.each { |thread_id| terms << mkterm(:thread, thread_id) } + parent_ids.each do |ref| + terms << mkterm(:ref, ref) + end + # Full text search content text << [subject_text, PREFIX['subject']] text << [subject_text, PREFIX['body']] @@@ -413,10 -486,10 +491,10 @@@ m.attachments.each { |a| text << [a, PREFIX['attachment']] } truncated_date = if m.date < MIN_DATE - Redwood::log "warning: adjusting too-low date #{m.date} for indexing" + debug "warning: adjusting too-low date #{m.date} for indexing" MIN_DATE elsif m.date > MAX_DATE - Redwood::log "warning: adjusting too-high date #{m.date} for indexing" + debug "warning: adjusting too-high date #{m.date} for indexing" MAX_DATE else m.date @@@ -429,17 -502,29 +507,29 @@@ Xapian.sortable_serialise 0 end - doc = Xapian::Document.new - docid = @docids[m.id] || assign_docid(m, truncated_date) + docid = nil + unless doc = find_doc(m.id) + doc = Xapian::Document.new + if not docid = assign_docid(m, truncated_date) + # Could be triggered by spam + Redwood::log "warning: docid underflow, dropping #{m.id.inspect}" + return + end + else + doc.clear_terms + doc.clear_values + docid = doc.docid + end @term_generator.document = doc text.each { |text,prefix| @term_generator.index_text text, 1, prefix } terms.each { |term| doc.add_term term if term.length <= MAX_TERM_LENGTH } + doc.add_value MSGID_VALUENO, m.id + doc.add_value THREAD_VALUENO, (thread_ids * ',') doc.add_value DATE_VALUENO, date_value - doc.data = m.id + doc.data = Marshal.dump entry @xapian.replace_document docid, doc - @docids[m.id] = docid end # Construct a Xapian term @@@ -462,48 -547,13 +552,13 @@@ PREFIX['source_id'] + args[0].to_s.downcase when :attachment_extension PREFIX['attachment_extension'] + args[0].to_s.downcase + when :msgid, :ref, :thread + PREFIX[type.to_s] + args[0][0...(MAX_TERM_LENGTH-1)] else raise "Invalid term type #{type}" end end - # Join all the given message-ids into a single thread - def union_threads ids - seen_threads = Set.new - related = Set.new - - # Get all the ids that will be in the new thread - ids.each do |id| - related << id - thread_id = @thread_ids[id] - if thread_id && !seen_threads.member?(thread_id) - thread_members = @thread_members[thread_id] - related.merge thread_members - seen_threads << thread_id - end - end - - # Pick a leader and move all the others to its thread - a = related.to_a - best, *rest = a.sort_by { |x| x.hash } - @thread_members[best] = a - @thread_ids[best] = best - rest.each do |x| - @thread_members.delete x - @thread_ids[x] = best - end - end end end - - class MarshalledGDBM < GDBM - def []= k, v - super k, Marshal.dump(v) - end - - def [] k - v = super k - v ? Marshal.load(v) : nil - end - end