`

源码剖析之ThreadLocal

阅读更多
背景:1、如果一个对象中有成员,当通过调用对象的方法操作(修改、查询等)成员时,如果没有加锁或者同步访问,那么可能会存在线程安全的问题。
2、但是有时候又需要定义某些成员变量,来方便多个方法间共享对象数据的访问。以避免在方法间传递大量的参数,ThreadLocal 就提供了这样的效果。

解决问题1的方案:我们解决此问题的一个常用手段是加锁,然而加锁会造成程序伸缩性的降低,在高并发的激烈竞争锁资源,依然可能会成为性能的瓶颈。

解决问题2的方案:jdk 1.2 开始提供的ThreadLocal 类,为此类问题提供了较好的解决方案,他实现了为每一个线程提供一个本地对象的功能,这样就保证了线程间不会有数据的共享,那自然也是线程安全的。从测试的效果来看,当线程的并发量越高,其优越性越明显与锁的机制。(注意:锁机制是为了线程间的共享安全而做同步访问,ThreadLocal 提供线程的内本地变量,不共享的,从这个意义上讲,二者的比较是没有意义的,场景完全不同


遵循习惯:ThreadLocal 实例通常是类中的 private static 字段,它们希望将状态与某一个线程(例如,用户 ID 或事务 ID)相关联。

内部的数据结构:
关键事理解ThreadLocalMap ,Thread ,ThreadLocal 之间的关系。


实现中心思路:把 ThreadLocal作为key ,(get,set)的变量作为值 保存在不同Thread线程的ThreadLocalMap 中,也即 ThreadLocal 在多个线程中使用,但是ThreadLocal 变量本身是不变的。

set流程:当在Thread调用 ThreadLocal.set()的时候,获取当前线程Thread.currentThread(),然后找到 Thread.threadLocals ,然后调用 map.set(this, value) 。

get流程:当在Thread调用 ThreadLocal.get() 的时候,ThreadLocal  会调用 ThreadLocal.initialValue 方法,然后调用Thread.threadLocals 变量,ThreadLocalMap ,然后 ThreadLocalMap.getEntry 方法。

注意:每个线程都保持对其线程局部变量副本的隐式引用,只要线程是活动的并且 ThreadLocal 实例是可访问的;在线程消失之后,其线程局部实例的所有副本都会被垃圾回收(除非存在对这些副本的其他引用)。


被废弃了的ThreadLocal所绑定对象的引用,会在以下4情况被清理:
1、 Thread结束时。
2、当Thread的ThreadLocalMap的threshold超过最大值时。
3、向Thread的ThreadLocalMap中存放一个ThreadLocal,hash算法没有命中既有Entry,而需要新建一个Entry时。
4、手工通过ThreadLocal的remove()方法或set(null)。



特别注意事项:ThreadLocal的使用在Tomcat的服务环境下要注意,并非每次web请求时候程序运行的ThreadLocal都是唯一的..ThreadLocal的绳命周期不等于一次Request的绳命周期..ThreadLocal与线程对象紧密绑定的,由于Tomcat使用了线程池,线程是可能存在复用情况...
1.ThreadLocal可以用于存放与请求无关对象,不能用来传递参数...
2.在所有使用线程池的地方都是如此
源码分析如下:


public class ThreadLocal<T> {
	
	/** 当前ThreadLocal 的hashcode值,在ThreadLocalMap 用作hash值
	作为ThreadLocal实例的变量只有 threadLocalHashCode 这一个
	*/
	private final int threadLocalHashCode = nextHashCode();

	/** 静态的成员,初始化值为0,线程安全的!!*/
private static AtomicInteger nextHashCode = 
	new AtomicInteger();

/** 
 下一个hashCode
*/
 private static int nextHashCode() {
	return nextHashCode.getAndAdd(HASH_INCREMENT); 
    }
    
/** 
  返回thread-local 变量的当前线程的副本的值。如果变量没有用于当前线程的值,则先将其初始化为调用 initialValue() 方法返回的值。
*/
 public T get() {
        Thread t = Thread.currentThread(); //获取当前线程
        ThreadLocalMap map = getMap(t);// 获取和当前线程绑定的所有thread-local 变量
        if (map != null) { //如果已经存在当前线程的ThreadLocalMap
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) //返回当前对象。
                return (T)e.value;
        }
        return setInitialValue();
    }

/** 
  返回当前线程的threadLocals Map对象。
*/
ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }

/** 
 设置初始化值
*/
  private T setInitialValue() {
        T value = initialValue(); //调用初始化的值
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) //如果map!= null,那么设置this->value.(注意:这种情况,肯定是thread已经绑定了其他的threadLocal变量)
            map.set(this, value);
        else // 当前线程遇到的第一个thread-local
            createMap(t, value);
        return value;
    }
/**
初始化当前thread-local 的初始值。注意:此方法一般在定义TheadLocal对象时,需要重写!
*/
  protected T initialValue() {
        return null;
    }
    
   /**
   根据currentThread 和 fistValue 创建 当前线程的 ThreadLocalMap
   */ 
    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }
    
    /** 为当前thread-local 设置值*/
     public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) //同setInitialValue
            map.set(this, value);
        else
            createMap(t, value); //同setInitialValue
    }
    
    /** 
     移除此线程局部变量当前线程的值。如果此线程局部变量随后被当前线程读取,且这期间当前线程没有设置其值,
     则将调用其 initialValue() 方法重新初始化其值。这将导致在当前线程多次调用 initialValue 方法。 
    */
     public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
     }
    
}


另外ThreadLocalMap 的源码分析如下:
注意:
ThreadLocalMap 的内部实现和 HashMap 完全不同,底层依赖为Entry数组,但是如果散列冲突,Entry是没有下一个链的,而是扫描当前index+1的位置是否有数据,如果没有那么放进去!!!

static class ThreadLocalMap {
	

        /**
         Entry:代表 ThreadLocalMap 中的每一项
         继承WeakReference ,使用 ThreadLocal 作为ref field引用。如果entry.get() == null 可以认定ThreadLocal 没有引用了
         也意味着 entry可以从table中 擦出了。
         */
        static class Entry extends WeakReference<ThreadLocal> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal k, Object v) {
                super(k); //这句话至关重要,指明了ThreadLocal作为弱引用的值
                value = v;
            }
        }

        /**初始化的容量
         */
        private static final int INITIAL_CAPACITY = 16;

        /**
         * map中数据实体的承载数组         */
        private Entry[] table;

        /** 已经存在的数据量
         */
        private int size = 0;

        /** 下一次resize的 size
         */
        private int threshold; // Default to 0

        /**
         * 设置threshold 负载因子2/3
         */
        private void setThreshold(int len) {
            threshold = len * 2 / 3;
        }

        /**
         * 轮询下一个
         */
        private static int nextIndex(int i, int len) {
            return ((i + 1 < len) ? i + 1 : 0);
        }

        /**
         * 轮询上一个
         */
        private static int prevIndex(int i, int len) {
            return ((i - 1 >= 0) ? i - 1 : len - 1);
        }

        /**
         * 初始化 ThreadLocalMap
         */
        ThreadLocalMap(ThreadLocal firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY]; //生成数组
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); // 找到位置
            table[i] = new Entry(firstKey, firstValue); //赋值对象
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }

        /**
         * 通过ThreadLocalMap 构造
         */
        private ThreadLocalMap(ThreadLocalMap parentMap) {
            Entry[] parentTable = parentMap.table;
            int len = parentTable.length;
            setThreshold(len);
            table = new Entry[len];

            for (int j = 0; j < len; j++) {
                Entry e = parentTable[j];
                if (e != null) {
                    ThreadLocal key = e.get(); //如果key == null 那么可以认为当前e已经没有对象引用了
                    if (key != null) {
                        Object value = key.childValue(e.value);
                        Entry c = new Entry(key, value);
                        int h = key.threadLocalHashCode & (len - 1); //使用threadLocalHashCode 昨晚hash值,来指定位置
                        while (table[h] != null) //如果table[h] != null,可以认为已经放的有值了,那么需要找下一个位置的值。
                            h = nextIndex(h, len);
                        table[h] = c;
                        size++;
                    }
                }
            }
        }

        /**  
          获取key 对应的Entry值,也是ThreadLocal.get() 方法的实现
        */
        private Entry getEntry(ThreadLocal key) {
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            if (e != null && e.get() == key) //如果能直接定位
                return e;
            else //如果不能直接定位,那么需要找从e开始的下一个值
                return getEntryAfterMiss(key, i, e);
        }

        /**
         * 用来找未能直接定位的值
         *
         * @param  key the thread local object
         * @param  在table中的位置
         * @param  e the entry at table[i]
         */
        private Entry getEntryAfterMiss(ThreadLocal key, int i, Entry e) {
            Entry[] tab = table;
            int len = tab.length;

            while (e != null) { //如果e == null, 那么可以认定没有值
                ThreadLocal k = e.get();
                if (k == key) //如果e.get() == key ,可以返回Entry
                    return e;
                if (k == null) //如果k == null,可能已经没有地方引用到了。需要清理!!!!
                    expungeStaleEntry(i);
                else
                    i = nextIndex(i, len); //寻找下一个位置
                e = tab[i];
            }
            return null;
        }

        /**
         * 对key 设置 value 值
         */
        private void set(ThreadLocal key, Object value) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);

            for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
                ThreadLocal k = e.get();
                if (k == key) { //如果找到了,直接设置 value 值即可
                        e.value = value;
                    return;
                }
                if (k == null) { //可以认定 k 已经没有引用了。
                        replaceStaleEntry(key, value, i); //e != null && key == null,那么就重用!
                    return;
                }
            }
//最后的选择就是新增Entry项
            tab[i] = new Entry(key, value);
            int sz = ++size;
//如果清理过期的slot失败了  并且sz>= threadhold,进行rehash !(注意:是新增后检测,如果没有多余的了,那么就先rehash)
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }

        /**
         *  根据key删除一项Entry
         */
        private void remove(ThreadLocal key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            for (Entry e = tab[i];
		 e != null;
		 e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) { //如果找到
                    e.clear(); //清理 。这个很重要!!!:	this.referent = null; 之后,那么e.get() 会返回null
                    expungeStaleEntry(i); //清除当前数据后,还要处理后续数据的rehash操作!!
                    return;
                }
            }
        }

        /**
         * 替换一个过期的entry项,在对特定key操作期间 ,value 会保存在entry中,对key 而言,无论an entry 是否存在
         *
         *副作用,是会擦出所以过期的entry项
         *
         */
        private void replaceStaleEntry(ThreadLocal key, Object value, int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
            Entry e;

            int slotToExpunge = staleSlot; //待过期的slot位置
            for (int i = prevIndex(staleSlot, len);
		             (e = tab[i]) != null;
                 i = prevIndex(i, len))
                if (e.get() == null) // 向前找,知道找到e.get() == null的为止
                    slotToExpunge = i;

            // Find either the key or trailing null slot of run, whichever
            // occurs first
            for (int i = nextIndex(staleSlot, len);
		            (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal k = e.get();

                // If we find key, then we need to swap it
                // with the stale entry to maintain hash table order.
                // The newly stale slot, or any other stale slot
                // encountered above it, can then be sent to expungeStaleEntry
                // to remove or rehash all of the other entries in run.
                if (k == key) {
                    e.value = value;

                    tab[i] = tab[staleSlot];
                    tab[staleSlot] = e;

                    // Start expunge at preceding stale entry if it exists
                    if (slotToExpunge == staleSlot)
                        slotToExpunge = i;
                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                    return;
                }

                // If we didn't find stale entry on backward scan, the
                // first stale entry seen while scanning for key is the
                // first still present in the run.
                if (k == null && slotToExpunge == staleSlot)
                    slotToExpunge = i;
            }

            // If key not found, put new entry in stale slot
            tab[staleSlot].value = null;   
            tab[staleSlot] = new Entry(key, value);

            // If there are any other stale entries in run, expunge them
            if (slotToExpunge != staleSlot)
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
        }

        /** 擦除一个过期的entry,通过rehash在staleSlot 和next null slot 之间任何可能冲突的entrys
            同样也会擦出任何其他 在找到下一个null entry知道遇到的 过期的entry 
         * 
         */
        private int expungeStaleEntry(int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
            tab[staleSlot].value = null;  //擦除在staleSlot位置的value
            tab[staleSlot] = null;
            size--; //数量减一

            // rehash直到遇到下一个null的值
            Entry e;
            int i;
            for (i = nextIndex(staleSlot, len); //找到staleSlot 对应的(可能是)同hash值的 下一个位置
            (e = tab[i]) != null;   //不为null就继续,知道遇到null终止
            i = nextIndex(i, len)) { //从i开始下一个元素
                ThreadLocal k = e.get();
                if (k == null) { //如果k = null,说明当前 e 对应的 ThreadLocal 已经没有引用
                    e.value = null;
                    tab[i] = null;
                    size--;
                } else {
                    int h = k.threadLocalHashCode & (len - 1); //找到位置k.threadLocalHashCode 理论应该在的位置
                    if (h != i) { //如果h !=i ,那么需要把当前元素放到h应该在的位置上去! 如果h == i,那么可以认定不用动了,e在它应该在的位置了!(理解此处的逻辑非常关键)
                        tab[i] = null; 
                        while (tab[h] != null) //从h开始一直向下走,知道寻找到null
                            h = nextIndex(h, len);
                        tab[h] = e; //把e放在table[h]的位置或者h散列后的第一个null位置!!
                    }
                }
            }
            return i; //第一个不为null的位置。
        }

        /**
         * 清空一些过期的数据。考虑到速度和垃圾回收的效率,做>>>1的一个折中方案!
         */
        private boolean cleanSomeSlots(int i, int n) {
            boolean removed = false;
            Entry[] tab = table;
            int len = tab.length;
            do {
                i = nextIndex(i, len); //下一个位置
                Entry e = tab[i];
                if (e != null && e.get() == null) {
                    n = len;//数组的长度
                    removed = true;
                    i = expungeStaleEntry(i); //清除在i位置过期的entry项
                }
            } while ( (n >>>= 1) != 0);
            return removed;
        }

        /**
         * rehash:先扫描过期的进行清除,清除后还不够就resize() 数据
         */
        private void rehash() {
            expungeStaleEntries(); //先清除过期的值

            // Use lower threshold for doubling to avoid hysteresis
            if (size >= threshold - threshold / 4)
                resize(); 
        }

        /** 扩增resize
         */
        private void resize() {
            Entry[] oldTab = table;
            int oldLen = oldTab.length;
            int newLen = oldLen * 2;
            Entry[] newTab = new Entry[newLen]; //两倍进行扩容
            int count = 0;

            for (int j = 0; j < oldLen; ++j) {
                Entry e = oldTab[j];
                if (e != null) { //e == null 说明此处没有元素
                    ThreadLocal k = e.get();
                    if (k == null) { //如果ThreadLocal已经没有引用了,那么e.value = null,让GC回收。
                        e.value = null; // Help the GC
                    } else {
                        int h = k.threadLocalHashCode & (newLen - 1); //找到k应该对应的位置
                        while (newTab[h] != null) //循环一直找到没有被占用的那个slot!!
                            h = nextIndex(h, newLen);
                        newTab[h] = e; //放到空的h位置
                        count++;
                    }
                }
            }

            setThreshold(newLen);
            size = count;
            table = newTab;
        }

        /**
         * 擦出所以在table中过期的entry
         */
        private void expungeStaleEntries() {
            Entry[] tab = table;
            int len = tab.length;
            for (int j = 0; j < len; j++) {
                Entry e = tab[j];
                if (e != null && e.get() == null) //此处:e不为null,并且g.get() == null 意味着是当前e是过期的。所以进行expungeStaleEntry操作
                    expungeStaleEntry(j);
            }
        }
    
	
	}

  • 大小: 24.7 KB
分享到:
评论

相关推荐

    Flask核心机制之上下文源码剖析

    当然学习flask如果不了解其中的处理流程,可能在很多问题上不能得到解决,当然我在写本篇文章之前也看到了很多博文有关于对flask上下文管理的剖析都非常到位,当然为了学习flask我也把对flask上下文理解写下来供自己...

    Java并发编程原理与实战

    线程之间通信之join应用与实现原理剖析.mp4 ThreadLocal 使用及实现原理.mp4 并发工具类CountDownLatch详解.mp4 并发工具类CyclicBarrier 详解.mp4 并发工具类Semaphore详解.mp4 并发工具类Exchanger详解.mp4 ...

    龙果 java并发编程原理实战

    第35节线程之间通信之join应用与实现原理剖析00:10:17分钟 | 第36节ThreadLocal 使用及实现原理00:17:41分钟 | 第37节并发工具类CountDownLatch详解00:22:04分钟 | 第38节并发工具类CyclicBarrier 详解00:11:52...

    Java 并发编程原理与实战视频

    第35节线程之间通信之join应用与实现原理剖析00:10:17分钟 | 第36节ThreadLocal 使用及实现原理00:17:41分钟 | 第37节并发工具类CountDownLatch详解00:22:04分钟 | 第38节并发工具类CyclicBarrier 详解00:11:52...

    龙果java并发编程完整视频

    第35节线程之间通信之join应用与实现原理剖析00:10:17分钟 | 第36节ThreadLocal 使用及实现原理00:17:41分钟 | 第37节并发工具类CountDownLatch详解00:22:04分钟 | 第38节并发工具类CyclicBarrier 详解00:11:52...

    java并发编程

    第35节线程之间通信之join应用与实现原理剖析00:10:17分钟 | 第36节ThreadLocal 使用及实现原理00:17:41分钟 | 第37节并发工具类CountDownLatch详解00:22:04分钟 | 第38节并发工具类CyclicBarrier 详解00:11:52...

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第一阶段04讲、线程生命周期以及start方法源码剖析.mp4 │ 高并发编程第一阶段05讲、采用多线程方式模拟银行排队叫号.mp4 │ 高并发编程第一阶段06讲、用Runnable接口将线程的逻辑执行单元从控制中...

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第一阶段04讲、线程生命周期以及start方法源码剖析.mp4 │ 高并发编程第一阶段05讲、采用多线程方式模拟银行排队叫号.mp4 │ 高并发编程第一阶段06讲、用Runnable接口将线程的逻辑执行单元从控制中...

Global site tag (gtag.js) - Google Analytics