前言

在现代软件开发中,多线程编程是提高程序性能和响应能力的重要手段。Java作为一门天生支持多线程的编程语言,提供了丰富的多线程编程API和同步机制。掌握Java多线程编程不仅能够充分利用多核处理器的计算能力,还能构建高并发、高性能的应用程序。本文将深入探讨Java多线程的核心概念、创建方式、同步机制以及实际应用场景。

多线程基础概念

进程与线程的区别

理解进程和线程的概念是学习多线程编程的基础:

graph TB
    A[计算机系统] --> B[进程 Process]
    A --> C[线程 Thread]
    
    B --> D[独立的内存空间]
    B --> E[系统资源分配单位]
    B --> F[进程间通信成本高]
    
    C --> G[共享进程内存空间]
    C --> H[CPU调度执行单位]
    C --> I[线程间通信成本低]
    
    subgraph "进程内部结构"
        J[主线程]
        K[工作线程1]
        L[工作线程2]
        M[共享内存区域]
        N[各线程栈空间]
    end

Java线程的生命周期

Java线程在整个生命周期中会经历多个状态:

stateDiagram-v2
    [*] --> NEW: 创建线程对象
    NEW --> RUNNABLE: 调用start()方法
    RUNNABLE --> BLOCKED: 等待获取锁
    RUNNABLE --> WAITING: 调用wait()/join()
    RUNNABLE --> TIMED_WAITING: 调用sleep()/wait(timeout)
    BLOCKED --> RUNNABLE: 获取到锁
    WAITING --> RUNNABLE: 被notify()/notifyAll()唤醒
    TIMED_WAITING --> RUNNABLE: 超时或被唤醒
    RUNNABLE --> TERMINATED: 线程执行完毕
    TERMINATED --> [*]

线程状态详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public class ThreadStateDemo {

public static void main(String[] args) throws InterruptedException {
// 演示线程各种状态
demonstrateThreadStates();
}

public static void demonstrateThreadStates() throws InterruptedException {
Object lock = new Object();

// NEW状态:线程已创建但未启动
Thread newThread = new Thread(() -> {
System.out.println("新线程开始执行");
});
System.out.println("新线程状态: " + newThread.getState()); // NEW

// RUNNABLE状态:线程正在JVM中执行
Thread runnableThread = new Thread(() -> {
for (int i = 0; i < 1000000; i++) {
// 持续计算,保持RUNNABLE状态
Math.random();
}
});
runnableThread.start();
Thread.sleep(10); // 确保线程开始执行
System.out.println("运行线程状态: " + runnableThread.getState()); // RUNNABLE

// BLOCKED状态:线程等待获取锁
Thread blockedThread1 = new Thread(() -> {
synchronized (lock) {
try {
Thread.sleep(2000); // 持有锁2秒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

Thread blockedThread2 = new Thread(() -> {
synchronized (lock) {
System.out.println("获取到锁");
}
});

blockedThread1.start();
Thread.sleep(100); // 确保blockedThread1先获取锁
blockedThread2.start();
Thread.sleep(100); // 确保blockedThread2在等待锁
System.out.println("阻塞线程状态: " + blockedThread2.getState()); // BLOCKED

// WAITING状态:线程无限期等待
Thread waitingThread = new Thread(() -> {
synchronized (lock) {
try {
lock.wait(); // 无限期等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
waitingThread.start();
Thread.sleep(100);
System.out.println("等待线程状态: " + waitingThread.getState()); // WAITING

// TIMED_WAITING状态:有限期等待
Thread timedWaitingThread = new Thread(() -> {
try {
Thread.sleep(5000); // 睡眠5秒
} catch (InterruptedException e) {
e.printStackTrace();
}
});
timedWaitingThread.start();
Thread.sleep(100);
System.out.println("限时等待线程状态: " + timedWaitingThread.getState()); // TIMED_WAITING

// 清理资源
synchronized (lock) {
lock.notifyAll(); // 唤醒等待的线程
}

// 等待所有线程结束
runnableThread.join();
blockedThread1.join();
blockedThread2.join();
waitingThread.join();
timedWaitingThread.join();

System.out.println("所有线程已结束");
}
}

线程的创建与使用

Java提供了多种创建线程的方式,每种方式都有其适用场景。

继承Thread类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 方式一:继承Thread类
public class MyThread extends Thread {
private String threadName;
private int count;

public MyThread(String name) {
this.threadName = name;
this.count = 0;
}

@Override
public void run() {
System.out.println(threadName + " 开始执行");

for (int i = 0; i < 5; i++) {
try {
count++;
System.out.println(threadName + " 执行第 " + count + " 次,当前时间: " +
new SimpleDateFormat("HH:mm:ss").format(new Date()));
Thread.sleep(1000); // 暂停1秒
} catch (InterruptedException e) {
System.out.println(threadName + " 被中断");
break;
}
}

System.out.println(threadName + " 执行完毕");
}

// 获取执行次数
public int getCount() {
return count;
}
}

// 使用示例
public class ThreadExample {
public static void demonstrateThreadClass() {
System.out.println("=== 继承Thread类示例 ===");

// 创建线程对象
MyThread thread1 = new MyThread("工作线程-1");
MyThread thread2 = new MyThread("工作线程-2");

// 启动线程
thread1.start();
thread2.start();

try {
// 等待线程完成
thread1.join();
thread2.join();

System.out.println("线程1执行了 " + thread1.getCount() + " 次");
System.out.println("线程2执行了 " + thread2.getCount() + " 次");

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

实现Runnable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// 方式二:实现Runnable接口
public class MyRunnable implements Runnable {
private String taskName;
private volatile boolean running = true;

public MyRunnable(String taskName) {
this.taskName = taskName;
}

@Override
public void run() {
System.out.println(taskName + " 开始执行,线程: " + Thread.currentThread().getName());

int counter = 0;
while (running && counter < 10) {
try {
counter++;
System.out.println(taskName + " 正在处理任务 " + counter);
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println(taskName + " 被中断,停止执行");
Thread.currentThread().interrupt(); // 重新设置中断标志
break;
}
}

System.out.println(taskName + " 执行完毕,共处理 " + counter + " 个任务");
}

// 停止任务
public void stop() {
running = false;
}
}

// 使用示例
public class RunnableExample {
public static void demonstrateRunnableInterface() {
System.out.println("=== 实现Runnable接口示例 ===");

// 创建Runnable对象
MyRunnable task1 = new MyRunnable("数据处理任务");
MyRunnable task2 = new MyRunnable("文件备份任务");

// 创建Thread对象并启动
Thread thread1 = new Thread(task1, "DataProcessor");
Thread thread2 = new Thread(task2, "FileBackup");

thread1.start();
thread2.start();

try {
// 让任务运行3秒后停止
Thread.sleep(3000);
task1.stop();
task2.stop();

// 等待线程完成
thread1.join();
thread2.join();

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

实现Callable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;

// 方式三:实现Callable接口(有返回值)
public class MyCallable implements Callable<String> {
private String taskName;
private int workTime;

public MyCallable(String taskName, int workTime) {
this.taskName = taskName;
this.workTime = workTime;
}

@Override
public String call() throws Exception {
System.out.println(taskName + " 开始执行,预计耗时 " + workTime + " 秒");

// 模拟任务执行
for (int i = 1; i <= workTime; i++) {
Thread.sleep(1000);
System.out.println(taskName + " 进度: " + (i * 100 / workTime) + "%");

// 模拟可能出现的异常
if (i == 3 && taskName.contains("异常")) {
throw new RuntimeException(taskName + " 执行过程中发生异常");
}
}

String result = taskName + " 执行完毕,结果: SUCCESS";
System.out.println(result);
return result;
}
}

// 使用示例
public class CallableExample {
public static void demonstrateCallableInterface() {
System.out.println("=== 实现Callable接口示例 ===");

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);

// 创建Callable任务
List<Callable<String>> tasks = new ArrayList<>();
tasks.add(new MyCallable("计算任务", 3));
tasks.add(new MyCallable("下载任务", 2));
tasks.add(new MyCallable("异常任务", 4));

try {
// 提交任务并获取Future对象
List<Future<String>> futures = new ArrayList<>();
for (Callable<String> task : tasks) {
Future<String> future = executor.submit(task);
futures.add(future);
}

// 获取任务结果
for (int i = 0; i < futures.size(); i++) {
try {
Future<String> future = futures.get(i);
String result = future.get(10, TimeUnit.SECONDS); // 设置超时时间
System.out.println("任务 " + (i + 1) + " 结果: " + result);
} catch (ExecutionException e) {
System.err.println("任务 " + (i + 1) + " 执行异常: " + e.getCause().getMessage());
} catch (TimeoutException e) {
System.err.println("任务 " + (i + 1) + " 执行超时");
}
}

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}

线程同步机制

在多线程环境中,当多个线程访问共享资源时,必须使用同步机制来保证线程安全。

synchronized关键字

graph TD
    A[synchronized同步机制] --> B[同步方法]
    A --> C[同步代码块]
    A --> D[静态同步方法]
    
    B --> E[实例方法同步
锁定当前对象] C --> F[灵活指定锁对象
精确控制同步范围] D --> G[类级别同步
锁定Class对象] E --> H[自动获取和释放锁] F --> H G --> H H --> I[保证原子性] H --> J[保证可见性] H --> K[保证有序性]

synchronized的使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
public class SynchronizedExample {
private int count = 0;
private final Object lock = new Object();

// 同步方法
public synchronized void synchronizedMethod() {
count++;
System.out.println(Thread.currentThread().getName() +
" 同步方法执行,count = " + count);
}

// 同步代码块
public void synchronizedBlock() {
synchronized (lock) {
count++;
System.out.println(Thread.currentThread().getName() +
" 同步代码块执行,count = " + count);
}
}

// 静态同步方法
public static synchronized void staticSynchronizedMethod() {
System.out.println(Thread.currentThread().getName() +
" 静态同步方法执行");
}

// 演示银行账户转账的线程安全问题
static class BankAccount {
private double balance;
private String accountNumber;

public BankAccount(String accountNumber, double initialBalance) {
this.accountNumber = accountNumber;
this.balance = initialBalance;
}

// 不安全的转账方法
public void unsafeTransfer(BankAccount target, double amount) {
if (this.balance >= amount) {
System.out.println(Thread.currentThread().getName() +
" 开始转账 " + amount + " 从 " + this.accountNumber +
" 到 " + target.accountNumber);

// 模拟转账延迟
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

this.balance -= amount;
target.balance += amount;

System.out.println(Thread.currentThread().getName() +
" 转账完成,余额: " + this.accountNumber +
"=" + this.balance + ", " +
target.accountNumber + "=" + target.balance);
} else {
System.out.println(Thread.currentThread().getName() +
" 余额不足,无法转账");
}
}

// 安全的转账方法
public synchronized void safeTransfer(BankAccount target, double amount) {
// 使用对象hashCode排序来避免死锁
BankAccount firstLock = this.hashCode() < target.hashCode() ? this : target;
BankAccount secondLock = this.hashCode() < target.hashCode() ? target : this;

synchronized (firstLock) {
synchronized (secondLock) {
if (this.balance >= amount) {
System.out.println(Thread.currentThread().getName() +
" 安全转账 " + amount + " 从 " + this.accountNumber +
" 到 " + target.accountNumber);

try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

this.balance -= amount;
target.balance += amount;

System.out.println(Thread.currentThread().getName() +
" 安全转账完成,余额: " + this.accountNumber +
"=" + this.balance + ", " +
target.accountNumber + "=" + target.balance);
} else {
System.out.println(Thread.currentThread().getName() +
" 余额不足,无法转账");
}
}
}
}

public synchronized double getBalance() {
return balance;
}
}

// 演示同步机制
public static void demonstrateSynchronization() {
SynchronizedExample example = new SynchronizedExample();

// 创建多个线程测试同步
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 3; j++) {
example.synchronizedMethod();
example.synchronizedBlock();
staticSynchronizedMethod();
}
}, "Thread-" + i);
}

// 启动所有线程
for (Thread thread : threads) {
thread.start();
}

// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.println("最终count值: " + example.count);
}

// 演示银行转账场景
public static void demonstrateBankTransfer() {
BankAccount account1 = new BankAccount("ACC001", 1000);
BankAccount account2 = new BankAccount("ACC002", 1000);

// 创建多个转账线程
Thread t1 = new Thread(() -> {
account1.safeTransfer(account2, 300);
}, "Transfer-1");

Thread t2 = new Thread(() -> {
account2.safeTransfer(account1, 200);
}, "Transfer-2");

Thread t3 = new Thread(() -> {
account1.safeTransfer(account2, 100);
}, "Transfer-3");

t1.start();
t2.start();
t3.start();

try {
t1.join();
t2.join();
t3.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("最终余额: ACC001=" + account1.getBalance() +
", ACC002=" + account2.getBalance());
}
}

wait()、notify()和notifyAll()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
public class WaitNotifyExample {

// 生产者消费者模式示例
static class SharedBuffer {
private final Queue<Integer> buffer = new LinkedList<>();
private final int capacity;

public SharedBuffer(int capacity) {
this.capacity = capacity;
}

// 生产者方法
public synchronized void produce(int item) throws InterruptedException {
while (buffer.size() == capacity) {
System.out.println("缓冲区已满,生产者等待...");
wait(); // 等待消费者消费
}

buffer.offer(item);
System.out.println("生产者生产了: " + item + ", 缓冲区大小: " + buffer.size());
notifyAll(); // 通知等待的消费者
}

// 消费者方法
public synchronized int consume() throws InterruptedException {
while (buffer.isEmpty()) {
System.out.println("缓冲区为空,消费者等待...");
wait(); // 等待生产者生产
}

int item = buffer.poll();
System.out.println("消费者消费了: " + item + ", 缓冲区大小: " + buffer.size());
notifyAll(); // 通知等待的生产者
return item;
}

public synchronized int size() {
return buffer.size();
}
}

// 生产者线程
static class Producer implements Runnable {
private final SharedBuffer buffer;
private final String name;

public Producer(SharedBuffer buffer, String name) {
this.buffer = buffer;
this.name = name;
}

@Override
public void run() {
try {
for (int i = 1; i <= 5; i++) {
int item = Integer.parseInt(name.substring(name.length() - 1)) * 10 + i;
buffer.produce(item);
Thread.sleep(1000 + (int)(Math.random() * 1000)); // 随机延迟
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(name + " 被中断");
}
System.out.println(name + " 生产完毕");
}
}

// 消费者线程
static class Consumer implements Runnable {
private final SharedBuffer buffer;
private final String name;

public Consumer(SharedBuffer buffer, String name) {
this.buffer = buffer;
this.name = name;
}

@Override
public void run() {
try {
for (int i = 1; i <= 3; i++) {
int item = buffer.consume();
Thread.sleep(1500 + (int)(Math.random() * 1000)); // 随机延迟
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(name + " 被中断");
}
System.out.println(name + " 消费完毕");
}
}

// 演示生产者消费者模式
public static void demonstrateProducerConsumer() {
System.out.println("=== 生产者消费者模式演示 ===");

SharedBuffer buffer = new SharedBuffer(3); // 容量为3的缓冲区

// 创建生产者和消费者线程
Thread producer1 = new Thread(new Producer(buffer, "Producer-1"));
Thread producer2 = new Thread(new Producer(buffer, "Producer-2"));
Thread consumer1 = new Thread(new Consumer(buffer, "Consumer-1"));
Thread consumer2 = new Thread(new Consumer(buffer, "Consumer-2"));
Thread consumer3 = new Thread(new Consumer(buffer, "Consumer-3"));

// 启动线程
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();

try {
// 等待所有线程完成
producer1.join();
producer2.join();
consumer1.join();
consumer2.join();
consumer3.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("最终缓冲区大小: " + buffer.size());
}
}

Lock接口和ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import java.util.concurrent.locks.*;

public class LockExample {
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private int count = 0;

// 使用ReentrantLock的基本示例
public void lockMethod() {
lock.lock();
try {
count++;
System.out.println(Thread.currentThread().getName() +
" 获取锁,count = " + count);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 必须在finally块中释放锁
}
}

// 尝试获取锁的示例
public void tryLockMethod() {
if (lock.tryLock()) {
try {
count++;
System.out.println(Thread.currentThread().getName() +
" 尝试获取锁成功,count = " + count);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() +
" 尝试获取锁失败");
}
}

// 使用Condition的等待/通知机制
public void await() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 开始等待");
condition.await();
System.out.println(Thread.currentThread().getName() + " 等待结束");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void signal() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 发送信号");
condition.signalAll();
} finally {
lock.unlock();
}
}

// 读写锁示例
static class ReadWriteLockExample {
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private String data = "初始数据";

// 读操作
public String read() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() +
" 开始读取数据: " + data);
Thread.sleep(1000); // 模拟读取耗时
System.out.println(Thread.currentThread().getName() +
" 读取数据完成");
return data;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
} finally {
readLock.unlock();
}
}

// 写操作
public void write(String newData) {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() +
" 开始写入数据: " + newData);
Thread.sleep(2000); // 模拟写入耗时
this.data = newData;
System.out.println(Thread.currentThread().getName() +
" 写入数据完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
}

// 演示Lock的使用
public static void demonstrateLock() {
System.out.println("=== ReentrantLock演示 ===");

LockExample example = new LockExample();

// 创建多个线程测试锁
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
example.lockMethod();
example.tryLockMethod();
}, "LockThread-" + i);
}

// 启动线程
for (Thread thread : threads) {
thread.start();
}

// 等待线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// 演示Condition的使用
public static void demonstrateCondition() {
System.out.println("=== Condition演示 ===");

LockExample example = new LockExample();

// 创建等待线程
Thread waitThread1 = new Thread(example::await, "WaitThread-1");
Thread waitThread2 = new Thread(example::await, "WaitThread-2");

// 创建信号线程
Thread signalThread = new Thread(() -> {
try {
Thread.sleep(2000); // 等待2秒
example.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "SignalThread");

waitThread1.start();
waitThread2.start();
signalThread.start();

try {
waitThread1.join();
waitThread2.join();
signalThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 演示读写锁
public static void demonstrateReadWriteLock() {
System.out.println("=== 读写锁演示 ===");

ReadWriteLockExample example = new ReadWriteLockExample();

// 创建多个读线程
Thread[] readers = new Thread[3];
for (int i = 0; i < readers.length; i++) {
readers[i] = new Thread(() -> {
for (int j = 0; j < 2; j++) {
example.read();
}
}, "Reader-" + i);
}

// 创建写线程
Thread writer = new Thread(() -> {
example.write("更新后的数据-" + System.currentTimeMillis());
}, "Writer");

// 启动线程
for (Thread reader : readers) {
reader.start();
}
writer.start();

try {
for (Thread reader : readers) {
reader.join();
}
writer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

线程池

线程池是管理和复用线程的重要机制,能够有效控制系统资源的使用。

线程池的核心概念

graph TB
    A[线程池 ThreadPool] --> B[核心组件]
    A --> C[工作流程]
    A --> D[优势特点]
    
    B --> E[CorePoolSize
核心线程数] B --> F[MaximumPoolSize
最大线程数] B --> G[WorkQueue
工作队列] B --> H[ThreadFactory
线程工厂] B --> I[RejectedExecutionHandler
拒绝策略] C --> J[提交任务到队列] C --> K[核心线程处理任务] C --> L[队列满时创建新线程] C --> M[达到最大线程数时拒绝] D --> N[复用线程降低开销] D --> O[控制并发数量] D --> P[管理线程生命周期]

ThreadPoolExecutor详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolExample {

// 自定义线程工厂
static class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

public NamedThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
t.setDaemon(false);
return t;
}
}

// 自定义拒绝策略
static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("任务被拒绝: " + r.toString() +
", 线程池状态: 活跃线程=" + executor.getActiveCount() +
", 队列大小=" + executor.getQueue().size());
}
}

// 模拟任务
static class Task implements Runnable {
private final int taskId;
private final int executionTime;

public Task(int taskId, int executionTime) {
this.taskId = taskId;
this.executionTime = executionTime;
}

@Override
public void run() {
System.out.println("任务 " + taskId + " 开始执行,线程: " +
Thread.currentThread().getName());
try {
Thread.sleep(executionTime * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("任务 " + taskId + " 被中断");
return;
}
System.out.println("任务 " + taskId + " 执行完毕,耗时: " + executionTime + "秒");
}

@Override
public String toString() {
return "Task{id=" + taskId + ", time=" + executionTime + "s}";
}
}

// 演示ThreadPoolExecutor的使用
public static void demonstrateThreadPoolExecutor() {
System.out.println("=== ThreadPoolExecutor演示 ===");

// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(3), // 工作队列,容量为3
new NamedThreadFactory("Worker"), // 自定义线程工厂
new CustomRejectedExecutionHandler() // 自定义拒绝策略
);

// 提交任务
for (int i = 1; i <= 10; i++) {
Task task = new Task(i, 2);
try {
executor.execute(task);
System.out.println("提交任务 " + i + ", 活跃线程: " +
executor.getActiveCount() +
", 队列大小: " + executor.getQueue().size());
} catch (RejectedExecutionException e) {
System.err.println("任务 " + i + " 被拒绝");
}

// 稍微延迟,观察线程池状态变化
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}

// 监控线程池状态
monitorThreadPool(executor);

// 关闭线程池
shutdownThreadPool(executor);
}

// 监控线程池状态
private static void monitorThreadPool(ThreadPoolExecutor executor) {
Thread monitor = new Thread(() -> {
while (!executor.isTerminated()) {
try {
System.out.println("=== 线程池状态 ===");
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
System.out.println("当前线程数: " + executor.getPoolSize());
System.out.println("活跃线程数: " + executor.getActiveCount());
System.out.println("队列中任务数: " + executor.getQueue().size());
System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
System.out.println("总任务数: " + executor.getTaskCount());
System.out.println("================");

Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Monitor");

monitor.setDaemon(true);
monitor.start();
}

// 优雅关闭线程池
private static void shutdownThreadPool(ThreadPoolExecutor executor) {
System.out.println("开始关闭线程池...");

executor.shutdown(); // 不再接受新任务,等待已提交任务完成

try {
// 等待所有任务完成,最多等待30秒
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
System.out.println("线程池未能在30秒内完成所有任务,强制关闭");
executor.shutdownNow(); // 强制关闭

// 等待强制关闭完成
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
System.err.println("线程池无法完全关闭");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}

System.out.println("线程池已关闭");
}

// 演示Executors工厂方法
public static void demonstrateExecutors() {
System.out.println("=== Executors工厂方法演示 ===");

// 1. 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(3);
System.out.println("固定大小线程池 (大小=3):");
for (int i = 1; i <= 5; i++) {
fixedPool.execute(new Task(i, 1));
}
fixedPool.shutdown();

// 2. 缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();
System.out.println("缓存线程池:");
for (int i = 1; i <= 5; i++) {
cachedPool.execute(new Task(i, 1));
}
cachedPool.shutdown();

// 3. 单线程线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();
System.out.println("单线程线程池:");
for (int i = 1; i <= 3; i++) {
singlePool.execute(new Task(i, 1));
}
singlePool.shutdown();

// 4. 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
System.out.println("定时任务线程池:");

// 延迟执行
scheduledPool.schedule(new Task(1, 1), 2, TimeUnit.SECONDS);

// 固定频率执行
AtomicInteger counter = new AtomicInteger(0);
ScheduledFuture<?> future = scheduledPool.scheduleAtFixedRate(() -> {
int count = counter.incrementAndGet();
System.out.println("定时任务执行第 " + count + " 次");
if (count >= 3) {
scheduledPool.shutdown();
}
}, 1, 2, TimeUnit.SECONDS);

try {
// 等待所有线程池完成
fixedPool.awaitTermination(10, TimeUnit.SECONDS);
cachedPool.awaitTermination(10, TimeUnit.SECONDS);
singlePool.awaitTermination(10, TimeUnit.SECONDS);
scheduledPool.awaitTermination(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

并发集合

Java提供了线程安全的并发集合类,用于在多线程环境中安全地操作数据。

ConcurrentHashMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import java.util.concurrent.*;
import java.util.Map;

public class ConcurrentCollectionExample {

// 演示ConcurrentHashMap的使用
public static void demonstrateConcurrentHashMap() {
System.out.println("=== ConcurrentHashMap演示 ===");

ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();

// 创建多个线程并发操作Map
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
final int threadIndex = i;
threads[i] = new Thread(() -> {
String threadName = "Thread-" + threadIndex;

// 并发写入操作
for (int j = 0; j < 10; j++) {
String key = threadName + "-key-" + j;
concurrentMap.put(key, j);
System.out.println(threadName + " 写入: " + key + " = " + j);
}

// 并发读取操作
for (int j = 0; j < 5; j++) {
String key = threadName + "-key-" + j;
Integer value = concurrentMap.get(key);
System.out.println(threadName + " 读取: " + key + " = " + value);
}

// 原子操作
concurrentMap.compute("counter", (k, v) -> v == null ? 1 : v + 1);

}, "ConcurrentMapThread-" + i);
}

// 启动所有线程
for (Thread thread : threads) {
thread.start();
}

// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.println("最终Map大小: " + concurrentMap.size());
System.out.println("计数器值: " + concurrentMap.get("counter"));
}

// 演示BlockingQueue的使用
public static void demonstrateBlockingQueue() {
System.out.println("=== BlockingQueue演示 ===");

BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
String item = "Item-" + i;
queue.put(item); // 阻塞式添加
System.out.println("生产者添加: " + item +
", 队列大小: " + queue.size());
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("生产者完成");
}, "Producer");

// 消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
String item = queue.take(); // 阻塞式获取
System.out.println("消费者获取: " + item +
", 队列大小: " + queue.size());
Thread.sleep(800);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("消费者完成");
}, "Consumer");

producer.start();
consumer.start();

try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 演示CopyOnWriteArrayList的使用
public static void demonstrateCopyOnWriteArrayList() {
System.out.println("=== CopyOnWriteArrayList演示 ===");

CopyOnWriteArrayList<String> cowList = new CopyOnWriteArrayList<>();

// 写线程
Thread writer = new Thread(() -> {
for (int i = 1; i <= 5; i++) {
String item = "Item-" + i;
cowList.add(item);
System.out.println("写入: " + item + ", 当前大小: " + cowList.size());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Writer");

// 读线程
Thread reader = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("读取列表: " + cowList);
try {
Thread.sleep(800);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Reader");

writer.start();
reader.start();

try {
writer.join();
Thread.sleep(2000); // 让读线程再读取几次
reader.interrupt();
reader.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("最终列表: " + cowList);
}
}

实际应用案例

多线程文件下载器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;

public class MultiThreadDownloader {

static class DownloadTask implements Callable<String> {
private final String url;
private final int taskId;

public DownloadTask(String url, int taskId) {
this.url = url;
this.taskId = taskId;
}

@Override
public String call() throws Exception {
System.out.println("开始下载任务 " + taskId + ": " + url);

// 模拟下载过程
int totalSize = 1000 + (int)(Math.random() * 5000); // 随机文件大小
int downloaded = 0;

while (downloaded < totalSize) {
Thread.sleep(100); // 模拟网络延迟
downloaded += 50 + (int)(Math.random() * 100);

if (downloaded > totalSize) {
downloaded = totalSize;
}

int progress = (downloaded * 100) / totalSize;
System.out.println("任务 " + taskId + " 下载进度: " + progress +
"% (" + downloaded + "/" + totalSize + ")");
}

String result = "任务 " + taskId + " 下载完成: " + url +
", 大小: " + totalSize + " bytes";
System.out.println(result);
return result;
}
}

public static void demonstrateMultiThreadDownload() {
System.out.println("=== 多线程文件下载器演示 ===");

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);

// 创建下载任务
List<String> urls = List.of(
"http://example.com/file1.zip",
"http://example.com/file2.pdf",
"http://example.com/file3.mp4",
"http://example.com/file4.jpg",
"http://example.com/file5.txt"
);

List<Future<String>> futures = new ArrayList<>();

// 提交下载任务
for (int i = 0; i < urls.size(); i++) {
DownloadTask task = new DownloadTask(urls.get(i), i + 1);
Future<String> future = executor.submit(task);
futures.add(future);
}

// 获取下载结果
System.out.println("等待所有下载任务完成...");
for (int i = 0; i < futures.size(); i++) {
try {
String result = futures.get(i).get(30, TimeUnit.SECONDS);
System.out.println("收到结果: " + result);
} catch (Exception e) {
System.err.println("下载任务 " + (i + 1) + " 失败: " + e.getMessage());
}
}

executor.shutdown();
System.out.println("所有下载任务处理完毕");
}
}

总结

Java多线程编程是现代软件开发的重要技能,通过本文的学习,我们深入了解了:

  1. 多线程基础:理解了进程与线程的区别、线程生命周期等核心概念
  2. 线程创建方式:掌握了继承Thread类、实现Runnable接口、实现Callable接口等创建方式
  3. 同步机制:学会了使用synchronized关键字、wait/notify机制、Lock接口等同步工具
  4. 线程池技术:理解了线程池的工作原理和使用方法,提高程序性能
  5. 并发集合:掌握了线程安全的集合类,避免并发问题
  6. 实际应用:通过案例学习了多线程在实际项目中的应用

在实际开发中,正确使用多线程能够:

  • 充分利用多核处理器性能
  • 提高程序响应能力和吞吐量
  • 实现异步处理和并行计算
  • 构建高并发的服务器应用

建议开发者在使用多线程时注意:

  • 避免线程安全问题,正确使用同步机制
  • 合理设计线程池参数,避免资源浪费
  • 注意死锁和活锁问题
  • 使用并发工具类简化开发
  • 进行充分的测试,特别是并发场景下的测试

掌握Java多线程编程将为构建高性能、高并发的Java应用程序提供强有力的技术支撑。

参考资料