并发,在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行。本文转载自Java并发编程详解

线程

线程基础

线程的本质

线程是轻量级进程:在同一个进程中,多个线程共享内存空间(堆、方法区),但每个线程拥有独立的栈和程序计数器(PC)。
并发 vs 并行:

  • 并发:线程在单核CPU上交替执行(时间片轮转)。
  • 并行:多核CPU上线程真正同时运行

创建线程的两种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 方式1:继承Thread类
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread running by extending Thread");
}
}

// 方式2:实现Runnable接口(推荐,避免单继承限制,在Java中,类只能继承一个父类(单继承),但可以实现多个接口)
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Thread running by implementing Runnable");
}
}

public class Main {
public static void main(String[] args) {
// 方式1 启动线程(调用start(),而非run()!直接调用run()只是普通方法调用,不会创建新线程)
new MyThread().start();
// 方式2
new Thread(new MyRunnable()).start();
}
}

Runnable优势总结:

  • 解耦任务与线程:Runnable表示任务逻辑,Thread表示线程载体,符合面向对象的职责分离原则。
  • 资源共享:多个线程可共享同一个Runnable实例(例如售票系统的共享票池)。
  • 灵活组合:可与线程池(ExecutorService)、Lambda表达式等现代特性无缝结合。

线程的生命周期

  1. 新建(New):线程对象已创建,但未调用start()
  2. 就绪(Runnable):调用start()后,等待CPU调度。
  3. 运行(Running):获得CPU时间片,执行run()方法。
  4. 阻塞(Blocked):因等待锁、I/O操作或sleep()等暂停执行。
  5. 终止(Terminated):run()执行完毕或发生未捕获异常。

线程同步锁机制

1、竞态条件(Race Condition)

当多个线程同时访问共享资源且未正确同步时,结果依赖于线程执行顺序。

1
2
3
4
5
6
7
8
// 典型竞态条件示例:不安全的计数器
class UnsafeCounter {
private int count = 0;
public void increment() {
count++; // 非原子操作(实际为 read-modify-write)
}
public int getCount() { return count; }
}

2、同步解决方案

  • synchronized 关键字:

    • 同步方法:锁对象为当前实例(this)或类对象(静态方法)。
    • 同步代码块:显式指定锁对象(任意对象)。
  • Lock 接口(更灵活,支持超时、公平锁):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;

    class SafeCounterWithLock {
    private int count = 0;
    private Lock lock = new ReentrantLock();

    public void increment() {
    lock.lock();
    try {
    count++;
    } finally {
    lock.unlock(); // 确保释放锁
    }
    }
    }

3、volatile 关键字

  • 保证可见性:对volatile变量的修改立即对其他线程可见。
  • 禁止指令重排序:防止编译器和CPU优化导致的执行顺序错乱。
1
2
3
4
5
6
7
class VolatileExample {
private volatile boolean flag = false;

public void toggleFlag() {
flag = !flag; // 非原子操作,volatile仅保证可见性
}
}

线程间的协作

1、 wait()、notify()、notifyAll()

  • Object类的核心方法:

    • wait():释放锁并进入等待状态。

    • notify():随机唤醒一个等待线程。

    • notifyAll():唤醒所有等待线程。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      import java.util.LinkedList;
      import java.util.Queue;

      class SharedBuffer {
      private final Queue<Integer> queue = new LinkedList<>();
      private final int capacity = 5;

      // 生产者线程逻辑
      public synchronized void produce(int item) throws InterruptedException {
      while (queue.size() == capacity) {
      wait(); // 缓冲区满,等待消费者消费
      }
      queue.add(item);
      System.out.println("生产: " + item + ",当前队列大小: " + queue.size());
      notifyAll(); // 通知消费者可以消费
      }

      // 消费者线程逻辑
      public synchronized int consume() throws InterruptedException {
      while (queue.isEmpty()) {
      wait(); // 缓冲区空,等待生产者生产
      }
      int item = queue.poll();
      System.out.println("消费: " + item + ",剩余队列大小: " + queue.size());
      notifyAll(); // 通知生产者可以生产
      return item;
      }
      }

      public class Main {
      public static void main(String[] args) {
      SharedBuffer buffer = new SharedBuffer();

      // 创建生产者线程
      Thread producer = new Thread(() -> {
      for (int i = 1; i <= 10; i++) {
      try {
      buffer.produce(i);
      Thread.sleep(500); // 模拟生产耗时
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      }, "Producer");

      // 创建消费者线程
      Thread consumer = new Thread(() -> {
      for (int i = 1; i <= 10; i++) {
      try {
      buffer.consume();
      Thread.sleep(1000); // 模拟消费耗时
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      }, "Consumer");

      // 启动线程
      producer.start();
      consumer.start();
      }
      }

2、Condition 接口

  • Lock配合使用,提供更精细的线程等待与唤醒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import java.util.concurrent.locks.*;

class AdvancedBuffer {
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity = 5;

public void produce(int item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 等待"非满"条件
}
queue.add(item);
System.out.println("生产: " + item + ",队列大小: " + queue.size());
notEmpty.signal(); // 触发"非空"条件
} finally {
lock.unlock();
}
}

public int consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待"非空"条件
}
int item = queue.poll();
System.out.println("消费: " + item + ",剩余队列大小: " + queue.size());
notFull.signal(); // 触发"非满"条件
return item;
} finally {
lock.unlock();
}
}
}

public class Main {
public static void main(String[] args) {
AdvancedBuffer buffer = new AdvancedBuffer();

// 创建生产者线程(Lambda实现Runnable)
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
buffer.produce(i);
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Producer").start();

// 创建消费者线程
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
buffer.consume();
Thread.sleep(800);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Consumer").start();
}
}

并发编程

并发工具类(java.util.concurrent)

1、Executor 框架

  • 线程池管理:避免频繁创建/销毁线程的开销。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class Main {
    public static void main(String[] args) {
    // 创建固定大小的线程池(4个线程)
    ExecutorService executor = Executors.newFixedThreadPool(4);

    // 提交10个任务
    for (int i = 1; i <= 10; i++) {
    final int taskId = i;
    executor.submit(() -> {
    System.out.println("任务 " + taskId + " 正在由 "
    + Thread.currentThread().getName() + " 执行");
    try {
    Thread.sleep(1000); // 模拟任务耗时
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    }

    // 关闭线程池(不再接受新任务)
    executor.shutdown();
    }
    }

2、并发集合

  • 线程安全容器:避免手动同步。

    • CopyOnWriteArrayList:读多写少场景。

    • ConcurrentHashMap:高并发哈希表。

    • BlockingQueue:阻塞队列(如LinkedBlockingQueue)。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      // ConcurrentHashMap 示例
      import java.util.concurrent.ConcurrentHashMap;

      public class Main {
      public static void main(String[] args) {
      ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

      // 创建多个写线程
      for (int i = 0; i < 5; i++) {
      final int threadId = i;
      new Thread(() -> {
      for (int j = 0; j < 100; j++) {
      String key = "key-" + threadId + "-" + j;
      map.put(key, j);
      }
      System.out.println("写线程 " + threadId + " 完成");
      }).start();
      }

      // 创建读线程
      new Thread(() -> {
      while (true) {
      System.out.println("当前Map大小: " + map.size());
      try {
      Thread.sleep(500);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      }).start();
      }
      }

3、原子类(Atomic)

  • 基于CAS(Compare-And-Swap)保证原子性,实现无锁线程安全。

  • 性能优于synchronized,适用于高并发场景。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    import java.util.concurrent.atomic.AtomicInteger;

    public class AtomicExample {
    private static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
    Runnable task = () -> {
    for (int i = 0; i < 10000; i++) {
    count.incrementAndGet(); // 原子操作
    }
    };

    Thread t1 = new Thread(task);
    Thread t2 = new Thread(task);
    t1.start();
    t2.start();
    t1.join();
    t2.join();

    System.out.println("Final count: " + count.get()); // 正确输出20000
    }
    }

4、CountDownLatch 与 CyclicBarrier

  • CountDownLatch:等待多个任务完成。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import java.util.concurrent.CountDownLatch;

    public class Main {
    public static void main(String[] args) throws InterruptedException {
    final int TASK_COUNT = 3;
    CountDownLatch latch = new CountDownLatch(TASK_COUNT);

    // 创建并启动多个任务线程
    for (int i = 1; i <= TASK_COUNT; i++) {
    final int taskId = i;
    new Thread(() -> {
    System.out.println("任务 " + taskId + " 开始执行");
    try {
    Thread.sleep(2000); // 模拟任务耗时
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("任务 " + taskId + " 完成");
    latch.countDown(); // 计数器减1 多个线程调用 latch.countDown()
    }).start();
    }

    // 主线程等待所有任务完成
    latch.await();
    System.out.println("所有任务已完成,继续主线程逻辑");
    }
    }
  • CyclicBarrier:多个线程相互等待至屏障点。

    1
    2
    CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("All threads reached barrier"));
    // 每个线程调用 barrier.await()

典型陷阱

1、死锁(Deadlock)

四个必要条件:

  1. 互斥:资源只能被一个线程持有。
  2. 占有且等待:线程持有资源并等待其他资源。
  3. 不可抢占:资源不能被强制释放。
  4. 循环等待:多个线程形成环形等待链。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 死锁示例
// Thread 1: lock A → try lock B
// Thread 2: lock B → try lock A
public class DeadlockDemo {
private static final Object lockA = new Object();
private static final Object lockB = new Object();

public static void main(String[] args) {
// 线程1:先获取lockA,再请求lockB
new Thread(() -> {
synchronized (lockA) {
System.out.println("线程1 持有lockA");
try {
Thread.sleep(100); // 模拟操作耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println("线程1 获取lockB");
}
}
}).start();

// 线程2:先获取lockB,再请求lockA
new Thread(() -> {
synchronized (lockB) {
System.out.println("线程2 持有lockB");
synchronized (lockA) {
System.out.println("线程2 获取lockA");
}
}
}).start();
}
}

解决方案:破坏任一条件,如按固定顺序获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 避免死锁(统一锁顺序)
public class DeadlockSolution {
private static final Object lockA = new Object();
private static final Object lockB = new Object();

public static void main(String[] args) {
// 所有线程按相同顺序获取锁(先lockA后lockB)
new Thread(() -> acquireLocks(lockA, lockB, "线程1")).start();
new Thread(() -> acquireLocks(lockA, lockB, "线程2")).start();
}

private static void acquireLocks(Object firstLock, Object secondLock, String threadName) {
synchronized (firstLock) {
System.out.println(threadName + " 持有 " + firstLock);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (secondLock) {
System.out.println(threadName + " 获取 " + secondLock);
}
}
}
}

2、活锁(Livelock)

线程不断重试失败的操作(如谦让式资源释放),但无法推进。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 活锁示例:两个线程互相让出CPU
while (true) {
if (tryLock(lockA)) {
if (tryLock(lockB)) { /* ... */ }
else { unlock(lockA); }
}
Thread.yield(); // 让出CPU但未解决问题
}

public class LivelockDemo {
static class Worker {
private boolean active = false;

public void work(Runnable task, Worker otherWorker) {
while (!active) {
if (otherWorker.active) {
System.out.println("让出执行权...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
try {
active = true;
task.run();
active = false;
} catch (Exception e) {
active = false;
}
}
}
}

public static void main(String[] args) {
final Worker worker1 = new Worker();
final Worker worker2 = new Worker();

new Thread(() -> worker1.work(() -> System.out.println("Worker1执行任务"), worker2)).start();
new Thread(() -> worker2.work(() -> System.out.println("Worker2执行任务"), worker1)).start();
}
}

3、资源竞争与性能问题

资源竞争和性能问题是并发编程中的核心挑战之一。以下通过具体示例展示资源竞争导致的数据错误,以及不同锁策略对性能的影响,并给出优化方案。

  • 锁粒度:粗粒度锁(简单但低效) vs 细粒度锁(复杂但高效)。
  • 锁分离:如ReadWriteLock分离读锁与写锁。

示例1:资源竞争导致数据错误(未同步的计数器)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class UnsafeCounterExample {
private static int count = 0;

public static void main(String[] args) throws InterruptedException {
Runnable task = () -> {
for (int i = 0; i < 10000; i++) {
count++; // 非原子操作:read → modify → write
}
};

// 启动两个线程同时修改count
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
t1.join();
t2.join();

// 预期结果:20000,实际结果可能小于20000
System.out.println("Final count: " + count); // 结果不确定,因线程竞争而异
}
}

问题分析

  • count++ 是非原子操作,实际包含以下步骤:

    1
    2
    3
    int tmp = count;  // Step 1: 读取当前值
    tmp = tmp + 1; // Step 2: 修改值
    count = tmp; // Step 3: 写回新值
  • 当两个线程同时执行时,可能出现以下时序:

    • 线程A读取count=100 → 线程B读取count=100 → 线程A写入101 → 线程B写入101
    • 最终结果为101,而非预期的102

示例2:粗粒度锁的性能问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class CoarseLockExample {
private static final Object lock = new Object();
private static int count = 0;

public static void main(String[] args) throws InterruptedException {
Runnable task = () -> {
for (int i = 0; i < 100000; i++) {
synchronized (lock) { // 粗粒度锁:锁住整个循环
count++;
}
}
};

long start = System.currentTimeMillis();
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
t1.join();
t2.join();
long end = System.currentTimeMillis();

System.out.println("Final count: " + count);
System.out.println("耗时: " + (end - start) + "ms");
}
}

// 输出
Final count: 200000
耗时: 120ms // 实际时间因机器性能而异,但明显较长

问题分析

  • 粗粒度锁:将整个循环包裹在synchronized块内,每次循环都会获取/释放锁。
  • 性能问题:频繁的锁竞争导致大量线程上下文切换,CPU时间浪费在锁管理而非实际计算。

示例3:细粒度锁优化(减少锁范围)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class FineGrainedLockExample {
private static final Object lock = new Object();
private static int count = 0;

public static void main(String[] args) throws InterruptedException {
Runnable task = () -> {
int localCount = 0;
for (int i = 0; i < 100000; i++) {
localCount++; // 先在线程本地累加
}
synchronized (lock) { // 仅对最终合并操作加锁
count += localCount;
}
};

long start = System.currentTimeMillis();
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
t1.join();
t2.join();
long end = System.currentTimeMillis();

System.out.println("Final count: " + count);
System.out.println("耗时: " + (end - start) + "ms");
}
}

Final count: 200000
耗时: 5ms // 性能显著提升

优化分析

  • 细粒度锁:线程先在本地变量localCount中累加,最后仅对合并操作加锁。
  • 性能提升:锁竞争频率从每次循环减少到每线程一次,大幅降低同步开销。

示例4:锁分离(读写锁优化)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockExample {
private static final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private static int value = 0;

public static void main(String[] args) {
// 写线程(频繁修改数据)
Runnable writer = () -> {
for (int i = 0; i < 1000; i++) {
rwLock.writeLock().lock();
try {
value++;
Thread.sleep(1); // 模拟写操作耗时
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}
};

// 读线程(频繁读取数据)
Runnable reader = () -> {
for (int i = 0; i < 1000; i++) {
rwLock.readLock().lock();
try {
System.out.println("读取 value: " + value);
Thread.sleep(1); // 模拟读操作耗时
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
}
};

// 启动2个写线程和4个读线程
new Thread(writer).start();
new Thread(writer).start();
new Thread(reader).start();
new Thread(reader).start();
new Thread(reader).start();
new Thread(reader).start();
}
}

优化分析

  • 读写锁分离:
    • 读锁(共享锁):允许多个线程同时读数据。
    • 写锁(独占锁):仅允许单个线程写数据,且与读锁互斥。
  • 适用场景:读多写少(如缓存系统),通过减少锁竞争提升吞吐量。

关键性总结

  1. 资源竞争问题
    • 表现:多线程同时修改共享数据导致结果错误(如计数器值不准确)。
    • 解决方案:使用同步机制(synchronizedLock)保证原子性。
  2. 性能优化策略
    • 减少锁粒度:仅对必要代码块加锁(如示例3的本地累加优化)。
    • 锁分离:读写锁(ReadWriteLock)区分读写操作,提升并发度。
    • 无锁编程:使用原子类(AtomicInteger)或并发容器(ConcurrentHashMap)。
  3. 性能测试建议
    • 对比不同锁策略的耗时(如示例2和示例3的耗时差异)。
    • 使用性能分析工具(如JProfilerVisualVM)定位瓶颈。

最佳实践

  1. **优先使用高层并发工具:**如ExecutorConcurrentHashMap
  2. 避免过早优化:仅在性能瓶颈出现时考虑低层同步。
  3. 测试并发代码:使用压力测试工具(如JMeter)和静态分析工具(如FindBugs)。
  4. 遵循不变性(Immutability):使用final字段和不可变对象(如String)。