当前位置: 首页 > news >正文

Java锁代码解析

1. 数据竞争导致数据不一致

两个线程分别把一个变量增加10000次,理论上变量最后的值是20000,实际上小于20000

package org.example;

public class synchronizedTest {
    private static int counter = 0;

    public static void main(String[] args) throws InterruptedException {
        // 线程 1:对 counter 进行 10000 次自增操作
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                counter++;
            }
        });

        // 线程 2:对 counter 进行 10000 次自增操作
        Thread thread2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                counter++;
            }
        });

        // 启动两个线程
        thread1.start();
        thread2.start();

        // 等待两个线程执行完毕
        thread1.join();
        thread2.join();

        // 预期结果应该是 20000,但由于数据竞争,实际结果可能小于 20000
        System.out.println("Counter value: " + counter);
    }
}

counter++ 的底层操作

counter++ 这个操作在 Java 代码里看似是一个简单的自增操作,但在计算机底层,它实际上是由多个步骤组成的:

  1. 读取(Read):从内存中读取 counter 的当前值到 CPU 的寄存器中。
  2. 加 1(Increment):在寄存器中对读取的值进行加 1 操作。
  3. 写入(Write):将寄存器中加 1 后的值写回到内存中的 counter 变量。

执行过程

  1. 线程 1 读取 counter 的值:线程 1 从内存中读取 counter 的值 0 到它的寄存器中。此时线程 1 的寄存器中的值为 0,内存中 counter 的值仍为 0。
  2. 线程调度切换到线程 2:操作系统将 CPU 控制权交给线程 2。
  3. 线程 2 读取 counter 的值:线程 2 从内存中读取 counter 的值 0 到它的寄存器中。此时线程 2 的寄存器中的值为 0,内存中 counter 的值还是 0。
  4. 线程 2 完成自增并写入:线程 2 在寄存器中对值进行加 1 操作,寄存器中的值变为 1,然后将 1 写回到内存中。此时内存中 counter 的值变为 1。
  5. 线程调度切换回线程 1:操作系统将 CPU 控制权交回给线程 1。
  6. 线程 1 完成自增并写入:线程 1 在它的寄存器中对之前读取的 0 进行加 1 操作,寄存器中的值变为 1,然后将 1 写回到内存中。此时内存中 counter 的值还是 1。

在上述过程中,线程 1 和线程 2 都进行了一次自增操作,但由于数据竞争,内存中 counter 的值只增加了 1,而不是预期的 2,这就导致了数据不一致。

修改后,把修改数据的值操作封装为方法用synchronized修饰,或者用synchronized修饰代码块,确保同一时间只有一个线程能够访问方法

package org.example;

public class synchronizedTest {
    private static int counter = 0;
    private static final Object staticLock = new Object();
    public static synchronized void increment() {
        counter++;
    }
    public static void increment() {
        // 对当前对象(this)加锁
        synchronized (staticLock) {
            counter++;
        }
    }


    public static void main(String[] args) throws InterruptedException {
        // 线程 1:对 counter 进行 10000 次自增操作
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                increment();
            }
        });

        // 线程 2:对 counter 进行 10000 次自增操作
        Thread thread2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                increment();
            }
        });

        // 启动两个线程
        thread1.start();
        thread2.start();

        // 等待两个线程执行完毕
        thread1.join();
        thread2.join();

        // 预期结果应该是 20000,但由于数据竞争,实际结果可能小于 20000
        System.out.println("Counter value: " + counter);
    }
}

2,并发修改

线程1在迭代List的同时,线程2添加新元素

package org.example;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class synchronizedTest {
    public static void main(String[] args) {
        // 创建一个 ArrayList
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            list.add(i);
        }

        // 线程 1:迭代 List
        Thread thread1 = new Thread(() -> {
            Iterator<Integer> iterator = list.iterator();
            while (iterator.hasNext()) {
                System.out.println(iterator.next());
                try {
                    // 模拟耗时操作
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 线程 2:修改 List
        Thread thread2 = new Thread(() -> {
            try {
                // 等待一段时间,确保线程 1 开始迭代
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            list.add(10);
        });

        // 启动两个线程
        thread1.start();
        thread2.start();
    }
}

解决问题,使用线程安全集合类或者synchronized修饰代码块

package org.example;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class synchronizedTest {
    private static final List<Integer> list = new CopyOnWriteArrayList<>();

    public static void main(String[] args) {
        // 初始化列表
        for (int i = 0; i < 10; i++) {
            list.add(i);
        }

        // 线程 1:迭代列表
        Thread thread1 = new Thread(() -> {
            Iterator<Integer> iterator = list.iterator();
            while (iterator.hasNext()) {
                System.out.println(iterator.next());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 线程 2:修改列表
        Thread thread2 = new Thread(() -> {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            list.add(10);
        });

        thread1.start();
        thread2.start();
    }
}

3,死锁问题

两个线程争夺资源造成互相等待现象

package org.example;

public class synchronizedTest {
    private static final Object resource1 = new Object();
    private static final Object resource2 = new Object();

    public static void main(String[] args) {
        // 线程 1:先获取 resource1,再获取 resource2
        Thread thread1 = new Thread(() -> {
            synchronized (resource1) {
                System.out.println("Thread 1: Holding resource 1...");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread 1: Waiting for resource 2...");
                //                resource2已经被tread2占有,无法执行下面代码
                synchronized (resource2) {
                    System.out.println("Thread 1: Holding resource 1 and 2...");
                }
            }
        });

        // 线程 2:先获取 resource2,再获取 resource1
        Thread thread2 = new Thread(() -> {
            synchronized (resource2) {
                System.out.println("Thread 2: Holding resource 2...");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread 2: Waiting for resource 1...");
                //                resource1已经被tread1占有,无法执行下面代码
                synchronized (resource1) {
                    System.out.println("Thread 2: Holding resource 1 and 2...");
                }
            }
        });

        // 启动两个线程
        thread1.start();
        thread2.start();
    }
}

修改,增加信号量,线程1获取resource1和信号量,线程2获取不到信号量,不能获取resource2,线程1接着获取resource2

package org.example;

import java.util.concurrent.Semaphore;

public class synchronizedTest {
    // 定义两个资源
    private static final Object resource1 = new Object();
    private static final Object resource2 = new Object();
    // 定义一个信号量,初始许可证数量为 1
    private static final Semaphore semaphore = new Semaphore(1);

    public static void main(String[] args) {
        // 线程 1
        Thread thread1 = new Thread(() -> {
            try {
                // 获取信号量的许可证
                semaphore.acquire();
                // 访问资源 1
                synchronized (resource1) {
                    System.out.println("Thread 1: Holding resource 1...");
                    Thread.sleep(100);
                    // 访问资源 2
                    synchronized (resource2) {
                        System.out.println("Thread 1: Holding resource 1 and 2...");
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 释放信号量的许可证
                semaphore.release();
            }
        });

        // 线程 2
        Thread thread2 = new Thread(() -> {
            try {
                // 获取信号量的许可证
                semaphore.acquire();
                // 访问资源 2
                synchronized (resource2) {
                    System.out.println("Thread 2: Holding resource 2...");
                    Thread.sleep(100);
                    // 访问资源 1
                    synchronized (resource1) {
                        System.out.println("Thread 2: Holding resource 1 and 2...");
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 释放信号量的许可证
                semaphore.release();
            }
        });

        // 启动线程
        thread1.start();
        thread2.start();
    }
}

ReentrantLock使用

private static int counter = 0;
    private static final Lock lock = new ReentrantLock();

    public static void increment() {
        // 获取锁
        lock.lock();
        try {
            // 临界区,对共享资源进行操作
            counter++;
            System.out.println(Thread.currentThread().getName() + " 执行后 counter 的值为: " + counter);
        } finally {
            // 释放锁,确保在任何情况下锁都会被释放
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        // 线程 1
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                increment();
            }
        });

        // 线程 2
        Thread thread2 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                increment();
            }
        });

        thread1.start();
        thread2.start();
    }
package org.example;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TryLockExample {
    private static final Lock lock = new ReentrantLock();

    public static void performTask() {
        if (lock.tryLock()) {
            try {
                System.out.println(Thread.currentThread().getName() + " 获取到锁,开始执行任务");
                // 模拟任务执行
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println(Thread.currentThread().getName() + " 释放锁");
            }
        } else {
            System.out.println(Thread.currentThread().getName() + " 未能获取到锁,放弃执行任务");
        }
    }

    public static void main(String[] args) {
        // 线程 1
        Thread thread1 = new Thread(TryLockExample::performTask);
        // 线程 2
        Thread thread2 = new Thread(TryLockExample::performTask);

        thread1.start();
        thread2.start();
    }
}

非公平锁,此处JVM用到了偏向锁优化,线程1第一次获取到锁,此后锁更偏向于这个线程

优点:减少锁开销,简化机制

公平锁,先来先得,避免饥饿问题

package org.example;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class FairLockInOneClass {
    // 创建公平锁
    private final Lock fairLock = new ReentrantLock(true);
    // 共享计数器
    private int counter = 0;

    // 内部类,代表工作线程
    private class Worker extends Thread {
        public Worker(String name) {
            super(name);
        }

        @Override
        public void run() {
            for (int i = 0; i < 3; i++) {
                // 获取锁
                fairLock.lock();
                try {
                    counter++;
                    System.out.println(Thread.currentThread().getName() + " 执行操作,当前计数器值为: " + counter);
                    // 模拟操作耗时
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 释放锁
                    fairLock.unlock();
                }
            }
        }
    }

    // 启动多个工作线程的方法
    public void startThreads() {
        Worker worker1 = new Worker("线程 1");
        Worker worker2 = new Worker("线程 2");
        Worker worker3 = new Worker("线程 3");

        worker1.start();
        worker2.start();
        worker3.start();

        try {
            // 等待所有线程执行完毕
            worker1.join();
            worker2.join();
            worker3.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        FairLockInOneClass example = new FairLockInOneClass();
        example.startThreads();
    }
}

volatile关键字

如果flag不用volatile修饰,thread2读到的flag为false,不会重新从内存里面读取,不能结束循环

package org.example;

public class volatileTest {
    // 使用 volatile 关键字修饰变量
    private static volatile boolean flag = false;

    public static void main(String[] args) {
        // 线程 1:修改 flag 的值
        Thread thread1 = new Thread(() -> {
            try {
                // 模拟耗时操作
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            flag = true;
            System.out.println("线程 1 已将 flag 设置为 true");
        });

        // 线程 2:不断检查 flag 的值
        Thread thread2 = new Thread(() -> {
            while (!flag) {
                // 循环等待
            }
            System.out.println("线程 2 检测到 flag 变为 true,退出循环");
        });

        // 启动线程
        thread2.start();
        thread1.start();

        try {
            // 等待两个线程执行完毕
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

作用保证可见性,禁止指令重排序,但是不能保证线程安全!

volatile 不能保证线程安全的原因

虽然 volatile 能保证变量的可见性和禁止指令重排序,但它不能保证复合操作的原子性。原子操作是指不可分割的操作,要么全部执行,要么全部不执行。像常见的 i++ 操作,它实际上包含了读取、加 1 和写入三个步骤,并非原子操作。

AQS使用

(写锁)独占锁也称为排他锁,是一种一次只能被一个线程持有的锁。当一个线程获取了独占锁后,其他线程若想获取该锁,就必须等待持有锁的线程释放锁,否则会被阻塞。

(读锁)共享锁是一种可以被多个线程同时持有的锁。当一个线程获取了共享锁后,其他线程也可以获取该共享锁,但如果有线程想要获取独占锁,则必须等待所有持有共享锁的线程释放锁。

自定义独占锁

package org.example;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Lock;

// 自定义独占锁类,实现 Lock 接口
class CustomExclusiveLock implements Lock {
    // 内部类 Sync 继承自 AQS
    private static class Sync extends AbstractQueuedSynchronizer {
        // 判断当前是否处于锁定状态,即 state 是否为 1
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 尝试获取锁,使用 CAS 操作将 state 从 0 变为 1
        @Override
        protected boolean tryAcquire(int acquires) {
            assert acquires == 1;
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 尝试释放锁,将 state 置为 0
        @Override
        protected boolean tryRelease(int releases) {
            assert releases == 1;
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }

    private final Sync sync = new Sync();

    // 加锁操作,调用 AQS 的 acquire 方法
    @Override
    public void lock() {
        sync.acquire(1);
    }

    // 可中断的加锁操作,调用 AQS 的 acquireInterruptibly 方法
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    // 尝试加锁,调用 AQS 的 tryAcquire 方法
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    // 在指定时间内尝试加锁,调用 AQS 的 tryAcquireNanos 方法
    @Override
    public boolean tryLock(long time, java.util.concurrent.TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    // 解锁操作,调用 AQS 的 release 方法
    @Override
    public void unlock() {
        sync.release(1);
    }

    // 获取条件对象
    @Override
    public java.util.concurrent.locks.Condition newCondition() {
        return null;
    }
}

// 测试自定义独占锁
public class CustomExclusiveLockTest {
    public static void main(String[] args) {
        CustomExclusiveLock lock = new CustomExclusiveLock();

        // 创建一个线程
        Thread thread = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("Thread acquired the lock.");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println("Thread released the lock.");
            }
        });

        thread.start();
    }
}

自定义共享锁

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

// 自定义共享锁类
class CustomSharedLock {
    // 内部类 Sync 继承自 AQS
    private static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            setState(permits);
        }

        // 获取共享资源
        @Override
        protected int tryAcquireShared(int acquires) {
            for (; ; ) {
                int current = getState();
                int newState = current - acquires;
                if (newState < 0 || compareAndSetState(current, newState)) {
                    return newState;
                }
            }
        }

        // 释放共享资源
        @Override
        protected boolean tryReleaseShared(int releases) {
            for (; ; ) {
                int current = getState();
                int newState = current + releases;
                if (compareAndSetState(current, newState)) {
                    return true;
                }
            }
        }
    }

    private final Sync sync;

    public CustomSharedLock(int permits) {
        this.sync = new Sync(permits);
    }

    // 获取共享锁
    public void acquireShared() {
        sync.acquireShared(1);
    }

    // 释放共享锁
    public void releaseShared() {
        sync.releaseShared(1);
    }
}

// 测试自定义共享锁
public class CustomSharedLockTest {
    public static void main(String[] args) {
        CustomSharedLock sharedLock = new CustomSharedLock(2);

        // 创建线程 1
        Thread thread1 = new Thread(() -> {
            sharedLock.acquireShared();
            try {
                System.out.println("Thread 1 acquired the shared lock.");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                sharedLock.releaseShared();
                System.out.println("Thread 1 released the shared lock.");
            }
        });

        // 创建线程 2
        Thread thread2 = new Thread(() -> {
            sharedLock.acquireShared();
            try {
                System.out.println("Thread 2 acquired the shared lock.");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                sharedLock.releaseShared();
                System.out.println("Thread 2 released the shared lock.");
            }
        });

        thread1.start();
        thread2.start();
    }
}

相关文章:

  • 迅为RK3568开发板篇Openharmony配置HDF控制UART-UART 接口运作机制
  • Solon —— 容器
  • 【R语言】绘图
  • V4L2驱动之UVC
  • 【LLAMA】羊驼从LLAMA1到LLAMA3梳理
  • C语言——时基
  • 广度优先搜索详解--BFS--蒟蒻的学习之路
  • 【哇! C++】类和对象(一)
  • ⭐ Unity 横向滑动列表 首尾相连 轮转图
  • 在Linux上安装和使用Docker
  • 嵌入式linux利用标准字符驱动模型控制多个设备方法
  • STM32 USB 设备的描述信息作用
  • 【ISO 14229-1:2023 UDS诊断(ECU复位0x11服务)测试用例CAPL代码全解析⑰】
  • git,gitea - tool - creat Gitea Empty Rep Dir
  • 【异常错误】pycharm debug view变量的时候显示不全,中间会以...显示
  • nginx负载均衡, 解决iphash不均衡的问题之consistent
  • 【时时三省】(C语言基础)用N-S流程图表示算法
  • ok113i平台——多媒体播放器适配
  • “让App玩捉迷藏:Android教育平板的‘隐身术’开发实录”
  • 尚硅谷爬虫note009
  • 足球少年郎7月试锋芒,明日之星冠军杯构建顶级青少年赛事
  • 俄官员说将适时宣布与乌克兰谈判代表
  • 检疫期缩减至30天!香港优化内地进口猫狗检疫安排
  • 联合国秘书长欢迎中美经贸高层会谈成果
  • 江西省市场监管局原局长谢来发被双开:违规接受旅游活动安排
  • 普京提议重启俄乌直接谈判后,特朗普表态了