当前位置: 主页 > JAVA语言

java多线程框架有哪些-ForkJoin概念Fork/Join实现框架的基本用法定义任务

发布时间:2023-07-07 09:01   浏览次数:次   作者:佚名

ForkJoin 概念

Fork/Join是JDK 1.7加入的新的线程池实现java多线程框架有哪些,它体现的是一种分治思想,适用于能够进行任务拆分的cpu密集型运算

所谓的任务拆分java多线程框架有哪些,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。使用ForkJoin框架需要将逻辑操作进行分治. 比较适合可以使用动态规划、回溯、递归等算法实现的逻辑。比如快速排序、二分查找,可以使用ForkJoin来提高运算能力。

Fork/Join在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率

基本用法

定义任务类:创建一个继承自RecursiveTask或RecursiveAction的任务类。RecursiveTask用于有返回结果的任务,而RecursiveAction用于没有返回结果的任务。实现compute()方法:在任务类中重写compute()方法,定义任务的具体计算逻辑。在该方法中,可以进行任务的拆分、执行以及结果的合并。创建任务对象和ForkJoinPool:在主程序中,创建任务对象并将其提交给ForkJoinPool进行执行。

案例代码:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.logging.Logger;
/**
 * 计算从begin到end的和
 */
public class MyRecursivelyTask extends RecursiveTask<Integer> {
    private static Logger log = Logger.getLogger("zzz");
    private Integer begin;
    private Integer end;
    public MyRecursivelyTask(Integer begin, Integer end) {
        this.begin = begin;
        this.end = end;
    }
    /**
     * 实现compute方法,拆分任务执行
     **/
    @Override
    protected Integer compute() {
		
        if (begin.equals(end)) {
            log.info("begin:"+ begin + "end:"+  end);
            return begin;
        }
        if (end - begin == 1) {
            log.info("begin:"+ begin + "end:"+  end);
            return begin + end;
        }
        Integer mid = (begin+end)/2;
        // 拆分任务
        MyRecursivelyTask t1 = new MyRecursivelyTask(mid + 1, end);
        MyRecursivelyTask t2 = new MyRecursivelyTask(begin, mid);
        log.info("t1:"+t1.toString());
        log.info("t2:"+t2.toString());
        
        // 执行拆分任务
        t1.fork();
        t2.fork();
		
        // 获取拆分的任务的结果
        Integer res1 = t1.join();
        Integer res2 = t2.join();
        return res1 + res2;
    }
    @Override
    public String toString() {
        return "MyRecursivelyTask{" +
                "begin=" + begin +
                ", end=" + end +
                '}';
    }
}
class forkJoinTest{
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        
		// 创建任务, 计算从1到5的和
        MyRecursivelyTask t = new MyRecursivelyTask(1, 5);
        // 创建fork/join线程池设置并行度为4
        ForkJoinPool pool = new ForkJoinPool(4);
        // 提交任务
        ForkJoinTask<Integer> res = pool.submit(t);
        // 获取任务结果
        System.out.println(res.get());
    }
}

ForkJoinPool中的API Future submit(ForkJoinTask task): T invoke(ForkJoinTask task): void execute(ForkJoinTask task): void shutdown(): List shutdownNow(): boolean awaitTermination(long timeout, TimeUnit unit): int getPoolSize(): int getActiveThreadCount(): int getParallelism(): long getStealCount(): long getQueuedTaskCount():