From 58167d7abcd4159c5524ceca67f0ec58c96b0af7 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jos=C3=A9=20Fonseca?= Date: Fri, 26 Oct 2012 18:13:10 +0100 Subject: [PATCH] More efficient synchronous multi-threading. Mimic a relay race, where only one thread is running at one time. This gracefully degrades to the old behavior for single-threaded traces, and is faster for multi-threaded traces, while still ensuring that the recorded order is preserved. --- CMakeLists.txt | 1 - common/os_thread.hpp | 2 +- common/os_workqueue.hpp | 48 ------- common/workqueue.cpp | 118 ----------------- retrace/retrace_main.cpp | 264 +++++++++++++++++++++++++++------------ 5 files changed, 182 insertions(+), 251 deletions(-) delete mode 100644 common/os_workqueue.hpp delete mode 100644 common/workqueue.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index f019049..0e174e6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -328,7 +328,6 @@ add_library (common STATIC common/image_pnm.cpp common/image_png.cpp common/${os} - common/workqueue.cpp ) set_target_properties (common PROPERTIES diff --git a/common/os_thread.hpp b/common/os_thread.hpp index 6c0b488..5fbdaab 100644 --- a/common/os_thread.hpp +++ b/common/os_thread.hpp @@ -348,7 +348,7 @@ private: DWORD id = 0; _native_handle = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)f, (LPVOID)arg, 0, &id); #else - pthread_create(&_native_handle, NULL, f, arg); + pthread_create(&_native_handle, NULL, ( void *(*) (void *))f, arg); #endif } diff --git a/common/os_workqueue.hpp b/common/os_workqueue.hpp deleted file mode 100644 index 047d3db..0000000 --- a/common/os_workqueue.hpp +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef _OS_WORKQUEUE_HPP_ -#define _OS_WORKQUEUE_HPP_ - -#include - -#include "os_thread.hpp" - -namespace os { - - -class WorkQueue; - -class WorkQueueWork { -protected: - friend class WorkQueue; - -public: - virtual void run(void) = 0; - virtual ~WorkQueueWork(void) { } -}; - -class WorkQueue { - std::queue work_queue; - - bool busy; - bool exit_workqueue; - os::condition_variable wake_cond; - os::condition_variable complete_cond; - - os::mutex mutex; - - os::thread thread; - - void wake_up_thread(void); - int run_tasks(void); -public: - void thread_entry(void); - void queue_work(WorkQueueWork *work); - void flush(void); - void destroy(void); - - WorkQueue(void); - ~WorkQueue(); -}; - -} - -#endif diff --git a/common/workqueue.cpp b/common/workqueue.cpp deleted file mode 100644 index fcd697a..0000000 --- a/common/workqueue.cpp +++ /dev/null @@ -1,118 +0,0 @@ -#include -#include - -#include "os_workqueue.hpp" - -namespace os -{ - -/** - * return 0 on batch complete, -1 on thread exit request. - */ -int WorkQueue::run_tasks(void) -{ - os::unique_lock lock(mutex); - - while (work_queue.empty() && !exit_workqueue) { - wake_cond.wait(lock); - } - - if (exit_workqueue) { - return -1; - } - - std::queue batch; - std::swap(work_queue, batch); - busy = true; - - lock.unlock(); - - assert(!batch.empty()); - while (!batch.empty()) { - WorkQueueWork *task; - - task = batch.front(); - task->run(); - batch.pop(); - delete task; - } - - lock.lock(); - - busy = false; - complete_cond.signal(); - - return 0; -} - -/* Must be called with WorkQueue::lock held */ -void WorkQueue::wake_up_thread(void) -{ - wake_cond.signal(); -} - -void WorkQueue::queue_work(WorkQueueWork *task) -{ - mutex.lock(); - work_queue.push(task); - wake_up_thread(); - mutex.unlock(); -} - -void WorkQueue::flush(void) -{ - os::unique_lock lock(mutex); - while (!work_queue.empty() || busy) { - complete_cond.wait(lock); - } -} - -void WorkQueue::thread_entry(void) -{ - int err; - - do { - err = run_tasks(); - } while (!err); -} - -void WorkQueue::destroy(void) -{ - mutex.lock(); - exit_workqueue = true; - wake_up_thread(); - mutex.unlock(); -} - -static -#ifdef _WIN32 -DWORD WINAPI -#else -void * -#endif -WorkQueue__entry_thunk(void *data) -{ - WorkQueue *thread = static_cast(data); - - thread->thread_entry(); - -#ifdef _WIN32 - return 0; -#else - return NULL; -#endif -} - -WorkQueue::WorkQueue(void) : - busy(false), - exit_workqueue(false), - thread(WorkQueue__entry_thunk, this) -{ -} - -WorkQueue::~WorkQueue(void) -{ - thread.join(); -} - -} diff --git a/retrace/retrace_main.cpp b/retrace/retrace_main.cpp index 9b029c8..f9eabcb 100644 --- a/retrace/retrace_main.cpp +++ b/retrace/retrace_main.cpp @@ -29,7 +29,7 @@ #include "os_binary.hpp" #include "os_time.hpp" -#include "os_workqueue.hpp" +#include "os_thread.hpp" #include "image.hpp" #include "trace_callset.hpp" #include "trace_dump.hpp" @@ -37,7 +37,6 @@ static bool waitOnFinish = false; -static bool use_threads; static const char *comparePrefix = NULL; static const char *snapshotPrefix = NULL; @@ -55,7 +54,6 @@ namespace retrace { trace::Parser parser; trace::Profiler profiler; -static std::map thread_wq_map; int verbosity = 0; bool debug = true; @@ -71,22 +69,7 @@ bool profilingPixelsDrawn = false; unsigned frameNo = 0; unsigned callNo = 0; -static bool state_dumped; -class RenderWork : public os::WorkQueueWork -{ - trace::Call *call; -public: - void run(void); - RenderWork(trace::Call *_call) { call = _call; } - ~RenderWork(void) { delete call; } -}; - -class FlushGLWork : public os::WorkQueueWork -{ -public: - void run(void) { flushRendering(); } -}; void frameComplete(trace::Call &call) { @@ -139,16 +122,17 @@ takeSnapshot(unsigned call_no) { return; } -void RenderWork::run(void) -{ + +class RelayRunner; + + +static void +retraceCall(trace::Call *call) { bool swapRenderTarget = call->flags & trace::CALL_FLAG_SWAP_RENDERTARGET; bool doSnapshot = snapshotFrequency.contains(*call) || compareFrequency.contains(*call); - if (state_dumped) - return; - // For calls which cause rendertargets to be swaped, we take the // snapshot _before_ swapping the rendertargets. if (doSnapshot && swapRenderTarget) { @@ -169,73 +153,196 @@ void RenderWork::run(void) if (doSnapshot && !swapRenderTarget) takeSnapshot(call->no); - if (call->no >= dumpStateCallNo && dumpState(std::cout)) - state_dumped = true; + if (call->no >= dumpStateCallNo && + dumpState(std::cout)) { + exit(0); + } } -static os::WorkQueue *get_work_queue(unsigned long thread_id) + +class RelayRace { - os::WorkQueue *thread; - std::map::iterator it; +public: + std::vector runners; - it = thread_wq_map.find(thread_id); - if (it == thread_wq_map.end()) { - thread = new os::WorkQueue(); - thread_wq_map[thread_id] = thread; - } else { - thread = it->second; + RelayRace(); + + RelayRunner * + getRunner(unsigned leg); + + void + startRace(void); + + void + passBaton(trace::Call *call); + + void + finishRace(); +}; + + +class RelayRunner +{ +public: + RelayRace *race; + unsigned leg; + os::mutex mutex; + os::condition_variable wake_cond; + + bool finished; + trace::Call *baton; + os::thread *thread; + + static void * + runnerThread(RelayRunner *_this); + + RelayRunner(RelayRace *race, unsigned _leg) : + race(race), + leg(_leg), + finished(false), + baton(0), + thread(0) + { + if (leg) { + thread = new os::thread(runnerThread, this); + } } - return thread; + void + runRace(void) { + os::unique_lock lock(mutex); + + while (1) { + while (!finished && !baton) { + wake_cond.wait(lock); + } + + if (finished) { + break; + } + + assert(baton); + trace::Call *call = baton; + baton = 0; + + runLeg(call); + } + + if (0) std::cerr << "leg " << leg << " actually finishing\n"; + + if (leg == 0) { + std::vector::iterator it; + for (it = race->runners.begin() + 1; it != race->runners.end(); ++it) { + RelayRunner* runner = *it; + runner->finishRace(); + } + } + } + + void runLeg(trace::Call *call) { + do { + assert(call); + assert(call->thread_id == leg); + retraceCall(call); + delete call; + call = parser.parse_call(); + } while (call && call->thread_id == leg); + + if (call) { + assert(call->thread_id != leg); + flushRendering(); + race->passBaton(call); + } else { + if (0) std::cerr << "finished on leg " << leg << "\n"; + if (leg) { + race->finishRace(); + } else { + finished = true; + } + } + } + + void receiveBaton(trace::Call *call) { + assert (call->thread_id == leg); + + mutex.lock(); + baton = call; + mutex.unlock(); + + wake_cond.signal(); + } + + void finishRace() { + if (0) std::cerr << "notify finish to leg " << leg << "\n"; + + mutex.lock(); + finished = true; + mutex.unlock(); + + wake_cond.signal(); + } +}; + +void * +RelayRunner::runnerThread(RelayRunner *_this) { + _this->runRace(); + return 0; } -static void exit_work_queues(void) -{ - std::map::iterator it; - it = thread_wq_map.begin(); - while (it != thread_wq_map.end()) { - os::WorkQueue *thread_wq = it->second; +RelayRace::RelayRace() { + runners.push_back(new RelayRunner(this, 0)); +} + +RelayRunner * +RelayRace::getRunner(unsigned leg) { + RelayRunner *runner; - thread_wq->queue_work(new FlushGLWork); - thread_wq->flush(); - thread_wq->destroy(); - thread_wq_map.erase(it++); + if (leg >= runners.size()) { + runners.resize(leg + 1); + runner = 0; + } else { + runner = runners[leg]; + } + if (!runner) { + runner = new RelayRunner(this, leg); + runners[leg] = runner; } + return runner; } -static void do_all_calls(void) -{ +void +RelayRace::startRace(void) { trace::Call *call; - int prev_thread_id = -1; - os::WorkQueue *thread_wq = NULL; - - while ((call = parser.parse_call())) { - RenderWork *render_work = new RenderWork(call); - - if (use_threads) { - if (prev_thread_id != call->thread_id) { - if (thread_wq) - thread_wq->flush(); - thread_wq = get_work_queue(call->thread_id); - prev_thread_id = call->thread_id; - } + call = parser.parse_call(); - thread_wq->queue_work(render_work); + if (!call) { + return; + } - // XXX: Flush immediately to avoid race conditions on unprotected - // static/global variables. - thread_wq->flush(); - } else { - render_work->run(); - delete render_work; - } + assert(call->thread_id == 0); - if (state_dumped) - break; + RelayRunner *foreRunner = getRunner(0); + if (call->thread_id == 0) { + foreRunner->baton = call; + } else { + passBaton(call); } - exit_work_queues(); + foreRunner->runRace(); +} + +void +RelayRace::passBaton(trace::Call *call) { + if (0) std::cerr << "switching to thread " << call->thread_id << "\n"; + RelayRunner *runner = getRunner(call->thread_id); + runner->receiveBaton(call); +} + +void +RelayRace::finishRace(void) { + RelayRunner *runner = getRunner(0); + runner->finishRace(); } @@ -248,14 +355,8 @@ mainLoop() { startTime = os::getTime(); - do_all_calls(); - - if (!use_threads) - /* - * Reached the end of trace; if using threads we do the flush - * when exiting the threads. - */ - flushRendering(); + RelayRace race; + race.startRace(); long long endTime = os::getTime(); float timeInterval = (endTime - startTime) * (1.0 / os::timeFrequency); @@ -297,8 +398,7 @@ usage(const char *argv0) { " -S CALLSET calls to snapshot (default is every frame)\n" " -v increase output verbosity\n" " -D CALLNO dump state at specific call no\n" - " -w waitOnFinish on final frame\n" - " -t enable threading\n"; + " -w waitOnFinish on final frame\n"; } @@ -376,8 +476,6 @@ int main(int argc, char **argv) } else if (!strcmp(arg, "-ppd")) { retrace::profilingPixelsDrawn = true; } - } else if (!strcmp(arg, "-t")) { - use_threads = true; } else { std::cerr << "error: unknown option " << arg << "\n"; usage(argv[0]); -- 2.43.0