diff --git a/source/core/ITask.h b/source/core/ITask.h index cc860f315a62a104b1f8ebe3cf09949e08489c34..2de1e95a78c72365011fce9e8a3fffef2ea8f9aa 100644 --- a/source/core/ITask.h +++ b/source/core/ITask.h @@ -21,6 +21,9 @@ public: virtual void stopRepeating() = 0; virtual UINT getFlags() = 0; + + // in microseconds + virtual uint64_t getRepeatInterval() = 0; }; template<class T> @@ -42,6 +45,11 @@ public: return(m_uFlags); } + uint64_t getRepeatInterval() override + { + return(0); + } + private: UINT m_uFlags; }; diff --git a/source/core/TaskManager.cpp b/source/core/TaskManager.cpp index de83a56655da5a128829a6bd5109128be0d60fde..0581fac28f72fd17da7a92dd1620f1eb46d3df8e 100644 --- a/source/core/TaskManager.cpp +++ b/source/core/TaskManager.cpp @@ -112,6 +112,17 @@ void CTaskManager::forceSinglethreaded() m_iNumThreads = 1; } +void CTaskManager::addTask(TaskPtr task, std::chrono::steady_clock::time_point tpWhen) +{ + { + ScopedSpinLock lock(m_slShedulerArray); + m_aScheduled.insert({task, tpWhen}, [](const ScheduledTask &a, const ScheduledTask &b){ + return(a.tpRunAt > b.tpRunAt); + }); + } + m_ConditionSchedulerThread.notify_one(); +} + void CTaskManager::addTask(TaskPtr task) { unsigned int flags = task->getFlags(); @@ -175,11 +186,16 @@ void CTaskManager::start() #endif m_aThreads.push_back(t); } - m_pIOThread = new std::thread(std::bind(&CTaskManager::workerIOMain, this)); + m_pIOThread = new std::thread(std::bind(&CTaskManager::workerIOMain, this)); #if defined(_WINDOWS) sprintf(name, "Worker IO"); SetThreadName(m_pIOThread->native_handle(), name); #endif + m_pShedulerThread = new std::thread(std::bind(&CTaskManager::schedulerMain, this)); +#if defined(_WINDOWS) + sprintf(name, "Scheduler"); + SetThreadName(m_pShedulerThread->native_handle(), name); +#endif sheduleNextBunch(); @@ -268,6 +284,7 @@ void CTaskManager::stop() m_isRunning = false; m_ConditionIOThread.notify_all(); m_ConditionWorker.notify_all(); + m_ConditionSchedulerThread.notify_all(); ITask *pStubTask = new CTaskStub(); for(int i = 0, l = m_aThreads.size() - 1; i < l; ++i) @@ -290,17 +307,30 @@ void CTaskManager::stop() m_pIOThread->join(); mem_delete(m_pIOThread); } + if(m_pShedulerThread) + { + m_pShedulerThread->join(); + mem_delete(m_pShedulerThread); + } } void CTaskManager::execute(TaskPtr t) { - // save time task started + std::chrono::steady_clock::time_point tpStarted(std::chrono::steady_clock::now()); t->run(); - // save time task ended + // std::chrono::steady_clock::time_point tpFinished(std::chrono::steady_clock::now()); if(t->getFlags() & CORE_TASK_FLAG_REPEATING) { - addTask(t); + uint64_t uRepeatInterval = t->getRepeatInterval(); + if(uRepeatInterval) + { + addTask(t, std::chrono::steady_clock::time_point(tpStarted + std::chrono::microseconds(uRepeatInterval))); + } + else + { + addTask(t); + } } else { @@ -320,6 +350,47 @@ void CTaskManager::workerIOMain() workerIO(); } +void CTaskManager::schedulerMain() +{ + srand((UINT)time(0)); + //TaskPtr task; + + ScheduledTask *pScheduledTask = NULL; + std::chrono::steady_clock::time_point tpNow, tpWaitUntil; + while(m_isRunning) + { + tpNow = tpWaitUntil = std::chrono::steady_clock::now(); + { + ScopedSpinLock lock(m_slShedulerArray); + for(UINT i = 0, l = m_aScheduled.size(); i < l; ++i) + { + pScheduledTask = &m_aScheduled[i]; + + if(pScheduledTask->tpRunAt <= tpNow) + { + addTask(pScheduledTask->pTask); + m_aScheduled.erase(i); + --i; --l; + } + else + { + tpWaitUntil = pScheduledTask->tpRunAt; + break; + } + } + } + std::unique_lock<std::mutex> lock(m_mutexSchedulerThread); + if(tpWaitUntil != tpNow) + { + m_ConditionSchedulerThread.wait_until(lock, tpWaitUntil); + } + else + { + m_ConditionSchedulerThread.wait(lock); + } + } +} + void CTaskManager::worker(bool bOneRun) { TaskPtr task; diff --git a/source/core/TaskManager.h b/source/core/TaskManager.h index a94efdda85429deeabcf782b4545772a7fc05628..1f4e98d378290581f2831c4ed26942ba0cb64943 100644 --- a/source/core/TaskManager.h +++ b/source/core/TaskManager.h @@ -11,6 +11,7 @@ See the license in LICENSE #include <thread> #include <algorithm> #include <atomic> +#include <chrono> #include "task.h" #include "common/ConcurrentQueue.h" #include <common/array.h> @@ -40,6 +41,7 @@ public: ~CTaskManager(); void addTask(TaskPtr task); //< Добавляет задачу в планировщик + void addTask(TaskPtr task, std::chrono::steady_clock::time_point tpWhen); //< Добавляет задачу в планировщик void addTaskIO(TaskPtr task); //< Добавляет задачу ввода/вывода void add(THREAD_UPDATE_FUNCTION fnFunc, DWORD dwFlag = CORE_TASK_FLAG_MAINTHREAD_REPEATING); //< Добавляет задачу в планировщик @@ -60,6 +62,7 @@ public: private: void workerMain(); void workerIOMain(); + void schedulerMain(); void worker(bool bOneRun); void workerIO(); void execute(TaskPtr task); @@ -69,6 +72,7 @@ private: Array<std::thread*> m_aThreads; std::thread* m_pIOThread; + std::thread* m_pShedulerThread; unsigned int m_iNumThreads; volatile bool m_isRunning = false; @@ -88,8 +92,11 @@ private: mutable mutex m_mutexSync; mutable mutex m_mutexFor; mutable mutex m_mutexIOThread; + mutable mutex m_mutexSchedulerThread; + mutable SpinLock m_slShedulerArray; Condition m_Condition; Condition m_ConditionIOThread; + Condition m_ConditionSchedulerThread; mutable SpinLock m_mutexWorker; Condition m_ConditionWorker; Condition m_ConditionFor; @@ -98,6 +105,13 @@ private: bool m_isSingleThreaded = false; Array<int> m_aiNumWaitFor; + + struct ScheduledTask + { + TaskPtr pTask; + std::chrono::steady_clock::time_point tpRunAt; + }; + Array<ScheduledTask> m_aScheduled; }; #endif