专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

Java 对线程安全支持有哪些?

1、 同步容器。它的原理是将状态封装起来,并对每个公有方法都实行同步,使得每次只有1个线程能够访问容器的状态。

 *  Vector和HashTable
 *  Collections.synchronizedXXX方法

> ## 同步容器的问题 ##
> 
> 1.  这种方式使得对容器的访问都串行化,严重降低了并发性,如果多个线程来竞争容器的锁时,吞吐量严重降低
> 2.  对容器的多个方法的复合操作,是线程不安全的,比如一个线程负责删除,另一个线程负责查询,有可能出现越界的异常

2、 并发容器。java.util.concurrent包里面的一系列实现

 *  Concurrent开头系列。以ConcurrentHashMap为例,它的实现原理为分段锁。默认情况下有16个,每个锁守护1/16的散列数据,这样保证了并发量能达到16

> 分段锁缺陷在于虽然一般情况下只要一个锁,但是遇到需要扩容等类似的事情,只能去获取所有的锁
> 
> ## ConcurrentHashMap一些问题 ##
> 
> 1.  需要对整个容器中的内容进行计算的方法,比如size、isEmpty、contains等等。由于并发的存在,在计算的过程中可能已进过期了,它实际上就是个估计值,但是在并发的场景下,需要使用的场景是很少的。  
>     以ConcurrentHashMap的size方法为例:
> 
>     /**
>         * Returns the number of key-value mappings in this map.  If the
>         * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
>         * <tt>Integer.MAX_VALUE</tt>.
>         *
>         * @return the number of key-value mappings in this map
>         */
>        public int size() {
>            //为了能够算准数量,会算2次,如果两次算的不准,就锁住再算
>            final Segment<K,V>[] segments = this.segments;
>            int size;
>            boolean overflow; // true if size overflows 32 bits
>            long sum;         // sum of modCounts
>            long last = 0L;   // previous sum
>            int retries = -1; // 第一轮的计算总数不重试
>            try {
>                for (;;) {
>                    if (retries++ == RETRIES_BEFORE_LOCK) {
>                    //RETRIES_BEFORE_LOCK 默认是2
>                        for (int j = 0; j < segments.length; ++j)
>                            ensureSegment(j).lock(); // force creation
>                    }
>                    sum = 0L;
>                    size = 0;
>                    overflow = false;
>                    for (int j = 0; j < segments.length; ++j) {
>                        Segment<K,V> seg = segmentAt(segments, j);
>                        if (seg != null) {
>                            sum += seg.modCount;
>                            int c = seg.count;
>                            if (c < 0 || (size += c) < 0)
>                                overflow = true;
>                        }
>                    }
>                    //第一次计算的时候
>                    if (sum == last)
>                        break; //如果前后两次数数一致,就认为已经算好了
>                    last = sum;
>                }
>            } finally {
>                if (retries > RETRIES_BEFORE_LOCK) {
>                    for (int j = 0; j < segments.length; ++j)
>                        segmentAt(segments, j).unlock();
>                }
>            }
>            return overflow ? Integer.MAX_VALUE : size;
>        }
>     
> 
> 1.  不能提供线程独占的功能

 *  CopyOnWrite系列。以CopyOnWriteArrayList为例,只在每次修改的时候,进行加锁控制,修改会创建并重新一个新的容器副本,其它时候由于都是事实上不可变的,也就不会出现线程安全问题

> ## CopyOnWrite的问题 ##
> 
> 每次修改都复制底层数组,存在开销,因此使用场景一般是迭代操作远多于修改操作
> 
> ### CopyOnWriteArrayList的读写示例 ###
> 
>     /**
>        * Appends the specified element to the end of this list.
>         *
>        * @param e element to be appended to this list
>       * @return <tt>true</tt> (as specified by {@link Collection#add})
>      */
>     public boolean add(E e) {
>            final ReentrantLock lock = this.lock;
>           lock.lock();
>          try {
>             Object[] elements = getArray();
>            int len = elements.length;
>           Object[] newElements = Arrays.copyOf(elements, len + 1);
>          newElements[len] = e;
>         setArray(newElements);
>        return true;
>     } finally {
>       lock.unlock();
>     }
>     }
>            /**
>           * {@inheritDoc}
>          *
>          * @throws IndexOutOfBoundsException {@inheritDoc}
>          */
>         public E get(int index) {
>             return get(getArray(), index);
>         }
>         /**
>        * Gets the array.  Non-private so as to also be accessible
>        * from CopyOnWriteArraySet class.
>        */
>         final Object[] getArray() {
>            return array;
>         }
>         private E get(Object[] a, int index) {
>             return (E) a[index];
>          }
>     

java中的同步工具类

1、 阻塞队列,BlockingQueue。它提供了put和take方法,在队列不满足各自条件时将产生阻塞

> BlockingQueue使用示例,生产者-消费者
> 
>     public static void main(String[] args) throws Exception {
>            BlockingQueue queue = new ArrayBlockingQueue(1024);
>            Producer producer = new Producer(queue);
>            Consumer consumer = new Consumer(queue);
>            new Thread(producer).start();
>            new Thread(consumer).start();
>        }
>     }
>     public class Producer implements Runnable{
>        protected BlockingQueue queue = null;
>     
>        public Producer(BlockingQueue queue) {
>            this.queue = queue;
>        }
>        
>        public void run() {
>            try {
>                queue.put("1");
>                Thread.sleep(1000);
>                queue.put("2");
>                Thread.sleep(2000);
>                queue.put("3");
>            } catch (InterruptedException e) {
>                e.printStackTrace();
>            }
>        }
>     }
>     public class Consumer implements Runnable{
>        
>        protected BlockingQueue queue = null;
>        
>        public Consumer(BlockingQueue queue) {
>            this.queue = queue;
>        }
>        
>        public void run() {
>            try {
>                System.out.println(queue.take());
>                System.out.println("Wait 1 sec");
>                System.out.println(queue.take());
>                System.out.println("Wait 2 sec");
>                System.out.println(queue.take());
>            } catch (InterruptedException e) {
>                e.printStackTrace();
>            }
>        }
>     }
>     
> 
> 输出为
> 
>     1
>     Wait 1 sec
>     2
>     Wait 2 sec
>     3
>     

2、 闭锁

 *  CountDownLatch。使多个线程等待一组事件发生,它包含一个计数器,表示需要等待的事件的数量,每发生一个事,就递减一次,当减为0时,所有事情发生,允许“通行”

> CountDownLatch示例:
> 
>     public class TestHarness{
>        public long timeTasks(int nThreads,final Runnable task) throws InterruptedException {
>        final CountDownLatch startGate = new CountDownLatch(1);
>        final CountDownLatch endGate = new CountDownLatch(nThreads);
>        for (int i=0;i<nThreads;i++){
>            Thread t = new Thread(){
>                public void run(){
>                    try {
>                        startGate.await();
>                        try {
>                            task.run();
>                        }finally {
>                            endGate.countDown();
>                        }
>                    } catch (InterruptedException e) {
>                        e.printStackTrace();
>                    }
>                }
>            };
>            t.start();
>        }
>        long start = System.nanoTime();
>        startGate.countDown();
>        endGate.await();
>        long end=System.nanoTime();
>        return end-start;
>        }
>     }
>     
启动门使主线程能够同时释放所有的工作线程,结束门使得主线程能够等待最后一个线程执行完

 *  FutureTask。Future.get的如果任务执行完成,则立即返回,否则将阻塞直到任务完结,再返回结果或者是抛出异常

3、 信号量,Semaphore 。它管理着一组虚拟的许可,许可的数量可通过构造函数指定,在执行操作时首先获得许可,并在使用后释放许可,如果没有,那么accquire将阻塞直到有许可。

> Semaphore示例
> 
>     public class BoundedHashSet<T>{
>        private final Set<T> set;
>        private final Semaphore sem;
>     
>        public BoundedHashSet(int bound) {
>            this.set = Collections.synchronizedSet(new HashSet<T>());
>            this.sem = new Semaphore(bound);
>        }
>        public boolean add(T o) throws InterruptedException {
>            sem.acquire();
>            boolean wasAdded = false;
>            try {
>                wasAdded = set.add(o);
>               return wasAdded;
>            }finally {
>                if (!wasAdded){
>                    sem.release();
>                }
>            }
>        }
>        public boolean remove(Object o){
>            boolean wasRemoved = set.remove(o);
>            if(wasRemoved){
>               sem.release();
>            }
>            return wasRemoved;
>                
>        }
>     }
>     

4、 栅栏。它能阻塞一组线程直到某个事件发生。 与闭锁的区别:

 *  所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其它线程。
 *  闭锁一旦进入终止状态,就不能被重置,它是一次性对象,而栅栏可以重置
 *  CyclicBarrier。可以使一定数量的参与方反复地在栅栏位置汇集

CyclicBarrier使用示例
>     public static void main(String[] args) {
>     //第k步执行完才能执行第k+1步
>            CyclicBarrier barrier = new CyclicBarrier(3,new StageKPlusOne());
>            StageK[] stageKs = new StageK[3];
>            for (int i=0;i<3;i++){
>                stageKs[i] = new StageK(barrier,"k part "+(i+1));
>            }
>            for (int i=0;i<3;i++){
>                new Thread(stageKs[i]).start();
>            }
>     }    
>     class StageKPlusOne implements Runnable{
>        @Override
>        public void run() {
>            System.out.println("stage k over");
>            System.out.println("stage k+1 start counting");
>        }
>     }
>     class StageK implements Runnable{
>        private CyclicBarrier barrier;
>        private String stage;
>        
>        public StageK(CyclicBarrier barrier, String stage) {
>            this.barrier = barrier;
>            this.stage = stage;
>        }
>        
>        @Override
>        public void run() {
>            System.out.println("stage "+stage+" counting...");
>            try {
>                TimeUnit.MILLISECONDS.sleep(500);
>            } catch (InterruptedException e) {
>                e.printStackTrace();
>            }
>            System.out.println("stage "+stage+" count over");
>            try {
>                barrier.await();
>            } catch (InterruptedException e) {
>                e.printStackTrace();
>            } catch (BrokenBarrierException e) {
>                e.printStackTrace();
>            }
>        }
>     }
>     
> 
> 输出为
> 
>     stage k part 1 counting...
>     stage k part 3 counting...
>     stage k part 2 counting...
>     stage k part 2 count over
>     stage k part 3 count over
>     stage k part 1 count over
>     stage k over
>     stage k+1 start counting
>     
 *  Exchanger。它是一种两方栅栏,各方在栅栏位置交换数据  
    Exchanger 使用示例:
    public static void main(String[] args) {
           Exchanger exchanger = new Exchanger();
            ExchangerRunnable er1 = new ExchangerRunnable(exchanger,"1");
            ExchangerRunnable er2 = new ExchangerRunnable(exchanger,"2");
            new Thread(er1).start();
            new Thread(er2).start();

        }
        class ExchangerRunnable implements Runnable{

        private Exchanger e;
        private Object o;

        public ExchangerRunnable(Exchanger e, Object o) {
           this.e = e;
            this.o = o;
    }

        @Override
        public void run() {
           Object pre=o;
            try {
                o=e.exchange(o);
                System.out.println("pre:"+pre+" now:"+o);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    }

输出如下
    pre:1 now:2
    pre:2 now:1

附录

案例

文章永久链接:https://tech.souyunku.com/47205

未经允许不得转载:搜云库技术团队 » Java 对线程安全支持有哪些?

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们