<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    badqiu

    XPer
    隨筆 - 46, 文章 - 3, 評論 - 195, 引用 - 0
    數據加載中……

    分布式應用上下文(Distributed ThreadLocal)

    1.問題

    單機應用內,在進程內部,我們可以使用ThreadLocal傳遞應用上下文的方式. 當前的 Spring Secrucity , Spring TransactionManager, Log4J MDC, Struts2 ActionContext等等應用場景隨處可見.

    但在是分布式系統下,由于不是在同一個進程內,所以無法使用ThreadLocal. 那么什么是分布式ThreadLocal呢?就是將一個系統中的ThreadLocal信息可以傳遞至下一個系統,將兩者的調用可以關聯起來。如對應用有一個調用,我們生成一個請求ID (traceId),在后面所有分布式系統調用中,可以通過這個traceId將所有調用關聯起來,這樣查找調用日志都將十分方便.

    2.實現方式

    我們現在使用的通訊協議,一般都包含兩部分:Header,Body. 如 Soap Header,Http Header. 通過自定義Header,可以帶上我們的自定義信息。 然后在服務器端解析Header,再得到自定義信息。那么就可以完成Distributed ThreadLocal的功能。

    如上圖,通過兩個攔截器,client在調用之前,將DistrbiutedThreadLocal中的信息放在soap header中,在服務端方法調用之前,從soap header中取回 DistrbiutedThreadLocal信息。

    3. 實現代碼.

    以下為CXF webservice的實現代碼,一個DistributedThreadLocal及增加了兩個攔截器. hessian 也可以自定義Header,完成傳遞.

    DistributedThreadLocal

    /**
     * 分布式 ThreadLocal, 存放在ThreadLocal中的數據可以傳輸至另外一臺機器上
     * 
    @author badqiu
     
    */
    public class DistributedThreadLocal {
        
    public static String DISTRIBUTED_THREAD_LOCAL_KEY_PREFIX = "tl_";
        
        
    public static ThreadLocal<Map<String, String>> threadLocal = new ThreadLocal<Map<String, String>>();

        
    public static void putAll(Map<String, String> map) {
            getMap().putAll(map);
        }
        
        
    public static void put(String key, String value) {
            getMap().put(key, value);
        }

        
    public static String get(String key) {
            Map
    <String, String> map = threadLocal.get();
            
    if (map == null)
                
    return null;
            
    return (String) map.get(key);
        }

        
    public static Map<String, String> getMap() {
            Map
    <String, String> map = threadLocal.get();
            
    if (map == null) {
                map 
    = new HashMap();
                threadLocal.set(map);
            }
            
    return map;
        }

        
    public static void clear() {
            threadLocal.set(
    null);
        }

    }

    DistributedThreadLocalInSOAPHeaderInterceptor

    /**
     * 輸入(In)攔截器,用于從 WebService SOAP 的Header中取回DistributedThreadLocal中的信息,并存放在DistributedThreadLocal中
     * 
     * 
    @author badqiu
     
    */
    public class DistributedThreadLocalInSOAPHeaderInterceptor extends AbstractSoapInterceptor {
        
        
    private SAAJInInterceptor saajIn = new SAAJInInterceptor();  
        
        
    public DistributedThreadLocalInSOAPHeaderInterceptor() {  
            
    super(Phase.PRE_PROTOCOL);  
            getAfter().add(SAAJInInterceptor.
    class.getName());  
        }  

        
    public void handleMessage(SoapMessage message) throws Fault {
            SOAPMessage doc 
    = message.getContent(SOAPMessage.class);  
            
    if (doc == null) {  
                saajIn.handleMessage(message);  
                doc 
    = message.getContent(SOAPMessage.class);  
            }  
            
            Map
    <String,String> headers = toHeadersMap(doc);  
            DistributedThreadLocal.putAll(headers);
            
        }

        
    private Map toHeadersMap(SOAPMessage doc) {
            SOAPHeader header 
    = getSOAPHeader(doc);  
            
    if (header == null) {  
                
    return new HashMap(0);  
            } 
            
            Map
    <String,String> headersMap = new HashMap();
            NodeList nodes 
    = header.getChildNodes();
            
    for(int i=0; i<nodes.getLength(); i++) {  
                Node item 
    = nodes.item(i);
                
    if(item.hasChildNodes()) {
                    headersMap.put(item.getLocalName(), item.getFirstChild().getNodeValue());
                }
            }
            
    return headersMap;
        }

        
    private SOAPHeader getSOAPHeader(SOAPMessage doc) {
            SOAPHeader header;
            
    try {
                header 
    = doc.getSOAPHeader();
            } 
    catch (SOAPException e) {
                
    throw new RuntimeException(e);
            }
            
    return header;
        }

    }

    DistributedThreadLocalOutSOAPHeaderInterceptor

    /**
     * 輸出(Out)攔截器,用于將DistributedThreadLocal中的信息存放在 WebService SOAP 的Header中
     * 
     * 
    @author badqiu
     
    */
    public class DistributedThreadLocalOutSOAPHeaderInterceptor extends AbstractSoapInterceptor {
        
        
    public DistributedThreadLocalOutSOAPHeaderInterceptor() {
            
    super(Phase.WRITE);
        }

        
    public void handleMessage(SoapMessage message) throws Fault {
            
            List
    <Header> headers = message.getHeaders();
            Map
    <String,String> threadlocalMap = DistributedThreadLocal.getMap();
            
            
    for(Map.Entry<String, String> entry : threadlocalMap.entrySet()) {
                headers.add(getHeader(entry.getKey(), entry.getValue()));
            }
        }

        
    private Header getHeader(String key, String value) {
            QName qName 
    = new QName(key);
            Document document 
    = DOMUtils.createDocument();
            Element element 
    = document.createElement(key);
            element.appendChild(document.createTextNode(value));
            SoapHeader header 
    = new SoapHeader(qName, element);
            
    return (header);
        }
    }

    CXF spring配置文件:

    server端:

    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:jaxws
    ="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core"
        xsi:schemaLocation
    ="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
        
    default-lazy-init="true">

        
    <description>Apache CXF的Web Service配置</description>

        
    <import resource="classpath:META-INF/cxf/cxf.xml" />
        
    <import resource="classpath:META-INF/cxf/cxf-servlet.xml" />
        
    <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />

        
    <!-- jax-ws endpoint定義  -->
        
    <jaxws:endpoint address="/hello" >
            
    <jaxws:implementor ref="hello" />
            
    <jaxws:inInterceptors>
                
    <bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdInSOAPHeaderInterceptor"/>
            
    </jaxws:inInterceptors>
        
    </jaxws:endpoint>
        
        
    <!-- WebService的實現Bean定義 -->
        
    <bean id="hello" class="cn.org.rapid_framework.hessian.HessianTest.HelloImpl" />
    </beans>

    client端:

    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:jaxws
    ="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core"
        xsi:schemaLocation
    ="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
        
    default-lazy-init="true">
        
    <description>Apache CXF Web Service Client端配置</description>

        
    <jaxws:client id="hello" serviceClass="cn.org.rapid_framework.hessian.HessianTest.Hello"
            address
    ="http://localhost:8080/service/hello" >
            
    <jaxws:outInterceptors>
                
    <bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdOutSOAPHeaderInterceptor"/>
            
    </jaxws:outInterceptors>
        
    </jaxws:client>

    </beans>

    4. 應用場景.

    通過分布式應用上下文,暫時想到的幾個應用場景.

    1. Log4j MDC traceId傳遞. 通過一個traceId,將所有相關的 操作所有的日志信息關聯起來。

    2. sessionId 傳遞, 讓我們的應用也有狀態,可以使用session什么的

    3. Security(username,password)傳遞. 在需要安全調用的地方,避免污染接口,需要顯式的在接口傳遞username,password. 相對應的 WSSecurity也可以走這個通道

    分布式應用上下文的概念,全球首創,歡迎轉載(因為google 搜索不到相關文章,或許早已經有相同的概念了,歡迎提醒我)。

    posted on 2011-01-04 19:56 badqiu 閱讀(2299) 評論(3)  編輯  收藏

    評論

    # re: 分布式應用上下文(Distributed ThreadLocal)[未登錄]  回復  更多評論   

    就是報文傳輸過來,待了threadlocal的信息,然后攔截,再寫入當前threadlocal中,其實很多分布式的概率類似,就是信息共享,但是樓主就是將threadlocal信息共享?可以這樣說?

    恩,攔截器代入threadlocal,思路不錯,有點像servlet用filter處理threadloacal,這里利用了webservice實現了分布式,然后攔截器有點filter的意思。
    2011-01-05 11:24 | garfield

    # re: 分布式應用上下文(Distributed ThreadLocal)  回復  更多評論   

    @garfield

    我另外一個 hassian的實現就是如你所有通過一個 Filter來實現的。
    主要是對應用來說,可以透明的傳遞應用上下文。
    2011-01-05 11:37 | badqiu

    # re: 分布式應用上下文(Distributed ThreadLocal)[未登錄]  回復  更多評論   

    @badqiu
    恩,這樣主要是保證了開發兩端的可以一致的使用,保證來開發的一致性,省去了很多不必要的東西
    2011-01-05 15:35 | garfield

    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 四虎影视永久免费观看| 四虎永久在线精品免费观看视频| 免费国产在线观看老王影院| 亚洲人成77777在线观看网| 18禁止看的免费污网站| 成全视频在线观看免费| 四虎在线免费播放| 亚洲日韩AV一区二区三区中文| 在线看免费观看AV深夜影院| 亚洲真人无码永久在线| 中文日本免费高清| 78成人精品电影在线播放日韩精品电影一区亚洲| 成人免费一区二区三区| 亚洲av永久无码精品漫画 | 国产亚洲精品美女久久久| 一级有奶水毛片免费看| 亚洲va无码va在线va天堂| 久久久免费精品re6| 亚洲日本久久久午夜精品| 免费观看美女裸体网站| 成人免费观看男女羞羞视频| 超清首页国产亚洲丝袜| 久久久久免费精品国产小说| 亚洲精品视频免费看| 午夜一级免费视频| 亚欧国产一级在线免费| 亚洲AV日韩精品久久久久久| 亚洲丶国产丶欧美一区二区三区| 日韩在线不卡免费视频一区| 亚洲1区1区3区4区产品乱码芒果| 精精国产www视频在线观看免费| 国产精品亚洲片在线| 美女视频黄是免费的网址| 老司机福利在线免费观看| 日韩精品免费电影| 亚洲精品乱码久久久久久V | 成年女人免费视频播放77777| 国产精品亚洲精品日韩电影| 亚洲国产精品不卡在线电影| 免费观看美女裸体网站| 久久免费公开视频|