mirror of
https://github.com/Mercury-Language/mercury.git
synced 2025-12-16 14:25:56 +00:00
Make MercuryThreadPool (java) notice when a thread blocks on a semaphore.
If threads are blocked while there is work in the queue extra threads may be
spawned to keep the processors busy.
Beginning now, tasks created with thread.spawn are use the thread pool.
(thread.spawn_native does not use the thread pool.)
java/runtime/Semaphore.java:
Wrap Java's Semaphore class which call the current thread's blocked()
and running() methods when a thread blocks and then runs after being
blocked.
library/thread.semaphore.m:
Use our own Semaphore class.
java/runtime/MercuryThread.java:
java/runtime/MercuryWorkerThread.java:
Define blocked() and running() on our threads.
java/runtime/NativeThread.java:
This class is used by spawn_native/4 and is required to define blocked()
and running(), however it implements them as no-ops as it isn't included
in the thread pool.
java/runtime/ThreadStatus.java:
Define the BLOCKED status.
java/runtime/MercuryThreadPool.java:
Count blocked threads seperatly and allow the creation of new threads
when existing threads become blocked.
Add some tracing code to help debug the thread management code. This is
disabled by default.
library/thread.m:
Implement spawn for Java using the thread pool. This was not enabled
earlier because without using java/runtime/Semaphore.java it was
possible to deadlock the system.
java/runtime/Task.java:
Add some tracing code to debug thread state changes, this is disabled by
default.
This commit is contained in:
@@ -14,7 +14,7 @@ public abstract class MercuryThread extends Thread
|
||||
private int id;
|
||||
|
||||
/**
|
||||
* Construct a new MercuryThread with the given ID and runnable.
|
||||
* Construct a new MercuryThread with the given ID.
|
||||
* @param name A string that identifies the type of thread.
|
||||
* @param id A numeric identifier (should be unique).
|
||||
*/
|
||||
@@ -24,5 +24,31 @@ public abstract class MercuryThread extends Thread
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
/**
|
||||
* The thread has become blocked.
|
||||
*/
|
||||
public abstract void blocked();
|
||||
|
||||
/**
|
||||
* The thread is unblocked and is now running again.
|
||||
*/
|
||||
public abstract void running();
|
||||
|
||||
/**
|
||||
* If the current thread is a MercuryThread then return a reference to
|
||||
* it.
|
||||
*/
|
||||
public static MercuryThread currentThread()
|
||||
{
|
||||
Thread thread;
|
||||
|
||||
thread = Thread.currentThread();
|
||||
if (thread instanceof MercuryThread) {
|
||||
return (MercuryThread)thread;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -24,9 +24,6 @@ import java.util.*;
|
||||
* new threads may be created to avoid deadlocks and attempt to keep all
|
||||
* processors busy.
|
||||
*
|
||||
* TODO: Currently the thread pool does not know when a thread is blocked by
|
||||
* a barrier, channel, mvar or semaphore.
|
||||
*
|
||||
* TODO: Currently the thread pool does not know when a thread is blocked in
|
||||
* foreign code or performing IO.
|
||||
*
|
||||
@@ -50,6 +47,8 @@ import java.util.*;
|
||||
public class MercuryThreadPool
|
||||
implements Runnable
|
||||
{
|
||||
public static final boolean debug = false;
|
||||
|
||||
private static MercuryThreadPool instance;
|
||||
|
||||
private MercuryThreadFactory thread_factory;
|
||||
@@ -80,6 +79,7 @@ public class MercuryThreadPool
|
||||
*/
|
||||
private int num_threads_working;
|
||||
private volatile int num_threads_waiting;
|
||||
private int num_threads_blocked;
|
||||
private int num_threads_other;
|
||||
private LinkedList<MercuryThread> threads;
|
||||
private Lock threads_lock;
|
||||
@@ -111,6 +111,7 @@ public class MercuryThreadPool
|
||||
thread_pool_size = size;
|
||||
num_threads_working = 0;
|
||||
num_threads_waiting = 0;
|
||||
num_threads_blocked = 0;
|
||||
num_threads_other = 0;
|
||||
threads = new LinkedList<MercuryThread>();
|
||||
threads_lock = new ReentrantLock();
|
||||
@@ -236,6 +237,9 @@ public class MercuryThreadPool
|
||||
case IDLE:
|
||||
num_threads_waiting--;
|
||||
break;
|
||||
case BLOCKED:
|
||||
num_threads_blocked--;
|
||||
break;
|
||||
case OTHER:
|
||||
num_threads_other--;
|
||||
break;
|
||||
@@ -250,6 +254,9 @@ public class MercuryThreadPool
|
||||
case IDLE:
|
||||
num_threads_waiting++;
|
||||
break;
|
||||
case BLOCKED:
|
||||
num_threads_blocked++;
|
||||
break;
|
||||
case OTHER:
|
||||
num_threads_other++;
|
||||
break;
|
||||
@@ -260,10 +267,12 @@ public class MercuryThreadPool
|
||||
threads_lock.unlock();
|
||||
}
|
||||
|
||||
if ((new_ == ThreadStatus.IDLE) ||
|
||||
(new_ == ThreadStatus.OTHER))
|
||||
{
|
||||
switch (new_) {
|
||||
case BLOCKED:
|
||||
signalMainLoop();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -279,6 +288,9 @@ public class MercuryThreadPool
|
||||
case IDLE:
|
||||
num_threads_waiting--;
|
||||
break;
|
||||
case BLOCKED:
|
||||
num_threads_blocked--;
|
||||
break;
|
||||
case OTHER:
|
||||
num_threads_other--;
|
||||
break;
|
||||
@@ -330,16 +342,34 @@ public class MercuryThreadPool
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Warm up the thread pool by starting some initial threads (currently
|
||||
* one).
|
||||
*/
|
||||
protected void startupInitialThreads()
|
||||
{
|
||||
MercuryWorkerThread t = thread_factory.newWorkerThread();
|
||||
|
||||
threads_lock.lock();
|
||||
try {
|
||||
threads.add(t);
|
||||
num_threads_other++;
|
||||
} finally {
|
||||
threads_lock.unlock();
|
||||
}
|
||||
|
||||
t.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check threads.
|
||||
* Checks the numbers and status of the worker threads and starts more
|
||||
* threads if required.
|
||||
* @return The number of currently working/blocked threads.
|
||||
*/
|
||||
protected int checkThreads()
|
||||
protected void checkThreads()
|
||||
{
|
||||
int num_new_threads;
|
||||
int num_working_blocked_threads;
|
||||
int num_tasks_waiting;
|
||||
List<MercuryWorkerThread> new_threads =
|
||||
new LinkedList<MercuryWorkerThread>();
|
||||
|
||||
@@ -352,26 +382,50 @@ public class MercuryThreadPool
|
||||
thread_pool_size = (user_specified_size > 0) ? user_specified_size :
|
||||
Runtime.getRuntime().availableProcessors();
|
||||
|
||||
tasks_lock.lock();
|
||||
try {
|
||||
num_tasks_waiting = tasks.size();
|
||||
} finally {
|
||||
tasks_lock.unlock();
|
||||
}
|
||||
|
||||
if (num_tasks_waiting > 0) {
|
||||
/*
|
||||
* If we have fewer than the default number of threads then start
|
||||
* some new threads.
|
||||
* If we have fewer than the default number of threads then
|
||||
* start some new threads.
|
||||
*/
|
||||
threads_lock.lock();
|
||||
try {
|
||||
int num_threads = num_threads_working + num_threads_waiting +
|
||||
num_threads_other;
|
||||
num_working_blocked_threads = num_threads_working +
|
||||
num_threads_other;
|
||||
num_new_threads = thread_pool_size - num_threads;
|
||||
if (num_new_threads > 0) {
|
||||
num_threads_blocked + num_threads_other;
|
||||
int num_threads_limit = thread_pool_size +
|
||||
num_threads_blocked;
|
||||
// Determine the number of new threads that we want.
|
||||
num_new_threads = num_tasks_waiting - num_threads_other -
|
||||
num_threads_waiting;
|
||||
if (num_new_threads + num_threads > num_threads_limit) {
|
||||
/*
|
||||
* The number of threads that we want, plus the number
|
||||
* we already have, exceeds the number that we're
|
||||
* allowed to have.
|
||||
*/
|
||||
num_new_threads = num_threads_limit - num_threads;
|
||||
}
|
||||
if (debug) {
|
||||
System.err.println("Pool has " +
|
||||
num_threads_working + " working threads, " +
|
||||
num_threads_blocked + " blocked threads, " +
|
||||
num_threads_waiting + " idle threads, " +
|
||||
num_threads_other + " other (starting up) threads. " +
|
||||
"will create " + num_new_threads + " new threads.");
|
||||
}
|
||||
|
||||
for (int i = 0; i < num_new_threads; i++) {
|
||||
MercuryWorkerThread t = thread_factory.newWorkerThread();
|
||||
new_threads.add(t);
|
||||
threads.add(t);
|
||||
num_threads_other++;
|
||||
}
|
||||
num_threads = thread_pool_size;
|
||||
}
|
||||
num_threads_other += num_new_threads;
|
||||
} finally {
|
||||
threads_lock.unlock();
|
||||
}
|
||||
@@ -383,12 +437,7 @@ public class MercuryThreadPool
|
||||
for (MercuryWorkerThread t : new_threads) {
|
||||
t.start();
|
||||
}
|
||||
|
||||
/*
|
||||
* If there are too many threads then superfluous threads will
|
||||
* shut down when they try to get a new task.
|
||||
*/
|
||||
return num_working_blocked_threads;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -420,7 +469,6 @@ public class MercuryThreadPool
|
||||
public void run()
|
||||
{
|
||||
boolean done = false;
|
||||
int num_working_blocked_threads;
|
||||
long num_tasks_submitted;
|
||||
long num_tasks_completed;
|
||||
boolean tasks_locked = false;
|
||||
@@ -442,7 +490,7 @@ public class MercuryThreadPool
|
||||
* Start new threads if we have fewer than the
|
||||
* thread_pool_size
|
||||
*/
|
||||
num_working_blocked_threads = checkThreads();
|
||||
checkThreads();
|
||||
|
||||
/*
|
||||
* Acquire the main loop lock while we're still
|
||||
@@ -517,6 +565,7 @@ public class MercuryThreadPool
|
||||
public void runMain(Runnable run_main)
|
||||
{
|
||||
Task main_task = new Task(run_main);
|
||||
startupInitialThreads();
|
||||
submit(main_task);
|
||||
try {
|
||||
/*
|
||||
@@ -553,12 +602,7 @@ public class MercuryThreadPool
|
||||
* @param runnable The task the new thread should execute.
|
||||
*/
|
||||
public MercuryThread newThread(final Runnable runnable) {
|
||||
return new MercuryThread("Mercury Thread", allocateThreadId())
|
||||
{
|
||||
public void run() {
|
||||
runnable.run();
|
||||
}
|
||||
};
|
||||
return new NativeThread(allocateThreadId(), runnable);
|
||||
}
|
||||
|
||||
public MercuryWorkerThread newWorkerThread() {
|
||||
|
||||
@@ -78,5 +78,15 @@ public class MercuryWorkerThread extends MercuryThread
|
||||
pool.updateThreadCounts(status, new_status);
|
||||
status = new_status;
|
||||
}
|
||||
|
||||
public void blocked() {
|
||||
pool.updateThreadCounts(status, ThreadStatus.BLOCKED);
|
||||
status = ThreadStatus.BLOCKED;
|
||||
}
|
||||
|
||||
public void running() {
|
||||
pool.updateThreadCounts(status, ThreadStatus.WORKING);
|
||||
status = ThreadStatus.WORKING;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
42
java/runtime/NativeThread.java
Normal file
42
java/runtime/NativeThread.java
Normal file
@@ -0,0 +1,42 @@
|
||||
|
||||
package jmercury.runtime;
|
||||
|
||||
/**
|
||||
* Native thread.
|
||||
*
|
||||
* A Native thread is created by Mercury to tasks created with
|
||||
* spawn_native/4.
|
||||
*/
|
||||
public class NativeThread extends MercuryThread
|
||||
{
|
||||
private Runnable task;
|
||||
|
||||
/**
|
||||
* Create a new native thread.
|
||||
* @param id The id for the new thread.
|
||||
* @param task The task to execute.
|
||||
*/
|
||||
public NativeThread(int id, Runnable task) {
|
||||
super("Mercury Native Thread", id);
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
task.run();
|
||||
}
|
||||
|
||||
/**
|
||||
* no-op
|
||||
*/
|
||||
public void blocked() {
|
||||
;
|
||||
}
|
||||
|
||||
/**
|
||||
* no-op
|
||||
*/
|
||||
public void running() {
|
||||
;
|
||||
}
|
||||
}
|
||||
|
||||
150
java/runtime/Semaphore.java
Normal file
150
java/runtime/Semaphore.java
Normal file
@@ -0,0 +1,150 @@
|
||||
//
|
||||
// Copyright (C) 2014 The Mercury Team
|
||||
// This file may only be copied under the terms of the GNU Library General
|
||||
// Public License - see the file COPYING.LIB in the Mercury distribution.
|
||||
//
|
||||
|
||||
package jmercury.runtime;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
public class Semaphore
|
||||
extends java.util.concurrent.Semaphore
|
||||
{
|
||||
public Semaphore(int permits, boolean fair)
|
||||
{
|
||||
super(permits, fair);
|
||||
}
|
||||
|
||||
public Semaphore(int permits)
|
||||
{
|
||||
super(permits);
|
||||
}
|
||||
|
||||
public void acquire()
|
||||
throws InterruptedException
|
||||
{
|
||||
acquire(1);
|
||||
}
|
||||
|
||||
public void acquire(int permits)
|
||||
throws InterruptedException
|
||||
{
|
||||
boolean blocked = false;
|
||||
|
||||
try {
|
||||
if (tryAcquire(permits, 0, TimeUnit.SECONDS)) {
|
||||
return;
|
||||
} else {
|
||||
// This thread will (probably) block.
|
||||
blocked();
|
||||
blocked = true;
|
||||
super.acquire(permits);
|
||||
}
|
||||
} finally {
|
||||
if (blocked) {
|
||||
running();
|
||||
// The thread isn't blocked anymore.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void acquireUninterruptibly()
|
||||
{
|
||||
acquireUninterruptibly(1);
|
||||
}
|
||||
|
||||
public void acquireUninterruptibly(int permits)
|
||||
{
|
||||
boolean interrupted_once = false;
|
||||
boolean keep_trying;
|
||||
boolean success;
|
||||
|
||||
// Avoid a warning with the loop below
|
||||
success = false;
|
||||
|
||||
/*
|
||||
* tryAcquire is interruptable so we keep trying until we've
|
||||
* executed it at least once and it was not interrupted. We also
|
||||
* track if we were interrupted so we can raise this condition
|
||||
* again.
|
||||
*/
|
||||
do {
|
||||
keep_trying = true;
|
||||
try {
|
||||
success = tryAcquire(permits, 0, TimeUnit.SECONDS);
|
||||
keep_trying = false;
|
||||
} catch (InterruptedException e) {
|
||||
interrupted_once = true;
|
||||
}
|
||||
} while (keep_trying);
|
||||
|
||||
if (!success) {
|
||||
// This thread will (probably) block because tryAcquire failed.
|
||||
blocked();
|
||||
super.acquireUninterruptibly(permits);
|
||||
running();
|
||||
}
|
||||
|
||||
if (interrupted_once) {
|
||||
/*
|
||||
** Because this was supposed to be uninterruptable we need to
|
||||
** raise the interrupt that we received earlier.
|
||||
*/
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean tryAcquire(long timeout, TimeUnit unit)
|
||||
throws InterruptedException
|
||||
{
|
||||
return tryAcquire(1, timeout, unit);
|
||||
}
|
||||
|
||||
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
|
||||
throws InterruptedException
|
||||
{
|
||||
if (timeout < 1) {
|
||||
return super.tryAcquire(permits, timeout, unit);
|
||||
} else {
|
||||
boolean result;
|
||||
|
||||
result = tryAcquire(permits, 0, unit);
|
||||
if (result) {
|
||||
// Blocking wasn't necessary
|
||||
return true;
|
||||
} else {
|
||||
// Blocking required, notify thread.
|
||||
blocked();
|
||||
try {
|
||||
// Block.
|
||||
return tryAcquire(permits, timeout, unit);
|
||||
} finally {
|
||||
running();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void blocked()
|
||||
{
|
||||
MercuryThread mer_thread;
|
||||
|
||||
mer_thread = MercuryThread.currentThread();
|
||||
if (mer_thread != null) {
|
||||
mer_thread.blocked();
|
||||
}
|
||||
}
|
||||
|
||||
protected void running()
|
||||
{
|
||||
MercuryThread mer_thread;
|
||||
|
||||
mer_thread = MercuryThread.currentThread();
|
||||
if (mer_thread != null) {
|
||||
mer_thread.running();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,6 +57,10 @@ public class Task implements Runnable
|
||||
* status change.
|
||||
*/
|
||||
protected synchronized void updateStatus(Status status) {
|
||||
if (MercuryThreadPool.debug) {
|
||||
System.err.println("Thread: " + Thread.currentThread() +
|
||||
"task: " + this + " status " + status);
|
||||
}
|
||||
this.status = status;
|
||||
notifyAll();
|
||||
}
|
||||
@@ -73,6 +77,10 @@ public class Task implements Runnable
|
||||
wait();
|
||||
}
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "Task " + id;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ package jmercury.runtime;
|
||||
public enum ThreadStatus {
|
||||
WORKING,
|
||||
IDLE,
|
||||
BLOCKED,
|
||||
|
||||
/*
|
||||
* A thread in any other state, currently the only possibility is after
|
||||
|
||||
@@ -287,7 +287,7 @@ spawn_context_2(_, Res, "", !IO) :-
|
||||
Task task = new Task(rg);
|
||||
ThreadId = String.valueOf(task.getId());
|
||||
rg.setId(ThreadId);
|
||||
JavaInternal.getThreadPool().submitExclusiveThread(task);
|
||||
JavaInternal.getThreadPool().submit(task);
|
||||
Success = bool.YES;
|
||||
").
|
||||
|
||||
|
||||
@@ -97,7 +97,7 @@ public class ML_Semaphore {
|
||||
"class [mercury]mercury.thread.semaphore__csharp_code.mercury_code.ML_Semaphore").
|
||||
:- pragma foreign_type("C#", semaphore, "thread__semaphore.ML_Semaphore").
|
||||
:- pragma foreign_type("Erlang", semaphore, "").
|
||||
:- pragma foreign_type("Java", semaphore, "java.util.concurrent.Semaphore").
|
||||
:- pragma foreign_type("Java", semaphore, "jmercury.runtime.Semaphore").
|
||||
|
||||
:- pragma foreign_decl("C", "
|
||||
extern void
|
||||
@@ -153,7 +153,7 @@ init(Semaphore, !IO) :-
|
||||
init(Count::in) = (Semaphore::uo),
|
||||
[will_not_call_mercury, thread_safe],
|
||||
"
|
||||
Semaphore = new java.util.concurrent.Semaphore(Count);
|
||||
Semaphore = new jmercury.runtime.Semaphore(Count);
|
||||
").
|
||||
|
||||
:- pragma foreign_code("C", "
|
||||
|
||||
Reference in New Issue
Block a user