0%

浅析Java线程(二)

阻塞队列(blocking queue)

对于许多线程问题,可以通过使用一个或多个队列以优雅且安全的方式将其形式化

生产者线程向队列插入元素,消费者线程取出它们

当试图向队列添加元素而队列已满,或想从队列移出元素而队列为空,阻塞队列导致线程阻塞

阻塞队列实例代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import java.io.File;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueDemo {

private static final int FILE_QUEUE_SIZE = 10;
private static final int SEARCH_THREADS = 100;
private static final File DUMMY = new File("");
private static BlockingQueue<File> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);

public static void main(String[] args) {
try (Scanner in = new Scanner(System.in)) {
System.out.print("Enter base directory (e.g. /opt/jdk1.8.0/src): ");
String directory = in.nextLine();
System.out.print("Enter keyword (e.g. volatile): ");
String keyword = in.nextLine();

Runnable enumerator = () -> {
try {
enumerate(new File(directory));
queue.put(DUMMY);
} catch (InterruptedException e) {
// ignore
}
};

new Thread(enumerator).start();
for (int i = 0; i < SEARCH_THREADS; i++) {
Runnable searcher = () -> {
try {
boolean done = false;
while (!done) {
File file = queue.take();
if (file == DUMMY) {
queue.put(file);
done = true;
} else {
search(file, keyword);
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e ) {
// ignore
}
};
}
}
}

/**
* Recursively enumerates all files in a given directory and its subdirectories.
* 递归枚举给定目录及其子目录中的所有文件。
* @param directory the directory in which to start
* @throws InterruptedException
*/
public static void enumerate(File directory) throws InterruptedException {
File[] files = directory.listFiles();
for (File file: files) {
if (file.isDirectory()) {
enumerate(file);
} else {
queue.put(file);
}
}
}

/**
* Searches a file for a given keyword and prints all matching lines.
* @param file the file to search
* @param keyword the keyword to search for
* @throws IOException
*/
public static void search(File file, String keyword) throws IOException {
try (Scanner in = new Scanner(file, "UTF-8")) {
int lineNumber = 0;
while (in.hasNext()) {
lineNumber++;
String line = in.nextLine();
if (line.contains(keyword)) {
System.out.printf("%s:%d:%s%n", file.getPath(), lineNumber, line);
}
}
}
}
}

生产者线程枚举在子目录下的所有文件,并把它们放到一个阻塞队列中
我们同时启动了大量搜索线程,每个搜索线程从队列去除一个文件,打开它,打印所有包含关键字的行,然后取出下一个文件

输入输出样例

1
2
3
4
5
6
7
8
9
10
Enter base directory (e.g. /opt/jdk1.8.0/src): \ideaIC-2019.3.1.win\lib
Enter keyword (e.g. volatile): .java
/* 部分输出如下:
\ideaIC-2019.3.1.win\lib\ant\lib\ant-apache-resolver.pom:58: <include>org/apache/tools/ant/types/resolver/*.java</include>
\ideaIC-2019.3.1.win\lib\ant\lib\ant-launcher.pom:45: <include>org/apache/tools/ant/launch/*.java</include>
\ideaIC-2019.3.1.win\lib\ant\lib\ant.pom:145: <exclude>org/apache/tools/ant/util/ScriptRunner.java</exclude>
\ideaIC-2019.3.1.win\lib\ant\lib\ant.pom:147: <exclude>org/apache/tools/ant/util/optional/ScriptRunner.java</exclude>
\ideaIC-2019.3.1.win\lib\ant\WHATSNEW:3089:* <javac> handles package-info.java files, there were repeatedly compiled.
\ideaIC-2019.3.1.win\lib\ant\WHATSNEW:3220:* Avoid possible NPE in Jar.java.
*/

API java.util.concurrent.BlockingQueue接口

  • void put(E e)
  • E take()
  • boolean offer(E e)
  • E poll(long timeout, TimeUnit unit)

API java.util.concurrent.BlockingDeQue

API java.util.concurrent.ArrayBlockingQueue

  • extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
  • public ArrayBlockingQueue(int capacity)
  • public ArrayBlockingQueue(int capacity, boolean fair)

    构造带有指定容量和公平性设置的阻塞队列,队列用循环数组实现

API java.util.concurrent.LinkedBlockingQueue

API java.util.concurrent.LinkedBlockingDeQue

API java.util.concurrent.Delayed接口

  • long getDelay(TimeUnit unit)

    以给定的时间单位返回与此对象关联的剩余延迟

API java.util.concurrent.PriorityBlockingQueue

  • public PriorityBlockingQueue()
  • public PriorityBlockingQueue(int initialCapacity)
  • public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)

    构造无边界阻塞优先队列,用堆实现
    initialCapacity: 优先队列初始容量
    comparator: 元素比较器,如果没有指定,元素必须实现Comparable接口

API java.util.concurrent.TransferQueue接口

  • void transfer(E e)

    将元素转移给使用者,如有必要将等待

  • boolean tryTransfer(E e, long timeout, TimeUnit unit)

    尝试在给定超时时间内传输一个值

CallableFuture

  • Runnable封装一个异步运行的任务,CallableRunnable类似,但有泛型<V>,而且只有一个方法call()
  • Callable<V>表示一个最终返回V类型对象的异步计算
  • Future保存异步计算的结果
  • FutureTask包装器可将Callable转换成FutureRunnable,它同时实现二者的接口

实例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

/**
* @author Cay Horstmann
*/
public class FutureDemo {
public static void main(String[] args) {
try (Scanner in = new Scanner(System.in)) {
System.out.print("Enter base directory (e.g. /usr/local/jdk5.0/src): ");
String directory = in.nextLine();
System.out.print("Enter keyword (e.g. volatile): ");
String keyword = in.nextLine();

MathCounter counter = new MathCounter(new File(directory), keyword);
FutureTask<Integer> task = new FutureTask<>(counter);
Thread t = new Thread(task);
t.start();
try {
System.out.println(task.get() + " matching files.");
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// ignore
}
}
}
}

/**
* This task counts files in a directory and its subdirectories that contain a given keyword.
*/
class MathCounter implements Callable<Integer> {

private File directory;
private String keyword;

public MathCounter(File directory, String keyword) {
this.directory = directory;
this.keyword = keyword;
}

@Override
public Integer call() throws Exception {
int count = 0;
try {
File[] files = directory.listFiles();
List<Future<Integer>> results = new ArrayList<>();

for (File file: files) {
if (file.isDirectory()) {
MathCounter counter = new MathCounter(file, keyword);
FutureTask<Integer> task = new FutureTask<>(counter);
results.add(task);
Thread t = new Thread(task);
t.start();
} else {
if (search(file)) {
count++;
}
}
}

for (Future<Integer> result: results) {
try {
count += result.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}catch (InterruptedException e) {
// ignore
}
return count;
}

/**
* Searches a file for a given keyword.
* @param file the file to search
* @return true if the keyword is contains in the file
*/
public boolean search(File file) {
try {
try (Scanner in = new Scanner(file,"UTF-8")) {
boolean found = false;
while (!found && in.hasNextLine()) {
String line = in.nextLine();
if (line.contains(keyword)) {
found = true;
}
}
return found;
}
} catch (IOException e) {
return false;
}
}
}
/*
Enter base directory (e.g. /usr/local/jdk5.0/src): \ideaIC-2019.3.1.win\lib
Enter keyword (e.g. volatile): include
28 matching files.
*/

API java.util.concurrent.Callable<V>接口

  • V call() throws Exception

    Computes a result, or throws an exception if unable to do so

API java.util.concurrent.Future<V>接口

  • V get()
  • V get(long timeout, TimeUnit unit)
  • boolean cancel(boolean mayInterruptIfRunning)

    尝试取消任务的运行

  • boolean isCancelled()
  • boolean isDone()

API java.util.concurrent.FutureTask<V>

  • public FutureTask(Callable<V> callable)
  • public FutureTask(Runnable runnable, V result)

    构造一个既是Future<V>又是Runnable的对象

执行器(Executor)

构建一个新的线程是有一定代价的,因为涉及与操作系统的交互
如果程序中创建了大量生命期很短的线程,应该使用线程池(thread pool)
使用线程池也可以减少并发线程的数目,创建大量线程会大大降低性能甚至使虚拟机崩溃
Executors类有许多静态工厂方法来构建线程池

方法 描述
newCachedThreadPool 必要时创建新线程;空闲线程会被保留60s
newFixedThreadPool 该池包含固定数量的线程;空闲线程会一直被保留
newSingleThreadExecutor 只有一个线程的线程池,该线程顺序执行每一个提交的任务
newScheduledThreadPool 用于预定执行而构建的固定线程池
newSingleThreadScheduledExecutor 用于预定执行而构建的单线程池

线程池

可以用如下方法将RunnableCallable对象提交给ExecutorService

  • submit(Callable<T> task)
  • submit(Runnable task)
  • submit(Runnable task, T result)

当用完一个线程池的时候,立即调用shutdown方法

用法举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Demo {

public static void main(String[] args) {

ExecutorService pool = Executors.newCachedThreadPool();

Callable callable = new Callable<String>() {
@Override
public String call() throws Exception {
return this.getClass().getName();
}
};

Future<String> result = pool.submit(callable);

pool.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});

pool.submit(new Runnable() {
@Override
public void run() {
System.out.print(Thread.currentThread().getName() + ": ");
try {
System.out.println(result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
});

pool.shutdown();
}
}
/* Output:
pool-1-thread-2
pool-1-thread-2: cn.nopech.thread.Demo$1
*/

API java.util.concurrent.ExecutorService接口

  • <T> Future<T> submit(Callable<T> task)
  • Future<?> submit(Runnable task)
  • <T> Future<T> submit(Runnable task, T result)
  • void shutdown()

    关闭服务,会先完成已提交的任务而不再接收新的任务

API java.util.concurrent.Executors

  • public static ExecutorService newFixedThreadPool(int nThreads)
  • public static ExecutorService newSingleThreadExecutor()
  • public static ExecutorService newCachedThreadPool()
  • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

API java.util.concurrent.ThreadPoolExecutor

  • public int getLargestPoolSize()
  • public long getTaskCount()
  • public long getCompletedTaskCount()

Fork-Join框架

有些应用使用了大量线程,但其中大多数都是空闲的。举例来说,一个 Web 服务器可能会为每个连接分配一个线程。
又一些应用可能堆每个处理器内核分别使用一个线程,来完成计算密集型任务。Java SE 7 引入了 Fork-Join 框架,专门用来支持该类应用

在后台,fork-join 框架使用了一种有效的智能方法来平衡可用线程的工作负载,这种方法称为工作密取(work stealing)。

每一个工作线程都有一个双端队列(deque)来完成任务。一个工作线程将子任务压入其双端队列的队头(只有一个线程可以访问队头,故不需要加锁)。一个工作线程空闲时,它会从另一个双端队列的队尾“密取”一个任务。由于大的子任务都在队尾,这种密取很少出现。

示例

对数组numbers,统计该数组中有多少个元素大于 0.5,可以将这个数组一分为二,分别对这两个部分进行统计,再将结果相加

采用框架可用的一种方式完成这种递归计算,需提供一个扩展RecursiveTask<T>的类(如果计算会生成一个类型为 T 的结果)或者提供一个扩展RecursiveAction的类(如果不会生成任何结果)。
再覆盖compute()方法来生成并调用子任务,然后合并其结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.function.DoublePredicate;

/**
* This program demonstrates the fork-join framework.
* @author Cay Horstmann
*/
public class ForkJoinDemo {
public static void main(String[] args) {
final int SIZE = 10000000;
double[] numbers = new double[SIZE];
for (int i = 0; i < SIZE; i++) {
numbers[i] = Math.random();
}
Counter counter = new Counter(numbers, 0, numbers.length, x -> x > 5);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(counter);
System.out.println(counter.join());
}
}

class Counter extends RecursiveTask<Integer> {

public static final int THRESHOLD = 100;
private double[] values;
private int from;
private int to;
private DoublePredicate filter;

public Counter(double[] values, int from, int to, DoublePredicate filter) {
this.values = values;
this.from = from;
this.to = to;
this.filter = filter;
}

@Override
protected Integer compute() {
if (to - from < THRESHOLD) {
int count = 0;
for (int i = from; i < to; i++) {
if (filter.test(values[i])) {
count++;
}
}
return count;
} else {
int mid = (from + to) / 2;
Counter first = new Counter(values, from, mid, filter);
Counter second = new Counter(values, mid, to, filter);
invokeAll(first, second);
return first.join() + second.join();
}
}
}
/* Output:
5000073
*/

在这里,invokeAll()方法接收到很多任务并阻塞,直到所有任务都已经完成。join()方法将生成结果,对每个子任务调用join(),返回其总和
可以看到输出为 5000073,大致是 SIZE 的一半,比较符合随机特性

可完成Future

假设要在一组事件处理器中实现一个控制流,为任务完成之后要出现的动作注册一个处理器,下一个动作也是异步的,并且动作在不同的时间处理器中
Java SE 8CompletableFuture类提供了一种候选方法,可完成Future可以组合(composed

例如

我们希望从 Web 页面抽取所有链接来建立一个网络爬虫,假设有如下方法

1
public void CompletableFuture<String> readPage(URL url)

Web 页面可用时会生成这个页面的文本
如果方法:

1
public static List<URL> getLinks(String page)

生成一个 HTML 页面中的URL,可以当页面可用时再调用这个方法:

1
2
CompletableFuture<String> contents = readPage(url);
CompletableFuture<List<URL>> links = contents.thenApply(Parsee::getLinks);

thenApply()方法不会阻塞,它会返回另一个future,第一个future完成时,其结果会提供给getLinks()方法,这个方法的返回值就是最终的结果

利用可完成Future,可用指定你希望做什么,以及希望以什么顺序执行这些工作。当然,这不会立即发生,不过重要的是所有代码都放在一处

其他内容不加赘述,可用参照API查看相关用法,简单提一下有这个东西