专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

Java7 ConcurrentHashMap源码详解

java 7 ConcurrentHashMap 数据结构

70_1.png

  • 整个ConcurrentHashMap可以认为是一个Segment数组,这个数组是不可以扩容的
  • 每个Segment都维护着一个HashEntry数组,这个数组有key``value``hash``next,所以Segment相当于一个HahsMap
  • 所以可以认为ConcurrentHashMap是一个大数组包含着一个数组+链表的数据结构

问题

  • ConcurrentHashMap 怎么做到线程安全的
  • ConcurrentHashMap 大数组(Segment)和小数组(HashEntry)容量是怎么定的,怎么扩容的

烧脑的实例化

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        //concurrencyLevel 是期望的 Segment 数组大小
        //1.ssize 是 Segment 数组的大小
        //2.通过循环,如果满足 `ssize < concurrencyLevel` 就 将 ssize 左移动
        //3.ssize初始值是1 每次左移之后的结果都是二的幂次方
        //4.所以是想通过循环找到大于等于 concurrencyLevel 的 值
        //5.总结:Segment 数组的大小 =  ssize,结果是大于等于 concurrencyLevel ,二的幂次方
        //6.跟java 7 HahsMap 有点像,不过 HahsMap 没有通过循环,而是通过Integer的一个方法
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        //sshift 是 ssize 经过几次左移
        //segmentShift 主要是跟后面获得hash槽位有关
        this.segmentShift = 32 - sshift;
        //hash & segmentMask 得到hash槽位,至于 为什么 segmentMask = ssize - 1
        //因为数组长度减一跟hash & 才能得到 0-(ssize-1)的槽位
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        //1.initialCapacity 期望 整个的ConcurrentHashMap 容量大小
        //2.initialCapacity / ssize 是为了计算出每个 Segment 下面的 HashEntry 数组要大概多少
        int c = initialCapacity / ssize;
        //1.c * ssize 相当于 HashEntry的长度*Segment长度
        //2.如果 c * ssize < initialCapacity 就不满足 ConcurrentHashMap 期望的总长度
        //所以需要将  c++
        if (c * ssize < initialCapacity)
            ++c;
        //cap 默认是2
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        //1.c是刚才计算出来的 HashEntry 数组长度
        //2.通过循环比较,左移动,获取比c大的二的幂次方,跟Segment获取长度一样
        //都是为了获取二的幂次方,因为 HashEntry  也是数组
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        //初始化一个 Segment,阈值,加载因子  HashEntry(数组大小为 cap)
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        //初始化 Segment 数组,大小为 ssize
        Segment Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        //通过 UNSAFE 去将 Segment 对象放到 Segment数组的第一个位
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

总结:

  • 1.concurrencyLevel 是期望 Segment 数组的长度
  • 2.通过循环左移动,获取到大于等于 concurrencyLevel的二的幂次方,作为 Segment 数组的长度,最后计算 ssize 作为Segment数组长度
  • 3.initialCapacity是期望ConcurrentHashMap的长度
  • 4.HashEntry数组长度的获取有三步。a. c=initialCapacity / ssize 期望总的大小除以Segment的大小,表示每个SegmentHashEntry长度**至少为c ** b.如果c * ssize < initialCapacity ,就是 HashEntry的长度 * Segment 的长度,如果小于期望总长度,HashEntry长度需要++ c.通过循环右移动获取HashEntry最后的长度为大于等于c,二的幂次方。
  • 5.实例化Segment数组,大小为ssize
    1. 实例化一个 Segment对象用UNSAFE方法放到Segment第一位,为什么做这个操作,因为上面一堆计算需要记录下来作为模板,接下来Segment数组还会放第n个Segment对象,Segment需要的HashEntry数组大小都在第一个Segment里面

put

  public V put(K key, V value) {
        Segment<K,V> s;
        //不能存key为null,跟hashMap不一样
        if (value == null)
            throw new NullPointerException();
    //通过key获取hash值        
        int hash = hash(key);
    //1.在初始化的时候 segmentMask = ssize -1    
    //2.segmentShift = = 32 - sshift,sshift是 ssize获得过程中经过几次左移 32减去左移位数 
    //获得高位为零的位数
    //3.hash >>> segmentShift ,hash右移segmentShift(ssize高位为零的位数),目的就是为了让
    //hash高位参与到hash槽位的定位
        int j = (hash >>> segmentShift) & segmentMask;
    //通过 UNSAFE 获取到槽位为 j  Segment,可能为空,因为初始化只做了第一位 Segment
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
    //如果 Segment 为空就去获取          
            s = ensureSegment(j);
    //往 Segment put 参数        
        return s.put(key, hash, value, false);
    }

ensureSegment

获取 Segment

 private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        //通过 UNSAFE 从主内存获取 Segment 数组中第 k 位
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        //拿到数组第1位作为模板
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
        //从模板拿到 HashEntry 数组长度,新建一个数组长度    
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        //重新 通过 UNSAFE 从主内存获取 Segment 数组中第 k 位
        //防止多线程安全问题
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
        //死循环   通过 UNSAFE 从主内存获取 Segment 数组中第 k 位       
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
        //通过cas 真正将 Segment 设置到 Segment 数组的k位              
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

总结:

  • 1.ensureSegment 获取Segment数组第一位作为模板来构造新的Segment
  • 2.ensureSegment方法是线程安全的,通过UNSAFE.getObjectVolatile获取可见的Segment,同时通过UNSAFE.compareAndSwapObject新增元素到Segment数组中。

Segment#put

Segment#put 才是真正的插入参数

  final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  //1.tryLock 尝试加锁
  //2.加锁失败 进入 scanAndLockForPut 
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
    //获得锁        
            try {
                HashEntry<K,V>[] tab = table;
            //计算出需要存放的hash槽位    
                int index = (tab.length - 1) & hash;
            //通过 UNSAFE 获取到 第index 槽位的 HashEntry    
                HashEntry<K,V> first = entryAt(tab, index);
            //HashEntry 是个链表,遍历链表    
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        //判断是否有key相同的,如果有就覆盖
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                    //如果 e == null 就采用尾插法插入新数据
                    //如果新的节点不为空,直接将新的节点设置为头节点
                        if (node != null)
                            node.setNext(first);
                        else
                    //如果新节点不为空就 new 一个,下一个节点执行原头节点    
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        //进行扩容,所以扩容只对 Segment 进行扩容
                            rehash(node);
                        else
                        //新节点放到数组里面
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

总结:

  • 1.Segment#put是通过ReentrantLock+UNSAFE的一些方法来实现线程安全
  • 2.put进去走两个分支,一个是存在相同的key直接覆盖,一个是不同的key需要新建节点
  • 3.新建节点采用的也是头插法
  • 4.这个方法里面还有两个方法一个是等待锁scanAndLockForPut,一个是扩容rehash
  • 5.问题:为什么新节点有时候会null有时候不会null

scanAndLockForPut

Segment#put操作tryLock失败之后就会进入scanAndLockForPut去等待获取锁,在里面等待的过程会去new 一个新的节点。

   private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            //尝试加锁失败
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                //初始化 retries = -1 满足条件 
                if (retries < 0) {
                //如果头节点为空,就新建一个节点,retries 改为 0
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                //如果存在key相同的 直接将 retries 改为 0     
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                //相当于遍历链表,直到 e = null 就不会走这块 if的逻辑了    
                        e = e.next;
                }
                //如果 retries 大于某个值就 直接 lock ,不在走循环
                //这里的 retries 相当于重试次数
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                //1.(retries & 1) == 0 相当于偶数
                //2.当 retries 是偶数的时候就会 走 entryForHash 
                //3.entryForHash 去主内存获取最新的Segment的下面 HashEntry 数组的某个元素的头节点
                //4.如果f = entryForHash(this, hash)) != first 说明获取到锁的那个线程改变了 的Segment的下面 HashEntry
                //5.如果改变了 retries = -1,改为-1 才能接着走上面的if逻辑
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

总结:

  • 1.retries 这个属性是关键,有三个ifretries=-1会走第一个if
  • 2.第一个if会遍历链表,要么链表中有key相同的就不遍历,要么链表没有key相同的就新建一个节点,为了是后面拿到锁之后就不用新建节点了
  • 3.第二个if是判断重试次数是否超过MAX_SCAN_RETRIESMAX_SCAN_RETRIES这个参数跟cpu有关,如果超过就直接lock等待获取锁,不在尝试获取锁
  • 4.第三个if,如果尝试次数是偶数,就去获取当前Segment下面HahEntry,如果数据变了,修改retries=-1,然后重新走第一个if。

rehash

rehash 是对Segment下面的HashEntry进行扩容

  private void rehash(HashEntry<K,V> node) {

            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            //新数组大小是旧数组大小两倍
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            //对旧数组进行循环遍历迁移数据
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    //获得新的hash槽位
                    int idx = e.hash & sizeMask;
                    //单个节点,直接设置到数组中
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        //遍历整个链表,获取到最后几个元素得坑位一样得
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        //把最后几个坑位一样的直接设置到数组中去
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        //遍历链表,知道 lastRun 因为 lastRun 已经插入到新的槽位去了
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                        //头插法
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            //重新计算需要插入新节点的 槽位
            int nodeIndex = node.hash & sizeMask; // add the new node
            //头插法到链表中
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }

总结:

  • 1.是对HashEntry进行扩容
  • 2.扩容之后是原数组两倍
  • 3.扩容过程中有两个分支比较重要
  • 4.一个是循环遍历链表,找到后几个节点最后分配的槽位一样,直接把最后几个节点放到新的操作上
  • 5.一个是循环遍历链表,从头节点到已经被上一个循环放到新槽位的位置,使用头插法插到新的槽位上

面试题 java7 ConcurrentHashMap 的 size 是怎么统计

 public int size() {
        final Segment<K,V>[] segments = this.segments;
        //记录总的长度
        int size;
        //是否溢出
        boolean overflow; // true if size overflows 32 bits
        //modCount 次数
        long sum;     
        //上一次的 modcount
        long last = 0L;   // previous sum
        //每次尝试获取 size 都会+1 当加到 2 的时候就会去将每个 segment锁住
        //在 finally 的时候循环释放锁锁
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
            //1.RETRIES_BEFORE_LOCK = 2
            //2.每次循环 retries 会先跟 RETRIES_BEFORE_LOCK 比较是否相等,在加一
            //3.所以如果存在频繁修改map,就会出现多次循环
            //4.当 retries=2 的时候,进入segment加锁的逻辑,所以加锁之后 retries=3
            //5.ensureSegment(j).lock() 先获取segment,在对每个segment加锁
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                //遍历所有个segement
                for (int j = 0; j < segments.length; ++j) {
                //通过 UNSAFE.getObjectVolatile 获取segement
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                    //sum 记录此次循环整个容器的修改次数
                        sum += seg.modCount;
                        int c = seg.count;
                    //size 叠加记录所有的segement长度    
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                //last是上一次的modcount,如果本次的modcount==上一次的modcount
                //说明容器大小没有变,可以直接返回size
                //如果modcount 发生变化,说明容器有被修改了,需要重新计算长度
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
        // retries > RETRIES_BEFORE_LOCK 说明有对segement进行上锁
            if (retries > RETRIES_BEFORE_LOCK) {
            //循环对segment 解锁
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

总结:

  • 1.计算总大小的时候会通过UNSAFE.getObjectVolatile遍历拿到最新的segement,然后叠加每个segement的数组长度
  • 2.有两个点去保证获取到最新的size
  • 第一个点是每次都会记录下上一次的modcount,如果上一次的modcount跟本次的modcount相同说明map没被修改过的,本次获取到的size是正常的
  • 3.第二个是当循环都不满足第一个点,那就对每个segment进行上锁,然后循环去获取每个segment的长度,最后再finally解锁

get

  • 1.get 方法就没有加锁了
  • 2.就通过hash定位到具体的segment,然后再通过hash定位到HashEntry,再遍历链表,比较hash

remove

  • 1.remove也是先通过hash定位到segment
  • 2.然后调用segmentremove
  • 3.对segment进行加锁

总结

java7 ConcurrentHashMapputremove的似时候就是对segment进行加锁,这就是分段锁的由来。

未完待续……….

文章永久链接:https://tech.souyunku.com/26685

未经允许不得转载:搜云库技术团队 » Java7 ConcurrentHashMap源码详解

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们