LMMS
Loading...
Searching...
No Matches
juce_ThreadPool.cpp
Go to the documentation of this file.
1/*
2 ==============================================================================
3
4 This file is part of the JUCE library.
5 Copyright (c) 2022 - Raw Material Software Limited
6
7 JUCE is an open source library subject to commercial or open-source
8 licensing.
9
10 The code included in this file is provided under the terms of the ISC license
11 http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12 To use, copy, modify, and/or distribute this software for any purpose with or
13 without fee is hereby granted provided that the above copyright notice and
14 this permission notice appear in all copies.
15
16 JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17 EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18 DISCLAIMED.
19
20 ==============================================================================
21*/
22
23namespace juce
24{
25
27{
29 : Thread ("Pool", stackSize), pool (p)
30 {
31 }
32
33 void run() override
34 {
35 while (! threadShouldExit())
36 if (! pool.runNextJob (*this))
37 wait (500);
38 }
39
40 std::atomic<ThreadPoolJob*> currentJob { nullptr };
42
44};
45
46//==============================================================================
50
52{
53 // you mustn't delete a job while it's still in a pool! Use ThreadPool::removeJob()
54 // to remove it first!
55 jassert (pool == nullptr || ! pool->contains (this));
56}
57
59{
60 return jobName;
61}
62
63void ThreadPoolJob::setJobName (const String& newName)
64{
65 jobName = newName;
66}
67
69{
70 shouldStop = true;
71 listeners.call ([] (Thread::Listener& l) { l.exitSignalSent(); });
72}
73
75{
76 listeners.add (listener);
77}
78
80{
81 listeners.remove (listener);
82}
83
85{
86 if (auto* t = dynamic_cast<ThreadPool::ThreadPoolThread*> (Thread::getCurrentThread()))
87 return t->currentJob.load();
88
89 return nullptr;
90}
91
92//==============================================================================
93ThreadPool::ThreadPool (int numThreads, size_t threadStackSize)
94{
95 jassert (numThreads > 0); // not much point having a pool without any threads!
96
97 createThreads (numThreads, threadStackSize);
98}
99
104
106{
107 removeAllJobs (true, 5000);
108 stopThreads();
109}
110
111void ThreadPool::createThreads (int numThreads, size_t threadStackSize)
112{
113 for (int i = jmax (1, numThreads); --i >= 0;)
114 threads.add (new ThreadPoolThread (*this, threadStackSize));
115
116 for (auto* t : threads)
117 t->startThread();
118}
119
121{
122 for (auto* t : threads)
123 t->signalThreadShouldExit();
124
125 for (auto* t : threads)
126 t->stopThread (500);
127}
128
129void ThreadPool::addJob (ThreadPoolJob* job, bool deleteJobWhenFinished)
130{
131 jassert (job != nullptr);
132 jassert (job->pool == nullptr);
133
134 if (job->pool == nullptr)
135 {
136 job->pool = this;
137 job->shouldStop = false;
138 job->isActive = false;
139 job->shouldBeDeleted = deleteJobWhenFinished;
140
141 {
142 const ScopedLock sl (lock);
143 jobs.add (job);
144 }
145
146 for (auto* t : threads)
147 t->notify();
148 }
149}
150
151void ThreadPool::addJob (std::function<ThreadPoolJob::JobStatus()> jobToRun)
152{
153 struct LambdaJobWrapper : public ThreadPoolJob
154 {
155 LambdaJobWrapper (std::function<ThreadPoolJob::JobStatus()> j) : ThreadPoolJob ("lambda"), job (j) {}
156 JobStatus runJob() override { return job(); }
157
158 std::function<ThreadPoolJob::JobStatus()> job;
159 };
160
161 addJob (new LambdaJobWrapper (jobToRun), true);
162}
163
164void ThreadPool::addJob (std::function<void()> jobToRun)
165{
166 struct LambdaJobWrapper : public ThreadPoolJob
167 {
168 LambdaJobWrapper (std::function<void()> j) : ThreadPoolJob ("lambda"), job (j) {}
169 JobStatus runJob() override { job(); return ThreadPoolJob::jobHasFinished; }
170
171 std::function<void()> job;
172 };
173
174 addJob (new LambdaJobWrapper (jobToRun), true);
175}
176
178{
179 const ScopedLock sl (lock);
180 return jobs.size();
181}
182
184{
185 return threads.size();
186}
187
188ThreadPoolJob* ThreadPool::getJob (int index) const noexcept
189{
190 const ScopedLock sl (lock);
191 return jobs [index];
192}
193
194bool ThreadPool::contains (const ThreadPoolJob* job) const noexcept
195{
196 const ScopedLock sl (lock);
197 return jobs.contains (const_cast<ThreadPoolJob*> (job));
198}
199
200bool ThreadPool::isJobRunning (const ThreadPoolJob* job) const noexcept
201{
202 const ScopedLock sl (lock);
203 return jobs.contains (const_cast<ThreadPoolJob*> (job)) && job->isActive;
204}
205
207{
208 const ScopedLock sl (lock);
209
210 auto index = jobs.indexOf (const_cast<ThreadPoolJob*> (job));
211
212 if (index > 0 && ! job->isActive)
213 jobs.move (index, 0);
214}
215
216bool ThreadPool::waitForJobToFinish (const ThreadPoolJob* job, int timeOutMs) const
217{
218 if (job != nullptr)
219 {
221
222 while (contains (job))
223 {
224 if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
225 return false;
226
227 jobFinishedSignal.wait (2);
228 }
229 }
230
231 return true;
232}
233
234bool ThreadPool::removeJob (ThreadPoolJob* job, bool interruptIfRunning, int timeOutMs)
235{
236 bool dontWait = true;
237 OwnedArray<ThreadPoolJob> deletionList;
238
239 if (job != nullptr)
240 {
241 const ScopedLock sl (lock);
242
243 if (jobs.contains (job))
244 {
245 if (job->isActive)
246 {
247 if (interruptIfRunning)
248 job->signalJobShouldExit();
249
250 dontWait = false;
251 }
252 else
253 {
254 jobs.removeFirstMatchingValue (job);
255 addToDeleteList (deletionList, job);
256 }
257 }
258 }
259
260 return dontWait || waitForJobToFinish (job, timeOutMs);
261}
262
263bool ThreadPool::removeAllJobs (bool interruptRunningJobs, int timeOutMs,
264 ThreadPool::JobSelector* selectedJobsToRemove)
265{
266 Array<ThreadPoolJob*> jobsToWaitFor;
267
268 {
269 OwnedArray<ThreadPoolJob> deletionList;
270
271 {
272 const ScopedLock sl (lock);
273
274 for (int i = jobs.size(); --i >= 0;)
275 {
276 auto* job = jobs.getUnchecked(i);
277
278 if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job))
279 {
280 if (job->isActive)
281 {
282 jobsToWaitFor.add (job);
283
284 if (interruptRunningJobs)
285 job->signalJobShouldExit();
286 }
287 else
288 {
289 jobs.remove (i);
290 addToDeleteList (deletionList, job);
291 }
292 }
293 }
294 }
295 }
296
298
299 for (;;)
300 {
301 for (int i = jobsToWaitFor.size(); --i >= 0;)
302 {
303 auto* job = jobsToWaitFor.getUnchecked (i);
304
305 if (! isJobRunning (job))
306 jobsToWaitFor.remove (i);
307 }
308
309 if (jobsToWaitFor.size() == 0)
310 break;
311
312 if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
313 return false;
314
315 jobFinishedSignal.wait (20);
316 }
317
318 return true;
319}
320
321StringArray ThreadPool::getNamesOfAllJobs (bool onlyReturnActiveJobs) const
322{
324 const ScopedLock sl (lock);
325
326 for (auto* job : jobs)
327 if (job->isActive || ! onlyReturnActiveJobs)
328 s.add (job->getJobName());
329
330 return s;
331}
332
334{
335 bool ok = true;
336
337 for (auto* t : threads)
338 if (! t->setPriority (newPriority))
339 ok = false;
340
341 return ok;
342}
343
345{
346 OwnedArray<ThreadPoolJob> deletionList;
347
348 {
349 const ScopedLock sl (lock);
350
351 for (int i = 0; i < jobs.size(); ++i)
352 {
353 if (auto* job = jobs[i])
354 {
355 if (! job->isActive)
356 {
357 if (job->shouldStop)
358 {
359 jobs.remove (i);
360 addToDeleteList (deletionList, job);
361 --i;
362 continue;
363 }
364
365 job->isActive = true;
366 return job;
367 }
368 }
369 }
370 }
371
372 return nullptr;
373}
374
376{
377 if (auto* job = pickNextJobToRun())
378 {
380 thread.currentJob = job;
381
382 try
383 {
384 result = job->runJob();
385 }
386 catch (...)
387 {
388 jassertfalse; // Your runJob() method mustn't throw any exceptions!
389 }
390
391 thread.currentJob = nullptr;
392
393 OwnedArray<ThreadPoolJob> deletionList;
394
395 {
396 const ScopedLock sl (lock);
397
398 if (jobs.contains (job))
399 {
400 job->isActive = false;
401
402 if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop)
403 {
404 jobs.removeFirstMatchingValue (job);
405 addToDeleteList (deletionList, job);
406
407 jobFinishedSignal.signal();
408 }
409 else
410 {
411 // move the job to the end of the queue if it wants another go
412 jobs.move (jobs.indexOf (job), -1);
413 }
414 }
415 }
416
417 return true;
418 }
419
420 return false;
421}
422
424{
425 job->shouldStop = true;
426 job->pool = nullptr;
427
428 if (job->shouldBeDeleted)
429 deletionList.add (job);
430}
431
432} // namespace juce
#define noexcept
Definition DistrhoDefines.h:72
Definition juce_Array.h:56
ElementType getUnchecked(int index) const
Definition juce_Array.h:252
int size() const noexcept
Definition juce_Array.h:215
void remove(int indexToRemove)
Definition juce_Array.h:767
void add(const ElementType &newElement)
Definition juce_Array.h:418
Definition juce_OwnedArray.h:51
ObjectClass * add(ObjectClass *newObject)
Definition juce_OwnedArray.h:294
Definition juce_StringArray.h:35
Definition juce_String.h:53
static int getNumCpus() noexcept
Definition juce_SystemStats.cpp:104
Definition juce_Thread.h:181
static Thread *JUCE_CALLTYPE getCurrentThread()
Definition juce_Thread.cpp:160
bool wait(int timeOutMilliseconds) const
Definition juce_Thread.cpp:299
Thread(const String &threadName, size_t threadStackSize=0)
Definition juce_Thread.cpp:26
bool threadShouldExit() const
Definition juce_Thread.cpp:177
Definition juce_ThreadPool.h:192
virtual bool isJobSuitable(ThreadPoolJob *job)=0
void moveJobToFront(const ThreadPoolJob *jobToMove) noexcept
Definition juce_ThreadPool.cpp:206
void createThreads(int numThreads, size_t threadStackSize=0)
Definition juce_ThreadPool.cpp:111
void addToDeleteList(OwnedArray< ThreadPoolJob > &, ThreadPoolJob *) const
Definition juce_ThreadPool.cpp:423
void addJob(ThreadPoolJob *job, bool deleteJobWhenFinished)
Definition juce_ThreadPool.cpp:129
OwnedArray< ThreadPoolThread > threads
Definition juce_ThreadPool.h:325
friend class ThreadPoolJob
Definition juce_ThreadPool.h:324
ThreadPoolJob * pickNextJobToRun()
Definition juce_ThreadPool.cpp:344
void stopThreads()
Definition juce_ThreadPool.cpp:120
CriticalSection lock
Definition juce_ThreadPool.h:327
int getNumThreads() const noexcept
Definition juce_ThreadPool.cpp:183
ThreadPoolJob * getJob(int index) const noexcept
Definition juce_ThreadPool.cpp:188
int getNumJobs() const noexcept
Definition juce_ThreadPool.cpp:177
bool runNextJob(ThreadPoolThread &)
Definition juce_ThreadPool.cpp:375
bool removeAllJobs(bool interruptRunningJobs, int timeOutMilliseconds, JobSelector *selectedJobsToRemove=nullptr)
Definition juce_ThreadPool.cpp:263
WaitableEvent jobFinishedSignal
Definition juce_ThreadPool.h:328
ThreadPool(int numberOfThreads, size_t threadStackSize=0)
Definition juce_ThreadPool.cpp:93
~ThreadPool()
Definition juce_ThreadPool.cpp:105
StringArray getNamesOfAllJobs(bool onlyReturnActiveJobs) const
Definition juce_ThreadPool.cpp:321
bool isJobRunning(const ThreadPoolJob *job) const noexcept
Definition juce_ThreadPool.cpp:200
bool setThreadPriorities(int newPriority)
Definition juce_ThreadPool.cpp:333
bool removeJob(ThreadPoolJob *job, bool interruptIfRunning, int timeOutMilliseconds)
Definition juce_ThreadPool.cpp:234
bool contains(const ThreadPoolJob *job) const noexcept
Definition juce_ThreadPool.cpp:194
bool waitForJobToFinish(const ThreadPoolJob *job, int timeOutMilliseconds) const
Definition juce_ThreadPool.cpp:216
Array< ThreadPoolJob * > jobs
Definition juce_ThreadPool.h:321
void signalJobShouldExit()
Definition juce_ThreadPool.cpp:68
ListenerList< Thread::Listener, Array< Thread::Listener *, CriticalSection > > listeners
Definition juce_ThreadPool.h:138
std::atomic< bool > isActive
Definition juce_ThreadPool.h:137
std::atomic< bool > shouldBeDeleted
Definition juce_ThreadPool.h:137
std::atomic< bool > shouldStop
Definition juce_ThreadPool.h:137
JobStatus
Definition juce_ThreadPool.h:71
@ jobHasFinished
Definition juce_ThreadPool.h:72
@ jobNeedsRunningAgain
Definition juce_ThreadPool.h:75
ThreadPool * pool
Definition juce_ThreadPool.h:136
void setJobName(const String &newName)
Definition juce_ThreadPool.cpp:63
String getJobName() const
Definition juce_ThreadPool.cpp:58
void addListener(Thread::Listener *)
Definition juce_ThreadPool.cpp:74
virtual ~ThreadPoolJob()
Definition juce_ThreadPool.cpp:51
String jobName
Definition juce_ThreadPool.h:135
static ThreadPoolJob * getCurrentThreadPoolJob()
Definition juce_ThreadPool.cpp:84
void removeListener(Thread::Listener *)
Definition juce_ThreadPool.cpp:79
ThreadPoolJob(const String &name)
Definition juce_ThreadPool.cpp:47
static uint32 getMillisecondCounter() noexcept
Definition juce_Time.cpp:241
int * l
Definition inflate.c:1579
struct huft * t
Definition inflate.c:943
register unsigned j
Definition inflate.c:1576
register unsigned i
Definition inflate.c:1575
unsigned s
Definition inflate.c:1555
static const char * name
Definition pugl.h:1582
virtual ASIOError start()=0
#define jassert(expression)
#define JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR(className)
#define jassertfalse
Definition carla_juce.cpp:31
CriticalSection::ScopedLockType ScopedLock
Definition juce_CriticalSection.h:186
unsigned int uint32
Definition juce_MathsFunctions.h:45
constexpr Type jmax(Type a, Type b)
Definition juce_MathsFunctions.h:94
Definition juce_ThreadPool.cpp:27
std::atomic< ThreadPoolJob * > currentJob
Definition juce_ThreadPool.cpp:40
void run() override
Definition juce_ThreadPool.cpp:33
ThreadPool & pool
Definition juce_ThreadPool.cpp:41
ThreadPoolThread(ThreadPool &p, size_t stackSize)
Definition juce_ThreadPool.cpp:28
void DWORD stackSize
Definition swell-functions.h:807
uch * p
Definition crypt.c:594
int result
Definition process.c:1455
#define void
Definition unzip.h:396
#define const
Definition zconf.h:137