Java中的并发编程框架(一)

分类: JVM 发布于:

并行计算是计算机历史上的一个伤疤。

Java并行执行框架

线程可执行体的几种形式

  • Runnable

  • Callable

  • 直接集成Thread

Runnable 和 Callable 接口的异同

  • 都代表一个可执行任务(task)

  • Runnable 接口可以当做Thread的构造参数传入Thread实例,Callable不可以。

  • Runnable 和 Callable 都可以被ExecutorService调度管理

  • 返回值

  • Runnable 接口的run函数没有参数,没有返回值。

public interface Runnable {
    public void run();
}

被executor调度时,future没有值

public void executeTask() {
    executorService = Executors.newSingleThreadExecutor();
    Future future = executorService.submit(new EventLoggingTask());
    executorService.shutdown();
}

Callable接口的定义

Callable定义中包含了泛型的返回值。

public interface Callable<V> {
    V call() throws Exception;
}

调用的例子

public class MyTestTask implements Callable<Integer> {
    int number;
 
    // standard constructors
 
    public Integer call() throws InvalidParamaterException {
        int fact = 1;
        // ...
        for(int count = number; count > 1; count--) {
            fact = fact * count;
        }
 
        return fact;
    }
}

执行单体测试

@Test
public void runtest(){
    MyTestTask task = new MyTestTask(5);
    Future<Integer> future = executorService.submit(task);
    assertEquals(120, future.get().intValue());
}

执行层面的问题是, 为什么future能捕捉到执行线程的返回值?

问题一

future 是定义在主栈上的局部变量, thread实例在另外一个栈上,是什么机制保证线程退出时把返回值交给调用方?

一个可解释的模型是, 在JIT中,在主线程调用wait元语,子线程退出后notifiy主线程。

总之 future会一对一mapping到thread堆栈,不然它无法确保取到值。

使用ExecutorService类

ExecutorService 提供了一个线程池的管理方法,用来对线程进行创建和销毁。

常用例子

Runnable tasktmp = () -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Hello " + threadName);
        };
        // 直接运行lambda实例
        tasktmp.run();

        // 提交一个lambda格式的task
        ExecutorService  executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName);
        });

        // 提交一个Callable接口的task
        Callable<Integer> task = () -> {
          try {
              TimeUnit.SECONDS.sleep(1);
              return  123;

          } catch (InterruptedException e){
              throw new InterruptedException("task interrupted");
          }
        };

        ExecutorService executorService1 = Executors.newFixedThreadPool(1);
        Future<Integer> future = executorService1.submit(task);

        // 使用feature获取返回值
        Integer result = future.get();

        System.out.println("future done?" + future.isDone());
        System.out.println("result :" + result);

        // 使用可调度的Service实例
        ScheduledExecutorService executorService2 = Executors.newScheduledThreadPool(1);
        Runnable task1 = () -> System.out.println("Scheduling: " + System.nanoTime());
        ScheduledFuture<?> future1 = executorService2.schedule(task1, 3,TimeUnit.SECONDS);
        TimeUnit.MILLISECONDS.sleep(1337);

        long remainingDelay = future1.getDelay(TimeUnit.MILLISECONDS);

        System.out.printf("Remaining Delay: %sms", remainingDelay);

        ScheduledExecutorService executor3 = Executors.newScheduledThreadPool(1);

        Runnable task3 = () -> System.out.println("Scheduling: " + System.nanoTime());

        int initialDelay = 0;
        int period = 1;
        // 反复执行的例子
        executor3.scheduleAtFixedRate(task3, initialDelay, period, TimeUnit.SECONDS);