java 7 ConcurrentHashMap 数据结构
- 整个
ConcurrentHashMap
可以认为是一个Segment
数组,这个数组是不可以扩容的 - 每个
Segment
都维护着一个HashEntry
数组,这个数组有key``value``hash``next
,所以Segment
相当于一个HahsMap
- 所以可以认为
ConcurrentHashMap
是一个大数组
包含着一个数组+链表
的数据结构
问题
ConcurrentHashMap
怎么做到线程安全的ConcurrentHashMap
大数组(Segment
)和小数组(HashEntry
)容量是怎么定的,怎么扩容的
烧脑的实例化
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
//concurrencyLevel 是期望的 Segment 数组大小
//1.ssize 是 Segment 数组的大小
//2.通过循环,如果满足 `ssize < concurrencyLevel` 就 将 ssize 左移动
//3.ssize初始值是1 每次左移之后的结果都是二的幂次方
//4.所以是想通过循环找到大于等于 concurrencyLevel 的 值
//5.总结:Segment 数组的大小 = ssize,结果是大于等于 concurrencyLevel ,二的幂次方
//6.跟java 7 HahsMap 有点像,不过 HahsMap 没有通过循环,而是通过Integer的一个方法
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//sshift 是 ssize 经过几次左移
//segmentShift 主要是跟后面获得hash槽位有关
this.segmentShift = 32 - sshift;
//hash & segmentMask 得到hash槽位,至于 为什么 segmentMask = ssize - 1
//因为数组长度减一跟hash & 才能得到 0-(ssize-1)的槽位
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//1.initialCapacity 期望 整个的ConcurrentHashMap 容量大小
//2.initialCapacity / ssize 是为了计算出每个 Segment 下面的 HashEntry 数组要大概多少
int c = initialCapacity / ssize;
//1.c * ssize 相当于 HashEntry的长度*Segment长度
//2.如果 c * ssize < initialCapacity 就不满足 ConcurrentHashMap 期望的总长度
//所以需要将 c++
if (c * ssize < initialCapacity)
++c;
//cap 默认是2
int cap = MIN_SEGMENT_TABLE_CAPACITY;
//1.c是刚才计算出来的 HashEntry 数组长度
//2.通过循环比较,左移动,获取比c大的二的幂次方,跟Segment获取长度一样
//都是为了获取二的幂次方,因为 HashEntry 也是数组
while (cap < c)
cap <<= 1;
// create segments and segments[0]
//初始化一个 Segment,阈值,加载因子 HashEntry(数组大小为 cap)
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
//初始化 Segment 数组,大小为 ssize
Segment Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
//通过 UNSAFE 去将 Segment 对象放到 Segment数组的第一个位
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
总结:
- 1.
concurrencyLevel
是期望Segment
数组的长度 - 2.通过循环左移动,获取到大于等于
concurrencyLevel
的二的幂次方,作为Segment
数组的长度,最后计算ssize
作为Segment
数组长度 - 3.
initialCapacity
是期望ConcurrentHashMap
的长度 - 4.
HashEntry
数组长度的获取有三步。a.c=initialCapacity / ssize
期望总的大小除以Segment
的大小,表示每个Segment
的HashEntry
长度**至少为c ** b.如果c * ssize < initialCapacity
,就是HashEntry
的长度 *Segment
的长度,如果小于期望总长度,HashEntry
长度需要++ c.通过循环右移动获取HashEntry
最后的长度为大于等于c,二的幂次方。 - 5.实例化
Segment
数组,大小为ssize
-
- 实例化一个
Segment
对象用UNSAFE
方法放到Segment
第一位,为什么做这个操作,因为上面一堆计算需要记录下来作为模板,接下来Segment
数组还会放第n个Segment
对象,Segment
需要的HashEntry
数组大小都在第一个Segment
里面
- 实例化一个
put
public V put(K key, V value) {
Segment<K,V> s;
//不能存key为null,跟hashMap不一样
if (value == null)
throw new NullPointerException();
//通过key获取hash值
int hash = hash(key);
//1.在初始化的时候 segmentMask = ssize -1
//2.segmentShift = = 32 - sshift,sshift是 ssize获得过程中经过几次左移 32减去左移位数
//获得高位为零的位数
//3.hash >>> segmentShift ,hash右移segmentShift(ssize高位为零的位数),目的就是为了让
//hash高位参与到hash槽位的定位
int j = (hash >>> segmentShift) & segmentMask;
//通过 UNSAFE 获取到槽位为 j Segment,可能为空,因为初始化只做了第一位 Segment
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
//如果 Segment 为空就去获取
s = ensureSegment(j);
//往 Segment put 参数
return s.put(key, hash, value, false);
}
ensureSegment
获取 Segment
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
//通过 UNSAFE 从主内存获取 Segment 数组中第 k 位
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//拿到数组第1位作为模板
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
//从模板拿到 HashEntry 数组长度,新建一个数组长度
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//重新 通过 UNSAFE 从主内存获取 Segment 数组中第 k 位
//防止多线程安全问题
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
//死循环 通过 UNSAFE 从主内存获取 Segment 数组中第 k 位
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
//通过cas 真正将 Segment 设置到 Segment 数组的k位
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
总结:
- 1.
ensureSegment
获取Segment
数组第一位作为模板来构造新的Segment
- 2.
ensureSegment
方法是线程安全的,通过UNSAFE.getObjectVolatile
获取可见的Segment
,同时通过UNSAFE.compareAndSwapObject
新增元素到Segment
数组中。
Segment#put
Segment#put
才是真正的插入参数
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//1.tryLock 尝试加锁
//2.加锁失败 进入 scanAndLockForPut
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
//获得锁
try {
HashEntry<K,V>[] tab = table;
//计算出需要存放的hash槽位
int index = (tab.length - 1) & hash;
//通过 UNSAFE 获取到 第index 槽位的 HashEntry
HashEntry<K,V> first = entryAt(tab, index);
//HashEntry 是个链表,遍历链表
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
//判断是否有key相同的,如果有就覆盖
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
//如果 e == null 就采用尾插法插入新数据
//如果新的节点不为空,直接将新的节点设置为头节点
if (node != null)
node.setNext(first);
else
//如果新节点不为空就 new 一个,下一个节点执行原头节点
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//进行扩容,所以扩容只对 Segment 进行扩容
rehash(node);
else
//新节点放到数组里面
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
总结:
- 1.
Segment#put
是通过ReentrantLock
+UNSAFE
的一些方法来实现线程安全 - 2.
put
进去走两个分支,一个是存在相同的key直接覆盖,一个是不同的key需要新建节点 - 3.新建节点采用的也是头插法
- 4.这个方法里面还有两个方法一个是等待锁
scanAndLockForPut
,一个是扩容rehash
- 5.问题:为什么新节点有时候会null有时候不会null
scanAndLockForPut
当Segment#put
操作tryLock
失败之后就会进入scanAndLockForPut
去等待获取锁,在里面等待的过程会去new 一个新的节点。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
//尝试加锁失败
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
//初始化 retries = -1 满足条件
if (retries < 0) {
//如果头节点为空,就新建一个节点,retries 改为 0
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
//如果存在key相同的 直接将 retries 改为 0
else if (key.equals(e.key))
retries = 0;
else
//相当于遍历链表,直到 e = null 就不会走这块 if的逻辑了
e = e.next;
}
//如果 retries 大于某个值就 直接 lock ,不在走循环
//这里的 retries 相当于重试次数
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
//1.(retries & 1) == 0 相当于偶数
//2.当 retries 是偶数的时候就会 走 entryForHash
//3.entryForHash 去主内存获取最新的Segment的下面 HashEntry 数组的某个元素的头节点
//4.如果f = entryForHash(this, hash)) != first 说明获取到锁的那个线程改变了 的Segment的下面 HashEntry
//5.如果改变了 retries = -1,改为-1 才能接着走上面的if逻辑
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
总结:
- 1.
retries
这个属性是关键,有三个if
,retries=-1
会走第一个if
- 2.第一个
if
会遍历链表,要么链表中有key相同的就不遍历,要么链表没有key相同的就新建一个节点,为了是后面拿到锁之后就不用新建节点了 - 3.第二个
if
是判断重试次数是否超过MAX_SCAN_RETRIES
,MAX_SCAN_RETRIES
这个参数跟cpu
有关,如果超过就直接lock
等待获取锁,不在尝试获取锁 - 4.第三个
if
,如果尝试次数是偶数,就去获取当前Segment
下面HahEntry
,如果数据变了,修改retries=-1
,然后重新走第一个if。
rehash
rehash
是对Segment
下面的HashEntry
进行扩容
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
//新数组大小是旧数组大小两倍
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
//对旧数组进行循环遍历迁移数据
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
//获得新的hash槽位
int idx = e.hash & sizeMask;
//单个节点,直接设置到数组中
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
//遍历整个链表,获取到最后几个元素得坑位一样得
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
//把最后几个坑位一样的直接设置到数组中去
newTable[lastIdx] = lastRun;
// Clone remaining nodes
//遍历链表,知道 lastRun 因为 lastRun 已经插入到新的槽位去了
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
//头插法
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
//重新计算需要插入新节点的 槽位
int nodeIndex = node.hash & sizeMask; // add the new node
//头插法到链表中
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
总结:
- 1.是对
HashEntry
进行扩容 - 2.扩容之后是原数组两倍
- 3.扩容过程中有两个分支比较重要
- 4.一个是循环遍历链表,找到后几个节点最后分配的槽位一样,直接把最后几个节点放到新的操作上
- 5.一个是循环遍历链表,从头节点到已经被上一个循环放到新槽位的位置,使用头插法插到新的槽位上
面试题 java7 ConcurrentHashMap 的 size 是怎么统计
public int size() {
final Segment<K,V>[] segments = this.segments;
//记录总的长度
int size;
//是否溢出
boolean overflow; // true if size overflows 32 bits
//modCount 次数
long sum;
//上一次的 modcount
long last = 0L; // previous sum
//每次尝试获取 size 都会+1 当加到 2 的时候就会去将每个 segment锁住
//在 finally 的时候循环释放锁锁
int retries = -1; // first iteration isn't retry
try {
for (;;) {
//1.RETRIES_BEFORE_LOCK = 2
//2.每次循环 retries 会先跟 RETRIES_BEFORE_LOCK 比较是否相等,在加一
//3.所以如果存在频繁修改map,就会出现多次循环
//4.当 retries=2 的时候,进入segment加锁的逻辑,所以加锁之后 retries=3
//5.ensureSegment(j).lock() 先获取segment,在对每个segment加锁
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
//遍历所有个segement
for (int j = 0; j < segments.length; ++j) {
//通过 UNSAFE.getObjectVolatile 获取segement
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
//sum 记录此次循环整个容器的修改次数
sum += seg.modCount;
int c = seg.count;
//size 叠加记录所有的segement长度
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
//last是上一次的modcount,如果本次的modcount==上一次的modcount
//说明容器大小没有变,可以直接返回size
//如果modcount 发生变化,说明容器有被修改了,需要重新计算长度
if (sum == last)
break;
last = sum;
}
} finally {
// retries > RETRIES_BEFORE_LOCK 说明有对segement进行上锁
if (retries > RETRIES_BEFORE_LOCK) {
//循环对segment 解锁
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
总结:
- 1.计算总大小的时候会通过
UNSAFE.getObjectVolatile
遍历拿到最新的segement
,然后叠加每个segement
的数组长度 - 2.有两个点去保证获取到最新的
size
, - 第一个点是每次都会记录下上一次的
modcount
,如果上一次的modcount
跟本次的modcount
相同说明map没被修改过的,本次获取到的size
是正常的 - 3.第二个是当循环都不满足第一个点,那就对每个
segment
进行上锁,然后循环去获取每个segment
的长度,最后再finally
解锁
get
- 1.
get
方法就没有加锁了 - 2.就通过
hash
定位到具体的segment
,然后再通过hash
定位到HashEntry
,再遍历链表,比较hash
remove
- 1.
remove
也是先通过hash
定位到segment
- 2.然后调用
segment
的remove
- 3.对
segment
进行加锁
总结
java7 ConcurrentHashMap
在 put
和remove
的似时候就是对segment
进行加锁,这就是分段锁的由来。
未完待续……….