]> git.cworth.org Git - apitrace/commitdiff
More efficient synchronous multi-threading.
authorJosé Fonseca <jose.r.fonseca@gmail.com>
Fri, 26 Oct 2012 17:13:10 +0000 (18:13 +0100)
committerJosé Fonseca <jose.r.fonseca@gmail.com>
Fri, 26 Oct 2012 17:13:10 +0000 (18:13 +0100)
Mimic a relay race, where only one thread is running at one time.

This gracefully degrades to the old behavior for single-threaded traces,
and is faster for multi-threaded traces, while still ensuring that the
recorded order is preserved.

CMakeLists.txt
common/os_thread.hpp
common/os_workqueue.hpp [deleted file]
common/workqueue.cpp [deleted file]
retrace/retrace_main.cpp

index f019049e3f561e9f395ee23cfc481621fa443da2..0e174e65834bbf9aca860aeb1037e2d8ea2175f9 100644 (file)
@@ -328,7 +328,6 @@ add_library (common STATIC
     common/image_pnm.cpp
     common/image_png.cpp
     common/${os}
-    common/workqueue.cpp
 )
 
 set_target_properties (common PROPERTIES
index 6c0b488c0152474a031a82839b0c767777714fdd..5fbdaabf274e0f51304b209f807b133bcb71d55e 100644 (file)
@@ -348,7 +348,7 @@ private:
             DWORD id = 0;
             _native_handle = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)f, (LPVOID)arg, 0, &id);
 #else
-            pthread_create(&_native_handle, NULL, f, arg);
+            pthread_create(&_native_handle, NULL, ( void *(*) (void *))f, arg);
 #endif
         }
 
diff --git a/common/os_workqueue.hpp b/common/os_workqueue.hpp
deleted file mode 100644 (file)
index 047d3db..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-#ifndef _OS_WORKQUEUE_HPP_
-#define _OS_WORKQUEUE_HPP_
-
-#include <queue>
-
-#include "os_thread.hpp"
-
-namespace os {
-
-
-class WorkQueue;
-
-class WorkQueueWork {
-protected:
-    friend class WorkQueue;
-
-public:
-    virtual void run(void) = 0;
-    virtual ~WorkQueueWork(void) { }
-};
-
-class WorkQueue {
-    std::queue<WorkQueueWork *> work_queue;
-
-    bool busy;
-    bool exit_workqueue;
-    os::condition_variable wake_cond;
-    os::condition_variable complete_cond;
-
-    os::mutex mutex;
-
-    os::thread thread;
-
-    void wake_up_thread(void);
-    int run_tasks(void);
-public:
-    void thread_entry(void);
-    void queue_work(WorkQueueWork *work);
-    void flush(void);
-    void destroy(void);
-
-    WorkQueue(void);
-    ~WorkQueue();
-};
-
-}
-
-#endif
diff --git a/common/workqueue.cpp b/common/workqueue.cpp
deleted file mode 100644 (file)
index fcd697a..0000000
+++ /dev/null
@@ -1,118 +0,0 @@
-#include <queue>
-#include <assert.h>
-
-#include "os_workqueue.hpp"
-
-namespace os
-{
-
-/**
- * return 0 on batch complete, -1 on thread exit request.
- */
-int WorkQueue::run_tasks(void)
-{
-    os::unique_lock<os::mutex> lock(mutex);
-
-    while (work_queue.empty() && !exit_workqueue) {
-        wake_cond.wait(lock);
-    }
-
-    if (exit_workqueue) {
-        return -1;
-    }
-
-    std::queue<WorkQueueWork *> batch;
-    std::swap(work_queue, batch);
-    busy = true;
-
-    lock.unlock();
-
-    assert(!batch.empty());
-    while (!batch.empty()) {
-        WorkQueueWork *task;
-
-        task = batch.front();
-        task->run();
-        batch.pop();
-        delete task;
-    }
-
-    lock.lock();
-
-    busy = false;
-    complete_cond.signal();
-
-    return 0;
-}
-
-/* Must be called with WorkQueue::lock held */
-void WorkQueue::wake_up_thread(void)
-{
-    wake_cond.signal();
-}
-
-void WorkQueue::queue_work(WorkQueueWork *task)
-{
-    mutex.lock();
-    work_queue.push(task);
-    wake_up_thread();
-    mutex.unlock();
-}
-
-void WorkQueue::flush(void)
-{
-    os::unique_lock<os::mutex> lock(mutex);
-    while (!work_queue.empty() || busy) {
-        complete_cond.wait(lock);
-    }
-}
-
-void WorkQueue::thread_entry(void)
-{
-    int err;
-
-    do {
-        err = run_tasks();
-    } while (!err);
-}
-
-void WorkQueue::destroy(void)
-{
-    mutex.lock();
-    exit_workqueue = true;
-    wake_up_thread();
-    mutex.unlock();
-}
-
-static
-#ifdef _WIN32
-DWORD WINAPI
-#else
-void *
-#endif
-WorkQueue__entry_thunk(void *data)
-{
-    WorkQueue *thread = static_cast<WorkQueue *>(data);
-
-    thread->thread_entry();
-
-#ifdef _WIN32
-    return 0;
-#else
-    return NULL;
-#endif
-}
-
-WorkQueue::WorkQueue(void) :
-    busy(false),
-    exit_workqueue(false),
-    thread(WorkQueue__entry_thunk, this)
-{
-}
-
-WorkQueue::~WorkQueue(void)
-{
-    thread.join();
-}
-
-}
index 9b029c8b5a10e4c0d6c5c9794257b1f74d6b281a..f9eabcb3b5cfe26729f17e28ab3fbb39bad02ab7 100644 (file)
@@ -29,7 +29,7 @@
 
 #include "os_binary.hpp"
 #include "os_time.hpp"
-#include "os_workqueue.hpp"
+#include "os_thread.hpp"
 #include "image.hpp"
 #include "trace_callset.hpp"
 #include "trace_dump.hpp"
@@ -37,7 +37,6 @@
 
 
 static bool waitOnFinish = false;
-static bool use_threads;
 
 static const char *comparePrefix = NULL;
 static const char *snapshotPrefix = NULL;
@@ -55,7 +54,6 @@ namespace retrace {
 trace::Parser parser;
 trace::Profiler profiler;
 
-static std::map<unsigned long, os::WorkQueue *> thread_wq_map;
 
 int verbosity = 0;
 bool debug = true;
@@ -71,22 +69,7 @@ bool profilingPixelsDrawn = false;
 
 unsigned frameNo = 0;
 unsigned callNo = 0;
-static bool state_dumped;
 
-class RenderWork : public os::WorkQueueWork
-{
-       trace::Call *call;
-public:
-       void run(void);
-       RenderWork(trace::Call *_call) { call = _call; }
-       ~RenderWork(void) { delete call; }
-};
-
-class FlushGLWork : public os::WorkQueueWork
-{
-public:
-    void run(void) { flushRendering(); }
-};
 
 void
 frameComplete(trace::Call &call) {
@@ -139,16 +122,17 @@ takeSnapshot(unsigned call_no) {
     return;
 }
 
-void RenderWork::run(void)
-{
+
+class RelayRunner;
+
+
+static void
+retraceCall(trace::Call *call) {
     bool swapRenderTarget = call->flags &
         trace::CALL_FLAG_SWAP_RENDERTARGET;
     bool doSnapshot = snapshotFrequency.contains(*call) ||
         compareFrequency.contains(*call);
 
-    if (state_dumped)
-        return;
-
     // For calls which cause rendertargets to be swaped, we take the
     // snapshot _before_ swapping the rendertargets.
     if (doSnapshot && swapRenderTarget) {
@@ -169,73 +153,196 @@ void RenderWork::run(void)
     if (doSnapshot && !swapRenderTarget)
         takeSnapshot(call->no);
 
-    if (call->no >= dumpStateCallNo && dumpState(std::cout))
-        state_dumped = true;
+    if (call->no >= dumpStateCallNo &&
+        dumpState(std::cout)) {
+        exit(0);
+    }
 }
 
-static os::WorkQueue *get_work_queue(unsigned long thread_id)
+
+class RelayRace
 {
-    os::WorkQueue *thread;
-    std::map<unsigned long, os::WorkQueue *>::iterator it;
+public:
+    std::vector<RelayRunner*> runners;
 
-    it = thread_wq_map.find(thread_id);
-    if (it == thread_wq_map.end()) {
-        thread = new os::WorkQueue();
-        thread_wq_map[thread_id] = thread;
-    } else {
-        thread = it->second;
+    RelayRace();
+
+    RelayRunner *
+    getRunner(unsigned leg);
+
+    void
+    startRace(void);
+
+    void
+    passBaton(trace::Call *call);
+
+    void
+    finishRace();
+};
+
+
+class RelayRunner
+{
+public:
+    RelayRace *race;
+    unsigned leg;
+    os::mutex mutex;
+    os::condition_variable wake_cond;
+
+    bool finished;
+    trace::Call *baton;
+    os::thread *thread;
+
+    static void *
+    runnerThread(RelayRunner *_this);
+
+    RelayRunner(RelayRace *race, unsigned _leg) :
+        race(race),
+        leg(_leg),
+        finished(false),
+        baton(0),
+        thread(0)
+    {
+        if (leg) {
+            thread = new os::thread(runnerThread, this);
+        }
     }
 
-    return thread;
+    void
+    runRace(void) {
+        os::unique_lock<os::mutex> lock(mutex);
+
+        while (1) {
+            while (!finished && !baton) {
+                wake_cond.wait(lock);
+            }
+
+            if (finished) {
+                break;
+            }
+
+            assert(baton);
+            trace::Call *call = baton;
+            baton = 0;
+
+            runLeg(call);
+        }
+
+        if (0) std::cerr << "leg " << leg << " actually finishing\n";
+
+        if (leg == 0) {
+            std::vector<RelayRunner*>::iterator it;
+            for (it = race->runners.begin() + 1; it != race->runners.end(); ++it) {
+                RelayRunner* runner = *it;
+                runner->finishRace();
+            }
+        }
+    }
+
+    void runLeg(trace::Call *call) {
+        do {
+            assert(call);
+            assert(call->thread_id == leg);
+            retraceCall(call);
+            delete call;
+            call = parser.parse_call();
+        } while (call && call->thread_id == leg);
+
+        if (call) {
+            assert(call->thread_id != leg);
+            flushRendering();
+            race->passBaton(call);
+        } else {
+            if (0) std::cerr << "finished on leg " << leg << "\n";
+            if (leg) {
+                race->finishRace();
+            } else {
+                finished = true;
+            }
+        }
+    }
+
+    void receiveBaton(trace::Call *call) {
+        assert (call->thread_id == leg);
+
+        mutex.lock();
+        baton = call;
+        mutex.unlock();
+
+        wake_cond.signal();
+    }
+
+    void finishRace() {
+        if (0) std::cerr << "notify finish to leg " << leg << "\n";
+
+        mutex.lock();
+        finished = true;
+        mutex.unlock();
+
+        wake_cond.signal();
+    }
+};
+
+void *
+RelayRunner::runnerThread(RelayRunner *_this) {
+    _this->runRace();
+    return 0;
 }
 
-static void exit_work_queues(void)
-{
-    std::map<unsigned long, os::WorkQueue *>::iterator it;
 
-    it = thread_wq_map.begin();
-    while (it != thread_wq_map.end()) {
-        os::WorkQueue *thread_wq = it->second;
+RelayRace::RelayRace() {
+    runners.push_back(new RelayRunner(this, 0));
+}
+
+RelayRunner *
+RelayRace::getRunner(unsigned leg) {
+    RelayRunner *runner;
 
-        thread_wq->queue_work(new FlushGLWork);
-        thread_wq->flush();
-        thread_wq->destroy();
-        thread_wq_map.erase(it++);
+    if (leg >= runners.size()) {
+        runners.resize(leg + 1);
+        runner = 0;
+    } else {
+        runner = runners[leg];
+    }
+    if (!runner) {
+        runner = new RelayRunner(this, leg);
+        runners[leg] = runner;
     }
+    return runner;
 }
 
-static void do_all_calls(void)
-{
+void
+RelayRace::startRace(void) {
     trace::Call *call;
-    int prev_thread_id = -1;
-    os::WorkQueue *thread_wq = NULL;
-
-    while ((call = parser.parse_call())) {
-        RenderWork *render_work = new RenderWork(call);
-
-        if (use_threads) {
-            if (prev_thread_id != call->thread_id) {
-                if (thread_wq)
-                    thread_wq->flush();
-                thread_wq = get_work_queue(call->thread_id);
-                prev_thread_id = call->thread_id;
-            }
+    call = parser.parse_call();
 
-            thread_wq->queue_work(render_work);
+    if (!call) {
+        return;
+    }
 
-            // XXX: Flush immediately to avoid race conditions on unprotected
-            // static/global variables.
-            thread_wq->flush();
-        } else {
-            render_work->run();
-            delete render_work;
-        }
+    assert(call->thread_id == 0);
 
-        if (state_dumped)
-            break;
+    RelayRunner *foreRunner = getRunner(0);
+    if (call->thread_id == 0) {
+        foreRunner->baton = call;
+    } else {
+        passBaton(call);
     }
 
-    exit_work_queues();
+    foreRunner->runRace();
+}
+
+void
+RelayRace::passBaton(trace::Call *call) {
+    if (0) std::cerr << "switching to thread " << call->thread_id << "\n";
+    RelayRunner *runner = getRunner(call->thread_id);
+    runner->receiveBaton(call);
+}
+
+void
+RelayRace::finishRace(void) {
+    RelayRunner *runner = getRunner(0);
+    runner->finishRace();
 }
 
 
@@ -248,14 +355,8 @@ mainLoop() {
 
     startTime = os::getTime();
 
-    do_all_calls();
-
-    if (!use_threads)
-        /*
-         * Reached the end of trace; if using threads we do the flush
-         * when exiting the threads.
-         */
-        flushRendering();
+    RelayRace race;
+    race.startRace();
 
     long long endTime = os::getTime();
     float timeInterval = (endTime - startTime) * (1.0 / os::timeFrequency);
@@ -297,8 +398,7 @@ usage(const char *argv0) {
         "  -S CALLSET   calls to snapshot (default is every frame)\n"
         "  -v           increase output verbosity\n"
         "  -D CALLNO    dump state at specific call no\n"
-        "  -w           waitOnFinish on final frame\n"
-        "  -t           enable threading\n";
+        "  -w           waitOnFinish on final frame\n";
 }
 
 
@@ -376,8 +476,6 @@ int main(int argc, char **argv)
             } else if (!strcmp(arg, "-ppd")) {
                 retrace::profilingPixelsDrawn = true;
             }
-        } else if (!strcmp(arg, "-t")) {
-            use_threads = true;
         } else {
             std::cerr << "error: unknown option " << arg << "\n";
             usage(argv[0]);