1、 同步容器。它的原理是将状态封装起来,并对每个公有方法都实行同步,使得每次只有1个线程能够访问容器的状态。
* Vector和HashTable
* Collections.synchronizedXXX方法
> ## 同步容器的问题 ##
>
> 1. 这种方式使得对容器的访问都串行化,严重降低了并发性,如果多个线程来竞争容器的锁时,吞吐量严重降低
> 2. 对容器的多个方法的复合操作,是线程不安全的,比如一个线程负责删除,另一个线程负责查询,有可能出现越界的异常
2、 并发容器。java.util.concurrent包里面的一系列实现
* Concurrent开头系列。以ConcurrentHashMap为例,它的实现原理为分段锁。默认情况下有16个,每个锁守护1/16的散列数据,这样保证了并发量能达到16
> 分段锁缺陷在于虽然一般情况下只要一个锁,但是遇到需要扩容等类似的事情,只能去获取所有的锁
>
> ## ConcurrentHashMap一些问题 ##
>
> 1. 需要对整个容器中的内容进行计算的方法,比如size、isEmpty、contains等等。由于并发的存在,在计算的过程中可能已进过期了,它实际上就是个估计值,但是在并发的场景下,需要使用的场景是很少的。
> 以ConcurrentHashMap的size方法为例:
>
> /**
> * Returns the number of key-value mappings in this map. If the
> * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
> * <tt>Integer.MAX_VALUE</tt>.
> *
> * @return the number of key-value mappings in this map
> */
> public int size() {
> //为了能够算准数量,会算2次,如果两次算的不准,就锁住再算
> final Segment<K,V>[] segments = this.segments;
> int size;
> boolean overflow; // true if size overflows 32 bits
> long sum; // sum of modCounts
> long last = 0L; // previous sum
> int retries = -1; // 第一轮的计算总数不重试
> try {
> for (;;) {
> if (retries++ == RETRIES_BEFORE_LOCK) {
> //RETRIES_BEFORE_LOCK 默认是2
> for (int j = 0; j < segments.length; ++j)
> ensureSegment(j).lock(); // force creation
> }
> sum = 0L;
> size = 0;
> overflow = false;
> for (int j = 0; j < segments.length; ++j) {
> Segment<K,V> seg = segmentAt(segments, j);
> if (seg != null) {
> sum += seg.modCount;
> int c = seg.count;
> if (c < 0 || (size += c) < 0)
> overflow = true;
> }
> }
> //第一次计算的时候
> if (sum == last)
> break; //如果前后两次数数一致,就认为已经算好了
> last = sum;
> }
> } finally {
> if (retries > RETRIES_BEFORE_LOCK) {
> for (int j = 0; j < segments.length; ++j)
> segmentAt(segments, j).unlock();
> }
> }
> return overflow ? Integer.MAX_VALUE : size;
> }
>
>
> 1. 不能提供线程独占的功能
* CopyOnWrite系列。以CopyOnWriteArrayList为例,只在每次修改的时候,进行加锁控制,修改会创建并重新一个新的容器副本,其它时候由于都是事实上不可变的,也就不会出现线程安全问题
> ## CopyOnWrite的问题 ##
>
> 每次修改都复制底层数组,存在开销,因此使用场景一般是迭代操作远多于修改操作
>
> ### CopyOnWriteArrayList的读写示例 ###
>
> /**
> * Appends the specified element to the end of this list.
> *
> * @param e element to be appended to this list
> * @return <tt>true</tt> (as specified by {@link Collection#add})
> */
> public boolean add(E e) {
> final ReentrantLock lock = this.lock;
> lock.lock();
> try {
> Object[] elements = getArray();
> int len = elements.length;
> Object[] newElements = Arrays.copyOf(elements, len + 1);
> newElements[len] = e;
> setArray(newElements);
> return true;
> } finally {
> lock.unlock();
> }
> }
> /**
> * {@inheritDoc}
> *
> * @throws IndexOutOfBoundsException {@inheritDoc}
> */
> public E get(int index) {
> return get(getArray(), index);
> }
> /**
> * Gets the array. Non-private so as to also be accessible
> * from CopyOnWriteArraySet class.
> */
> final Object[] getArray() {
> return array;
> }
> private E get(Object[] a, int index) {
> return (E) a[index];
> }
>
java中的同步工具类
1、 阻塞队列,BlockingQueue。它提供了put和take方法,在队列不满足各自条件时将产生阻塞
> BlockingQueue使用示例,生产者-消费者
>
> public static void main(String[] args) throws Exception {
> BlockingQueue queue = new ArrayBlockingQueue(1024);
> Producer producer = new Producer(queue);
> Consumer consumer = new Consumer(queue);
> new Thread(producer).start();
> new Thread(consumer).start();
> }
> }
> public class Producer implements Runnable{
> protected BlockingQueue queue = null;
>
> public Producer(BlockingQueue queue) {
> this.queue = queue;
> }
>
> public void run() {
> try {
> queue.put("1");
> Thread.sleep(1000);
> queue.put("2");
> Thread.sleep(2000);
> queue.put("3");
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> }
> }
> public class Consumer implements Runnable{
>
> protected BlockingQueue queue = null;
>
> public Consumer(BlockingQueue queue) {
> this.queue = queue;
> }
>
> public void run() {
> try {
> System.out.println(queue.take());
> System.out.println("Wait 1 sec");
> System.out.println(queue.take());
> System.out.println("Wait 2 sec");
> System.out.println(queue.take());
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> }
> }
>
>
> 输出为
>
> 1
> Wait 1 sec
> 2
> Wait 2 sec
> 3
>
2、 闭锁
* CountDownLatch。使多个线程等待一组事件发生,它包含一个计数器,表示需要等待的事件的数量,每发生一个事,就递减一次,当减为0时,所有事情发生,允许“通行”
> CountDownLatch示例:
>
> public class TestHarness{
> public long timeTasks(int nThreads,final Runnable task) throws InterruptedException {
> final CountDownLatch startGate = new CountDownLatch(1);
> final CountDownLatch endGate = new CountDownLatch(nThreads);
> for (int i=0;i<nThreads;i++){
> Thread t = new Thread(){
> public void run(){
> try {
> startGate.await();
> try {
> task.run();
> }finally {
> endGate.countDown();
> }
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> }
> };
> t.start();
> }
> long start = System.nanoTime();
> startGate.countDown();
> endGate.await();
> long end=System.nanoTime();
> return end-start;
> }
> }
>
启动门使主线程能够同时释放所有的工作线程,结束门使得主线程能够等待最后一个线程执行完
* FutureTask。Future.get的如果任务执行完成,则立即返回,否则将阻塞直到任务完结,再返回结果或者是抛出异常
3、 信号量,Semaphore 。它管理着一组虚拟的许可,许可的数量可通过构造函数指定,在执行操作时首先获得许可,并在使用后释放许可,如果没有,那么accquire将阻塞直到有许可。
> Semaphore示例
>
> public class BoundedHashSet<T>{
> private final Set<T> set;
> private final Semaphore sem;
>
> public BoundedHashSet(int bound) {
> this.set = Collections.synchronizedSet(new HashSet<T>());
> this.sem = new Semaphore(bound);
> }
> public boolean add(T o) throws InterruptedException {
> sem.acquire();
> boolean wasAdded = false;
> try {
> wasAdded = set.add(o);
> return wasAdded;
> }finally {
> if (!wasAdded){
> sem.release();
> }
> }
> }
> public boolean remove(Object o){
> boolean wasRemoved = set.remove(o);
> if(wasRemoved){
> sem.release();
> }
> return wasRemoved;
>
> }
> }
>
4、 栅栏。它能阻塞一组线程直到某个事件发生。 与闭锁的区别:
* 所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其它线程。
* 闭锁一旦进入终止状态,就不能被重置,它是一次性对象,而栅栏可以重置
* CyclicBarrier。可以使一定数量的参与方反复地在栅栏位置汇集
CyclicBarrier使用示例
> public static void main(String[] args) {
> //第k步执行完才能执行第k+1步
> CyclicBarrier barrier = new CyclicBarrier(3,new StageKPlusOne());
> StageK[] stageKs = new StageK[3];
> for (int i=0;i<3;i++){
> stageKs[i] = new StageK(barrier,"k part "+(i+1));
> }
> for (int i=0;i<3;i++){
> new Thread(stageKs[i]).start();
> }
> }
> class StageKPlusOne implements Runnable{
> @Override
> public void run() {
> System.out.println("stage k over");
> System.out.println("stage k+1 start counting");
> }
> }
> class StageK implements Runnable{
> private CyclicBarrier barrier;
> private String stage;
>
> public StageK(CyclicBarrier barrier, String stage) {
> this.barrier = barrier;
> this.stage = stage;
> }
>
> @Override
> public void run() {
> System.out.println("stage "+stage+" counting...");
> try {
> TimeUnit.MILLISECONDS.sleep(500);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> System.out.println("stage "+stage+" count over");
> try {
> barrier.await();
> } catch (InterruptedException e) {
> e.printStackTrace();
> } catch (BrokenBarrierException e) {
> e.printStackTrace();
> }
> }
> }
>
>
> 输出为
>
> stage k part 1 counting...
> stage k part 3 counting...
> stage k part 2 counting...
> stage k part 2 count over
> stage k part 3 count over
> stage k part 1 count over
> stage k over
> stage k+1 start counting
>
* Exchanger。它是一种两方栅栏,各方在栅栏位置交换数据
Exchanger 使用示例:
public static void main(String[] args) {
Exchanger exchanger = new Exchanger();
ExchangerRunnable er1 = new ExchangerRunnable(exchanger,"1");
ExchangerRunnable er2 = new ExchangerRunnable(exchanger,"2");
new Thread(er1).start();
new Thread(er2).start();
}
class ExchangerRunnable implements Runnable{
private Exchanger e;
private Object o;
public ExchangerRunnable(Exchanger e, Object o) {
this.e = e;
this.o = o;
}
@Override
public void run() {
Object pre=o;
try {
o=e.exchange(o);
System.out.println("pre:"+pre+" now:"+o);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
输出如下
pre:1 now:2
pre:2 now:1