mirror of
https://github.com/Mercury-Language/mercury.git
synced 2026-04-16 01:43:35 +00:00
Use spaces, not tabs, in the Java and C# runtimes.
Add modelines to keep it that way. Fix formatting, and english in comments.
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
// vim: ts=4 sw=4 expandtab ft=java
|
||||
//
|
||||
// Copyright (C) 2014, 2016, 2018 The Mercury Team
|
||||
// This file is distributed under the terms specified in COPYING.LIB.
|
||||
@@ -16,30 +17,30 @@ import java.util.*;
|
||||
*
|
||||
* The pool attempts to reduce overheads, especially in embarrassingly
|
||||
* parallel workloads, by re-using threads and restricting the overall
|
||||
* number of threads. By default a thread is created for each hardware
|
||||
* thread available. If all threads are making progress then no new threads
|
||||
* will be created even if tasks are waiting, reducing overheads. But if
|
||||
* number of threads. By default a thread is created for each hardware
|
||||
* thread available. If all threads are making progress then no new threads
|
||||
* will be created even if tasks are waiting, reducing overheads. But if
|
||||
* one or more threads block on a barrier, channel, mvar or semaphore, then
|
||||
* new threads may be created to avoid deadlocks and attempt to keep all
|
||||
* processors busy.
|
||||
*
|
||||
* TODO: Currently the thread pool does not know when a thread is blocked in
|
||||
* foreign code or performing IO.
|
||||
* TODO: Currently the thread pool does not know when a thread is blocked
|
||||
* in foreign code or performing IO.
|
||||
*
|
||||
* Java's API provides several different thread pools, see
|
||||
* java.util.concurrent.Executors, none of which are suitable for our needs
|
||||
* so we have implemented our own in this class. Specifically the fixed
|
||||
* so we have implemented our own in this class. Specifically the fixed
|
||||
* thread pool is unsuitable as we want to be able to temporarily exceed the
|
||||
* normal number of threads to overcome deadlocks (and to keep processors
|
||||
* busy); and the cached thread pools, which are also very similar to the
|
||||
* ThreadPoolExecutor class, create threads (up to a maximum number) even if
|
||||
* all the processors are busy. Additionally we cannot instrument this code
|
||||
* all the processors are busy. Additionally we cannot instrument this code
|
||||
* as easily for parallel profiling.
|
||||
*
|
||||
* The JavaInternal class holds the reference to the thread pool. The pool
|
||||
* should be executed by calling run() from the primordial thread. run()
|
||||
* The JavaInternal class holds the reference to the thread pool. The pool
|
||||
* should be executed by calling run() from the primordial thread. run()
|
||||
* will exit once all the work has been completed; it will shutdown the
|
||||
* worker threads as it exits. The runMain() method provides a convenient
|
||||
* worker threads as it exits. The runMain() method provides a convenient
|
||||
* wrapper for run which also executes a task, such as the main/2 predicate
|
||||
* of the application.
|
||||
*/
|
||||
@@ -50,30 +51,23 @@ public class MercuryThreadPool
|
||||
|
||||
private MercuryThreadFactory thread_factory;
|
||||
|
||||
/*
|
||||
* Locking:
|
||||
* Rather than synchronize on the 'this' pointer we create explicit
|
||||
* locks. The numbers of threads and list of threads are protected by
|
||||
* the threads_lock lock. The queue of tasks is protected by the
|
||||
* tasks_lock. Separate condition variables are also used, This avoids
|
||||
* threads waiting on conditions to not be woken up by a different
|
||||
* condition, for example for example a thread can wait for the pool to
|
||||
* shutdown without being woken when new work arrives.
|
||||
*
|
||||
* Safety: If you acquire more than one lock you must acquire locks in
|
||||
* this order: tasks_lock, threads_lock then main_loop_lock. If
|
||||
* you wait on any condition you must only hold that condition's
|
||||
* lock.
|
||||
*/
|
||||
// Locking:
|
||||
// Rather than synchronize on the 'this' pointer we create explicit locks.
|
||||
// The numbers of threads and list of threads are protected by the
|
||||
// threads_lock lock. The queue of tasks is protected by the tasks_lock.
|
||||
// Separate condition variables are also used. This avoids threads waiting
|
||||
// on conditions to not be woken up by a different condition, for example
|
||||
// for example a thread can wait for the pool to shutdown without
|
||||
// being woken when new work arrives.
|
||||
//
|
||||
// Safety: If you acquire more than one lock you must acquire locks in
|
||||
// this order: tasks_lock, threads_lock then main_loop_lock. If you wait
|
||||
// on any condition, you must only hold that condition's lock.
|
||||
|
||||
/*
|
||||
* Worker threads
|
||||
*/
|
||||
// Worker threads.
|
||||
private int thread_pool_size;
|
||||
private int user_specified_size;
|
||||
/*
|
||||
* The sum of num_threads_* is the total number of threads in the pool
|
||||
*/
|
||||
// The sum of num_threads_* is the total number of threads in the pool.
|
||||
private int num_threads_working;
|
||||
private volatile int num_threads_waiting;
|
||||
private int num_threads_blocked;
|
||||
@@ -81,18 +75,16 @@ public class MercuryThreadPool
|
||||
private LinkedList<MercuryThread> threads;
|
||||
private Lock threads_lock;
|
||||
|
||||
/*
|
||||
* Tasks
|
||||
*/
|
||||
// Tasks.
|
||||
private Deque<Task> tasks;
|
||||
private long num_tasks_submitted;
|
||||
private long num_tasks_completed;
|
||||
private Lock tasks_lock;
|
||||
private Condition thread_wait_for_task_condition;
|
||||
|
||||
// Has a shutdown request been received (protected by tasks_lock)
|
||||
// Has a shutdown request been received (protected by tasks_lock)?
|
||||
private boolean shutdown_request;
|
||||
// Shutdown because the program is aborting (if true then don't run
|
||||
// Shutdown because the program is aborting (if true, then don't run
|
||||
// finalisers).
|
||||
private boolean shutdown_abort;
|
||||
// True if worker threads should exit (the pool is shutting down).
|
||||
@@ -101,9 +93,7 @@ public class MercuryThreadPool
|
||||
// shutting down).
|
||||
private boolean running;
|
||||
|
||||
/*
|
||||
* Main loop condition.
|
||||
*/
|
||||
// Main loop condition.
|
||||
private Lock main_loop_lock;
|
||||
private Condition main_loop_condition;
|
||||
|
||||
@@ -123,12 +113,10 @@ public class MercuryThreadPool
|
||||
threads = new LinkedList<MercuryThread>();
|
||||
threads_lock = new ReentrantLock();
|
||||
|
||||
/*
|
||||
* The task queue is four times longer than the number of threads.
|
||||
* This decision is arbitrary and should be revised after doing some
|
||||
* benchmarking. This capacity is just the initial size. The
|
||||
* ArrayDeque task will grow as needed.
|
||||
*/
|
||||
// The task queue is four times longer than the number of threads.
|
||||
// This decision is arbitrary and should be revised after doing some
|
||||
// benchmarking. This capacity is just the initial size.
|
||||
// The ArrayDeque task will grow as needed.
|
||||
tasks = new ArrayDeque<Task>(size*4);
|
||||
shutdown_request = false;
|
||||
shutdown_abort = false;
|
||||
@@ -191,10 +179,8 @@ public class MercuryThreadPool
|
||||
try {
|
||||
do {
|
||||
if (tooManyThreadsWaiting()) {
|
||||
/*
|
||||
* We already have plenty of threads waiting for
|
||||
* work. Ask this one to shutdown.
|
||||
*/
|
||||
// We already have plenty of threads waiting for work.
|
||||
// Ask this one to shutdown.
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -203,12 +189,10 @@ public class MercuryThreadPool
|
||||
return task;
|
||||
}
|
||||
|
||||
/*
|
||||
* XXX: If there are tasks currently being executed that
|
||||
* spawn other tasks while should_shutdown is true, then
|
||||
* there's a possibility that this could deadlock as we
|
||||
* don't check that here.
|
||||
*/
|
||||
// XXX: If there are tasks currently being executed that
|
||||
// spawn other tasks while should_shutdown is true, then
|
||||
// there is a possibility that this could deadlock as we
|
||||
// don't check that here.
|
||||
if (shutdown_now) {
|
||||
return null;
|
||||
}
|
||||
@@ -222,10 +206,8 @@ public class MercuryThreadPool
|
||||
|
||||
protected boolean tooManyThreadsWaiting()
|
||||
{
|
||||
/*
|
||||
* num_threads_waiting is volatile because we use it to perform a
|
||||
* double checked lock optimisation.
|
||||
*/
|
||||
// num_threads_waiting is volatile because we use it to perform
|
||||
// a double checked lock optimisation.
|
||||
if (num_threads_waiting > thread_pool_size) {
|
||||
threads_lock.lock();
|
||||
// Recheck with lock.
|
||||
@@ -351,11 +333,9 @@ public class MercuryThreadPool
|
||||
{
|
||||
main_loop_lock.lock();
|
||||
try {
|
||||
/*
|
||||
* There may be more than one thread waiting on this condition
|
||||
* such as when more than one thread calls waitForShutdown().
|
||||
* I can't imagine this happening, but it is allowed.
|
||||
*/
|
||||
// There may be more than one thread waiting on this condition
|
||||
// such as when more than one thread calls waitForShutdown().
|
||||
// I can't imagine this happening, but it is allowed.
|
||||
main_loop_condition.signalAll();
|
||||
} finally {
|
||||
main_loop_lock.unlock();
|
||||
@@ -364,8 +344,7 @@ public class MercuryThreadPool
|
||||
|
||||
/**
|
||||
* Warm up the thread pool by starting some initial threads.
|
||||
* Currently starts a single thread, other threads are started on
|
||||
* demand.
|
||||
* Currently starts a single thread, other threads are started on demand.
|
||||
*/
|
||||
protected void startupInitialThreads()
|
||||
{
|
||||
@@ -394,12 +373,10 @@ public class MercuryThreadPool
|
||||
List<MercuryWorkerThread> new_threads =
|
||||
new LinkedList<MercuryWorkerThread>();
|
||||
|
||||
/*
|
||||
* If necessary poll the Java runtime to see if the number of
|
||||
* available processors has changed. I don't know if this actually
|
||||
* changes in practice however the Java API says that one can and
|
||||
* should poll it.
|
||||
*/
|
||||
// If necessary, poll the Java runtime to see if the number of
|
||||
// available processors has changed. I don't know if this actually
|
||||
// changes in practice, however the Java API says that one can and
|
||||
// should poll it.
|
||||
thread_pool_size = (user_specified_size > 0) ? user_specified_size :
|
||||
Runtime.getRuntime().availableProcessors();
|
||||
|
||||
@@ -411,10 +388,8 @@ public class MercuryThreadPool
|
||||
}
|
||||
|
||||
if (num_tasks_waiting > 0) {
|
||||
/*
|
||||
* If we have fewer than the default number of threads then
|
||||
* start some new threads.
|
||||
*/
|
||||
// If we have fewer than the default number of threads,
|
||||
// then start some new threads.
|
||||
threads_lock.lock();
|
||||
try {
|
||||
int num_threads = numThreads();
|
||||
@@ -424,11 +399,9 @@ public class MercuryThreadPool
|
||||
num_new_threads = num_tasks_waiting - num_threads_other -
|
||||
num_threads_waiting;
|
||||
if (num_new_threads + num_threads > num_threads_limit) {
|
||||
/*
|
||||
* The number of threads that we want, plus the number
|
||||
* we already have, exceeds the number that we're
|
||||
* allowed to have.
|
||||
*/
|
||||
// The number of threads that we want, plus the number
|
||||
// we already have, exceeds the number that we are
|
||||
// allowed to have.
|
||||
num_new_threads = num_threads_limit - num_threads;
|
||||
}
|
||||
if (debug) {
|
||||
@@ -450,10 +423,8 @@ public class MercuryThreadPool
|
||||
threads_lock.unlock();
|
||||
}
|
||||
|
||||
/*
|
||||
* Start the threads while we're not holding the lock, this makes
|
||||
* the above critical section smaller.
|
||||
*/
|
||||
// Start the threads while we are not holding the lock;
|
||||
// this makes the above critical section smaller.
|
||||
for (MercuryWorkerThread t : new_threads) {
|
||||
t.start();
|
||||
}
|
||||
@@ -471,9 +442,9 @@ public class MercuryThreadPool
|
||||
|
||||
/**
|
||||
* Run the thread pool.
|
||||
* The calling thread is used to "run" the thread pool. Its main job
|
||||
* is to keep the correct number of worker threads alive. It does not
|
||||
* return until the thread pool is stopped (with a call to shutdown()).
|
||||
* The calling thread is used to "run" the thread pool. Its main job is
|
||||
* to keep the correct number of worker threads alive. It does not return
|
||||
* until the thread pool is stopped (with a call to shutdown()).
|
||||
* run() is usually called by runMain(), and shutdown() is usually
|
||||
* called by the main task itself.
|
||||
*/
|
||||
@@ -485,9 +456,7 @@ public class MercuryThreadPool
|
||||
|
||||
try {
|
||||
do {
|
||||
/*
|
||||
* Have all the tasks been completed?
|
||||
*/
|
||||
// Have all the tasks been completed?
|
||||
tasks_lock.lock();
|
||||
tasks_locked = true;
|
||||
try {
|
||||
@@ -503,25 +472,21 @@ public class MercuryThreadPool
|
||||
will_shutdown = okay_to_shutdown && shutdown_request;
|
||||
|
||||
if (!will_shutdown) {
|
||||
/*
|
||||
* Start new threads if we have fewer than the
|
||||
* thread_pool_size
|
||||
*/
|
||||
// Start new threads if we have fewer than the
|
||||
// thread_pool_size
|
||||
checkThreads();
|
||||
|
||||
/*
|
||||
* Acquire the main loop lock while we're still
|
||||
* holding tasks_lock. This prevents a race whereby
|
||||
* we release the locks below and then the last task
|
||||
* finishes but we don't get its signal on
|
||||
* main_loop_condition because we weren't holding
|
||||
* the lock.
|
||||
*
|
||||
* To preserve the locking order we must NOT
|
||||
* reacquire tasks_lock after releasing them while
|
||||
* still holding the main loop lock. This must also
|
||||
* be executed after checkThreads();
|
||||
*/
|
||||
// Acquire the main loop lock while we are still
|
||||
// holding tasks_lock. This prevents a race whereby
|
||||
// we release the locks below and then the last task
|
||||
// finishes but we don't get its signal on
|
||||
// main_loop_condition because we weren't holding
|
||||
// the lock.
|
||||
//
|
||||
// To preserve the locking order we must NOT
|
||||
// reacquire tasks_lock after releasing them while
|
||||
// still holding the main loop lock. This must also
|
||||
// be executed after checkThreads();
|
||||
main_loop_lock.lock();
|
||||
main_loop_locked = true;
|
||||
}
|
||||
@@ -545,12 +510,10 @@ public class MercuryThreadPool
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* If we don't execute the if branch above, then the
|
||||
* main_lock cannot be held because neither of the two
|
||||
* places where it could have been acquired would be
|
||||
* executed because done == true.
|
||||
*/
|
||||
// If we don't execute the if branch above, then the
|
||||
// main_lock cannot be held because neither of the two places
|
||||
// where it could have been acquired would be executed
|
||||
// because done == true.
|
||||
} while (!will_shutdown);
|
||||
} finally {
|
||||
if (main_loop_locked) {
|
||||
@@ -582,8 +545,8 @@ public class MercuryThreadPool
|
||||
/**
|
||||
* Wait for all the worker threads to exit.
|
||||
* Even though the JVM is not supposed to exit until all the non-daemon
|
||||
* threads have exited the effects of some threads may get lost. I
|
||||
* suspect this may be because the main thread closes stdout and stderr.
|
||||
* threads have exited the effects of some threads may get lost.
|
||||
* I suspect this may be because the main thread closes stdout and stderr.
|
||||
* Waiting for the worker threads fixes the problem - pbone.
|
||||
*/
|
||||
public boolean waitForShutdown()
|
||||
@@ -613,9 +576,8 @@ public class MercuryThreadPool
|
||||
/**
|
||||
* Start the thread pool in its own thread.
|
||||
* Normally the thread pool ie executed directly by the main thread.
|
||||
* However, when Mercury is used as a library by a native Java
|
||||
* application this is not true, and the thread pool runs in a thread of
|
||||
* its own.
|
||||
* However, when Mercury is used as a library by a native Java application,
|
||||
* this is not true, and the thread pool runs in a thread of its own.
|
||||
*/
|
||||
public MercuryThread startup()
|
||||
{
|
||||
@@ -706,19 +668,15 @@ public class MercuryThreadPool
|
||||
startupInitialThreads();
|
||||
submit(main_task);
|
||||
try {
|
||||
/*
|
||||
* This thread (the primordial thread) operates the thread pool
|
||||
* until the program is finished
|
||||
*/
|
||||
// This thread (the primordial thread) operates the thread pool
|
||||
// until the program is finished
|
||||
run();
|
||||
if (!shutdown_abort) {
|
||||
jmercury.runtime.JavaInternal.run_finalisers();
|
||||
}
|
||||
/*
|
||||
* We always wait for the thread pool to shutdown as worker
|
||||
* threads may either be completing work or reporting the reason
|
||||
* why the runtime is aborting.
|
||||
*/
|
||||
// We always wait for the thread pool to shutdown as worker threads
|
||||
// may either be completing work or reporting the reason why
|
||||
// the runtime is aborting.
|
||||
waitForShutdown();
|
||||
} catch (jmercury.runtime.Exception e) {
|
||||
JavaInternal.reportUncaughtException(e);
|
||||
@@ -727,7 +685,7 @@ public class MercuryThreadPool
|
||||
|
||||
/**
|
||||
* This class creates and names Mercury threads.
|
||||
* The factory is responsible for creating threads with unique IDs,
|
||||
* The factory is responsible for creating threads with unique IDs.
|
||||
*/
|
||||
private static class MercuryThreadFactory implements ThreadFactory
|
||||
{
|
||||
@@ -773,4 +731,3 @@ public class MercuryThreadPool
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user