Java中的并发编程框架(一)
并行计算是计算机历史上的一个伤疤。
Java并行执行框架
线程可执行体的几种形式
-
Runnable
-
Callable
-
直接集成Thread
Runnable 和 Callable 接口的异同
-
都代表一个可执行任务(task)
-
Runnable 接口可以当做Thread的构造参数传入Thread实例,Callable不可以。
-
Runnable 和 Callable 都可以被ExecutorService调度管理
-
返回值
-
Runnable 接口的run函数没有参数,没有返回值。
public interface Runnable {
public void run();
}
被executor调度时,future没有值
public void executeTask() {
executorService = Executors.newSingleThreadExecutor();
Future future = executorService.submit(new EventLoggingTask());
executorService.shutdown();
}
Callable接口的定义
Callable定义中包含了泛型的返回值。
public interface Callable<V> {
V call() throws Exception;
}
调用的例子
public class MyTestTask implements Callable<Integer> {
int number;
// standard constructors
public Integer call() throws InvalidParamaterException {
int fact = 1;
// ...
for(int count = number; count > 1; count--) {
fact = fact * count;
}
return fact;
}
}
执行单体测试
@Test
public void runtest(){
MyTestTask task = new MyTestTask(5);
Future<Integer> future = executorService.submit(task);
assertEquals(120, future.get().intValue());
}
执行层面的问题是, 为什么future能捕捉到执行线程的返回值?
问题一
future 是定义在主栈上的局部变量, thread实例在另外一个栈上,是什么机制保证线程退出时把返回值交给调用方?
一个可解释的模型是, 在JIT中,在主线程调用wait元语,子线程退出后notifiy主线程。
总之 future会一对一mapping到thread堆栈,不然它无法确保取到值。
使用ExecutorService类
ExecutorService 提供了一个线程池的管理方法,用来对线程进行创建和销毁。
常用例子
Runnable tasktmp = () -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
};
// 直接运行lambda实例
tasktmp.run();
// 提交一个lambda格式的task
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println(threadName);
});
// 提交一个Callable接口的task
Callable<Integer> task = () -> {
try {
TimeUnit.SECONDS.sleep(1);
return 123;
} catch (InterruptedException e){
throw new InterruptedException("task interrupted");
}
};
ExecutorService executorService1 = Executors.newFixedThreadPool(1);
Future<Integer> future = executorService1.submit(task);
// 使用feature获取返回值
Integer result = future.get();
System.out.println("future done?" + future.isDone());
System.out.println("result :" + result);
// 使用可调度的Service实例
ScheduledExecutorService executorService2 = Executors.newScheduledThreadPool(1);
Runnable task1 = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future1 = executorService2.schedule(task1, 3,TimeUnit.SECONDS);
TimeUnit.MILLISECONDS.sleep(1337);
long remainingDelay = future1.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);
ScheduledExecutorService executor3 = Executors.newScheduledThreadPool(1);
Runnable task3 = () -> System.out.println("Scheduling: " + System.nanoTime());
int initialDelay = 0;
int period = 1;
// 反复执行的例子
executor3.scheduleAtFixedRate(task3, initialDelay, period, TimeUnit.SECONDS);