]> git.cworth.org Git - vogl/blob - src/voglcore/vogl_threading_pthreads.cpp
Initial vogl checkin
[vogl] / src / voglcore / vogl_threading_pthreads.cpp
1 /**************************************************************************
2  *
3  * Copyright 2013-2014 RAD Game Tools and Valve Software
4  * Copyright 2010-2014 Rich Geldreich and Tenacious Software LLC
5  * All Rights Reserved.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  *
25  **************************************************************************/
26
27 // File: vogl_threading_pthreads.cpp
28 #include "vogl_core.h"
29 #include "vogl_threading_pthreads.h"
30 #include "vogl_timer.h"
31
32 #if VOGL_USE_PTHREADS_API
33
34 #ifdef WIN32
35 #pragma comment(lib, "../ext/libpthread/lib/pthreadVC2.lib")
36 #include "vogl_winhdr.h"
37 #endif
38
39 #ifdef __GNUC__
40 #include <sys/sysinfo.h>
41 #endif
42
43 #ifdef WIN32
44 #include <process.h>
45 #endif
46
47 namespace vogl
48 {
49     uint g_number_of_processors = 1;
50
51     void vogl_threading_init()
52     {
53 #ifdef WIN32
54         SYSTEM_INFO g_system_info;
55         GetSystemInfo(&g_system_info);
56         g_number_of_processors = math::maximum<uint>(1U, g_system_info.dwNumberOfProcessors);
57 #elif defined(__GNUC__)
58         g_number_of_processors = math::maximum<int>(1, get_nprocs());
59 #else
60         g_number_of_processors = 1;
61 #endif
62     }
63
64     vogl_thread_id_t vogl_get_current_thread_id()
65     {
66         // FIXME: Not portable
67         return static_cast<vogl_thread_id_t>(pthread_self());
68     }
69
70     void vogl_sleep(unsigned int milliseconds)
71     {
72 #ifdef WIN32
73         struct timespec interval;
74         interval.tv_sec = milliseconds / 1000;
75         interval.tv_nsec = (milliseconds % 1000) * 1000000L;
76         pthread_delay_np(&interval);
77 #else
78         while (milliseconds)
79         {
80             int msecs_to_sleep = VOGL_MIN(milliseconds, 1000);
81             usleep(msecs_to_sleep * 1000);
82             milliseconds -= msecs_to_sleep;
83         }
84 #endif
85     }
86
87     mutex::mutex(unsigned int spin_count, bool recursive)
88     {
89         VOGL_NOTE_UNUSED(spin_count);
90
91         pthread_mutexattr_t mta;
92
93         int status = pthread_mutexattr_init(&mta);
94         if (status)
95         {
96             dynamic_string msg(cVarArg, "%s: pthread_mutexattr_init() failed with status %i", VOGL_METHOD_NAME, status);
97             vogl_fail(msg.get_ptr(), __FILE__, __LINE__);
98         }
99
100         status = pthread_mutexattr_settype(&mta, recursive ? PTHREAD_MUTEX_RECURSIVE : PTHREAD_MUTEX_NORMAL);
101         if (status)
102         {
103             dynamic_string msg(cVarArg, "%s: pthread_mutexattr_settype() failed with status %i", VOGL_METHOD_NAME, status);
104             vogl_fail(msg.get_ptr(), __FILE__, __LINE__);
105         }
106
107         status = pthread_mutex_init(&m_mutex, &mta);
108         if (status)
109         {
110             dynamic_string msg(cVarArg, "%s: pthread_mutex_init() failed with status %i", VOGL_METHOD_NAME, status);
111             vogl_fail(msg.get_ptr(), __FILE__, __LINE__);
112         }
113
114 #ifdef VOGL_BUILD_DEBUG
115         m_lock_count = 0;
116 #endif
117     }
118
119     mutex::~mutex()
120     {
121 #ifdef VOGL_BUILD_DEBUG
122         if (m_lock_count)
123             vogl_assert("mutex::~mutex: mutex is still locked", __FILE__, __LINE__);
124 #endif
125         if (pthread_mutex_destroy(&m_mutex))
126             vogl_assert("mutex::~mutex: pthread_mutex_destroy() failed", __FILE__, __LINE__);
127     }
128
129     void mutex::lock()
130     {
131         pthread_mutex_lock(&m_mutex);
132 #ifdef VOGL_BUILD_DEBUG
133         m_lock_count++;
134 #endif
135     }
136
137     void mutex::unlock()
138     {
139 #ifdef VOGL_BUILD_DEBUG
140         if (!m_lock_count)
141             vogl_assert("mutex::unlock: mutex is not locked", __FILE__, __LINE__);
142         m_lock_count--;
143 #endif
144         pthread_mutex_unlock(&m_mutex);
145     }
146
147     void mutex::set_spin_count(unsigned int count)
148     {
149         VOGL_NOTE_UNUSED(count);
150     }
151
152     semaphore::semaphore(uint32 initialCount, uint32 maximumCount, const char *pName)
153     {
154         VOGL_NOTE_UNUSED(maximumCount);
155         VOGL_NOTE_UNUSED(pName);
156
157         VOGL_ASSERT(maximumCount >= initialCount);
158         VOGL_ASSERT(maximumCount >= 1);
159
160         if (sem_init(&m_sem, 0, initialCount))
161         {
162             VOGL_FAIL("semaphore: sem_init() failed");
163         }
164     }
165
166     semaphore::~semaphore()
167     {
168         sem_destroy(&m_sem);
169     }
170
171     void semaphore::release(uint32 releaseCount)
172     {
173         VOGL_ASSERT(releaseCount >= 1);
174
175         int status = 0;
176 #ifdef WIN32
177         if (1 == releaseCount)
178             status = sem_post(&m_sem);
179         else
180             status = sem_post_multiple(&m_sem, releaseCount);
181 #else
182         while (releaseCount > 0)
183         {
184             status = sem_post(&m_sem);
185             if (status)
186                 break;
187             releaseCount--;
188         }
189 #endif
190
191         if (status)
192         {
193             VOGL_FAIL("semaphore: sem_post() or sem_post_multiple() failed");
194         }
195     }
196
197     void semaphore::try_release(uint32 releaseCount)
198     {
199         VOGL_ASSERT(releaseCount >= 1);
200
201 #ifdef WIN32
202         if (1 == releaseCount)
203             sem_post(&m_sem);
204         else
205             sem_post_multiple(&m_sem, releaseCount);
206 #else
207         while (releaseCount > 0)
208         {
209             sem_post(&m_sem);
210             releaseCount--;
211         }
212 #endif
213     }
214
215     bool semaphore::wait(uint32 milliseconds)
216     {
217         int status;
218         if (milliseconds == cUINT32_MAX)
219         {
220             status = sem_wait(&m_sem);
221         }
222         else
223         {
224             struct timespec interval;
225             interval.tv_sec = milliseconds / 1000;
226             interval.tv_nsec = (milliseconds % 1000) * 1000000L;
227             status = sem_timedwait(&m_sem, &interval);
228         }
229
230         if (status)
231         {
232             if (errno != ETIMEDOUT)
233             {
234                 VOGL_FAIL("semaphore: sem_wait() or sem_timedwait() failed");
235             }
236             return false;
237         }
238
239         return true;
240     }
241
242     spinlock::spinlock()
243     {
244 #ifdef VOGL_BUILD_DEBUG
245         m_in_lock = false;
246 #endif
247
248         if (pthread_spin_init(&m_spinlock, 0))
249         {
250             VOGL_FAIL("spinlock: pthread_spin_init() failed");
251         }
252     }
253
254     spinlock::~spinlock()
255     {
256 #ifdef VOGL_BUILD_DEBUG
257         VOGL_ASSERT(!m_in_lock);
258 #endif
259
260         pthread_spin_destroy(&m_spinlock);
261     }
262
263     void spinlock::lock()
264     {
265         if (pthread_spin_lock(&m_spinlock))
266         {
267             VOGL_FAIL("spinlock: pthread_spin_lock() failed");
268         }
269
270 #ifdef VOGL_BUILD_DEBUG
271         VOGL_ASSERT(!m_in_lock);
272         m_in_lock = true;
273 #endif
274     }
275
276     void spinlock::unlock()
277     {
278 #ifdef VOGL_BUILD_DEBUG
279         VOGL_ASSERT(m_in_lock);
280         m_in_lock = false;
281 #endif
282
283         if (pthread_spin_unlock(&m_spinlock))
284         {
285             VOGL_FAIL("spinlock: pthread_spin_unlock() failed");
286         }
287     }
288
289     task_pool::task_pool()
290         : m_num_threads(0),
291           m_tasks_available(0, 32767),
292           m_all_tasks_completed(0, 1),
293           m_total_submitted_tasks(0),
294           m_total_completed_tasks(0),
295           m_exit_flag(false)
296     {
297         utils::zero_object(m_threads);
298     }
299
300     task_pool::task_pool(uint num_threads)
301         : m_num_threads(0),
302           m_tasks_available(0, 32767),
303           m_all_tasks_completed(0, 1),
304           m_total_submitted_tasks(0),
305           m_total_completed_tasks(0),
306           m_exit_flag(false)
307     {
308         utils::zero_object(m_threads);
309
310         bool status = init(num_threads);
311         VOGL_VERIFY(status);
312     }
313
314     task_pool::~task_pool()
315     {
316         deinit();
317     }
318
319     bool task_pool::init(uint num_threads)
320     {
321         VOGL_ASSERT(num_threads <= cMaxThreads);
322         num_threads = math::minimum<uint>(num_threads, cMaxThreads);
323
324         deinit();
325
326         bool succeeded = true;
327
328         m_num_threads = 0;
329         while (m_num_threads < num_threads)
330         {
331             int status = pthread_create(&m_threads[m_num_threads], NULL, thread_func, this);
332             if (status)
333             {
334                 succeeded = false;
335                 break;
336             }
337
338             m_num_threads++;
339         }
340
341         if (!succeeded)
342         {
343             deinit();
344             return false;
345         }
346
347         return true;
348     }
349
350     void task_pool::deinit()
351     {
352         if (m_num_threads)
353         {
354             join();
355
356             atomic_exchange32(&m_exit_flag, true);
357
358             m_tasks_available.release(m_num_threads);
359
360             for (uint i = 0; i < m_num_threads; i++)
361                 pthread_join(m_threads[i], NULL);
362
363             m_num_threads = 0;
364
365             atomic_exchange32(&m_exit_flag, false);
366         }
367
368         m_task_stack.clear();
369         m_total_submitted_tasks = 0;
370         m_total_completed_tasks = 0;
371     }
372
373     bool task_pool::queue_task(task_callback_func pFunc, uint64_t data, void *pData_ptr)
374     {
375         VOGL_ASSERT(pFunc);
376
377         task tsk;
378         tsk.m_callback = pFunc;
379         tsk.m_data = data;
380         tsk.m_pData_ptr = pData_ptr;
381         tsk.m_flags = 0;
382
383         atomic_increment32(&m_total_submitted_tasks);
384         if (!m_task_stack.try_push(tsk))
385         {
386             atomic_increment32(&m_total_completed_tasks);
387             return false;
388         }
389
390         m_tasks_available.release(1);
391
392         return true;
393     }
394
395     // It's the object's responsibility to delete pObj within the execute_task() method, if needed!
396     bool task_pool::queue_task(executable_task *pObj, uint64_t data, void *pData_ptr)
397     {
398         VOGL_ASSERT(pObj);
399
400         task tsk;
401         tsk.m_pObj = pObj;
402         tsk.m_data = data;
403         tsk.m_pData_ptr = pData_ptr;
404         tsk.m_flags = cTaskFlagObject;
405
406         atomic_increment32(&m_total_submitted_tasks);
407         if (!m_task_stack.try_push(tsk))
408         {
409             atomic_increment32(&m_total_completed_tasks);
410             return false;
411         }
412
413         m_tasks_available.release(1);
414
415         return true;
416     }
417
418     void task_pool::process_task(task &tsk)
419     {
420         if (tsk.m_flags & cTaskFlagObject)
421             tsk.m_pObj->execute_task(tsk.m_data, tsk.m_pData_ptr);
422         else
423             tsk.m_callback(tsk.m_data, tsk.m_pData_ptr);
424
425         if (atomic_increment32(&m_total_completed_tasks) == m_total_submitted_tasks)
426         {
427             // Try to signal the semaphore (the max count is 1 so this may actually fail).
428             m_all_tasks_completed.try_release();
429         }
430     }
431
432     void task_pool::join()
433     {
434         // Try to steal any outstanding tasks. This could cause one or more worker threads to wake up and immediately go back to sleep, which is wasteful but should be harmless.
435         task tsk;
436         while (m_task_stack.pop(tsk))
437             process_task(tsk);
438
439         // At this point the task stack is empty.
440         // Now wait for all concurrent tasks to complete. The m_all_tasks_completed semaphore has a max count of 1, so it's possible it could have saturated to 1 as the tasks
441         // where issued and asynchronously completed, so this loop may iterate a few times.
442         const int total_submitted_tasks = static_cast<int>(atomic_add32(&m_total_submitted_tasks, 0));
443         while (m_total_completed_tasks != total_submitted_tasks)
444         {
445             // If the previous (m_total_completed_tasks != total_submitted_tasks) check failed the semaphore MUST be eventually signalled once the last task completes.
446             // So I think this can actually be an INFINITE delay, but it shouldn't really matter if it's 1ms.
447             m_all_tasks_completed.wait(1);
448         }
449     }
450
451     void *task_pool::thread_func(void *pContext)
452     {
453         task_pool *pPool = static_cast<task_pool *>(pContext);
454         task tsk;
455
456         for (;;)
457         {
458             if (!pPool->m_tasks_available.wait())
459                 break;
460
461             if (pPool->m_exit_flag)
462                 break;
463
464             if (pPool->m_task_stack.pop(tsk))
465             {
466                 pPool->process_task(tsk);
467             }
468         }
469
470         return NULL;
471     }
472
473 } // namespace vogl
474
475 #endif // VOGL_USE_PTHREADS_API