mirror of
https://github.com/Mercury-Language/mercury.git
synced 2026-04-15 09:23:44 +00:00
[java] The thread pool now works when Mercury is used as a library
The thread pool code used in the Java backend was tied the execution of
main/2. However if Mercury is used as a library the thread pool won't have
been started and threads created with thread.spawn would not be executed.
This patch makes it possible to start and stop the thread pool independently of
main/2 by calling startup() and shutdown(). These calls are called
implicitly by calling runMain(). The thread pool can also be started on
demand.
This patch also adds the MercuryRuntime class, which now contains methods
that may be called by users' Java code to interact with the Mercury runtime
system, including a new finalise() method.
java/runtime/MercuryThreadPool.java:
Add startup() method.
shutdown() method is now public and it's meaning has changed, it now
requests the shutdown rather than performing it.
Renamed some variables to make their meanings clearer.
java/runtime/JavaInternal.java:
Initialise the ThreadPool and MercuryOptions objects on demand.
Make all members of this class static to avoid confusion.
Add a private constructor.
java/runtime/MercuryRuntime.java:
Add methods that can be called by Mercury users to interact with the
runtime system. Including a convenient finalise() method that does all
the finalisation.
samples/java_interface/standalone_java/mercury_lib.m:
samples/java_interface/standalone_java/JavaMain.java:
Extend the standalone Java example so that it makes use of threads: Add
a fibs function in Mercury that uses concurrency and therefore starts
the thread pool; call it from the Java code.
Use the new finalise() method from the MercuryRuntime class inside of a
finally block.
samples/java_interface/standalone_java/Makefile:
Fix a minor error.
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
//
|
||||
// Copyright (C) 2001-2003, 2009 The University of Melbourne.
|
||||
// Copyright (C) 2014 The Mercury Team.
|
||||
// This file may only be copied under the terms of the GNU Library General
|
||||
// Public License - see the file COPYING.LIB in the Mercury distribution.
|
||||
//
|
||||
@@ -9,33 +10,46 @@
|
||||
package jmercury.runtime;
|
||||
|
||||
/**
|
||||
* Internals for Mercury's runtime system on the Java backend.
|
||||
* At the moment this class is used to store the main module's name (progname),
|
||||
* command line arguments and the exit status. We can't put them in one of the
|
||||
* Internals and static objects for Mercury's runtime system on the Java
|
||||
* backend.
|
||||
* This class is used to store the main module's name (progname), command
|
||||
* line arguments and the exit status. We can't put them in one of the
|
||||
* library modules because we need to hold them in a class variable in a top
|
||||
* level class.
|
||||
*
|
||||
* The class also contains utility methods.
|
||||
* The class also contains utility methods and other objects such as a
|
||||
* reference to the thread pool.
|
||||
*
|
||||
* No instance of this class is ever created, all it's members and methods
|
||||
* are static.
|
||||
*/
|
||||
public class JavaInternal {
|
||||
|
||||
private static JavaInternal instance;
|
||||
|
||||
/**
|
||||
* Private constructor.
|
||||
* This private constructor doesn't do anything and isn't called by
|
||||
* anyone. It exists only to prevent people from creating an instance.
|
||||
*/
|
||||
private JavaInternal() {
|
||||
options = new MercuryOptions();
|
||||
options.process();
|
||||
thread_pool = new MercuryThreadPool(options.getNumProcessors());
|
||||
}
|
||||
|
||||
private MercuryThreadPool thread_pool;
|
||||
private MercuryOptions options;
|
||||
private static MercuryThreadPool thread_pool = null;
|
||||
private static MercuryOptions options = null;
|
||||
|
||||
public static MercuryThreadPool getThreadPool() {
|
||||
return instance.thread_pool;
|
||||
public static synchronized MercuryThreadPool getThreadPool() {
|
||||
if (thread_pool == null) {
|
||||
thread_pool = new MercuryThreadPool(
|
||||
getOptions().getNumProcessors());
|
||||
}
|
||||
return thread_pool;
|
||||
}
|
||||
|
||||
public static MercuryOptions getOptions() {
|
||||
return instance.options;
|
||||
public static synchronized MercuryOptions getOptions() {
|
||||
if (options == null) {
|
||||
options = new MercuryOptions();
|
||||
options.process();
|
||||
}
|
||||
return options;
|
||||
}
|
||||
|
||||
public static java.lang.String progname;
|
||||
@@ -57,11 +71,12 @@ public class JavaInternal {
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the main task using the thread pool.
|
||||
* Run the main task.
|
||||
* The maun task is executed by the thread pool so that when it blocks
|
||||
* the thread pool is notified correctly.
|
||||
*/
|
||||
public static void runMain(Runnable main)
|
||||
{
|
||||
instance = new JavaInternal();
|
||||
getThreadPool().runMain(main);
|
||||
}
|
||||
|
||||
|
||||
59
java/runtime/MercuryRuntime.java
Normal file
59
java/runtime/MercuryRuntime.java
Normal file
@@ -0,0 +1,59 @@
|
||||
//
|
||||
// Copyright (C) 2014 The Mercury Team
|
||||
// This file may only be copied under the terms of the GNU Library General
|
||||
// Public License - see the file COPYING.LIB in the Mercury distribution.
|
||||
//
|
||||
|
||||
package jmercury.runtime;
|
||||
|
||||
/**
|
||||
* Interface to the Mercury Runtime System for Java code.
|
||||
*
|
||||
* No instance of this class is ever created, all it's members and methods
|
||||
* are static.
|
||||
*/
|
||||
public class MercuryRuntime
|
||||
{
|
||||
/**
|
||||
* Private constructor.
|
||||
* This private constructor doesn't do anything and isn't called by
|
||||
* anyone. It exists only to prevent people from creating an instance.
|
||||
*/
|
||||
private MercuryRuntime() {
|
||||
}
|
||||
|
||||
private static MercuryThreadPool thread_pool = null;
|
||||
|
||||
/**
|
||||
* Return the thread pool, initalising it if required.
|
||||
* This does not start the thread pool. It is started either when
|
||||
* startup() is called or automatically when the first task is
|
||||
* submitted.
|
||||
*/
|
||||
public static synchronized MercuryThreadPool getThreadPool()
|
||||
{
|
||||
if (thread_pool == null) {
|
||||
thread_pool = new MercuryThreadPool(
|
||||
JavaInternal.getOptions().getNumProcessors());
|
||||
}
|
||||
return thread_pool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrive the exit status stored in the IO state.
|
||||
*/
|
||||
public static int getExitStatus() {
|
||||
return JavaInternal.exit_status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Finalise the runtime system.
|
||||
* This _must_ be cAlled at the normal end of any program. Currently
|
||||
* it runs finalisers and stops the thread pool.
|
||||
*/
|
||||
public static void finalise() {
|
||||
JavaInternal.run_finalisers();
|
||||
getThreadPool().shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,12 +86,19 @@ public class MercuryThreadPool
|
||||
* Tasks
|
||||
*/
|
||||
private Deque<Task> tasks;
|
||||
private boolean should_shutdown;
|
||||
private long num_tasks_submitted;
|
||||
private long num_tasks_completed;
|
||||
private Lock tasks_lock;
|
||||
private Condition thread_wait_for_task_condition;
|
||||
|
||||
// Has a shutdown request been received (protected by tasks_lock)
|
||||
private boolean shutdown_request;
|
||||
// True if worker threads should exit (the pool is shutting down).
|
||||
private boolean shutdown_now;
|
||||
// True if the thread pool is running (including starting up and
|
||||
// shutting down).
|
||||
private boolean running;
|
||||
|
||||
/*
|
||||
* Main loop condition.
|
||||
*/
|
||||
@@ -121,7 +128,9 @@ public class MercuryThreadPool
|
||||
* ArrayDeque task will grow as needed.
|
||||
*/
|
||||
tasks = new ArrayDeque<Task>(size*4);
|
||||
should_shutdown = false;
|
||||
shutdown_request = false;
|
||||
shutdown_now = false;
|
||||
running = false;
|
||||
num_tasks_submitted = 0;
|
||||
num_tasks_completed = 0;
|
||||
tasks_lock = new ReentrantLock();
|
||||
@@ -155,6 +164,9 @@ public class MercuryThreadPool
|
||||
task.scheduled();
|
||||
num_tasks_submitted++;
|
||||
thread_wait_for_task_condition.signal();
|
||||
if (!running) {
|
||||
startup();
|
||||
}
|
||||
} finally {
|
||||
tasks_lock.unlock();
|
||||
}
|
||||
@@ -194,7 +206,7 @@ public class MercuryThreadPool
|
||||
* there's a possibility that this could deadlock as we
|
||||
* don't check that here.
|
||||
*/
|
||||
if (should_shutdown) {
|
||||
if (shutdown_now) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -341,8 +353,9 @@ public class MercuryThreadPool
|
||||
}
|
||||
|
||||
/**
|
||||
* Warm up the thread pool by starting some initial threads (currently
|
||||
* one).
|
||||
* Warm up the thread pool by starting some initial threads.
|
||||
* Currently starts a single thread, other threads are started on
|
||||
* demand.
|
||||
*/
|
||||
protected void startupInitialThreads()
|
||||
{
|
||||
@@ -439,13 +452,16 @@ public class MercuryThreadPool
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the thread pool. This is usually called by runMain()
|
||||
* Run the thread pool.
|
||||
* The calling thread is used to "run" the thread pool. It's main job
|
||||
* is to keep the correct number of worker threads alive. It does not
|
||||
* return until the thread pool is stopped (with a call to shutdown()).
|
||||
* run() is usually called by runMain(), and shutdown() is usually
|
||||
* called by the main task itself.
|
||||
*/
|
||||
public void run()
|
||||
{
|
||||
boolean done = false;
|
||||
long num_tasks_submitted;
|
||||
long num_tasks_completed;
|
||||
boolean will_shutdown = false;
|
||||
boolean tasks_locked = false;
|
||||
boolean main_loop_locked = false;
|
||||
|
||||
@@ -457,11 +473,17 @@ public class MercuryThreadPool
|
||||
tasks_lock.lock();
|
||||
tasks_locked = true;
|
||||
try {
|
||||
boolean okay_to_shutdown;
|
||||
long num_tasks_submitted;
|
||||
long num_tasks_completed;
|
||||
|
||||
num_tasks_submitted = this.num_tasks_submitted;
|
||||
num_tasks_completed = this.num_tasks_completed;
|
||||
done = (num_tasks_submitted == num_tasks_completed);
|
||||
okay_to_shutdown =
|
||||
(num_tasks_submitted == num_tasks_completed);
|
||||
will_shutdown = okay_to_shutdown && shutdown_request;
|
||||
|
||||
if (!done) {
|
||||
if (!will_shutdown) {
|
||||
/*
|
||||
* Start new threads if we have fewer than the
|
||||
* thread_pool_size
|
||||
@@ -489,7 +511,7 @@ public class MercuryThreadPool
|
||||
tasks_locked = false;
|
||||
}
|
||||
|
||||
if (!done) {
|
||||
if (!will_shutdown) {
|
||||
if (!main_loop_locked) {
|
||||
main_loop_lock.lock();
|
||||
main_loop_locked = true;
|
||||
@@ -510,7 +532,7 @@ public class MercuryThreadPool
|
||||
* places where it could have been acquired would be
|
||||
* executed because done == true.
|
||||
*/
|
||||
} while (!done);
|
||||
} while (!will_shutdown);
|
||||
} finally {
|
||||
if (main_loop_locked) {
|
||||
main_loop_lock.unlock();
|
||||
@@ -520,27 +542,104 @@ public class MercuryThreadPool
|
||||
}
|
||||
}
|
||||
|
||||
shutdown();
|
||||
}
|
||||
|
||||
protected void shutdown()
|
||||
{
|
||||
/*
|
||||
* Shutdown
|
||||
*/
|
||||
tasks_lock.lock();
|
||||
try {
|
||||
should_shutdown = true;
|
||||
shutdown_now = true;
|
||||
thread_wait_for_task_condition.signalAll();
|
||||
running = false;
|
||||
} finally {
|
||||
tasks_lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the thread pool in it's own thread.
|
||||
* Normally the thread pool ie executed directly by the main thread.
|
||||
* However, when Mercury is used as a library by a native Java
|
||||
* application this is not true, and the thread pool runs in a thread of
|
||||
* it's own.
|
||||
*/
|
||||
public MercuryThread startup()
|
||||
{
|
||||
MercuryThread thread;
|
||||
|
||||
tasks_lock.lock();
|
||||
try {
|
||||
if (running) {
|
||||
return null;
|
||||
}
|
||||
running = true;
|
||||
} finally {
|
||||
tasks_lock.unlock();
|
||||
}
|
||||
|
||||
startupInitialThreads();
|
||||
thread = thread_factory.newThread(new Runnable() {
|
||||
public void run() {
|
||||
MercuryThreadPool.this.startupInitialThreads();
|
||||
MercuryThreadPool.this.run();
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
return thread;
|
||||
}
|
||||
|
||||
/**
|
||||
* Request that the thread pool shutdown.
|
||||
* This method does not wait for the thread pool to shutdown, it's an
|
||||
* asychronous signal. The thread pool will shutdown if: shutdown() has
|
||||
* been called (implicitly when running as an application) and there are
|
||||
* no remaining tasks either queued or running (spawn_native tasks are
|
||||
* not included). The requirement that the process does not exit until
|
||||
* all tasks have finish is maintained by the JVM.
|
||||
*/
|
||||
public boolean shutdown()
|
||||
{
|
||||
tasks_lock.lock();
|
||||
try {
|
||||
if (running && !shutdown_request) {
|
||||
shutdown_request = true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} finally {
|
||||
tasks_lock.unlock();
|
||||
}
|
||||
|
||||
signalMainLoop();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the main/2 predicate and wait for its completion.
|
||||
* This method implicitly starts and stops the thread pool.
|
||||
*/
|
||||
public void runMain(Runnable run_main)
|
||||
public void runMain(final Runnable run_main)
|
||||
{
|
||||
Task main_task = new Task(run_main);
|
||||
Task main_task;
|
||||
Runnable run_main_and_shutdown;
|
||||
|
||||
run_main_and_shutdown = new Runnable() {
|
||||
public void run() {
|
||||
run_main.run();
|
||||
shutdown();
|
||||
}
|
||||
};
|
||||
main_task = new Task(run_main_and_shutdown);
|
||||
|
||||
tasks_lock.lock();
|
||||
try {
|
||||
if (running) {
|
||||
throw new ThreadPoolStateError("ThreadPool is already running");
|
||||
}
|
||||
running = true;
|
||||
} finally {
|
||||
tasks_lock.unlock();
|
||||
}
|
||||
|
||||
startupInitialThreads();
|
||||
submit(main_task);
|
||||
try {
|
||||
@@ -592,5 +691,15 @@ public class MercuryThreadPool
|
||||
return next_thread_id++;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The thread pool is in the wrong state for the action the caller tried
|
||||
* to perform.
|
||||
*/
|
||||
public static class ThreadPoolStateError extends Exception {
|
||||
public ThreadPoolStateError(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user