DELAY = 300
def initialize
- @polling = false
+ @mutex = Mutex.new
@last_poll = nil
self.class.i_am_the_instance self
end
def buffer
- BufferManager.spawn_unless_exists("<poll for new messages>", :hidden => true) do
- PollMode.new
- end
+ BufferManager.spawn_unless_exists("<poll for new messages>", :hidden => true) { PollMode.new }
end
def poll
end
end
+ ## TODO: merge this with sup-import
def do_poll
- return [0, 0] if @polling
- @polling = true
- found = {}
- total_num = 0
- total_numi = 0
+ total_num = total_numi = 0
+ @mutex.synchronize do
+ found = {}
+ Index.usual_sources.each do |source|
+ next if source.broken? || source.done?
- Index.usual_sources.each do |source|
- next if source.done?
- yield "Loading from #{source}... "
+ yield "Loading from #{source}... "
+ start_offset = nil
+ num = 0
+ num_inbox = 0
- start_offset = nil
- num = 0
- num_inbox = 0
- begin
source.each do |offset, labels|
+ break if source.broken?
start_offset ||= offset
yield "Found message at #{offset} with labels #{labels * ', '}"
- m = Redwood::Message.new :source => source, :source_info => offset,
- :labels => labels
- if found[m.id]
- yield "Skipping duplicate message #{m.id}"
- next
- else
+
+ begin
+ begin
+ m = Redwood::Message.new :source => source, :source_info => offset, :labels => labels
+ rescue MessageFormatError => e
+ yield "Non-fatal error loading message #{source}##{offset}: #{e.message}"
+ next
+ end
+
+ if found[m.id]
+ yield "Skipping duplicate message #{m.id}"
+ next
+ end
found[m.id] = true
- end
- if Index.add_message m
- UpdateManager.relay :add, m
- num += 1
- total_num += 1
- total_numi += 1 if m.labels.include? :inbox
- end
- end
+ if Index.add_message m
+ UpdateManager.relay :add, m
+ num += 1
+ total_num += 1
+ total_numi += 1 if m.labels.include? :inbox
+ end
- if num % 1000 == 0 && num > 0
- elapsed = Time.now - start
- pctdone = (offset.to_f - start_offset) / (source.total.to_f - start_offset)
- remaining = (source.end_offset.to_f - offset.to_f) * (elapsed.to_f / (offset.to_f - start_offset))
- yield "## #{num} (#{(pctdone * 100.0)}% done) read; #{elapsed.to_time_s} elapsed; est. #{remaining.to_time_s} remaining"
+ if num % 1000 == 0 && num > 0
+ elapsed = Time.now - start
+ pctdone = source.pct_done
+ remaining = (100.0 - pctdone) * (elapsed.to_f / pctdone)
+ yield "## #{num} (#{pctdone}% done) read; #{elapsed.to_time_s} elapsed; est. #{remaining.to_time_s} remaining"
+ end
+ rescue SourceError => e
+ msg = "Fatal error loading from #{source}: #{e.message}"
+ Redwood::log msg
+ yield msg
+ break
+ end
end
- rescue SourceError, MessageFormatError => e
- msg = "#{source.broken? ? 'Fatal' : 'Non-fatal'} error loading from #{source}: #{e.message}"
- Redwood::log msg
- yield msg
+ yield "Found #{num} messages" unless num == 0
end
- yield "Found #{num} messages" unless num == 0
+ yield "Done polling; loaded #{total_num} new messages total"
+ @last_poll = Time.now
+ @polling = false
end
- yield "Done polling; loaded #{total_num} new messages total"
- @last_poll = Time.now
- @polling = false
[total_num, total_numi]
end
end