From c5fc66ee58f2c60f2d226868bb1cf5b91badaf53 Mon Sep 17 00:00:00 2001 From: sanine Date: Sat, 1 Oct 2022 20:59:36 -0500 Subject: add ode --- libs/ode-0.16.1/ode/src/threading_impl_templates.h | 1265 ++++++++++++++++++++ 1 file changed, 1265 insertions(+) create mode 100644 libs/ode-0.16.1/ode/src/threading_impl_templates.h (limited to 'libs/ode-0.16.1/ode/src/threading_impl_templates.h') diff --git a/libs/ode-0.16.1/ode/src/threading_impl_templates.h b/libs/ode-0.16.1/ode/src/threading_impl_templates.h new file mode 100644 index 0000000..acecbc3 --- /dev/null +++ b/libs/ode-0.16.1/ode/src/threading_impl_templates.h @@ -0,0 +1,1265 @@ +/************************************************************************* + * * + * Open Dynamics Engine, Copyright (C) 2001-2003 Russell L. Smith. * + * All rights reserved. Email: russ@q12.org Web: www.q12.org * + * * + * Threading implementation templates file. * + * Copyright (C) 2011-2019 Oleh Derevenko. All rights reserved. * + * e-mail: odar@eleks.com (change all "a" to "e") * + * * + * This library is free software; you can redistribute it and/or * + * modify it under the terms of EITHER: * + * (1) The GNU Lesser General Public License as published by the Free * + * Software Foundation; either version 2.1 of the License, or (at * + * your option) any later version. The text of the GNU Lesser * + * General Public License is included with this library in the * + * file LICENSE.TXT. * + * (2) The BSD-style license that is included with this library in * + * the file LICENSE-BSD.TXT. * + * * + * This library is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the files * + * LICENSE.TXT and LICENSE-BSD.TXT for more details. * + * * + *************************************************************************/ + +/* + * Job list and Mutex group implementation templates for built-in threading + * support provider. + */ + + +#ifndef _ODE_THREADING_IMPL_TEMPLATES_H_ +#define _ODE_THREADING_IMPL_TEMPLATES_H_ + + +#include +#include + +#include + +#include "objects.h" + +#include + + +#define dMAKE_JOBINSTANCE_RELEASEE(job_instance) ((dCallReleaseeID)(job_instance)) +#define dMAKE_RELEASEE_JOBINSTANCE(releasee) ((dxThreadedJobInfo *)(releasee)) + + +template +class dxtemplateMutexGroup +{ +private: + dxtemplateMutexGroup() {} + ~dxtemplateMutexGroup() {} + +public: + static dxtemplateMutexGroup *AllocateInstance(dmutexindex_t Mutex_count); + static void FreeInstance(dxtemplateMutexGroup *mutex_group); + +private: + bool InitializeMutexArray(dmutexindex_t Mutex_count); + void FinalizeMutexArray(dmutexindex_t Mutex_count); + +public: + void LockMutex(dmutexindex_t mutex_index) { dIASSERT(mutex_index < m_un.m_mutex_count); m_Mutex_array[mutex_index].LockMutex(); } + bool TryLockMutex(dmutexindex_t mutex_index) { dIASSERT(mutex_index < m_un.m_mutex_count); return m_Mutex_array[mutex_index].TryLockMutex(); } + void UnlockMutex(dmutexindex_t mutex_index) { dIASSERT(mutex_index < m_un.m_mutex_count); m_Mutex_array[mutex_index].UnlockMutex(); } + +private: + union + { + dmutexindex_t m_mutex_count; + unsigned long m_reserved_for_allignment[2]; + + } m_un; + + tThreadMutex m_Mutex_array[1]; +}; + +template +class dxtemplateCallWait: + public dBase +{ +public: + dxtemplateCallWait() {} + ~dxtemplateCallWait() { DoFinalizeObject(); } + + bool InitializeObject() { return DoInitializeObject(); } + +private: + bool DoInitializeObject() { return m_wait_wakeup.InitializeObject(); } + void DoFinalizeObject() { /* Do nothing */ } + +public: + typedef dxtemplateCallWait dxCallWait; + +public: + void ResetTheWait() { m_wait_wakeup.ResetWakeup(); } + void SignalTheWait() { m_wait_wakeup.WakeupAllThreads(); } + bool PerformWaiting(const dThreadedWaitTime *timeout_time_ptr/*=NULL*/) { return m_wait_wakeup.WaitWakeup(timeout_time_ptr); } + +public: + static void AbstractSignalTheWait(void *wait_wakeup_ptr) { ((dxCallWait *)wait_wakeup_ptr)->SignalTheWait(); } + +private: + tThreadWakeup m_wait_wakeup; +}; + + +#if dBUILTIN_THREADING_IMPL_ENABLED + +template +class dxtemplateThreadedLull +{ +public: + dxtemplateThreadedLull(): m_registrant_count(0), m_alarm_wakeup() {} + ~dxtemplateThreadedLull() { dIASSERT(m_registrant_count == 0); DoFinalizeObject(); } + + bool InitializeObject() { return DoInitializeObject(); } + +private: + bool DoInitializeObject() { return m_alarm_wakeup.InitializeObject(); } + void DoFinalizeObject() { /* Do nothing */ } + +private: + typedef typename tAtomicsProvider::atomicord_t atomicord_t; + +public: + void RegisterToLull() { tAtomicsProvider::IncrementTargetNoRet(&m_registrant_count); } + void WaitForLullAlarm() { dIASSERT(m_registrant_count != 0); m_alarm_wakeup.WaitWakeup(NULL); } + void UnregisterFromLull() { tAtomicsProvider::DecrementTargetNoRet(&m_registrant_count); } + + void SignalLullAlarmIfAnyRegistrants() + { + if (tatomic_test_required ? (tAtomicsProvider::QueryTargetValue(&m_registrant_count) != 0) : (m_registrant_count != 0)) + { + m_alarm_wakeup.WakeupAThread(); + } + } + +private: + atomicord_t m_registrant_count; + tThreadWakeup m_alarm_wakeup; +}; + + +#endif // #if dBUILTIN_THREADING_IMPL_ENABLED + + +struct dxThreadedJobInfo: + public dBase +{ + dxThreadedJobInfo() {} + explicit dxThreadedJobInfo(void *): m_next_job(NULL) {} + + void AssignJobData(ddependencycount_t dependencies_count, dxThreadedJobInfo *dependent_job, void *call_wait, + int *fault_accumulator_ptr, dThreadedCallFunction *call_function, void *call_context, dcallindex_t call_index) + { + m_dependencies_count = dependencies_count; + m_dependent_job = dependent_job; + m_call_wait = call_wait; + m_fault_accumulator_ptr = fault_accumulator_ptr; + + m_call_fault = 0; + m_call_function = call_function; + m_call_context = call_context; + m_call_index = call_index; + } + + bool InvokeCallFunction() + { + int call_result = m_call_function(m_call_context, m_call_index, dMAKE_JOBINSTANCE_RELEASEE(this)); + return call_result != 0; + } + + dxThreadedJobInfo *m_next_job; + dxThreadedJobInfo **m_prev_job_next_ptr; + + ddependencycount_t m_dependencies_count; + dxThreadedJobInfo *m_dependent_job; + void *m_call_wait; + int *m_fault_accumulator_ptr; + + int m_call_fault; + dThreadedCallFunction *m_call_function; + void *m_call_context; + dcallindex_t m_call_index; +}; + + +template +class dxtemplateThreadingLockHelper +{ +public: + dxtemplateThreadingLockHelper(tThreadMutex &mutex_instance): m_mutex_instance(mutex_instance), m_lock_indicator_flag(false) { LockMutex(); } + ~dxtemplateThreadingLockHelper() { if (m_lock_indicator_flag) { UnlockMutex(); } } + + void LockMutex() { dIASSERT(!m_lock_indicator_flag); m_mutex_instance.LockMutex(); m_lock_indicator_flag = true; } + void UnlockMutex() { dIASSERT(m_lock_indicator_flag); m_mutex_instance.UnlockMutex(); m_lock_indicator_flag = false; } + +private: + tThreadMutex &m_mutex_instance; + bool m_lock_indicator_flag; +}; + +template +class dxtemplateJobListContainer +{ +public: + dxtemplateJobListContainer(): + m_job_list(NULL), + m_info_pool((atomicptr_t)NULL), + m_pool_access_lock(), + m_list_access_lock(), + m_info_wait_lull(), + m_info_count_known_to_be_preallocated(0) + { + } + + ~dxtemplateJobListContainer() + { + dIASSERT(m_job_list == NULL); // Would not it be nice to wait for jobs to complete before deleting the list? + + FreeJobInfoPoolInfos(); + DoFinalizeObject(); + } + + bool InitializeObject() { return DoInitializeObject(); } + +private: + bool DoInitializeObject() { return m_pool_access_lock.InitializeObject() && m_list_access_lock.InitializeObject() && m_info_wait_lull.InitializeObject(); } + void DoFinalizeObject() { /* Do nothing */ } + +public: + typedef tAtomicsProvider dxAtomicsProvider; + typedef typename tAtomicsProvider::atomicord_t atomicord_t; + typedef typename tAtomicsProvider::atomicptr_t atomicptr_t; + typedef tThreadMutex dxThreadMutex; + typedef dxtemplateThreadingLockHelper dxMutexLockHelper; + typedef void dWaitSignallingFunction(void *job_call_wait); + +public: + dxThreadedJobInfo *ReleaseAJobAndPickNextPendingOne( + dxThreadedJobInfo *job_to_release, bool job_result, dWaitSignallingFunction *wait_signal_proc_ptr, + bool &out_last_job_flag); + +private: + dxThreadedJobInfo *PickNextPendingJob(bool &out_last_job_flag); + void ReleaseAJob(dxThreadedJobInfo *job_instance, bool job_result, dWaitSignallingFunction *wait_signal_proc_ptr); + +public: + inline dxThreadedJobInfo *AllocateJobInfoFromPool(); + void QueueJobForProcessing(dxThreadedJobInfo *job_instance); + + void AlterJobProcessingDependencies(dxThreadedJobInfo *job_instance, ddependencychange_t dependencies_count_change, + bool &out_job_has_become_ready); + +private: + inline ddependencycount_t SmartAddJobDependenciesCount(dxThreadedJobInfo *job_instance, ddependencychange_t dependencies_count_change); + + inline void InsertJobInfoIntoListHead(dxThreadedJobInfo *job_instance); + inline void RemoveJobInfoFromList(dxThreadedJobInfo *job_instance); + + dxThreadedJobInfo *ExtractJobInfoFromPoolOrAllocate(); + inline void ReleaseJobInfoIntoPool(dxThreadedJobInfo *job_instance); + +private: + void FreeJobInfoPoolInfos(); + +public: + bool EnsureNumberOfJobInfosIsPreallocated(ddependencycount_t required_info_count); + +private: + bool DoPreallocateJobInfos(ddependencycount_t required_info_count); + +public: + bool IsJobListReadyForShutdown() const { return m_job_list == NULL; } + +private: + dxThreadedJobInfo *m_job_list; + volatile atomicptr_t m_info_pool; // dxThreadedJobInfo * + tThreadMutex m_pool_access_lock; + tThreadMutex m_list_access_lock; + tThreadLull m_info_wait_lull; + ddependencycount_t m_info_count_known_to_be_preallocated; +}; + + +typedef void (dxThreadReadyToServeCallback)(void *callback_context); + + +#if dBUILTIN_THREADING_IMPL_ENABLED + +template +class dxtemplateJobListThreadedHandler +{ +public: + dxtemplateJobListThreadedHandler(tJobListContainer *list_container_ptr): + m_job_list_ptr(list_container_ptr), + m_processing_wakeup(), + m_active_thread_count(0), + m_shutdown_requested(0) + { + } + + ~dxtemplateJobListThreadedHandler() + { + dIASSERT(m_active_thread_count == 0); + + DoFinalizeObject(); + } + + bool InitializeObject() { return DoInitializeObject(); } + +private: + bool DoInitializeObject() { return m_processing_wakeup.InitializeObject(); } + void DoFinalizeObject() { /* Do nothing */ } + +public: + typedef dxtemplateCallWait dxCallWait; + +public: + inline void ProcessActiveJobAddition(); + inline void PrepareForWaitingAJobCompletion(); + +public: + inline unsigned RetrieveActiveThreadsCount(); + inline void StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/); + +private: + void PerformJobProcessingUntilShutdown(); + void PerformJobProcessingSession(); + + void BlockAsIdleThread(); + void ActivateAnIdleThread(); + +public: + inline void ShutdownProcessing(); + inline void CleanupForRestart(); + +private: + bool IsShutdownRequested() const { return m_shutdown_requested != 0; } + +private: + typedef typename tJobListContainer::dxAtomicsProvider dxAtomicsProvider; + typedef typename tJobListContainer::atomicord_t atomicord_t; + + atomicord_t GetActiveThreadsCount() const { return m_active_thread_count; } + void RegisterAsActiveThread() { dxAtomicsProvider::template AddValueToTarget((volatile void *)&m_active_thread_count, 1); } + void UnregisterAsActiveThread() { dxAtomicsProvider::template AddValueToTarget((volatile void *)&m_active_thread_count, -1); } + +private: + tJobListContainer *m_job_list_ptr; + tThreadWakeup m_processing_wakeup; + volatile atomicord_t m_active_thread_count; + int m_shutdown_requested; +}; + + +#endif // #if dBUILTIN_THREADING_IMPL_ENABLED + + +template +class dxtemplateJobListSelfHandler +{ +public: + dxtemplateJobListSelfHandler(tJobListContainer *list_container_ptr): + m_job_list_ptr(list_container_ptr) + { + } + + ~dxtemplateJobListSelfHandler() + { + // Do nothing + } + + bool InitializeObject() { return true; } + +public: + typedef dxtemplateCallWait dxCallWait; + +public: + inline void ProcessActiveJobAddition(); + inline void PrepareForWaitingAJobCompletion(); + +public: + inline unsigned RetrieveActiveThreadsCount(); + inline void StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/); + +private: + void PerformJobProcessingUntilExhaustion(); + void PerformJobProcessingSession(); + +public: + inline void ShutdownProcessing(); + inline void CleanupForRestart(); + +private: + tJobListContainer *m_job_list_ptr; +}; + + +struct dIMutexGroup; +struct dxICallWait; + +class dxIThreadingImplementation +{ +public: + virtual void FreeInstance() = 0; + +public: + virtual dIMutexGroup *AllocMutexGroup(dmutexindex_t Mutex_count) = 0; + virtual void FreeMutexGroup(dIMutexGroup *mutex_group) = 0; + virtual void LockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index) = 0; + // virtual bool TryLockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index) = 0; + virtual void UnlockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index) = 0; + +public: + virtual dxICallWait *AllocACallWait() = 0; + virtual void ResetACallWait(dxICallWait *call_wait) = 0; + virtual void FreeACallWait(dxICallWait *call_wait) = 0; + +public: + virtual bool PreallocateJobInfos(ddependencycount_t max_simultaneous_calls_estimate) = 0; + virtual void ScheduleNewJob(int *fault_accumulator_ptr/*=NULL*/, + dCallReleaseeID *out_post_releasee_ptr/*=NULL*/, ddependencycount_t dependencies_count, dCallReleaseeID dependent_releasee/*=NULL*/, + dxICallWait *call_wait/*=NULL*/, + dThreadedCallFunction *call_func, void *call_context, dcallindex_t instance_index) = 0; + virtual void AlterJobDependenciesCount(dCallReleaseeID target_releasee, ddependencychange_t dependencies_count_change) = 0; + virtual void WaitJobCompletion(int *out_wait_status_ptr/*=NULL*/, + dxICallWait *call_wait, const dThreadedWaitTime *timeout_time_ptr/*=NULL*/) = 0; + +public: + virtual unsigned RetrieveActiveThreadsCount() = 0; + virtual void StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/) = 0; + virtual void ShutdownProcessing() = 0; + virtual void CleanupForRestart() = 0; +}; + + +template +class dxtemplateThreadingImplementation: + public dBase, + public dxIThreadingImplementation +{ +public: + dxtemplateThreadingImplementation(): + dBase(), + m_list_container(), + m_list_handler(&m_list_container) + { + } + + virtual ~dxtemplateThreadingImplementation() + { + DoFinalizeObject(); + } + + bool InitializeObject() { return DoInitializeObject(); } + +private: + bool DoInitializeObject() { return m_list_container.InitializeObject() && m_list_handler.InitializeObject(); } + void DoFinalizeObject() { /* Do nothing */ } + +protected: + virtual void FreeInstance(); + +private: + typedef dxtemplateMutexGroup dxMutexGroup; + typedef typename tJobListHandler::dxCallWait dxCallWait; + +protected: + virtual dIMutexGroup *AllocMutexGroup(dmutexindex_t Mutex_count); + virtual void FreeMutexGroup(dIMutexGroup *mutex_group); + virtual void LockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index); + // virtual bool TryLockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index); + virtual void UnlockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index); + +protected: + virtual dxICallWait *AllocACallWait(); + virtual void ResetACallWait(dxICallWait *call_wait); + virtual void FreeACallWait(dxICallWait *call_wait); + +protected: + virtual bool PreallocateJobInfos(ddependencycount_t max_simultaneous_calls_estimate); + virtual void ScheduleNewJob(int *fault_accumulator_ptr/*=NULL*/, + dCallReleaseeID *out_post_releasee_ptr/*=NULL*/, ddependencycount_t dependencies_count, dCallReleaseeID dependent_releasee/*=NULL*/, + dxICallWait *call_wait/*=NULL*/, + dThreadedCallFunction *call_func, void *call_context, dcallindex_t instance_index); + virtual void AlterJobDependenciesCount(dCallReleaseeID target_releasee, ddependencychange_t dependencies_count_change); + virtual void WaitJobCompletion(int *out_wait_status_ptr/*=NULL*/, + dxICallWait *call_wait, const dThreadedWaitTime *timeout_time_ptr/*=NULL*/); + +protected: + virtual unsigned RetrieveActiveThreadsCount(); + virtual void StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/); + virtual void ShutdownProcessing(); + virtual void CleanupForRestart(); + +private: + tJobListContainer m_list_container; + tJobListHandler m_list_handler; +}; + + +/************************************************************************/ +/* Implementation of dxtemplateMutexGroup */ +/************************************************************************/ + +template +/*static */dxtemplateMutexGroup *dxtemplateMutexGroup::AllocateInstance(dmutexindex_t Mutex_count) +{ + dAASSERT(Mutex_count != 0); + + const dxtemplateMutexGroup *const dummy_group = (dxtemplateMutexGroup *)(sizeint)8; + const sizeint size_requited = ((sizeint)(&dummy_group->m_Mutex_array) - (sizeint)dummy_group) + Mutex_count * sizeof(tThreadMutex); + dxtemplateMutexGroup *mutex_group = (dxtemplateMutexGroup *)dAlloc(size_requited); + + if (mutex_group != NULL) + { + mutex_group->m_un.m_mutex_count = Mutex_count; + + if (!mutex_group->InitializeMutexArray(Mutex_count)) + { + dFree((void *)mutex_group, size_requited); + mutex_group = NULL; + } + } + + return mutex_group; +} + +template +/*static */void dxtemplateMutexGroup::FreeInstance(dxtemplateMutexGroup *mutex_group) +{ + if (mutex_group != NULL) + { + dmutexindex_t Mutex_count = mutex_group->m_un.m_mutex_count; + mutex_group->FinalizeMutexArray(Mutex_count); + + const sizeint anyting_not_zero = 2 * sizeof(sizeint); + const dxtemplateMutexGroup *const dummy_group = (dxtemplateMutexGroup *)anyting_not_zero; + const sizeint size_requited = ((sizeint)(&dummy_group->m_Mutex_array) - (sizeint)dummy_group) + Mutex_count * sizeof(tThreadMutex); + dFree((void *)mutex_group, size_requited); + } +} + +template +bool dxtemplateMutexGroup::InitializeMutexArray(dmutexindex_t Mutex_count) +{ + bool any_fault = false; + + dmutexindex_t mutex_index = 0; + for (; mutex_index != Mutex_count; ++mutex_index) + { + tThreadMutex *mutex_storage = m_Mutex_array + mutex_index; + + new(mutex_storage) tThreadMutex; + + if (!mutex_storage->InitializeObject()) + { + mutex_storage->tThreadMutex::~tThreadMutex(); + + any_fault = true; + break; + } + } + + if (any_fault) + { + FinalizeMutexArray(mutex_index); + } + + bool init_result = !any_fault; + return init_result; +} + +template +void dxtemplateMutexGroup::FinalizeMutexArray(dmutexindex_t Mutex_count) +{ + for (dmutexindex_t mutex_index = 0; mutex_index != Mutex_count; ++mutex_index) + { + tThreadMutex *mutex_storage = m_Mutex_array + mutex_index; + + mutex_storage->tThreadMutex::~tThreadMutex(); + } +} + +/************************************************************************/ +/* Implementation of dxtemplateJobListContainer */ +/************************************************************************/ + +template +dxThreadedJobInfo *dxtemplateJobListContainer::ReleaseAJobAndPickNextPendingOne( + dxThreadedJobInfo *job_to_release, bool job_result, dWaitSignallingFunction *wait_signal_proc_ptr, bool &out_last_job_flag) +{ + if (job_to_release != NULL) + { + ReleaseAJob(job_to_release, job_result, wait_signal_proc_ptr); + } + + dxMutexLockHelper list_access(m_list_access_lock); + + dxThreadedJobInfo *picked_job = PickNextPendingJob(out_last_job_flag); + return picked_job; +} + +template +dxThreadedJobInfo *dxtemplateJobListContainer::PickNextPendingJob( + bool &out_last_job_flag) +{ + dxThreadedJobInfo *current_job = m_job_list; + bool last_job_flag = false; + + while (current_job != NULL) + { + if (current_job->m_dependencies_count == 0) + { + // It is OK to assign in unsafe manner - dependencies count should not be changed + // after the job has become ready for execution + current_job->m_dependencies_count = 1; + last_job_flag = current_job->m_next_job == NULL; + + RemoveJobInfoFromList(current_job); + break; + } + + current_job = current_job->m_next_job; + } + + out_last_job_flag = last_job_flag; + return current_job; +} + +template +void dxtemplateJobListContainer::ReleaseAJob( + dxThreadedJobInfo *job_instance, bool job_result, dWaitSignallingFunction *wait_signal_proc_ptr) +{ + dxThreadedJobInfo *current_job = job_instance; + + if (!job_result) + { + // Accumulate call fault (be careful to not reset it!!!) + current_job->m_call_fault = 1; + } + + bool job_dequeued = true; + dIASSERT(current_job->m_prev_job_next_ptr == NULL); + + while (true) + { + dIASSERT(current_job->m_dependencies_count != 0); + + ddependencycount_t new_dependencies_count = SmartAddJobDependenciesCount(current_job, -1); + + if (new_dependencies_count != 0 || !job_dequeued) + { + break; + } + + void *job_call_wait = current_job->m_call_wait; + + if (job_call_wait != NULL) + { + wait_signal_proc_ptr(job_call_wait); + } + + int call_fault = current_job->m_call_fault; + + if (current_job->m_fault_accumulator_ptr) + { + *current_job->m_fault_accumulator_ptr = call_fault; + } + + dxThreadedJobInfo *dependent_job = current_job->m_dependent_job; + ReleaseJobInfoIntoPool(current_job); + + if (dependent_job == NULL) + { + break; + } + + if (call_fault) + { + // Accumulate call fault (be careful to not reset it!!!) + dependent_job->m_call_fault = 1; + } + + current_job = dependent_job; + job_dequeued = dependent_job->m_prev_job_next_ptr == NULL; + } +} + +template +dxThreadedJobInfo *dxtemplateJobListContainer::AllocateJobInfoFromPool() +{ + // No locking is necessary + dxThreadedJobInfo *job_instance = ExtractJobInfoFromPoolOrAllocate(); + return job_instance; +} + +template +void dxtemplateJobListContainer::QueueJobForProcessing(dxThreadedJobInfo *job_instance) +{ + dxMutexLockHelper list_access(m_list_access_lock); + + InsertJobInfoIntoListHead(job_instance); +} + + +template +void dxtemplateJobListContainer::AlterJobProcessingDependencies(dxThreadedJobInfo *job_instance, ddependencychange_t dependencies_count_change, + bool &out_job_has_become_ready) +{ + // Dependencies should not be changed when job has already become ready for execution + dIASSERT(job_instance->m_dependencies_count != 0); + // It's OK that access is not atomic - that is to be handled by external logic + dIASSERT(dependencies_count_change < 0 ? (job_instance->m_dependencies_count >= (ddependencycount_t)(-dependencies_count_change)) : ((ddependencycount_t)(-(ddependencychange_t)job_instance->m_dependencies_count) > (ddependencycount_t)dependencies_count_change)); + + ddependencycount_t new_dependencies_count = SmartAddJobDependenciesCount(job_instance, dependencies_count_change); + out_job_has_become_ready = new_dependencies_count == 0; +} + + +template +ddependencycount_t dxtemplateJobListContainer::SmartAddJobDependenciesCount( + dxThreadedJobInfo *job_instance, ddependencychange_t dependencies_count_change) +{ + ddependencycount_t new_dependencies_count = tAtomicsProvider::template AddValueToTarget((volatile void *)&job_instance->m_dependencies_count, dependencies_count_change) + dependencies_count_change; + return new_dependencies_count; +} + + +template +void dxtemplateJobListContainer::InsertJobInfoIntoListHead( + dxThreadedJobInfo *job_instance) +{ + dxThreadedJobInfo *job_list_head = m_job_list; + job_instance->m_next_job = job_list_head; + + if (job_list_head != NULL) + { + job_list_head->m_prev_job_next_ptr = &job_instance->m_next_job; + } + + job_instance->m_prev_job_next_ptr = &m_job_list; + m_job_list = job_instance; +} + +template +void dxtemplateJobListContainer::RemoveJobInfoFromList( + dxThreadedJobInfo *job_instance) +{ + if (job_instance->m_next_job) + { + job_instance->m_next_job->m_prev_job_next_ptr = job_instance->m_prev_job_next_ptr; + } + + *job_instance->m_prev_job_next_ptr = job_instance->m_next_job; + // Assign NULL to m_prev_job_next_ptr as an indicator that instance has been dequeued + job_instance->m_prev_job_next_ptr = NULL; +} + +template +dxThreadedJobInfo *dxtemplateJobListContainer::ExtractJobInfoFromPoolOrAllocate() +{ + dxThreadedJobInfo *result_info; + + bool waited_lull = false; + m_info_wait_lull.RegisterToLull(); + + while (true) + { + dxThreadedJobInfo *raw_head_info = (dxThreadedJobInfo *)m_info_pool; + + if (raw_head_info == NULL) + { + result_info = new dxThreadedJobInfo(); + + if (result_info != NULL) + { + break; + } + + m_info_wait_lull.WaitForLullAlarm(); + waited_lull = true; + } + + // Extraction must be locked so that other thread does not "steal" head info, + // use it and then reinsert back with a different "next" + dxMutexLockHelper pool_access(m_pool_access_lock); + + dxThreadedJobInfo *head_info = (dxThreadedJobInfo *)m_info_pool; // Head info must be re-read after mutex had been locked + + if (head_info != NULL) + { + dxThreadedJobInfo *next_info = head_info->m_next_job; + if (tAtomicsProvider::CompareExchangeTargetPtr(&m_info_pool, (atomicptr_t)head_info, (atomicptr_t)next_info)) + { + result_info = head_info; + break; + } + } + } + + m_info_wait_lull.UnregisterFromLull(); + + if (waited_lull) + { + // It is necessary to re-signal lull alarm if current thread was waiting as + // there might be other threads waiting which might have not received alarm signal. + m_info_wait_lull.SignalLullAlarmIfAnyRegistrants(); + } + + return result_info; +} + +template +void dxtemplateJobListContainer::ReleaseJobInfoIntoPool( + dxThreadedJobInfo *job_instance) +{ + while (true) + { + dxThreadedJobInfo *next_info = (dxThreadedJobInfo *)m_info_pool; + job_instance->m_next_job = next_info; + + if (tAtomicsProvider::CompareExchangeTargetPtr(&m_info_pool, (atomicptr_t)next_info, (atomicptr_t)job_instance)) + { + break; + } + } + + m_info_wait_lull.SignalLullAlarmIfAnyRegistrants(); +} + +template +void dxtemplateJobListContainer::FreeJobInfoPoolInfos() +{ + dxThreadedJobInfo *current_info = (dxThreadedJobInfo *)m_info_pool; + + while (current_info != NULL) + { + dxThreadedJobInfo *info_save = current_info; + current_info = current_info->m_next_job; + + delete info_save; + } + + m_info_pool = (atomicptr_t)NULL; +} + +template +bool dxtemplateJobListContainer::EnsureNumberOfJobInfosIsPreallocated(ddependencycount_t required_info_count) +{ + bool result = required_info_count <= m_info_count_known_to_be_preallocated + || DoPreallocateJobInfos(required_info_count); + return result; +} + +template +bool dxtemplateJobListContainer::DoPreallocateJobInfos(ddependencycount_t required_info_count) +{ + dIASSERT(required_info_count > m_info_count_known_to_be_preallocated); // Also ensures required_info_count > 0 + + bool allocation_failure = false; + + dxThreadedJobInfo *info_pool = (dxThreadedJobInfo *)m_info_pool; + + ddependencycount_t info_index = 0; + for (dxThreadedJobInfo **current_info_ptr = &info_pool; ; ) + { + dxThreadedJobInfo *current_info = *current_info_ptr; + + if (current_info == NULL) + { + current_info = new dxThreadedJobInfo(NULL); + + if (current_info == NULL) + { + allocation_failure = true; + break; + } + + *current_info_ptr = current_info; + } + + if (++info_index == required_info_count) + { + m_info_count_known_to_be_preallocated = info_index; + break; + } + + current_info_ptr = ¤t_info->m_next_job; + } + + // Make sure m_info_pool was not changed + dIASSERT(m_info_pool == NULL || m_info_pool == (atomicptr_t)info_pool); + + m_info_pool = (atomicptr_t)info_pool; + + bool result = !allocation_failure; + return result; +} + + +#if dBUILTIN_THREADING_IMPL_ENABLED + +/************************************************************************/ +/* Implementation of dxtemplateJobListThreadedHandler */ +/************************************************************************/ + +template +void dxtemplateJobListThreadedHandler::ProcessActiveJobAddition() +{ + ActivateAnIdleThread(); +} + +template +void dxtemplateJobListThreadedHandler::PrepareForWaitingAJobCompletion() +{ + // Do nothing +} + +template +unsigned dxtemplateJobListThreadedHandler::RetrieveActiveThreadsCount() +{ + return GetActiveThreadsCount(); +} + +template +void dxtemplateJobListThreadedHandler::StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/) +{ + RegisterAsActiveThread(); + + if (readiness_callback != NULL) + { + (*readiness_callback)(callback_context); + } + + PerformJobProcessingUntilShutdown(); + + UnregisterAsActiveThread(); +} + + +template +void dxtemplateJobListThreadedHandler::PerformJobProcessingUntilShutdown() +{ + while (true) + { + // It is expected that new jobs will not be queued any longer after shutdown had been requested + if (IsShutdownRequested() && m_job_list_ptr->IsJobListReadyForShutdown()) + { + break; + } + + PerformJobProcessingSession(); + + // It is expected that new jobs will not be queued any longer after shutdown had been requested + if (IsShutdownRequested() && m_job_list_ptr->IsJobListReadyForShutdown()) + { + break; + } + + BlockAsIdleThread(); + } +} + +template +void dxtemplateJobListThreadedHandler::PerformJobProcessingSession() +{ + dxThreadedJobInfo *current_job = NULL; + bool job_result = false; + + while (true) + { + bool last_job_flag; + current_job = m_job_list_ptr->ReleaseAJobAndPickNextPendingOne(current_job, job_result, &dxCallWait::AbstractSignalTheWait, last_job_flag); + + if (!current_job) + { + break; + } + + if (!last_job_flag) + { + ActivateAnIdleThread(); + } + + job_result = current_job->InvokeCallFunction(); + } +} + + +template +void dxtemplateJobListThreadedHandler::BlockAsIdleThread() +{ + m_processing_wakeup.WaitWakeup(NULL); +} + +template +void dxtemplateJobListThreadedHandler::ActivateAnIdleThread() +{ + m_processing_wakeup.WakeupAThread(); +} + + +template +void dxtemplateJobListThreadedHandler::ShutdownProcessing() +{ + m_shutdown_requested = true; + m_processing_wakeup.WakeupAllThreads(); +} + +template +void dxtemplateJobListThreadedHandler::CleanupForRestart() +{ + m_shutdown_requested = false; + m_processing_wakeup.ResetWakeup(); +} + + +#endif // #if dBUILTIN_THREADING_IMPL_ENABLED + + +/************************************************************************/ +/* Implementation of dxtemplateJobListSelfHandler */ +/************************************************************************/ + +template +void dxtemplateJobListSelfHandler::ProcessActiveJobAddition() +{ + // Do nothing +} + +template +void dxtemplateJobListSelfHandler::PrepareForWaitingAJobCompletion() +{ + PerformJobProcessingUntilExhaustion(); +} + + +template +unsigned dxtemplateJobListSelfHandler::RetrieveActiveThreadsCount() +{ + return 0U; // Return zero to indicate that there are no actual active threads provided. +} + +template +void dxtemplateJobListSelfHandler::StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/) +{ + (void)readiness_callback; // unused + (void)callback_context; // unused + dIASSERT(false); // This method is not expected to be called for Self-Handler +} + + +template +void dxtemplateJobListSelfHandler::PerformJobProcessingUntilExhaustion() +{ + PerformJobProcessingSession(); +} + +template +void dxtemplateJobListSelfHandler::PerformJobProcessingSession() +{ + dxThreadedJobInfo *current_job = NULL; + bool job_result = false; + + while (true) + { + bool dummy_last_job_flag; + current_job = m_job_list_ptr->ReleaseAJobAndPickNextPendingOne(current_job, job_result, &dxCallWait::AbstractSignalTheWait, dummy_last_job_flag); + + if (!current_job) + { + break; + } + + job_result = current_job->InvokeCallFunction(); + } +} + +template +void dxtemplateJobListSelfHandler::ShutdownProcessing() +{ + // Do nothing +} + +template +void dxtemplateJobListSelfHandler::CleanupForRestart() +{ + // Do nothing +} + + +/************************************************************************/ +/* Implementation of dxtemplateThreadingImplementation */ +/************************************************************************/ + +template +void dxtemplateThreadingImplementation::FreeInstance() +{ + delete this; +} + + +template +dIMutexGroup *dxtemplateThreadingImplementation::AllocMutexGroup(dmutexindex_t Mutex_count) +{ + dxMutexGroup *mutex_group = dxMutexGroup::AllocateInstance(Mutex_count); + return (dIMutexGroup *)mutex_group; +} + +template +void dxtemplateThreadingImplementation::FreeMutexGroup(dIMutexGroup *mutex_group) +{ + dxMutexGroup::FreeInstance((dxMutexGroup *)mutex_group); +} + +template +void dxtemplateThreadingImplementation::LockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index) +{ + ((dxMutexGroup *)mutex_group)->LockMutex(mutex_index); +} + +// template +// bool dxtemplateThreadingImplementation::TryLockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index) +// { +// return ((dxMutexGroup *)mutex_group)->TryLockMutex(mutex_index); +// } + +template +void dxtemplateThreadingImplementation::UnlockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index) +{ + ((dxMutexGroup *)mutex_group)->UnlockMutex(mutex_index); +} + + +template +dxICallWait *dxtemplateThreadingImplementation::AllocACallWait() +{ + dxCallWait *call_wait = new dxCallWait(); + + if (call_wait != NULL && !call_wait->InitializeObject()) + { + delete call_wait; + call_wait = NULL; + } + + return (dxICallWait *)call_wait; +} + +template +void dxtemplateThreadingImplementation::ResetACallWait(dxICallWait *call_wait) +{ + ((dxCallWait *)call_wait)->ResetTheWait(); +} + +template +void dxtemplateThreadingImplementation::FreeACallWait(dxICallWait *call_wait) +{ + delete ((dxCallWait *)call_wait); +} + + +template +bool dxtemplateThreadingImplementation::PreallocateJobInfos(ddependencycount_t max_simultaneous_calls_estimate) +{ + // No multithreading protection here! + // Resources are to be preallocated before jobs start to be scheduled + // as otherwise there is no way to implement the preallocation. + bool result = m_list_container.EnsureNumberOfJobInfosIsPreallocated(max_simultaneous_calls_estimate); + return result; +} + +template +void dxtemplateThreadingImplementation::ScheduleNewJob( + int *fault_accumulator_ptr/*=NULL*/, + dCallReleaseeID *out_post_releasee_ptr/*=NULL*/, ddependencycount_t dependencies_count, dCallReleaseeID dependent_releasee/*=NULL*/, + dxICallWait *call_wait/*=NULL*/, + dThreadedCallFunction *call_func, void *call_context, dcallindex_t instance_index) +{ + dxThreadedJobInfo *new_job = m_list_container.AllocateJobInfoFromPool(); + dIASSERT(new_job != NULL); + + new_job->AssignJobData(dependencies_count, dMAKE_RELEASEE_JOBINSTANCE(dependent_releasee), (dxCallWait *)call_wait, fault_accumulator_ptr, call_func, call_context, instance_index); + + if (out_post_releasee_ptr != NULL) + { + *out_post_releasee_ptr = dMAKE_JOBINSTANCE_RELEASEE(new_job); + } + + m_list_container.QueueJobForProcessing(new_job); + + if (dependencies_count == 0) + { + m_list_handler.ProcessActiveJobAddition(); + } +} + +template +void dxtemplateThreadingImplementation::AlterJobDependenciesCount( + dCallReleaseeID target_releasee, ddependencychange_t dependencies_count_change) +{ + dIASSERT(dependencies_count_change != 0); + + dxThreadedJobInfo *job_instance = dMAKE_RELEASEE_JOBINSTANCE(target_releasee); + + bool job_has_become_ready; + m_list_container.AlterJobProcessingDependencies(job_instance, dependencies_count_change, job_has_become_ready); + + if (job_has_become_ready) + { + m_list_handler.ProcessActiveJobAddition(); + } +} + +template +void dxtemplateThreadingImplementation::WaitJobCompletion( + int *out_wait_status_ptr/*=NULL*/, + dxICallWait *call_wait, const dThreadedWaitTime *timeout_time_ptr/*=NULL*/) +{ + dIASSERT(call_wait != NULL); + + m_list_handler.PrepareForWaitingAJobCompletion(); + + bool wait_status = ((dxCallWait *)call_wait)->PerformWaiting(timeout_time_ptr); + dIASSERT(timeout_time_ptr != NULL || wait_status); + + if (out_wait_status_ptr) + { + *out_wait_status_ptr = wait_status; + } +} + + +template +unsigned dxtemplateThreadingImplementation::RetrieveActiveThreadsCount() +{ + return m_list_handler.RetrieveActiveThreadsCount(); +} + +template +void dxtemplateThreadingImplementation::StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/) +{ + m_list_handler.StickToJobsProcessing(readiness_callback, callback_context); +} + +template +void dxtemplateThreadingImplementation::ShutdownProcessing() +{ + m_list_handler.ShutdownProcessing(); +} + +template +void dxtemplateThreadingImplementation::CleanupForRestart() +{ + m_list_handler.CleanupForRestart(); +} + + +#endif // #ifndef _ODE_THREADING_IMPL_TEMPLATES_H_ -- cgit v1.2.1