前言 在现代软件开发中,多线程编程是提高程序性能和响应能力的重要手段。Java作为一门天生支持多线程的编程语言,提供了丰富的多线程编程API和同步机制。掌握Java多线程编程不仅能够充分利用多核处理器的计算能力,还能构建高并发、高性能的应用程序。本文将深入探讨Java多线程的核心概念、创建方式、同步机制以及实际应用场景。
多线程基础概念 进程与线程的区别 理解进程和线程的概念是学习多线程编程的基础:
graph TB
A[计算机系统] --> B[进程 Process]
A --> C[线程 Thread]
B --> D[独立的内存空间]
B --> E[系统资源分配单位]
B --> F[进程间通信成本高]
C --> G[共享进程内存空间]
C --> H[CPU调度执行单位]
C --> I[线程间通信成本低]
subgraph "进程内部结构"
J[主线程]
K[工作线程1]
L[工作线程2]
M[共享内存区域]
N[各线程栈空间]
end
Java线程的生命周期 Java线程在整个生命周期中会经历多个状态:
stateDiagram-v2
[*] --> NEW: 创建线程对象
NEW --> RUNNABLE: 调用start()方法
RUNNABLE --> BLOCKED: 等待获取锁
RUNNABLE --> WAITING: 调用wait()/join()
RUNNABLE --> TIMED_WAITING: 调用sleep()/wait(timeout)
BLOCKED --> RUNNABLE: 获取到锁
WAITING --> RUNNABLE: 被notify()/notifyAll()唤醒
TIMED_WAITING --> RUNNABLE: 超时或被唤醒
RUNNABLE --> TERMINATED: 线程执行完毕
TERMINATED --> [*]
线程状态详解 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 public class ThreadStateDemo { public static void main (String[] args) throws InterruptedException { demonstrateThreadStates(); } public static void demonstrateThreadStates () throws InterruptedException { Object lock = new Object (); Thread newThread = new Thread (() -> { System.out.println("新线程开始执行" ); }); System.out.println("新线程状态: " + newThread.getState()); Thread runnableThread = new Thread (() -> { for (int i = 0 ; i < 1000000 ; i++) { Math.random(); } }); runnableThread.start(); Thread.sleep(10 ); System.out.println("运行线程状态: " + runnableThread.getState()); Thread blockedThread1 = new Thread (() -> { synchronized (lock) { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread blockedThread2 = new Thread (() -> { synchronized (lock) { System.out.println("获取到锁" ); } }); blockedThread1.start(); Thread.sleep(100 ); blockedThread2.start(); Thread.sleep(100 ); System.out.println("阻塞线程状态: " + blockedThread2.getState()); Thread waitingThread = new Thread (() -> { synchronized (lock) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } }); waitingThread.start(); Thread.sleep(100 ); System.out.println("等待线程状态: " + waitingThread.getState()); Thread timedWaitingThread = new Thread (() -> { try { Thread.sleep(5000 ); } catch (InterruptedException e) { e.printStackTrace(); } }); timedWaitingThread.start(); Thread.sleep(100 ); System.out.println("限时等待线程状态: " + timedWaitingThread.getState()); synchronized (lock) { lock.notifyAll(); } runnableThread.join(); blockedThread1.join(); blockedThread2.join(); waitingThread.join(); timedWaitingThread.join(); System.out.println("所有线程已结束" ); } }
线程的创建与使用 Java提供了多种创建线程的方式,每种方式都有其适用场景。
继承Thread类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public class MyThread extends Thread { private String threadName; private int count; public MyThread (String name) { this .threadName = name; this .count = 0 ; } @Override public void run () { System.out.println(threadName + " 开始执行" ); for (int i = 0 ; i < 5 ; i++) { try { count++; System.out.println(threadName + " 执行第 " + count + " 次,当前时间: " + new SimpleDateFormat ("HH:mm:ss" ).format(new Date ())); Thread.sleep(1000 ); } catch (InterruptedException e) { System.out.println(threadName + " 被中断" ); break ; } } System.out.println(threadName + " 执行完毕" ); } public int getCount () { return count; } } public class ThreadExample { public static void demonstrateThreadClass () { System.out.println("=== 继承Thread类示例 ===" ); MyThread thread1 = new MyThread ("工作线程-1" ); MyThread thread2 = new MyThread ("工作线程-2" ); thread1.start(); thread2.start(); try { thread1.join(); thread2.join(); System.out.println("线程1执行了 " + thread1.getCount() + " 次" ); System.out.println("线程2执行了 " + thread2.getCount() + " 次" ); } catch (InterruptedException e) { e.printStackTrace(); } } }
实现Runnable接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public class MyRunnable implements Runnable { private String taskName; private volatile boolean running = true ; public MyRunnable (String taskName) { this .taskName = taskName; } @Override public void run () { System.out.println(taskName + " 开始执行,线程: " + Thread.currentThread().getName()); int counter = 0 ; while (running && counter < 10 ) { try { counter++; System.out.println(taskName + " 正在处理任务 " + counter); Thread.sleep(500 ); } catch (InterruptedException e) { System.out.println(taskName + " 被中断,停止执行" ); Thread.currentThread().interrupt(); break ; } } System.out.println(taskName + " 执行完毕,共处理 " + counter + " 个任务" ); } public void stop () { running = false ; } } public class RunnableExample { public static void demonstrateRunnableInterface () { System.out.println("=== 实现Runnable接口示例 ===" ); MyRunnable task1 = new MyRunnable ("数据处理任务" ); MyRunnable task2 = new MyRunnable ("文件备份任务" ); Thread thread1 = new Thread (task1, "DataProcessor" ); Thread thread2 = new Thread (task2, "FileBackup" ); thread1.start(); thread2.start(); try { Thread.sleep(3000 ); task1.stop(); task2.stop(); thread1.join(); thread2.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
实现Callable接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 import java.util.concurrent.*;import java.util.List;import java.util.ArrayList;public class MyCallable implements Callable <String> { private String taskName; private int workTime; public MyCallable (String taskName, int workTime) { this .taskName = taskName; this .workTime = workTime; } @Override public String call () throws Exception { System.out.println(taskName + " 开始执行,预计耗时 " + workTime + " 秒" ); for (int i = 1 ; i <= workTime; i++) { Thread.sleep(1000 ); System.out.println(taskName + " 进度: " + (i * 100 / workTime) + "%" ); if (i == 3 && taskName.contains("异常" )) { throw new RuntimeException (taskName + " 执行过程中发生异常" ); } } String result = taskName + " 执行完毕,结果: SUCCESS" ; System.out.println(result); return result; } } public class CallableExample { public static void demonstrateCallableInterface () { System.out.println("=== 实现Callable接口示例 ===" ); ExecutorService executor = Executors.newFixedThreadPool(3 ); List<Callable<String>> tasks = new ArrayList <>(); tasks.add(new MyCallable ("计算任务" , 3 )); tasks.add(new MyCallable ("下载任务" , 2 )); tasks.add(new MyCallable ("异常任务" , 4 )); try { List<Future<String>> futures = new ArrayList <>(); for (Callable<String> task : tasks) { Future<String> future = executor.submit(task); futures.add(future); } for (int i = 0 ; i < futures.size(); i++) { try { Future<String> future = futures.get(i); String result = future.get(10 , TimeUnit.SECONDS); System.out.println("任务 " + (i + 1 ) + " 结果: " + result); } catch (ExecutionException e) { System.err.println("任务 " + (i + 1 ) + " 执行异常: " + e.getCause().getMessage()); } catch (TimeoutException e) { System.err.println("任务 " + (i + 1 ) + " 执行超时" ); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { executor.shutdown(); } } }
线程同步机制 在多线程环境中,当多个线程访问共享资源时,必须使用同步机制来保证线程安全。
synchronized关键字 graph TD
A[synchronized同步机制] --> B[同步方法]
A --> C[同步代码块]
A --> D[静态同步方法]
B --> E[实例方法同步 锁定当前对象]
C --> F[灵活指定锁对象 精确控制同步范围]
D --> G[类级别同步 锁定Class对象]
E --> H[自动获取和释放锁]
F --> H
G --> H
H --> I[保证原子性]
H --> J[保证可见性]
H --> K[保证有序性]
synchronized的使用示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 public class SynchronizedExample { private int count = 0 ; private final Object lock = new Object (); public synchronized void synchronizedMethod () { count++; System.out.println(Thread.currentThread().getName() + " 同步方法执行,count = " + count); } public void synchronizedBlock () { synchronized (lock) { count++; System.out.println(Thread.currentThread().getName() + " 同步代码块执行,count = " + count); } } public static synchronized void staticSynchronizedMethod () { System.out.println(Thread.currentThread().getName() + " 静态同步方法执行" ); } static class BankAccount { private double balance; private String accountNumber; public BankAccount (String accountNumber, double initialBalance) { this .accountNumber = accountNumber; this .balance = initialBalance; } public void unsafeTransfer (BankAccount target, double amount) { if (this .balance >= amount) { System.out.println(Thread.currentThread().getName() + " 开始转账 " + amount + " 从 " + this .accountNumber + " 到 " + target.accountNumber); try { Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } this .balance -= amount; target.balance += amount; System.out.println(Thread.currentThread().getName() + " 转账完成,余额: " + this .accountNumber + "=" + this .balance + ", " + target.accountNumber + "=" + target.balance); } else { System.out.println(Thread.currentThread().getName() + " 余额不足,无法转账" ); } } public synchronized void safeTransfer (BankAccount target, double amount) { BankAccount firstLock = this .hashCode() < target.hashCode() ? this : target; BankAccount secondLock = this .hashCode() < target.hashCode() ? target : this ; synchronized (firstLock) { synchronized (secondLock) { if (this .balance >= amount) { System.out.println(Thread.currentThread().getName() + " 安全转账 " + amount + " 从 " + this .accountNumber + " 到 " + target.accountNumber); try { Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } this .balance -= amount; target.balance += amount; System.out.println(Thread.currentThread().getName() + " 安全转账完成,余额: " + this .accountNumber + "=" + this .balance + ", " + target.accountNumber + "=" + target.balance); } else { System.out.println(Thread.currentThread().getName() + " 余额不足,无法转账" ); } } } } public synchronized double getBalance () { return balance; } } public static void demonstrateSynchronization () { SynchronizedExample example = new SynchronizedExample (); Thread[] threads = new Thread [5 ]; for (int i = 0 ; i < threads.length; i++) { threads[i] = new Thread (() -> { for (int j = 0 ; j < 3 ; j++) { example.synchronizedMethod(); example.synchronizedBlock(); staticSynchronizedMethod(); } }, "Thread-" + i); } for (Thread thread : threads) { thread.start(); } for (Thread thread : threads) { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("最终count值: " + example.count); } public static void demonstrateBankTransfer () { BankAccount account1 = new BankAccount ("ACC001" , 1000 ); BankAccount account2 = new BankAccount ("ACC002" , 1000 ); Thread t1 = new Thread (() -> { account1.safeTransfer(account2, 300 ); }, "Transfer-1" ); Thread t2 = new Thread (() -> { account2.safeTransfer(account1, 200 ); }, "Transfer-2" ); Thread t3 = new Thread (() -> { account1.safeTransfer(account2, 100 ); }, "Transfer-3" ); t1.start(); t2.start(); t3.start(); try { t1.join(); t2.join(); t3.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("最终余额: ACC001=" + account1.getBalance() + ", ACC002=" + account2.getBalance()); } }
wait()、notify()和notifyAll() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 public class WaitNotifyExample { static class SharedBuffer { private final Queue<Integer> buffer = new LinkedList <>(); private final int capacity; public SharedBuffer (int capacity) { this .capacity = capacity; } public synchronized void produce (int item) throws InterruptedException { while (buffer.size() == capacity) { System.out.println("缓冲区已满,生产者等待..." ); wait(); } buffer.offer(item); System.out.println("生产者生产了: " + item + ", 缓冲区大小: " + buffer.size()); notifyAll(); } public synchronized int consume () throws InterruptedException { while (buffer.isEmpty()) { System.out.println("缓冲区为空,消费者等待..." ); wait(); } int item = buffer.poll(); System.out.println("消费者消费了: " + item + ", 缓冲区大小: " + buffer.size()); notifyAll(); return item; } public synchronized int size () { return buffer.size(); } } static class Producer implements Runnable { private final SharedBuffer buffer; private final String name; public Producer (SharedBuffer buffer, String name) { this .buffer = buffer; this .name = name; } @Override public void run () { try { for (int i = 1 ; i <= 5 ; i++) { int item = Integer.parseInt(name.substring(name.length() - 1 )) * 10 + i; buffer.produce(item); Thread.sleep(1000 + (int )(Math.random() * 1000 )); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println(name + " 被中断" ); } System.out.println(name + " 生产完毕" ); } } static class Consumer implements Runnable { private final SharedBuffer buffer; private final String name; public Consumer (SharedBuffer buffer, String name) { this .buffer = buffer; this .name = name; } @Override public void run () { try { for (int i = 1 ; i <= 3 ; i++) { int item = buffer.consume(); Thread.sleep(1500 + (int )(Math.random() * 1000 )); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println(name + " 被中断" ); } System.out.println(name + " 消费完毕" ); } } public static void demonstrateProducerConsumer () { System.out.println("=== 生产者消费者模式演示 ===" ); SharedBuffer buffer = new SharedBuffer (3 ); Thread producer1 = new Thread (new Producer (buffer, "Producer-1" )); Thread producer2 = new Thread (new Producer (buffer, "Producer-2" )); Thread consumer1 = new Thread (new Consumer (buffer, "Consumer-1" )); Thread consumer2 = new Thread (new Consumer (buffer, "Consumer-2" )); Thread consumer3 = new Thread (new Consumer (buffer, "Consumer-3" )); producer1.start(); producer2.start(); consumer1.start(); consumer2.start(); consumer3.start(); try { producer1.join(); producer2.join(); consumer1.join(); consumer2.join(); consumer3.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("最终缓冲区大小: " + buffer.size()); } }
Lock接口和ReentrantLock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 import java.util.concurrent.locks.*;public class LockExample { private final ReentrantLock lock = new ReentrantLock (); private final Condition condition = lock.newCondition(); private int count = 0 ; public void lockMethod () { lock.lock(); try { count++; System.out.println(Thread.currentThread().getName() + " 获取锁,count = " + count); Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void tryLockMethod () { if (lock.tryLock()) { try { count++; System.out.println(Thread.currentThread().getName() + " 尝试获取锁成功,count = " + count); Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } else { System.out.println(Thread.currentThread().getName() + " 尝试获取锁失败" ); } } public void await () { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " 开始等待" ); condition.await(); System.out.println(Thread.currentThread().getName() + " 等待结束" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void signal () { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " 发送信号" ); condition.signalAll(); } finally { lock.unlock(); } } static class ReadWriteLockExample { private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock (); private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); private String data = "初始数据" ; public String read () { readLock.lock(); try { System.out.println(Thread.currentThread().getName() + " 开始读取数据: " + data); Thread.sleep(1000 ); System.out.println(Thread.currentThread().getName() + " 读取数据完成" ); return data; } catch (InterruptedException e) { e.printStackTrace(); return null ; } finally { readLock.unlock(); } } public void write (String newData) { writeLock.lock(); try { System.out.println(Thread.currentThread().getName() + " 开始写入数据: " + newData); Thread.sleep(2000 ); this .data = newData; System.out.println(Thread.currentThread().getName() + " 写入数据完成" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { writeLock.unlock(); } } } public static void demonstrateLock () { System.out.println("=== ReentrantLock演示 ===" ); LockExample example = new LockExample (); Thread[] threads = new Thread [5 ]; for (int i = 0 ; i < threads.length; i++) { threads[i] = new Thread (() -> { example.lockMethod(); example.tryLockMethod(); }, "LockThread-" + i); } for (Thread thread : threads) { thread.start(); } for (Thread thread : threads) { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void demonstrateCondition () { System.out.println("=== Condition演示 ===" ); LockExample example = new LockExample (); Thread waitThread1 = new Thread (example::await, "WaitThread-1" ); Thread waitThread2 = new Thread (example::await, "WaitThread-2" ); Thread signalThread = new Thread (() -> { try { Thread.sleep(2000 ); example.signal(); } catch (InterruptedException e) { e.printStackTrace(); } }, "SignalThread" ); waitThread1.start(); waitThread2.start(); signalThread.start(); try { waitThread1.join(); waitThread2.join(); signalThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void demonstrateReadWriteLock () { System.out.println("=== 读写锁演示 ===" ); ReadWriteLockExample example = new ReadWriteLockExample (); Thread[] readers = new Thread [3 ]; for (int i = 0 ; i < readers.length; i++) { readers[i] = new Thread (() -> { for (int j = 0 ; j < 2 ; j++) { example.read(); } }, "Reader-" + i); } Thread writer = new Thread (() -> { example.write("更新后的数据-" + System.currentTimeMillis()); }, "Writer" ); for (Thread reader : readers) { reader.start(); } writer.start(); try { for (Thread reader : readers) { reader.join(); } writer.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
线程池 线程池是管理和复用线程的重要机制,能够有效控制系统资源的使用。
线程池的核心概念 graph TB
A[线程池 ThreadPool] --> B[核心组件]
A --> C[工作流程]
A --> D[优势特点]
B --> E[CorePoolSize 核心线程数]
B --> F[MaximumPoolSize 最大线程数]
B --> G[WorkQueue 工作队列]
B --> H[ThreadFactory 线程工厂]
B --> I[RejectedExecutionHandler 拒绝策略]
C --> J[提交任务到队列]
C --> K[核心线程处理任务]
C --> L[队列满时创建新线程]
C --> M[达到最大线程数时拒绝]
D --> N[复用线程降低开销]
D --> O[控制并发数量]
D --> P[管理线程生命周期]
ThreadPoolExecutor详解 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;public class ThreadPoolExample { static class NamedThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger (1 ); private final String namePrefix; public NamedThreadFactory (String namePrefix) { this .namePrefix = namePrefix; } @Override public Thread newThread (Runnable r) { Thread t = new Thread (r, namePrefix + "-" + threadNumber.getAndIncrement()); t.setDaemon(false ); return t; } } static class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution (Runnable r, ThreadPoolExecutor executor) { System.err.println("任务被拒绝: " + r.toString() + ", 线程池状态: 活跃线程=" + executor.getActiveCount() + ", 队列大小=" + executor.getQueue().size()); } } static class Task implements Runnable { private final int taskId; private final int executionTime; public Task (int taskId, int executionTime) { this .taskId = taskId; this .executionTime = executionTime; } @Override public void run () { System.out.println("任务 " + taskId + " 开始执行,线程: " + Thread.currentThread().getName()); try { Thread.sleep(executionTime * 1000 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("任务 " + taskId + " 被中断" ); return ; } System.out.println("任务 " + taskId + " 执行完毕,耗时: " + executionTime + "秒" ); } @Override public String toString () { return "Task{id=" + taskId + ", time=" + executionTime + "s}" ; } } public static void demonstrateThreadPoolExecutor () { System.out.println("=== ThreadPoolExecutor演示 ===" ); ThreadPoolExecutor executor = new ThreadPoolExecutor ( 2 , 4 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue <>(3 ), new NamedThreadFactory ("Worker" ), new CustomRejectedExecutionHandler () ); for (int i = 1 ; i <= 10 ; i++) { Task task = new Task (i, 2 ); try { executor.execute(task); System.out.println("提交任务 " + i + ", 活跃线程: " + executor.getActiveCount() + ", 队列大小: " + executor.getQueue().size()); } catch (RejectedExecutionException e) { System.err.println("任务 " + i + " 被拒绝" ); } try { Thread.sleep(500 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break ; } } monitorThreadPool(executor); shutdownThreadPool(executor); } private static void monitorThreadPool (ThreadPoolExecutor executor) { Thread monitor = new Thread (() -> { while (!executor.isTerminated()) { try { System.out.println("=== 线程池状态 ===" ); System.out.println("核心线程数: " + executor.getCorePoolSize()); System.out.println("最大线程数: " + executor.getMaximumPoolSize()); System.out.println("当前线程数: " + executor.getPoolSize()); System.out.println("活跃线程数: " + executor.getActiveCount()); System.out.println("队列中任务数: " + executor.getQueue().size()); System.out.println("已完成任务数: " + executor.getCompletedTaskCount()); System.out.println("总任务数: " + executor.getTaskCount()); System.out.println("================" ); Thread.sleep(3000 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break ; } } }, "Monitor" ); monitor.setDaemon(true ); monitor.start(); } private static void shutdownThreadPool (ThreadPoolExecutor executor) { System.out.println("开始关闭线程池..." ); executor.shutdown(); try { if (!executor.awaitTermination(30 , TimeUnit.SECONDS)) { System.out.println("线程池未能在30秒内完成所有任务,强制关闭" ); executor.shutdownNow(); if (!executor.awaitTermination(10 , TimeUnit.SECONDS)) { System.err.println("线程池无法完全关闭" ); } } } catch (InterruptedException e) { executor.shutdownNow(); Thread.currentThread().interrupt(); } System.out.println("线程池已关闭" ); } public static void demonstrateExecutors () { System.out.println("=== Executors工厂方法演示 ===" ); ExecutorService fixedPool = Executors.newFixedThreadPool(3 ); System.out.println("固定大小线程池 (大小=3):" ); for (int i = 1 ; i <= 5 ; i++) { fixedPool.execute(new Task (i, 1 )); } fixedPool.shutdown(); ExecutorService cachedPool = Executors.newCachedThreadPool(); System.out.println("缓存线程池:" ); for (int i = 1 ; i <= 5 ; i++) { cachedPool.execute(new Task (i, 1 )); } cachedPool.shutdown(); ExecutorService singlePool = Executors.newSingleThreadExecutor(); System.out.println("单线程线程池:" ); for (int i = 1 ; i <= 3 ; i++) { singlePool.execute(new Task (i, 1 )); } singlePool.shutdown(); ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2 ); System.out.println("定时任务线程池:" ); scheduledPool.schedule(new Task (1 , 1 ), 2 , TimeUnit.SECONDS); AtomicInteger counter = new AtomicInteger (0 ); ScheduledFuture<?> future = scheduledPool.scheduleAtFixedRate(() -> { int count = counter.incrementAndGet(); System.out.println("定时任务执行第 " + count + " 次" ); if (count >= 3 ) { scheduledPool.shutdown(); } }, 1 , 2 , TimeUnit.SECONDS); try { fixedPool.awaitTermination(10 , TimeUnit.SECONDS); cachedPool.awaitTermination(10 , TimeUnit.SECONDS); singlePool.awaitTermination(10 , TimeUnit.SECONDS); scheduledPool.awaitTermination(15 , TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
并发集合 Java提供了线程安全的并发集合类,用于在多线程环境中安全地操作数据。
ConcurrentHashMap 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 import java.util.concurrent.*;import java.util.Map;public class ConcurrentCollectionExample { public static void demonstrateConcurrentHashMap () { System.out.println("=== ConcurrentHashMap演示 ===" ); ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap <>(); Thread[] threads = new Thread [5 ]; for (int i = 0 ; i < threads.length; i++) { final int threadIndex = i; threads[i] = new Thread (() -> { String threadName = "Thread-" + threadIndex; for (int j = 0 ; j < 10 ; j++) { String key = threadName + "-key-" + j; concurrentMap.put(key, j); System.out.println(threadName + " 写入: " + key + " = " + j); } for (int j = 0 ; j < 5 ; j++) { String key = threadName + "-key-" + j; Integer value = concurrentMap.get(key); System.out.println(threadName + " 读取: " + key + " = " + value); } concurrentMap.compute("counter" , (k, v) -> v == null ? 1 : v + 1 ); }, "ConcurrentMapThread-" + i); } for (Thread thread : threads) { thread.start(); } for (Thread thread : threads) { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("最终Map大小: " + concurrentMap.size()); System.out.println("计数器值: " + concurrentMap.get("counter" )); } public static void demonstrateBlockingQueue () { System.out.println("=== BlockingQueue演示 ===" ); BlockingQueue<String> queue = new ArrayBlockingQueue <>(5 ); Thread producer = new Thread (() -> { try { for (int i = 1 ; i <= 10 ; i++) { String item = "Item-" + i; queue.put(item); System.out.println("生产者添加: " + item + ", 队列大小: " + queue.size()); Thread.sleep(500 ); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("生产者完成" ); }, "Producer" ); Thread consumer = new Thread (() -> { try { for (int i = 1 ; i <= 10 ; i++) { String item = queue.take(); System.out.println("消费者获取: " + item + ", 队列大小: " + queue.size()); Thread.sleep(800 ); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("消费者完成" ); }, "Consumer" ); producer.start(); consumer.start(); try { producer.join(); consumer.join(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void demonstrateCopyOnWriteArrayList () { System.out.println("=== CopyOnWriteArrayList演示 ===" ); CopyOnWriteArrayList<String> cowList = new CopyOnWriteArrayList <>(); Thread writer = new Thread (() -> { for (int i = 1 ; i <= 5 ; i++) { String item = "Item-" + i; cowList.add(item); System.out.println("写入: " + item + ", 当前大小: " + cowList.size()); try { Thread.sleep(1000 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break ; } } }, "Writer" ); Thread reader = new Thread (() -> { while (!Thread.currentThread().isInterrupted()) { System.out.println("读取列表: " + cowList); try { Thread.sleep(800 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break ; } } }, "Reader" ); writer.start(); reader.start(); try { writer.join(); Thread.sleep(2000 ); reader.interrupt(); reader.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("最终列表: " + cowList); } }
实际应用案例 多线程文件下载器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 import java.util.concurrent.*;import java.util.List;import java.util.ArrayList;public class MultiThreadDownloader { static class DownloadTask implements Callable <String> { private final String url; private final int taskId; public DownloadTask (String url, int taskId) { this .url = url; this .taskId = taskId; } @Override public String call () throws Exception { System.out.println("开始下载任务 " + taskId + ": " + url); int totalSize = 1000 + (int )(Math.random() * 5000 ); int downloaded = 0 ; while (downloaded < totalSize) { Thread.sleep(100 ); downloaded += 50 + (int )(Math.random() * 100 ); if (downloaded > totalSize) { downloaded = totalSize; } int progress = (downloaded * 100 ) / totalSize; System.out.println("任务 " + taskId + " 下载进度: " + progress + "% (" + downloaded + "/" + totalSize + ")" ); } String result = "任务 " + taskId + " 下载完成: " + url + ", 大小: " + totalSize + " bytes" ; System.out.println(result); return result; } } public static void demonstrateMultiThreadDownload () { System.out.println("=== 多线程文件下载器演示 ===" ); ExecutorService executor = Executors.newFixedThreadPool(3 ); List<String> urls = List.of( "http://example.com/file1.zip" , "http://example.com/file2.pdf" , "http://example.com/file3.mp4" , "http://example.com/file4.jpg" , "http://example.com/file5.txt" ); List<Future<String>> futures = new ArrayList <>(); for (int i = 0 ; i < urls.size(); i++) { DownloadTask task = new DownloadTask (urls.get(i), i + 1 ); Future<String> future = executor.submit(task); futures.add(future); } System.out.println("等待所有下载任务完成..." ); for (int i = 0 ; i < futures.size(); i++) { try { String result = futures.get(i).get(30 , TimeUnit.SECONDS); System.out.println("收到结果: " + result); } catch (Exception e) { System.err.println("下载任务 " + (i + 1 ) + " 失败: " + e.getMessage()); } } executor.shutdown(); System.out.println("所有下载任务处理完毕" ); } }
总结 Java多线程编程是现代软件开发的重要技能,通过本文的学习,我们深入了解了:
多线程基础 :理解了进程与线程的区别、线程生命周期等核心概念
线程创建方式 :掌握了继承Thread类、实现Runnable接口、实现Callable接口等创建方式
同步机制 :学会了使用synchronized关键字、wait/notify机制、Lock接口等同步工具
线程池技术 :理解了线程池的工作原理和使用方法,提高程序性能
并发集合 :掌握了线程安全的集合类,避免并发问题
实际应用 :通过案例学习了多线程在实际项目中的应用
在实际开发中,正确使用多线程能够:
充分利用多核处理器性能
提高程序响应能力和吞吐量
实现异步处理和并行计算
构建高并发的服务器应用
建议开发者在使用多线程时注意:
避免线程安全问题,正确使用同步机制
合理设计线程池参数,避免资源浪费
注意死锁和活锁问题
使用并发工具类简化开发
进行充分的测试,特别是并发场景下的测试
掌握Java多线程编程将为构建高性能、高并发的Java应用程序提供强有力的技术支撑。
参考资料