【亚洲必赢官网】HashMap与线程安全,ConcurrentBag的贯彻原理

目录

Java要点2

HashMap与线程安全

HashMap与线程安全

  • 一、前言
  • 二、ConcurrentBag类
  • 三、
    ConcurrentBag线程安全实现原理

    • 一.
      ConcurrentBag的个体字段
    • 贰.
      用于数据存款和储蓄的TrehadLocalList类
    • 3.
      ConcurrentBag落成新增成分
    • 四. ConcurrentBag
      怎么样促成迭代器形式
  • 四、总结
  • 作者水平有限,要是不当欢迎各位批评指正!

JAVA 集合类

1、HashMap 为什么是线程不安全的

一、HashMap 为啥是线程不安全的


一.JAVA常用集合类功效、不一样和性质

两大类:Collections,Map;Collections分List和Set

List接口与实际现类

List类似于数组,能够由此索引来访问元素,完成该接口的常用类有ArrayList、LinkedList、Vector、Stack等。

   
HashMap是通过散列表来实现存款和储蓄结构的,具体内容请看本人的另一篇博客《HashMap深度解析》,那么HashMap为什么线程不安全啊,首要有三个原因。

   
HashMap是通过散列表来达成存款和储蓄结构的,具体内容请看自身的另壹篇博客《HashMap深度剖析》,那么HashMap为啥线程不安全啊,重要有多个原因。


ArrayList

ArrayList是动态数组,能够依据插入的成分的数码自动扩大容积,而使用者不要求精晓其里面是何许时候举行扩大的,把它作为充分体积的数组来使用即可。
ArrayList访问成分的措施get是常数时间,因为是直接根据下标索引来访问的,而add方法的岁月复杂度是O(n),因为急需活动成分,将新成分插入到适当的职位。
ArrayList是非线程安全的,即它未有同步,然则,能够经过Collections.synchronizedList()静态方法重临三个同步的实例,如:

List synList = Collections.synchronizedList(list);

数组扩大体量:ArrayList在插入成分的时候,都会检讨当前的数组大小是不是丰富,若是不够,将会扩大体量到当前体积
* 一.五 +
1(加①是为了当前容积为一时,也能扩大到贰),即把原本的因素全体复制到二个两倍大小的新数组,将旧的数组放弃掉(等待垃圾回收),那几个操作是比较耗费时间,由此建议在开立ArrayList的时候,依照要插入的要素的多少来早先估价Capacity,并起先化ArrayList,如:

ArrayList list = new ArrayList(100);

这么,在插入小于九十几个成分的时候都以不供给举行扩大体量的,能够拉动品质的升级换代,当然,借使对这么些容积估量大了,恐怕会带来1些空中的费用。

先是肯定是八个线程同时去往集合里添加多少,第二个原因:三个线程同时添加相同的key值数据,当三个线程同时遍历完桶内的链表时,发现,未有该key值的数量,那是她们同时成立了一个Entry结点,都助长到了桶内的链表上,那样在该HashMap集合中就涌出了三个Key相同的数额。第3个原因:当多个线程同时检查评定到size/capacity>负载因卯时,在扩大体量的时候或许会在链表上发生死循环(为啥会发生死循环,能够看1些HashMap的死循环相关的博客),也只怕会发出存款和储蓄万分。

率先肯定是七个线程同时去往集合里添加多少,第贰个原因:七个线程同时添加相同的key值数据,当四个线程同时遍历完桶内的链表时,发现,未有该key值的数据,这是他俩还要成立了一个Entry结点,都增进到了桶内的链表上,这样在该HashMap集合中就涌出了三个Key相同的数码。第四个原因:当五个线程同时检查实验到size/capacity>负载因猴时,在扩大体积的时候可能会在链表上发生死循环(为啥会生出死循环,能够看1些HashMap的死循环相关的博客),也可能会时有发生存储十分。

一、前言

小编眼前在做三个品种,项目中为了进步吞吐量,使用了音讯队列,中间完成了生育消费形式,在生育消费者格局中供给有2个成团,来囤积生产者所生育的物料,作者利用了最常见的List<T>会晤类型。

由于生产者线程有好多少个,消费者线程也有好多个,所以不可幸免的就生出了线程同步的题目。开端小编是运用lock主要字,实行线程同步,不过品质并不是专门出彩,然后有网络好友说能够行使SynchronizedList<T>来替代使用List<T>直达到规定的分数线程安全的目标。于是小编就替换来了SynchronizedList<T>,但是发现品质还是倒霉,于是查看了SynchronizedList<T>的源代码,发现它正是简不难单的在List<T>提供的API的根底上加了lock,所以质量基本与作者完成格局相差无几。

末尾小编找到了消除的方案,使用ConcurrentBag<T>【亚洲必赢官网】HashMap与线程安全,ConcurrentBag的贯彻原理。类来兑现,品质有相当大的更动,于是笔者查阅了ConcurrentBag<T>的源代码,完成丰裕小巧,特此在这记录一下。

LinkedList

LinkedList也促成了List接口,其内部贯彻是选拔双向链表来保存成分,由此插入与删除成分的属性都表现不错。它还提供了1些任何操作方法,如在头顶、底部插入或然去除成分,由此,能够用它来兑现栈、队列、双向队列。
是因为是选用链表保存成分的,所以随便访问成分的时候速度会相比慢(供给遍历链表找到对象成分),这点相比ArrayList的即兴访问要差,ArrayList是使用数组达成格局,直接动用下标可以访问到成分而不需求遍历。由此,在急需反复随意走访成分的场馆下,建议选取ArrayList。
与ArrayList壹样,LinkedList也是非同步的,假诺必要达成二10八线程访问,则供给协调在表面完成协同方法。当然也得以动用Collections.synchronizedList()静态方法。

贰、怎样线程安全的利用HashMap

贰、怎么样线程安全的运用HashMap

二、ConcurrentBag类

ConcurrentBag<T>实现了IProducerConsumerCollection<T>接口,该接口重要用以生产者消费者格局下,可知该类基本便是为生育消费者形式定制的。然后还完成了平常的IReadOnlyCollection<T>类,完成了此类就须求完结IEnumerable<T>、IEnumerable、 ICollection类。

ConcurrentBag<T>对外提供的主意未有List<T>那么多,可是同样有Enumerable完毕的恢宏方法。类本人提供的不2秘诀如下所示。

名称 说明
Add 将对象添加到 ConcurrentBag 中。
CopyTo 从指定数组索引开始,将 ConcurrentBag 元素复制到现有的一维 Array 中。
Equals(Object) 确定指定的 Object 是否等于当前的 Object。 (继承自 Object。)
Finalize 允许对象在“垃圾回收”回收之前尝试释放资源并执行其他清理操作。 (继承自 Object。)
GetEnumerator 返回循环访问 ConcurrentBag 的枚举器。
GetHashCode 用作特定类型的哈希函数。 (继承自 Object。)
GetType 获取当前实例的 Type。 (继承自 Object。)
MemberwiseClone 创建当前 Object 的浅表副本。 (继承自 Object。)
ToArray 将 ConcurrentBag 元素复制到新数组。
ToString 返回表示当前对象的字符串。 (继承自 Object。)
TryPeek 尝试从 ConcurrentBag 返回一个对象但不移除该对象。
TryTake 尝试从 ConcurrentBag 中移除并返回对象。

Vector

Vector是ArrayList的线程同步版本,就是说Vector是一道的,协助十2线程访问。除外,还有少数见仁见智时,当体积不够时,Vector暗中同意扩大一倍容积,而ArrayList是现阶段体积
* 1.5 + 1

方法一:Hashtable

方法一:Hashtable

3、 ConcurrentBag线程安全完毕原理

Stack

Stack是1种后进先出的数据结构,继承自Vector类,提供了push、pop、peek(获得栈顶成分)等办法。

   
Hashtable是Java低版本中提议来的,由于其内部的加锁机制,是的其性质较低,如今一度不常用了。所以当一个线程访问Hashtable的联手方法时,别的线程若是也要拜访同步方法,会被阻塞住。举个例子,当三个线程使用put方法时,另一个线程不但不得以应用put方法,连get方法都不得以,效能非常的低。

   
Hashtable是Java低版本中提议来的,由于个中间的加锁机制,是的其天性较低,最近曾经不常用了。所以当贰个线程访问Hashtable的壹道方法时,其余线程假如也要访问同步方法,会被阻塞住。举个例子,当叁个线程使用put方法时,另三个线程不但无法动用put方法,连get方法都无法,作用非常低。

1. ConcurrentBag的私有字段

ConcurrentBag线程安全完成主即使经过它的数量存款和储蓄的组织和细颗粒度的锁。

   public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
    {
        // ThreadLocalList对象包含每个线程的数据
        ThreadLocal<ThreadLocalList> m_locals;

        // 这个头指针和尾指针指向中的第一个和最后一个本地列表,这些本地列表分散在不同线程中
        // 允许在线程局部对象上枚举
        volatile ThreadLocalList m_headList, m_tailList;

        // 这个标志是告知操作线程必须同步操作
        // 在GlobalListsLock 锁中 设置
        bool m_needSync;

}

首要选取大家来看它证明的私有字段,当中需求小心的是集结的数码是存放在在ThreadLocal线程本地存款和储蓄中的。也正是说访问它的种种线程会维护三个投机的集合数据列表,二个聚众中的数据只怕会存放在差别线程的地方存款和储蓄空间中,所以如果线程访问本人本地存储的靶子,那么是平昔不难题的,这就是达成线程安全的首先层,利用线程本地存款和储蓄数据

然后能够看到ThreadLocalList m_headList, m_tailList;本条是存放在着地面列表对象的头指针和尾指针,通过这五个指针,大家就可以经过遍历的方式来访问具有地点列表。它应用volatile修饰,不允许线程举行本地缓存,每一个线程的读写都以向来操作在共享内部存款和储蓄器上,那就确认保证了变量始终存有一致性。任何线程在别的时刻进行读写操作均是风靡值。对于volatile修饰符,感谢自身是攻城狮提议描述失实。

最终又定义了叁个标志,这一个标志告知操作线程必须开始展览同步操作,那是落到实处了贰个细颗粒度的锁,因为唯有在多少个规格满意的景况下才必要进行线程同步。

Set接口

Set是不能够包罗重合成分的容器,其完结类有HashSet,继承于它的接口有SortedSet接口等。Set中提供了加、减、和交等集合操作函数。Set不能够根据索引随机访问成分,那是它与List的三个重要分化。

   
HashTable源码中是使用synchronized来担保线程安全的,比如上边包车型地铁get方法和put方法:
public synchronized V get(Object key) {}

   
HashTable源码中是运用synchronized来保管线程安全的,比如下面包车型大巴get方法和put方法:
public synchronized V get(Object key) {}

二. 用来数据存款和储蓄的TrehadLocalList类

接下去大家来看一下ThreadLocalList类的结构,该类正是实际存款和储蓄了多少的地方。实际上它是应用双向链表那种布局进行数量存款和储蓄。

[Serializable]
// 构造了双向链表的节点
internal class Node
{
    public Node(T value)
    {
        m_value = value;
    }
    public readonly T m_value;
    public Node m_next;
    public Node m_prev;
}

/// <summary>
/// 集合操作类型
/// </summary>
internal enum ListOperation
{
    None,
    Add,
    Take
};

/// <summary>
/// 线程锁定的类
/// </summary>
internal class ThreadLocalList
{
    // 双向链表的头结点 如果为null那么表示链表为空
    internal volatile Node m_head;

    // 双向链表的尾节点
    private volatile Node m_tail;

    // 定义当前对List进行操作的种类 
    // 与前面的 ListOperation 相对应
    internal volatile int m_currentOp;

    // 这个列表元素的计数
    private int m_count;

    // The stealing count
    // 这个不是特别理解 好像是在本地列表中 删除某个Node 以后的计数
    internal int m_stealCount;

    // 下一个列表 可能会在其它线程中
    internal volatile ThreadLocalList m_nextList;

    // 设定锁定是否已进行
    internal bool m_lockTaken;

    // The owner thread for this list
    internal Thread m_ownerThread;

    // 列表的版本,只有当列表从空变为非空统计是底层
    internal volatile int m_version;

    /// <summary>
    /// ThreadLocalList 构造器
    /// </summary>
    /// <param name="ownerThread">拥有这个集合的线程</param>
    internal ThreadLocalList(Thread ownerThread)
    {
        m_ownerThread = ownerThread;
    }
    /// <summary>
    /// 添加一个新的item到链表首部
    /// </summary>
    /// <param name="item">The item to add.</param>
    /// <param name="updateCount">是否更新计数.</param>
    internal void Add(T item, bool updateCount)
    {
        checked
        {
            m_count++;
        }
        Node node = new Node(item);
        if (m_head == null)
        {
            Debug.Assert(m_tail == null);
            m_head = node;
            m_tail = node;
            m_version++; // 因为进行初始化了,所以将空状态改为非空状态
        }
        else
        {
            // 使用头插法 将新的元素插入链表
            node.m_next = m_head;
            m_head.m_prev = node;
            m_head = node;
        }
        if (updateCount) // 更新计数以避免此添加同步时溢出
        {
            m_count = m_count - m_stealCount;
            m_stealCount = 0;
        }
    }

    /// <summary>
    /// 从列表的头部删除一个item
    /// </summary>
    /// <param name="result">The removed item</param>
    internal void Remove(out T result)
    {
        // 双向链表删除头结点数据的流程
        Debug.Assert(m_head != null);
        Node head = m_head;
        m_head = m_head.m_next;
        if (m_head != null)
        {
            m_head.m_prev = null;
        }
        else
        {
            m_tail = null;
        }
        m_count--;
        result = head.m_value;

    }

    /// <summary>
    /// 返回列表头部的元素
    /// </summary>
    /// <param name="result">the peeked item</param>
    /// <returns>True if succeeded, false otherwise</returns>
    internal bool Peek(out T result)
    {
        Node head = m_head;
        if (head != null)
        {
            result = head.m_value;
            return true;
        }
        result = default(T);
        return false;
    }

    /// <summary>
    /// 从列表的尾部获取一个item
    /// </summary>
    /// <param name="result">the removed item</param>
    /// <param name="remove">remove or peek flag</param>
    internal void Steal(out T result, bool remove)
    {
        Node tail = m_tail;
        Debug.Assert(tail != null);
        if (remove) // Take operation
        {
            m_tail = m_tail.m_prev;
            if (m_tail != null)
            {
                m_tail.m_next = null;
            }
            else
            {
                m_head = null;
            }
            // Increment the steal count
            m_stealCount++;
        }
        result = tail.m_value;
    }


    /// <summary>
    /// 获取总计列表计数, 它不是线程安全的, 如果同时调用它, 则可能提供不正确的计数
    /// </summary>
    internal int Count
    {
        get
        {
            return m_count - m_stealCount;
        }
    }
}

从上边的代码中我们能够特别证实在此以前的观点,正是ConcurentBag<T>在八个线程中储存数据时,使用的是双向链表ThreadLocalList贯彻了1组对链表增加和删除改查的措施。

HashSet

HashSet达成了Set接口,当中间是采用HashMap完毕的。放入HashSet的目的最好重写hashCode、equals方法,因为暗中认可的那多个办法很大概与您的业务逻辑是不一样的,而且,要同时重写这五个函数,如若只重写在那之中3个,很简单生出意外的题材。
记住上边几条规则:

相当对象,hashCode一定相等。
不等对象,hashCode不肯定不等于。
几个指标的hashCode相同,不肯定相等。
七个对象的hashCode区别,一定相等。

public synchronized V put(K key, V value) {}

public synchronized V put(K key, V value) {}

三. ConcurrentBag贯彻新增成分

接下去我们看壹看ConcurentBag<T>是哪些新增成分的。

/// <summary>
/// 尝试获取无主列表,无主列表是指线程已经被暂停或者终止,但是集合中的部分数据还存储在那里
/// 这是避免内存泄漏的方法
/// </summary>
/// <returns></returns>
private ThreadLocalList GetUnownedList()
{
    //此时必须持有全局锁
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    // 从头线程列表开始枚举 找到那些已经被关闭的线程
    // 将它所在的列表对象 返回
    ThreadLocalList currentList = m_headList;
    while (currentList != null)
    {
        if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped)
        {
            currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe
            return currentList;
        }
        currentList = currentList.m_nextList;
    }
    return null;
}
/// <summary>
/// 本地帮助方法,通过线程对象检索线程线程本地列表
/// </summary>
/// <param name="forceCreate">如果列表不存在,那么创建新列表</param>
/// <returns>The local list object</returns>
private ThreadLocalList GetThreadList(bool forceCreate)
{
    ThreadLocalList list = m_locals.Value;

    if (list != null)
    {
        return list;
    }
    else if (forceCreate)
    {
        // 获取用于更新操作的 m_tailList 锁
        lock (GlobalListsLock)
        {
            // 如果头列表等于空,那么说明集合中还没有元素
            // 直接创建一个新的
            if (m_headList == null)
            {
                list = new ThreadLocalList(Thread.CurrentThread);
                m_headList = list;
                m_tailList = list;
            }
            else
            {
               // ConcurrentBag内的数据是以双向链表的形式分散存储在各个线程的本地区域中
                // 通过下面这个方法 可以找到那些存储有数据 但是已经被停止的线程
                // 然后将已停止线程的数据 移交到当前线程管理
                list = GetUnownedList();
                // 如果没有 那么就新建一个列表 然后更新尾指针的位置
                if (list == null)
                {
                    list = new ThreadLocalList(Thread.CurrentThread);
                    m_tailList.m_nextList = list;
                    m_tailList = list;
                }
            }
            m_locals.Value = list;
        }
    }
    else
    {
        return null;
    }
    Debug.Assert(list != null);
    return list;
}
/// <summary>
/// Adds an object to the <see cref="ConcurrentBag{T}"/>.
/// </summary>
/// <param name="item">The object to be added to the
/// <see cref="ConcurrentBag{T}"/>. The value can be a null reference
/// (Nothing in Visual Basic) for reference types.</param>
public void Add(T item)
{
    // 获取该线程的本地列表, 如果此线程不存在, 则创建一个新列表 (第一次调用 add)
    ThreadLocalList list = GetThreadList(true);
    // 实际的数据添加操作 在AddInternal中执行
    AddInternal(list, item);
}

/// <summary>
/// </summary>
/// <param name="list"></param>
/// <param name="item"></param>
private void AddInternal(ThreadLocalList list, T item)
{
    bool lockTaken = false;
    try
    {
        #pragma warning disable 0420
        Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add);
        #pragma warning restore 0420
        // 同步案例:
        // 如果列表计数小于两个, 因为是双向链表的关系 为了避免与任何窃取线程发生冲突 必须获取锁
        // 如果设置了 m_needSync, 这意味着有一个线程需要冻结包 也必须获取锁
        if (list.Count < 2 || m_needSync)
        {
            // 将其重置为None 以避免与窃取线程的死锁
            list.m_currentOp = (int)ListOperation.None;
            // 锁定当前对象
            Monitor.Enter(list, ref lockTaken);
        }
        // 调用 ThreadLocalList.Add方法 将数据添加到双向链表中
        // 如果已经锁定 那么说明线程安全  可以更新Count 计数
        list.Add(item, lockTaken);
    }
    finally
    {
        list.m_currentOp = (int)ListOperation.None;
        if (lockTaken)
        {
            Monitor.Exit(list);
        }
    }
}

从地点代码中,我们得以很精通的了然Add()艺术是何许运维的,其中的显要便是GetThreadList()办法,通过该方法能够取妥贴前线程的数量存款和储蓄列表对象,即便不设有数量存款和储蓄列表,它会活动成立大概经过GetUnownedList()主意来探寻那多少个被终止然而还蕴藏有数量列表的线程,然后将数据列表重回给当下线程中,幸免了内部存款和储蓄器泄漏。

在数量拉长的进程中,实现了细颗粒度的lock同步锁,所以品质会很高。删除和任何操作与新增类似,本文不再赘述。

TreeSet

TreeSet同样的Set接口的兑现类,同样不能够存放相同的对象。它与HashSet不相同的是,TreeSet的要素是遵照顺序排列的,由此用TreeSet存放的靶子必要贯彻Comparable接口。

方法二:SynchronizedMap

方法二:SynchronizedMap

四. ConcurrentBag 怎么着兑现迭代器格局

看完上面包车型大巴代码后,笔者很惊叹ConcurrentBag<T>是何许兑现IEnumerator来贯彻迭代访问的,因为ConcurrentBag<T>是经过分流在差别线程中的ThreadLocalList来储存数据的,那么在达成迭代器情势时,进程会比较复杂。

末尾再查看了源码之后,发现ConcurrentBag<T>为了促成迭代器方式,将分在不相同线程中的数据全都存到二个List<T>见面中,然后回来了该副本的迭代器。所以每一次访问迭代器,它都会新建2个List<T>的副本,那样固然浪费了迟早的蕴藏空间,不过逻辑上特别简便易行了。

/// <summary>
/// 本地帮助器方法释放所有本地列表锁
/// </summary>
private void ReleaseAllLocks()
{
    // 该方法用于在执行线程同步以后 释放掉所有本地锁
    // 通过遍历每个线程中存储的 ThreadLocalList对象 释放所占用的锁
    ThreadLocalList currentList = m_headList;
    while (currentList != null)
    {

        if (currentList.m_lockTaken)
        {
            currentList.m_lockTaken = false;
            Monitor.Exit(currentList);
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// 从冻结状态解冻包的本地帮助器方法
/// </summary>
/// <param name="lockTaken">The lock taken result from the Freeze method</param>
private void UnfreezeBag(bool lockTaken)
{
    // 首先释放掉 每个线程中 本地变量的锁
    // 然后释放全局锁
    ReleaseAllLocks();
    m_needSync = false;
    if (lockTaken)
    {
        Monitor.Exit(GlobalListsLock);
    }
}

/// <summary>
/// 本地帮助器函数等待所有未同步的操作
/// </summary>
private void WaitAllOperations()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    ThreadLocalList currentList = m_headList;
    // 自旋等待 等待其它操作完成
    while (currentList != null)
    {
        if (currentList.m_currentOp != (int)ListOperation.None)
        {
            SpinWait spinner = new SpinWait();
            // 有其它线程进行操作时,会将cuurentOp 设置成 正在操作的枚举
            while (currentList.m_currentOp != (int)ListOperation.None)
            {
                spinner.SpinOnce();
            }
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// 本地帮助器方法获取所有本地列表锁
/// </summary>
private void AcquireAllLocks()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    bool lockTaken = false;
    ThreadLocalList currentList = m_headList;

    // 遍历每个线程的ThreadLocalList 然后获取对应ThreadLocalList的锁
    while (currentList != null)
    {
        // 尝试/最后 bllock 以避免在获取锁和设置所采取的标志之间的线程港口
        try
        {
            Monitor.Enter(currentList, ref lockTaken);
        }
        finally
        {
            if (lockTaken)
            {
                currentList.m_lockTaken = true;
                lockTaken = false;
            }
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// Local helper method to freeze all bag operations, it
/// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added
/// to the dictionary
/// 2- Then Acquire all local lists locks to prevent steal and synchronized operations
/// 3- Wait for all un-synchronized operations to be done
/// </summary>
/// <param name="lockTaken">Retrieve the lock taken result for the global lock, to be passed to Unfreeze method</param>
private void FreezeBag(ref bool lockTaken)
{
    Contract.Assert(!Monitor.IsEntered(GlobalListsLock));

    // 全局锁定可安全地防止多线程调用计数和损坏 m_needSync
    Monitor.Enter(GlobalListsLock, ref lockTaken);

    // 这将强制同步任何将来的添加/执行操作
    m_needSync = true;

    // 获取所有列表的锁
    AcquireAllLocks();

    // 等待所有操作完成
    WaitAllOperations();
}

/// <summary>
/// 本地帮助器函数返回列表中的包项, 这主要由 CopyTo 和 ToArray 使用。
/// 这不是线程安全, 应该被称为冻结/解冻袋块
/// 本方法是私有的 只有使用 Freeze/UnFreeze之后才是安全的 
/// </summary>
/// <returns>List the contains the bag items</returns>
private List<T> ToList()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));
    // 创建一个新的List
    List<T> list = new List<T>();
    ThreadLocalList currentList = m_headList;
    // 遍历每个线程中的ThreadLocalList 将里面的Node的数据 添加到list中
    while (currentList != null)
    {
        Node currentNode = currentList.m_head;
        while (currentNode != null)
        {
            list.Add(currentNode.m_value);
            currentNode = currentNode.m_next;
        }
        currentList = currentList.m_nextList;
    }

    return list;
}

/// <summary>
/// Returns an enumerator that iterates through the <see
/// cref="ConcurrentBag{T}"/>.
/// </summary>
/// <returns>An enumerator for the contents of the <see
/// cref="ConcurrentBag{T}"/>.</returns>
/// <remarks>
/// The enumeration represents a moment-in-time snapshot of the contents
/// of the bag.  It does not reflect any updates to the collection after 
/// <see cref="GetEnumerator"/> was called.  The enumerator is safe to use
/// concurrently with reads from and writes to the bag.
/// </remarks>
public IEnumerator<T> GetEnumerator()
{
    // Short path if the bag is empty
    if (m_headList == null)
        return new List<T>().GetEnumerator(); // empty list

    bool lockTaken = false;
    try
    {
        // 首先冻结整个 ConcurrentBag集合
        FreezeBag(ref lockTaken);
        // 然后ToList 再拿到 List的 IEnumerator
        return ToList().GetEnumerator();
    }
    finally
    {
        UnfreezeBag(lockTaken);
    }
}

由地方的代码可清楚,为了获取迭代器对象,总共举行了三步关键的操作。

  1. 使用FreezeBag()方法,冻结一切ConcurrentBag<T>见面。因为须求扭转集合的List<T>副本,生成副本时期不能够有其它线程更改损坏数据。
  2. ConcurrrentBag<T>生成List<T>副本。因为ConcurrentBag<T>积存数据的格局相比较独特,直接达成迭代器情势困难,思量到线程安全和逻辑,最好的法子是生成1个副本。
  3. 完毕以上操作之后,就足以选用UnfreezeBag()格局解冻整个集合。

那么FreezeBag()办法是怎么来冻结1切集合的吗?也是分为三步走。

  1. 先是得到全局锁,通过Monitor.Enter(GlobalListsLock, ref lockTaken);这么一条语句,那样任何线程就无法冻结集合。
  2. 接下来拿走具有线程中ThreadLocalList的锁,通过`亚洲必赢官网,AcquireAllLocks()方法来遍历获取。那样任何线程就无法对它实行操作损坏数据。
  3. 伺机已经跻身了操作流程线程甘休,通过WaitAllOperations()办法来促成,该方法会遍历每四个ThreadLocalList对象的m_currentOp天性,确认保障全部介乎None操作。

实现以上流程后,那么正是确实的冷冻了总体ConcurrentBag<T>集结,要解冻的话也就如。在此不再赘述。

Map接口

Map集合提供了服从“键值对”存款和储蓄成分的不二等秘书籍,多少个键唯1映射2个值。集合中“键值对”全部作为四个实体成分时,类似List集合,不过假若分别来年,Map是贰个两列成分的聚合:键是一列,值是壹列。与Set集合一样,Map也从没提供随机走访的能力,只好通过键来访问对应的值。
Map的每二个因素都以一个Map.Entry,那几个实体的结构是< Key, Value
>样式。

   
调用synchronizedMap()方法后会重回四个SynchronizedMap类的指标,而在SynchronizedMap类中动用了synchronized同步关键字来担保对Map的操作是线程安全的。

   
调用synchronizedMap()方法后会重临二个SynchronizedMap类的目的,而在SynchronizedMap类中行使了synchronized同步关键字来确定保证对Map的操作是线程安全的。

四、总结

上边给出一张图,描述了ConcurrentBag<T>是何等存款和储蓄数据的。通过种种线程中的ThreadLocal来完成线程本地存款和储蓄,每一个线程中都有这么的构造,互不困扰。然后各个线程中的m_headList连天指向ConcurrentBag<T>的首先个列表,m_tailList本着最终3个列表。列表与列表之间通过m_locals
下的 m_nextList不断,构成一个单链表。

数量存款和储蓄在每一种线程的m_locals中,通过Node类构成3个双向链表。
PS:
要注意m_tailListm_headList并不是储存在ThreadLocal中,而是有着的线程共享1份。

亚洲必赢官网 1

以上正是有关ConcurrentBag<T>类的落实,作者的壹些记录和剖析。

HashMap

HashMap完毕了Map接口,但它是非线程安全的。HashMap允许key值为null,value也得以为null。

源码如下

源码如下

作者水平有限,倘若不当欢迎各位批评指正!

附上ConcurrentBag<T>源码地址:戳一戳

Hashtable

Hashtable也是Map的落实类,继承自Dictionary类。它与HashMap不一样的是,它是线程安全的。而且它不允许key为null,value也无法为null。
出于它是线程安全的,在作用上紧跟于HashMap。

private static class SynchronizedMap<K,V>

    implements Map<K,V>, Serializable {

    // use serialVersionUID from JDK 1.2.2 for interoperability

    private static final long serialVersionUID =1978198479659022715L;

    private final Map<K,V> m;     // Backing Map

        final Object      mutex;    // Object on which to synchronize

    SynchronizedMap(Map<K,V> m) {

            if (m==null)

                throw new NullPointerException();

            this.m = m;

            mutex = this;

        }

    SynchronizedMap(Map<K,V> m, Object mutex) {

            this.m = m;

            this.mutex = mutex;

        }

    public int size() {

        synchronized(mutex) {return m.size();}

        }

    public boolean isEmpty(){

        synchronized(mutex) {return m.isEmpty();}

        }

    public boolean containsKey(Object key) {

        synchronized(mutex) {return m.containsKey(key);}

        }

    public boolean containsValue(Object value){

        synchronized(mutex) {return m.containsValue(value);}

        }

    public V get(Object key) {

        synchronized(mutex) {return m.get(key);}

        }

    public V put(K key, V value) {

        synchronized(mutex) {return m.put(key, value);}

        }

    public V remove(Object key) {

        synchronized(mutex) {return m.remove(key);}

        }

    public void putAll(Map<? extends K, ? extends V> map) {

        synchronized(mutex) {m.putAll(map);}

        }

    public void clear() {

        synchronized(mutex) {m.clear();}

    }

    private transient Set<K> keySet = null;

    private transient Set<Map.Entry<K,V>> entrySet = null;

    private transient Collection<V> values = null;

    public Set<K> keySet() {

            synchronized(mutex) {

                if (keySet==null)

                    keySet = new                                     SynchronizedSet<K>(m.keySet(),mutex); 

                return keySet;

            }

    }

    public Set<Map.Entry<K,V>> entrySet() {

            synchronized(mutex) {

                if (entrySet==null)

                    entrySet = new SynchronizedSet<Map.Entry<K,V>>(m.entrySet(), mutex);

                return entrySet;

            }

    }

    public Collection<V> values() {

            synchronized(mutex) {

                if (values==null)

                    values = new SynchronizedCollection<V>(m.values(), mutex);

                return values;

            }

        }

    public boolean equals(Object o) {

            if (this == o)

                return true;

            synchronized(mutex) {return m.equals(o);}

        }

    public int hashCode() {

            synchronized(mutex) {return m.hashCode();}

        }

    public String toString() {

        synchronized(mutex) {return m.toString();}

        }

        private void writeObject(ObjectOutputStream s) throws IOException {

        synchronized(mutex) {s.defaultWriteObject();}

        }

}
private static class SynchronizedMap<K,V>

    implements Map<K,V>, Serializable {

    // use serialVersionUID from JDK 1.2.2 for interoperability

    private static final long serialVersionUID =1978198479659022715L;

    private final Map<K,V> m;     // Backing Map

        final Object      mutex;    // Object on which to synchronize

    SynchronizedMap(Map<K,V> m) {

            if (m==null)

                throw new NullPointerException();

            this.m = m;

            mutex = this;

        }

    SynchronizedMap(Map<K,V> m, Object mutex) {

            this.m = m;

            this.mutex = mutex;

        }

    public int size() {

        synchronized(mutex) {return m.size();}

        }

    public boolean isEmpty(){

        synchronized(mutex) {return m.isEmpty();}

        }

    public boolean containsKey(Object key) {

        synchronized(mutex) {return m.containsKey(key);}

        }

    public boolean containsValue(Object value){

        synchronized(mutex) {return m.containsValue(value);}

        }

    public V get(Object key) {

        synchronized(mutex) {return m.get(key);}

        }

    public V put(K key, V value) {

        synchronized(mutex) {return m.put(key, value);}

        }

    public V remove(Object key) {

        synchronized(mutex) {return m.remove(key);}

        }

    public void putAll(Map<? extends K, ? extends V> map) {

        synchronized(mutex) {m.putAll(map);}

        }

    public void clear() {

        synchronized(mutex) {m.clear();}

    }

    private transient Set<K> keySet = null;

    private transient Set<Map.Entry<K,V>> entrySet = null;

    private transient Collection<V> values = null;

    public Set<K> keySet() {

            synchronized(mutex) {

                if (keySet==null)

                    keySet = new                                     SynchronizedSet<K>(m.keySet(),mutex); 

                return keySet;

            }

    }

    public Set<Map.Entry<K,V>> entrySet() {

            synchronized(mutex) {

                if (entrySet==null)

                    entrySet = new SynchronizedSet<Map.Entry<K,V>>(m.entrySet(), mutex);

                return entrySet;

            }

    }

    public Collection<V> values() {

            synchronized(mutex) {

                if (values==null)

                    values = new SynchronizedCollection<V>(m.values(), mutex);

                return values;

            }

        }

    public boolean equals(Object o) {

            if (this == o)

                return true;

            synchronized(mutex) {return m.equals(o);}

        }

    public int hashCode() {

            synchronized(mutex) {return m.hashCode();}

        }

    public String toString() {

        synchronized(mutex) {return m.toString();}

        }

        private void writeObject(ObjectOutputStream s) throws IOException {

        synchronized(mutex) {s.defaultWriteObject();}

        }

}

List总结

ArrayList内部贯彻应用动态数组,当体量不够时,自动扩大容积至(当前容积*壹.5+一)。成分的逐条依照插入的顺序排列。暗中认可初阶体积为十。
contains复杂度为O(n),add复杂度为分摊的常数,即添加n个要素须要O(n)时间,remove为O(n),get复杂度为O(一)
擅自访问成效高,随机插入、删除功效低。ArrayList是非线程安全的。

LinkedList内部采取双向链表实现,随机走访功效低,随机插入、删除功效高。能够视作储藏室、队列、双向队列来使用。LinkedList也是非线程安全的。

Vector跟ArrayList是接近的,内部贯彻也是动态数组,随机走访功效高。Vector是线程安全的。

Stack是栈,继承于Vector,其各样操作也是基于Vector的各样操作,由此当中间贯彻也是动态数组,先进后出。Stack是线程安全的。

方法三:ConcurrentHashMap

方法三:ConcurrentHashMap

List使用情况

对此急需赶快插入、删除成分,应该运用LinkedList
对于须求快速随机访问成分,应该利用ArrayList
若果List供给被八线程操作,应该使用Vector,借使只会被单线程操作,应该利用ArrayList
Set总结

HashSet内部是利用HashMap完毕的,HashSet的key值是不允许再度的,假如放入的对象是自定义对象,那么最佳能(CANON)够同时重写hashCode与equals函数,那样就能自定义添加的靶子在什么样的图景下是1样的,即能确认保证在工作逻辑下能添加对象到HashSet中,保险工作逻辑的不错。其余,HashSet里的因素不是根据顺序存款和储蓄的。HashSet是非线程安全的。

TreeSet存款和储蓄的因素是按顺序存款和储蓄的,假诺是储存的要素是自定义对象,那么供给贯彻Comparable接口。TreeSet也是非线程安全的。

LinkedHashSet继承自HashSet,它与HashSet差异的是,LinkedHashSet存款和储蓄成分的相继是遵照成分的插入顺序存款和储蓄的。LinkedHashSet也是非线程安全的。

    ConcurrentHashMap是java.util.concurrent包中的三个类,

    ConcurrentHashMap是java.util.concurrent包中的三个类,

Map总结

HashMap存款和储蓄键值对。当程序试图将一个key-value对放入 HashMap
中时,程序首先依据该key的hashCode()重临值决定该Entry的囤积地方:假使三个Entry的key的hashCode()
再次来到值相同,那它们的存款和储蓄地点相同。要是那七个Entry的key通过equals相比再次回到true,新添加Entry的value将覆盖集合中原始Entry的
value,但key不会覆盖。假使那七个Entry的key通过equals
相比再次来到false,新加上的Entry将与聚集中原来Entry形成Entry
链,而且新增加的 Entry 位于 Entry
链的底部。看下边HashMap添加键值对的源代码:

    public V put(K key, V value) {
    if (key == null)
        return putForNullKey(value);
    int hash = hash(key.hashCode());
    int i = indexFor(hash, table.length);
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
    modCount++;
    addEntry(hash, key, value, i);
    return null;
    }
    void addEntry(int hash, K key, V value, int bucketIndex) {
    Entry<K,V> e = table[bucketIndex];
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    if (size++ >= threshold)
        resize(2 * table.length);
    }

HashMap允许key、value值为null。HashMap是非线程安全的。

Hashtable是HashMap的线程安全版本。而且,key、value都不允许为null。

哈希值的应用分歧: Hashtable直接行使对象的hashCode,如下代码:

int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;

而HashMap重新计算hash值,如下代码:

  int hash = hash(key.hashCode());
  int i = indexFor(hash, table.length);
  static int hash(int h) {
  // This function ensures that hashCodes that differ only by
  // constant multiples at each bit position have a bounded
  // number of collisions (approximately 8 at default load factor).
  h ^= (h >>> 20) ^ (h >>> 12);
  return h ^ (h >>> 7) ^ (h >>> 4);
  }
  static int indexFor(int h, int length) {
  return h & (length-1);
  }

恢宏容积差别: Hashtable中hash数组默许大小是1一,扩张的章程是
old*二+1。HashMap中hash数组的暗中认可大小是1陆,而且一定是2的指数。

第一,大家先来了然一下以此集合的原理。hashtable是做了合伙的,不过质量下落了,因为
hashtable每趟同步执行的时候都要锁住整个结构。于是ConcurrentHashMap
修改了其锁住整个结构的格式,改为了只锁住HashMap的2个桶,锁的粒度大大减小,如下图:

率先,大家先来打探一下这一个集合的法则。hashtable是做了1块的,不过品质下跌了,因为
hashtable每一遍同步执行的时候都要锁住整个结构。于是ConcurrentHashMap
修改了其锁住整个结构的格式,改为了只锁住HashMap的1个桶,锁的粒度大大减小,如下图:

二.并发相关的集合类

并发作用的升级是索要从底层JVM指令级别开端重新设计,优化,革新同步锁的建制才能完毕的,java.util.concurrent
的目标正是要促成 Collection
框架对数据结构所实施的面世操作。通过提供1组可相信的、高质量并发营造块,开发人士可以抓好并发类的线程安全、可伸缩性、品质、可读性和可信性。JDK
5.0 中的并发革新有以下三组:

• JVM 级别更改。一大半现代处理器对出现对某壹硬件级别提供援助,日常以
compare-and-swap (CAS)指令格局。CAS
是1种低级其余、细粒度的技巧,它同意多少个线程更新3个内存地点,同时可以检查测试别的线程的争辨并开始展览还原。它是不可胜举高质量并发算法的根基。在
JDK 伍.0 从前,Java
语言中用来协调线程之间的走访的惟壹原语是壹块,同步是更重量级和粗粒度的。公开
CAS 能够付出中度可伸缩的产出 Java 类。那么些改变主要由 JDK
库类使用,而不是由开发人士使用。

• 低级实用程序类 — 锁定和原子类。使用 CAS 作为并发原语,ReentrantLock
类提供与 synchronized
原语相同的锁定和内部存款和储蓄器语义,可是如此能够更好地操纵锁定(如计时的锁定等待、锁定轮询和可间歇的锁定等待)和提供更好的可伸缩性(竞争时的高质量)。抢先一半开发人员将不再直接运用
ReentrantLock 类,而是选拔在 ReentrantLock 类上营造的高级类。

 亚洲必赢官网 2

 亚洲必赢官网 3

高等实用程序类。那几个类完成并发创设块,各类计算机科学文本中都会讲述那个类

时域信号、互斥、闩锁、屏障、交流程序、线程池和线程安全集合类等。半数以上开发人士都得以在应用程序中用那一个类,来替换许多(若是否一切)同步、wait()
和 notify() 的选择,从而做实质量、可读性和不易。

 

 

Hashtable

Hashtable
提供了一种易于使用的、线程安全的、关联的map功效,那当然也是便利的。然则,线程安全性是凭代价换到的――
Hashtable 的有所办法都是手拉手的。
此时,无竞争的联手会促成可观的品质代价。 Hashtable 的后继者 HashMap
是用作JDK1.第22中学的集合框架的1有个别出现的,它通过提供3个不联合的基类和2个联机的包装器
Collections.synchronizedMap ,解决了线程安全性难题。
通过将大旨的效应从线程安全性中分离开来, Collections.synchronizedMap
允许需求共同的用户能够拥有两只,而不须求1起的用户则无需为1起付出代价。
Hashtable 和 synchronizedMap 所运用的拿走同步的简易方法(同步 Hashtable
中或然联合的 Map
包装器对象中的每种方法)有八个根本的不足。首先,这种办法对于可伸缩性是一种障碍,因为一回只好有1个线程可以访问hash表。
同时,那样仍不足以提供真正的线程安全性,许多公用的混合操作还是需求相当的联手。即便诸如
get() 和 put()
之类的简约操作能够在不必要额外同步的情状下安全地做到,但要么有壹些公用的操作体系,例如迭代或然put-if-absent(空则放入),需求外部的壹道,以制止数据争用。

并且ConcurrentHashMap的读取操作大致是截然的面世操作。所以ConcurrentHashMap
读操作的加锁加锁粒度变小,个体操作差不多一贯不锁,所以比起此前的Hashtable大大变快了(这点在桶更加多时展现得更明显些)。唯有在求size等操作时才要求锁定任何表。小编觉着ConcurrentHashMap是线程安全的汇聚中最火速的。

还要ConcurrentHashMap的读取操作大约是截然的面世操作。所以ConcurrentHashMap
读操作的加锁加锁粒度变小,个体操作大概从不锁,所以比起从前的Hashtable大大变快了(那一点在桶越来越多时展现得更简明些)。唯有在求size等操作时才须要锁定任何表。作者以为ConcurrentHashMap是线程安全的联谊中最高效的。

减小锁粒度

增加 HashMap
的并发性同时还提供线程安全性的一种艺术是遗弃对全体表使用2个锁的办法,而选取对hash表的各样bucket都利用几个锁的点子(或然,更广大的是,使用1个锁池,各样锁负责掩护多少个bucket)
。这代表七个线程能够同时地走访一个 Map
的例外部分,而毋庸争用单个的联谊范围的锁。那种办法能够一直进步插入、检索以及移除操作的可伸缩性。不幸的是,那种并发性是以一定的代价换成的――这使得对1切
集合举办操作的部分艺术(例如 size() 或 isEmpty()
)的兑现更为困难,因为这个艺术供给三次获得过多的锁,并且还留存再次来到不得法的结果的危机。然则,对于壹些景况,例如落到实处cache,那样做是一个很好的低头――因为检索和插入操作相比较频仍,而
size() 和 isEmpty() 操作则少得多。

   
而在迭代时,ConcurrentHashMap使用了不相同于古板集合的快捷失利迭代器的另壹种迭代格局,大家誉为弱1致迭代器。在那种迭代情势中,当iterator被创制后集合再发生变更就不再是抛出
ConcurrentModificationException,取而代之的是在改动时new新的数目从而不影响原有的数码,iterator完结后再将头指针替换为新的数额,这样iterator线程能够行使原来老的多少,而写线程也得以出现的做到改变,更主要的,那保险了七个线程并发执行的延续性和扩充性,是性质提高的严重性。

   
而在迭代时,ConcurrentHashMap使用了差异于古板集合的便捷退步迭代器的另①种迭代格局,大家誉为弱壹致迭代器。在那种迭代格局中,当iterator被创设后集合再发生转移就不再是抛出
ConcurrentModificationException,取而代之的是在改变时new新的数额从而不影响原有的数额,iterator完结后再将头指针替换为新的多少,那样iterator线程能够采取原来老的数据,而写线程也足以现身的成功改变,更首要的,那保险了多少个线程并发执行的接二连三性和扩充性,是性质升高的主要。

ConcurrentHashMap

util.concurrent 包中的 ConcurrentHashMap 类(也将应运而生在JDK 壹.5中的
java.util.concurrent 包中)是对 Map 的线程安全的兑现,比起
synchronizedMap
来,它提供了好得多的并发性。四个读操作差不离总能够并发地执行,同时拓展的读和写操作平日也能并发地执行,而与此同时开始展览的写操作如故可以时不时地涌出举办(相关的类也提供了近似的四个读线程的并发性,可是,只同意有三个移动的写线程)
。ConcurrentHashMap 被设计用来优化检索操作;实际上,成功的 get()
操作完毕今后平常根本不会有锁着的财富。要在不使用锁的状态下取得线程安全性需求肯定的技巧性,并且须要对Java内部存款和储蓄器模型(Java
Memory Model)的细节有深切的知道。

在Java 7中利用的是对Segment(Hash表的一个桶)加锁的点子

在Java 七中动用的是对Segment(Hash表的1个桶)加锁的措施

CopyOnWriteArrayList

在这个遍历操作大大地多于插入或移除操作的面世应用程序中,壹般用
CopyOnWriteArrayList 类替代 ArrayList
。假设是用于存放二个侦听器(listener)列表,例如在AWT或Swing应用程序中,大概在大面积的JavaBean中,那么那种景色很宽泛(相关的
CopyOnWriteArraySet 使用四个 CopyOnWriteArrayList 来落到实处 Set 接口) 。
比方您正在使用1个见惯不惊的 ArrayList
来存放在贰个侦听器列表,那么只要该列表是可变的,而且大概要被多少个线程访问,您
就不可能不要么在对其展开迭代操作时期,要么在迭代前进行的仿制操作时期,锁定任何列表,那二种做法的支付都极大。当对列表执行会唤起列表产生变化的操作时,
CopyOnWriteArrayList
并不是为列表成立三个全新的副本,它的迭代器肯定能够回来在迭代器被创设时列表的场所,而不会抛出
ConcurrentModificationException
。在对列表实行迭代以前不要克隆列表只怕在迭代里面锁
定列表,因为迭代器所观察的列表的副本是不变的。换句话说,
CopyOnWriteArrayList
含有对1个不得变数组的一个可变的引用,因而,只要保留好可怜引用,您就足以博得不可变的线程安全性的利益,而且并非锁
定列表。

ConcurrentHashMap中十分重要实体类正是七个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点)。

ConcurrentHashMap中至关心珍贵要实体类正是多少个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点)。

BlockingQueue

堵塞队列(BlockingQueue)是2个支持多个附加操作的序列。这五个叠加的操作是:在队列为空时,获取成分的线程会等待队列变为非空。当队列满时,存款和储蓄成分的线程会等待队列可用。阻塞队列常用于生产者和顾客的现象,生产者是往队列里添美金素的线程,消费者是从队列里拿成分的线程。阻塞队列就是生产者存放成分的容器,而顾客也只从容器里拿成分。

闭塞队列提供了各个处理方法:

方法处理格局 抛出卓殊 回来特殊值 直白不通 过期退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
反省措施 element() peek() 不可用 不可用

抛出十三分:是指当阻塞队列满时候,再往队列里插入成分,会抛出IllegalStateException(“Queue
full”)卓殊。当队列为空时,从队列里取得成分时会抛出NoSuchElementException非常。
再次来到特殊值:插入方法会重回是不是成功,成功则赶回true。移除方法,则是从队列里拿出三个成分,借使未有则赶回null
直白不通:当阻塞队列满时,假如劳动者线程往队列里put成分,队列会一直不通生产者线程,直到得到多少,恐怕响应中断退出。当队列空时,消费者线程试图从队列里take成分,队列也会堵塞消费者线程,直到队列可用。
逾期退出:当阻塞队列满时,队列会阻塞生产者线程壹段时间,要是跨越一定的时日,生产者线程就会退出。

  1. Java里的不通队列

JDK七提供了九个闭塞队列。分别是

ArrayBlockingQueue :三个由数组结构重组的有界阻塞队列。
LinkedBlockingQueue :2个由链表结构重组的有界阻塞队列。
PriorityBlockingQueue :多少个帮助先行级排序的无界阻塞队列。
DelayQueue:二个用到优先级队列完毕的无界阻塞队列。
SynchronousQueue:贰个不存款和储蓄成分的隔开分离队列。
LinkedTransferQueue:三个由链表结构重组的无界阻塞队列。
LinkedBlockingDeque:3个由链表结构重组的双向阻塞队列。

Segment的源码

Segment的源码

ArrayBlockingQueue

ArrayBlockingQueue是1个用数组完结的有界阻塞队列。此行列依照先进先出(FIFO)的尺度对成分举办排序。暗中认可景况下不保证访问者公平的拜访队列,所谓公平访问队列是指阻塞的拥有生产者线程或顾客线程,当队列可用时,能够依据阻塞的先后顺序访问队列,即先阻塞的劳动者线程,能够先往队列里插入成分,先堵塞的顾客线程,可以先从队列里获得成分。

static final class Segment<K,V> extends
ReentrantLock implements Serializable {

static final class Segment<K,V> extends
ReentrantLock implements Serializable {

LinkedBlockingQueue

LinkedBlockingQueue是三个用链表完结的有界阻塞队列。此行列的默许和最大尺寸为Integer.MAX_VALUE。此行列依据先进先出的尺度对成分实行排序。
LinkedBlockingDeque是多少个由链表结构组成的双向阻塞队列。所谓双向队列指的你能够从队列的两边插入和移出成分。双端队列因为多了二个操作队列的进口,在10二线程同时入队时,也就收缩了大体上的竞争。比较其余的隔离队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等艺术,以First单词结尾的秘诀,表示插入,获取(peek)或移除双端队列的首先个因素。以Last单词结尾的措施,表示插入,获取或移除双端队列的尾声多个元素。别的插入方法add等同于addLast,移除方法remove等效于removeFirst。可是take方法却一样takeFirst,不知情是不是Jdk的bug,使用时照旧用饱含First和Last后缀的方法更了解。在开头化LinkedBlockingDeque时得以开首化队列的体积,用来防护其再扩大体积时过渡膨胀。其它双向阻塞队列能够应用在“工作窃取”方式中。

           private static final long serialVersionUID
= 2249069246763182397L;

           private static final long serialVersionUID
= 2249069246763182397L;

PriorityBlockingQueue

PriorityBlockingQueue是二个支撑先行级的无界队列。私下认可情状下成分运用自然顺序排列,也能够通过相比较器comparator来钦点成分的排序规则。成分依据升序排列。

           static final int MAX_SCAN_RETRIES
=Runtime.getRuntime().

           static final int MAX_SCAN_RETRIES
=Runtime.getRuntime().

DelayQueue

DelayQueue是三个支撑延时获取成分的无界阻塞队列。队列使用PriorityQueue来完结。队列中的成分必须兑现Delayed接口,在开立成分时方可钦命多长期才能从队列中拿走当前元素。唯有在延迟期满时才能从队列中领到成分。大家得以将DelayQueue运用在以下应用场景:

缓存系统的筹划:可以用DelayQueue保存缓存成分的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取成分时,表示缓存有效期到了。
定时义务调度。使用DelayQueue保存当天将会履行的任务和执行时间,一旦从DelayQueue中获取到职分就起来实施,从比如提姆erQueue正是使用DelayQueue达成的。
队列中的Delayed必须兑现compareTo来钦命成分的依次。比如让延时岁月最长的放在队列的末梢。达成代码如下:

    public int compareTo(Delayed other) {
       if (other == this) // compare zero ONLY if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask x = (ScheduledFutureTask)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
   else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) -
                  other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

怎么样促成Delayed接口

咱俩能够参见ScheduledThreadPoolExecutor里ScheduledFutureTask类。那么些类落成了Delayed接口。首先:在目的创设的时候,使用time记录前对象如哪天候能够利用,代码如下:

    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

然后使用getDelay能够查询当前因素还索要延时多长期,代码如下:

    public long getDelay(TimeUnit unit) {
                 return unit.convert(time - now(), TimeUnit.NANOSECONDS);
    }

通过构造函数能够看出延迟时间参数ns的单位是飞秒,本人规划的时候最棒利用阿秒,因为getDelay时可以内定任意单位,壹旦以微秒作为单位,而延时的小运又准确不到皮秒就劳动了。使用时请注意当time小于当前岁月时,getDelay会重回负数。

何以促成延时队列

延时队列的实现很简短,当消费者从队列里获取成分时,假若成分未有直达延时时间,就卡住当前线程。

    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                else if (leader != null)
                    available.await();

                availableProcessors() > 1 ? 64 : 1;

                availableProcessors() > 1 ? 64 : 1;

卡住队列的兑现原理

假如队列是空的,消费者会直接等候,当生产者添欧成分时候,消费者是什么晓稳妥前队列有元素的吗?如果让你来规划阻塞队列你会怎么安顿,让劳动者和消费者能够高功效的进展报纸发表呢?让大家先来看看JDK是咋样落到实处的。

行使布告格局达成。所谓通告情势,正是当生产者往满的队列里添欧成分时会阻塞住生产者,当消费者消费了多少个系列中的成分后,会文告劳动者当前队列可用。通过查阅JDK源码发现ArrayBlockingQueue使用了Condition来贯彻,代码如下:

    private final Condition notFull;
    private final Condition notEmpty;

    public ArrayBlockingQueue(int capacity, boolean fair) {
    //省略其他代码
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

    public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        insert(e);
    } finally {
        lock.unlock();
    }
    }

    public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return extract();
            } finally {
        lock.unlock();
    }
        }

        private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
}

当我们往队列里插入1个因素时,假诺队列不可用,阻塞生产者首要透过LockSupport.park(this);来促成

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)

                    reportInterruptAfterWait(interruptMode);
    }

继承进入源码,发现调用setBlocker先保存下就要阻塞的线程,然后调用unsafe.park阻塞当前线程。

    public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    unsafe.park(false, 0L);
    setBlocker(t, null);
}

unsafe.park是个native方法,代码如下:

park这一个方法会阻塞当前线程,只有以下多种情景中的一种产生时,该情势才会回到。

与park对应的unpark执行或早已进行时。注意:已经进行是指unpark先实施,然后再履行的park。
线程被暂停时。
比方参数中的time不是零,等待了点名的纳秒数时。
爆发格外现象时。这个卓殊事先不恐怕明确。
大家延续看一下JVM是怎么落实park方法的,park在差异的操作系统使用不一致的办法贯彻,在linux下是使用的是系统方法pthread_cond_wait贯彻。完结代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的
os::Platform伊夫nt::park方法,代码如下:

    void os::PlatformEvent::park() {
         int v ;
     for (;;) {
             v = _Event ;
     if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
     }
     guarantee (v >= 0, "invariant") ;
     if (v == 0) {
     // Do this the hard way by blocking ...
     int status = pthread_mutex_lock(_mutex);
     assert_status(status == 0, status, "mutex_lock");
     guarantee (_nParked == 0, "invariant") ;
     ++ _nParked ;
     while (_Event < 0) {
     status = pthread_cond_wait(_cond, _mutex);
     // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
     // Treat this the same as if the wait was interrupted
     if (status == ETIME) { status = EINTR; }
     assert_status(status == 0 || status == EINTR, status, "cond_wait");
     }
     -- _nParked ;

     // In theory we could move the ST of 0 into _Event past the unlock(),
     // but then we'd need a MEMBAR after the ST.
     _Event = 0 ;
     status = pthread_mutex_unlock(_mutex);
     assert_status(status == 0, status, "mutex_unlock");
     }
     guarantee (_Event >= 0, "invariant") ;
     }

 }

pthread_cond_wait是三个二十四线程的标准化变量函数,cond是condition的缩写,字面意思能够知晓为线程在伺机八个条件发出,这几个规格是三个全局变量。这几个艺术接收四个参数,贰个共享变量_cond,二个互斥量_mutex。而unpark方法在linux下是应用pthread_cond_signal达成的。park
在windows下则是行使WaitForSingleObject达成的。

           transient volatile HashEntry<K,V>[]
table;

           transient volatile HashEntry<K,V>[]
table;

三.局地常用集合类的个中贯彻格局

           transient int count;

           transient int count;

           transient int modCount;

           transient int modCount;

           transient int threshold;

           transient int threshold;

           final float loadFactor;

           final float loadFactor;

      }

      }

HashEntry的源码

HashEntry的源码

static final class HashEntry<K,V> { 

static final class HashEntry<K,V> { 

    final K key; 

    final K key; 

    final int hash; 

    final int hash; 

    volatile V value; 

    volatile V value; 

    final HashEntry<K,V> next; 

    final HashEntry<K,V> next; 

在Java 第88中学丢掉了Segment的概念,利用CAS算法做了新措施

在Java 第88中学丢掉了Segment的概念,利用CAS算法做了新点子

CAS算法采纳“数组+链表+红黑树”的艺术贯彻

CAS算法选用“数组+链表+红黑树”的法子实现

                                      ————亓慧杰

                                      ————亓慧杰

网站地图xml地图