Couple big changes in this commit, but they're all tied together.
Index.sync_message is refactored into three separate methods: add_message,
update_message and update_message_state. The intention is that add_message is
called for new messages only, update_message is called for changing the method
body on disk (e.g. when we see multiple copies of the same message, or by
DraftManager when the text of a draft is changes), and update_message_state is
called when the labels on a message change. So indexes that differentiate those
operations can exhibit more natural performance characteristics.
Also, PollManager.add_messages_from has been renamed to each_message_from and
changed significantly. It now *only* yields successive messages; it does not
load the index version of the message, and it does not auto-add the message to
the index. (In fact, it ignores the result of the block.) There's also a new
method called add_new_message that calls Index.add_message and then relays the
update to other GUI elements.
There was a lot of refactoring of sup-sync that was part of this. Probably
not strictly necessary but it was too hard to untangle the changes.
end
end
+class Set
+ def to_s; to_a * ',' end
+end
+
def time
startt = Time.now
yield
Options controlling WHICH messages sup-sync operates on:
EOS
opt :new, "Operate on new messages only. Don't scan over the entire source. (Default.)", :short => :none
- opt :changed, "Scan over the entire source for messages that have been deleted, altered, or moved from another source. (In the case of mbox sources, this includes all messages AFTER an altered message.)"
+ opt :changed, "Scan over the entire source for messages that have been deleted, altered, or moved from another source."
opt :restored, "Operate only on those messages included in a dump file as specified by --restore which have changed state."
opt :all, "Operate on all messages in the source, regardless of newness or changedness."
opt :start_at, "For --changed, --restored and --all, start at a particular offset.", :type => :int
opt :discard, "Discard any message state in the index and use the default source state. Dangerous!", :short => :none
opt :archive, "When using the default source state, mark messages as archived.", :short => "-x"
opt :read, "When using the default source state, mark messages as read."
- opt :extra_labels, "When using the default source state, also apply these user-defined labels. Should be a comma-separated list.", :type => String, :short => :none
+ opt :extra_labels, "When using the default source state, also apply these user-defined labels (a comma-separated list)", :default => "", :short => :none
text <<EOS
Redwood::start
index = Redwood::Index.new
-restored_state =
- if opts[:restore]
- dump = {}
- $stderr.puts "Loading state dump from #{opts[:restore]}..."
- IO.foreach opts[:restore] do |l|
- l =~ /^(\S+) \((.*?)\)$/ or raise "Can't read dump line: #{l.inspect}"
- mid, labels = $1, $2
- dump[mid] = labels.to_set_of_symbols
- end
- $stderr.puts "Read #{dump.size} entries from dump file."
- dump
- else
- {}
+restored_state = if opts[:restore]
+ dump = {}
+ $stderr.puts "Loading state dump from #{opts[:restore]}..."
+ IO.foreach opts[:restore] do |l|
+ l =~ /^(\S+) \((.*?)\)$/ or raise "Can't read dump line: #{l.inspect}"
+ mid, labels = $1, $2
+ dump[mid] = labels.to_set_of_symbols
end
+ $stderr.puts "Read #{dump.size} entries from dump file."
+ dump
+else
+ {}
+end
seen = {}
index.lock_or_die
begin
index.load
- sources = ARGV.map do |uri|
- Redwood::SourceManager.source_for uri or Trollop::die "Unknown source: #{uri}. Did you add it with sup-add first?"
+ sources = if opts[:all_sources]
+ Redwood::SourceManager.sources
+ elsif ARGV.empty?
+ Redwood::SourceManager.usual_sources
+ else
+ ARGV.map do |uri|
+ Redwood::SourceManager.source_for uri or Trollop::die "Unknown source: #{uri}. Did you add it with sup-add first?"
+ end
end
-
- sources = Redwood::SourceManager.usual_sources if sources.empty?
- sources = Redwood::SourceManager.sources if opts[:all_sources]
+ ## for all target specifications except for only-new messages, reset the
+ ## source to the beginning (or to the user-specified starting point.)
unless target == :new
if opts[:start_at]
Trollop::die :start_at, "can only be used on one source" unless sources.size == 1
sources.each { |s| s.reset! }
end
end
-
+
sources.each do |source|
$stderr.puts "Scanning #{source}..."
num_added = num_updated = num_scanned = num_restored = 0
last_info_time = start_time = Time.now
- Redwood::PollManager.add_messages_from source, :force_overwrite => true do |m_old, m, offset|
+ Redwood::PollManager.each_message_from source do |m|
num_scanned += 1
seen[m.id] = true
-
- if Time.now - last_info_time > PROGRESS_UPDATE_INTERVAL
- last_info_time = Time.now
- elapsed = last_info_time - start_time
- start = opts[:start_at] || source.start_offset
- pctdone = 100.0 * (source.cur_offset - start).to_f / (source.end_offset - start).to_f
- remaining = (100.0 - pctdone) * (elapsed.to_f / pctdone)
- $stderr.printf "## read %dm (about %.0f%%) @ %.1fm/s. %s elapsed, about %s remaining\n", num_scanned, pctdone, num_scanned / elapsed, elapsed.to_time_s, remaining.to_time_s
+ old_m = index.build_message m.id
+
+ case target
+ when :changed
+ ## skip this message if we're operating only on changed messages, the
+ ## message is in the index, and it's unchanged from what the source is
+ ## reporting.
+ next if old_m && old_m.source.id == m.source.id && old_m.source_info == m.source_info
+ when :restored
+ ## skip if we're operating on restored messages, and this one
+ ## ain't (or we wouldn't be making a change)
+ next unless old_m && restored_state[m.id] && restored_state[m.id] != old_m.labels
+ when :new
+ ## nothing to do; we'll consider all messages starting at the start offset, which
+ ## hasn't been changed.
+ when :all
+ ## nothing to do; we'll consider all messages starting at the start offset, which
+ ## was reset to the beginning above.
end
- ## skip if we're operating only on changed messages, the message
- ## is in the index, and it's unchanged from what the source is
- ## reporting.
- next if target == :changed && m_old && m_old.source.id == source.id && m_old.source_info == offset
-
- ## get the state currently in the index
- index_state = m_old.labels.dup if m_old
-
- ## skip if we're operating on restored messages, and this one
- ## ain't (or we wouldn't be making a change)
- next if target == :restored && (!restored_state[m.id] || !index_state || restored_state[m.id] == index_state)
-
- ## m.labels is the default source labels. tweak these according
- ## to default source state modification flags.
+ ## tweak source labels according to commandline arguments if necessary
m.labels.delete :inbox if opts[:archive]
m.labels.delete :unread if opts[:read]
- m.labels += opts[:extra_labels].to_set_of_symbols(",") if opts[:extra_labels]
-
- ## assign message labels based on the operation we're performing
- case op
- when :asis
+ m.labels += opts[:extra_labels].to_set_of_symbols(",")
+
+ ## decide what to do based on message labels and the operation we're performing
+ dothis, new_labels = case
+ when (op == :restore) && restored_state[m.id] && old_m && (old_m.labels != restored_state[m.id])
+ [:update_message_state, restored_state[m.id]]
+ when op == :discard
+ if old_m && (old_m.labels != m.labels)
+ [:update_message_state, m.labels]
+ else
+ # don't do anything
+ end
+ else
## duplicate behavior of poll mode: if index_state is non-nil, this is a newer
## version of an older message, so merge in any new labels except :unread and
## :inbox.
- m.labels = ((m.labels - [:unread, :inbox]) + index_state) if index_state
- when :restore
- ## if the entry exists on disk
- if restored_state[m.id]
- m.labels = restored_state[m.id]
- num_restored += 1
- elsif index_state
- m.labels = index_state
+ ##
+ ## TODO: refactor such that this isn't duplicated
+ if old_m
+ m.labels = old_m.labels + (m.labels - [:unread, :inbox])
+ :update_message
+ else
+ :add_message
end
- when :discard
- ## nothin! use default source labels
+ end
+
+ ## now, actually do the operation
+ case dothis
+ when :add_message
+ $stderr.puts "Adding new message #{source}###{m.source_info} with labels #{m.labels}" if opts[:verbose]
+ index.add_message m unless opts[:dry_run]
+ num_added += 1
+ when :update_message
+ $stderr.puts "Updating message #{source}###{m.source_info}; labels #{old_m.labels} => #{m.labels}; offset #{old_m.source_info} => #{m.source_info}" if opts[:verbose]
+ index.update_message m unless opts[:dry_run]
+ num_updated += 1
+ when :update_message_state
+ $stderr.puts "Changing flags for #{source}##{m.source_info} from #{m.labels} to #{new_labels}"
+ m.labels = new_labels
+ index.update_message_state m unless opts[:dry_run]
+ num_updated += 1
end
if Time.now - last_info_time > PROGRESS_UPDATE_INTERVAL
remaining = (100.0 - pctdone) * (elapsed.to_f / pctdone)
$stderr.printf "## read %dm (about %.0f%%) @ %.1fm/s. %s elapsed, about %s remaining\n", num_scanned, pctdone, num_scanned / elapsed, elapsed.to_time_s, remaining.to_time_s
end
-
- if index_state.nil?
- puts "Adding message #{source}##{offset} from #{m.from} with state {#{m.labels.to_a * ', '}}" if opts[:verbose]
- num_added += 1
- else
- puts "Updating message #{source}##{offset}, source #{m_old.source.id} => #{source.id}, offset #{m_old.source_info} => #{offset}, state {#{index_state.to_a * ', '}} => {#{m.labels.to_a * ', '}}" if opts[:verbose]
- num_updated += 1
- end
-
- opts[:dry_run] ? nil : m
end
+
$stderr.puts "Scanned #{num_scanned}, added #{num_added}, updated #{num_updated} messages from #{source}."
$stderr.puts "Restored state on #{num_restored} (#{100.0 * num_restored / num_scanned}%) messages." if num_restored > 0
end
num_dropped = num_moved = num_scanned = 0
out_fp = Tempfile.new "sup-sync-back-#{source.id}"
- Redwood::PollManager.add_messages_from source do |m_old, m, offset|
+ Redwood::PollManager.each_message_from source do |m|
num_scanned += 1
- if m_old
+ if(m_old = index.build_message(m.id))
labels = m_old.labels
if labels.member? :deleted
if opts[:drop_deleted]
- puts "Dropping deleted message #{source}##{offset}" if opts[:verbose]
+ puts "Dropping deleted message #{source}##{m.source_info}" if opts[:verbose]
num_dropped += 1
elsif opts[:move_deleted] && labels.member?(:deleted)
- puts "Moving deleted message #{source}##{offset}" if opts[:verbose]
+ puts "Moving deleted message #{source}##{m.source_info}" if opts[:verbose]
save m, deleted_fp unless opts[:dry_run]
num_moved += 1
end
elsif labels.member? :spam
if opts[:drop_spam]
- puts "Dropping spam message #{source}##{offset}" if opts[:verbose]
+ puts "Dropping spam message #{source}##{m.source_info}" if opts[:verbose]
num_dropped += 1
elsif opts[:move_spam] && labels.member?(:spam)
- puts "Moving spam message #{source}##{offset}" if opts[:verbose]
+ puts "Moving spam message #{source}##{m.source_info}" if opts[:verbose]
save m, spam_fp unless opts[:dry_run]
num_moved += 1
end
else
save m, out_fp unless opts[:dry_run]
end
-
- nil # don't actually add anything!
end
$stderr.puts "Scanned #{num_scanned}, dropped #{num_dropped}, moved #{num_moved} messages from #{source}."
modified_sources << source if num_dropped > 0 || num_moved > 0
puts "From #{m.from}, subject: #{m.subj}" if opts[:very_verbose]
puts "#{m.id}: {#{old_labels.to_a.join ','}} => {#{m.labels.to_a.join ','}}" if opts[:verbose]
puts if opts[:very_verbose]
- index.sync_message m unless opts[:dry_run]
+ index.update_message_state m unless opts[:dry_run]
end
if Time.now - last_info_time > 60
File.open(fn, "w") { |f| yield f }
my_message = nil
- @source.each do |thisoffset, theselabels|
- m = Message.build_from_source @source, thisoffset
- m.labels = theselabels
- Index.sync_message m
- UpdateManager.relay self, :added, m
- my_message = m if thisoffset == offset
+ PollManager.each_message_from(@source) do |m|
+ PollManager.add_new_message m
+ my_message = m
end
my_message
end
end
+ def add_message m; sync_message m end
+ def update_message m; sync_message m end
+ def update_message_state m; sync_message m end
+
def sync_message m, opts={}
entry = @index[m.id]
@index.add_document d
end
end
+ private :sync_message
def save_index fn=File.join(@dir, "ferret")
# don't have to do anything, apparently
unimplemented
end
- ## Syncs the message to the index, replacing any previous version. adding
- ## either way. Index state will be determined by the message's #labels
- ## accessor.
- def sync_message m, opts={}
- unimplemented
- end
+ def add_message m; unimplemented end
+ def update_message m; unimplemented end
+ def update_message_state m; unimplemented end
def save_index fn
unimplemented
## don't tempt me.
def sanitize_message_id mid; mid.gsub(/(\s|[^\000-\177])+/, "")[0..254] end
- def save index
+ def save_state index
return unless @dirty
- index.sync_message self
+ index.update_message_state self
@dirty = false
true
end
BufferManager.say("Saving threads...") do |say_id|
dirty_threads.each_with_index do |t, i|
BufferManager.say "Saving modified thread #{i + 1} of #{dirty_threads.length}...", say_id
- t.save Index
+ t.save_state Index
end
end
end
num = 0
numi = 0
- add_messages_from source do |m_old, m, offset|
- ## always preserve the labels on disk.
- m.labels = (m.labels - [:unread, :inbox]) + m_old.labels if m_old
- yield "Found message at #{offset} with labels {#{m.labels.to_a * ', '}}"
- unless m_old
+ each_message_from source do |m|
+ yield "Found message at #{m.source_info} with labels {#{m.labels.to_a * ', '}}"
+ old_m = Index.build_message m.id
+ if old_m
+ if old_m.source.id != source.id || old_m.source_info != m.source_info
+ ## here we merge labels between new and old versions, but we don't let the new
+ ## message add :unread or :inbox labels. (they can exist in the old version,
+ ## just not be added.)
+ new_labels = old_m.labels + (m.labels - [:unread, :inbox])
+ yield "Message at #{m.source_info} is an updated of an old message. Updating labels from #{m.labels.to_a * ','} => #{new_labels.to_a * ','}"
+ m.labels = new_labels
+ Index.update_message m
+ else
+ yield "Skipping already-imported message at #{m.source_info}"
+ end
+ else
+ yield "Found new message at #{m.source_info} with labels #{m.labels.to_a * ','}"
+ Index.add_message m
num += 1
from_and_subj << [m.from && m.from.longname, m.subj]
if (m.labels & [:inbox, :spam, :deleted, :killed]) == Set.new([:inbox])
[total_num, total_numi, from_and_subj, from_and_subj_inbox]
end
- ## this is the main mechanism for adding new messages to the
- ## index. it's called both by sup-sync and by PollMode.
- ##
- ## for each message in the source, starting from the source's
- ## starting offset, this methods yields the message, the source
- ## offset, and the index entry on disk (if any). it expects the
- ## yield to return the message (possibly altered in some way), and
- ## then adds it (if new) or updates it (if previously seen).
+ ## like Source#each, but yields successive Message objects, which have their
+ ## labels and offsets set correctly.
##
- ## the labels of the yielded message are the default source
- ## labels. it is likely that callers will want to replace these with
- ## the index labels, if they exist, so that state is not lost when
- ## e.g. a new version of a message from a mailing list comes in.
- def add_messages_from source, opts={}
+ ## this is the primary mechanism for iterating over messages from a source.
+ def each_message_from source, opts={}
begin
return if source.done? || source.has_errors?
- source.each do |offset, default_labels|
+ source.each do |offset, source_labels|
if source.has_errors?
Redwood::log "error loading messages from #{source}: #{source.error.message}"
return
end
- m_new = Message.build_from_source source, offset
- m_old = Index.build_message m_new.id
-
- m_new.labels += default_labels + (source.archived? ? [] : [:inbox])
- m_new.labels << :sent if source.uri.eql?(SentManager.source_uri)
- m_new.labels.delete :unread if m_new.source_marked_read?
- m_new.labels.each { |l| LabelManager << l }
+ m = Message.build_from_source source, offset
+ m.labels += source_labels + (source.archived? ? [] : [:inbox])
+ m.labels.delete :unread if m.source_marked_read? # preserve read status if possible
+ m.labels.each { |l| LabelManager << l }
- HookManager.run "before-add-message", :message => m_new
- m_ret = yield(m_old, m_new, offset) or next if block_given?
- Index.sync_message m_ret, opts
- UpdateManager.relay self, :added, m_ret unless m_old
+ HookManager.run "before-add-message", :message => m
+ yield m
end
rescue SourceError => e
Redwood::log "problem getting messages from #{source}: #{e.message}"
Redwood::report_broken_sources :force_to_top => true
end
end
+
+ ## TODO: see if we can do this within PollMode rather than by calling this
+ ## method.
+ ##
+ ## a wrapper around Index.add_message that calls the proper hooks,
+ ## does the gui callback stuff, etc.
+ def add_new_message m
+ Index.add_message m
+ UpdateManager.relay self, :added, m
+ end
end
end
def write_sent_message date, from_email, &block
@source.store_message date, from_email, &block
- PollManager.add_messages_from(@source) do |m_old, m, offset|
+ PollManager.each_message_from(@source) do |m|
m.remove_label :unread
- m
+ PollManager.add_new_message m
end
end
end
def uri; 'sup://sent' end
def id; 9998; end
- def labels; [:inbox]; end
+ def labels; [:inbox, :sent]; end
end
end
def set_labels l; each { |m, *o| m && m.labels = l }; end
def has_label? t; any? { |m, *o| m && m.has_label?(t) }; end
- def save index; each { |m, *o| m && m.save(index) }; end
+ def save_state index; each { |m, *o| m && m.save_state(index) }; end
def direct_participants
map { |m, *o| [m.from] + m.to if m }.flatten.compact.uniq
m
end
+ def add_message m; sync_message m end
+ def update_message m; sync_message m end
+ def update_message_state m; sync_message m end
+
def sync_message m, opts={}
entry = synchronize { @entries[m.id] }
snippet = m.snippet
end
true
end
+ private :sync_message
def num_results_for query={}
xapian_query = build_xapian_query query