1.問題
單機應用內,在進程內部,我們可以使用ThreadLocal傳遞應用上下文的方式. 當前的 Spring Secrucity , Spring TransactionManager, Log4J MDC, Struts2 ActionContext等等應用場景隨處可見.
但在是分布式系統下,由于不是在同一個進程內,所以無法使用ThreadLocal. 那么什么是分布式ThreadLocal呢?就是將一個系統中的ThreadLocal信息可以傳遞至下一個系統,將兩者的調用可以關聯起來。如對應用有一個調用,我們生成一個請求ID (traceId),在后面所有分布式系統調用中,可以通過這個traceId將所有調用關聯起來,這樣查找調用日志都將十分方便.
2.實現方式
我們現在使用的通訊協議,一般都包含兩部分:Header,Body. 如 Soap Header,Http Header. 通過自定義Header,可以帶上我們的自定義信息。 然后在服務器端解析Header,再得到自定義信息。那么就可以完成Distributed ThreadLocal的功能。

如上圖,通過兩個攔截器,client在調用之前,將DistrbiutedThreadLocal中的信息放在soap header中,在服務端方法調用之前,從soap header中取回 DistrbiutedThreadLocal信息。
3. 實現代碼.
以下為CXF webservice的實現代碼,一個DistributedThreadLocal及增加了兩個攔截器. hessian 也可以自定義Header,完成傳遞.
DistributedThreadLocal
/**
* 分布式 ThreadLocal, 存放在ThreadLocal中的數據可以傳輸至另外一臺機器上
* @author badqiu
*/
public class DistributedThreadLocal {
public static String DISTRIBUTED_THREAD_LOCAL_KEY_PREFIX = "tl_";
public static ThreadLocal<Map<String, String>> threadLocal = new ThreadLocal<Map<String, String>>();
public static void putAll(Map<String, String> map) {
getMap().putAll(map);
}
public static void put(String key, String value) {
getMap().put(key, value);
}
public static String get(String key) {
Map<String, String> map = threadLocal.get();
if (map == null)
return null;
return (String) map.get(key);
}
public static Map<String, String> getMap() {
Map<String, String> map = threadLocal.get();
if (map == null) {
map = new HashMap();
threadLocal.set(map);
}
return map;
}
public static void clear() {
threadLocal.set(null);
}
}
DistributedThreadLocalInSOAPHeaderInterceptor
/**
* 輸入(In)攔截器,用于從 WebService SOAP 的Header中取回DistributedThreadLocal中的信息,并存放在DistributedThreadLocal中
*
* @author badqiu
*/
public class DistributedThreadLocalInSOAPHeaderInterceptor extends AbstractSoapInterceptor {
private SAAJInInterceptor saajIn = new SAAJInInterceptor();
public DistributedThreadLocalInSOAPHeaderInterceptor() {
super(Phase.PRE_PROTOCOL);
getAfter().add(SAAJInInterceptor.class.getName());
}
public void handleMessage(SoapMessage message) throws Fault {
SOAPMessage doc = message.getContent(SOAPMessage.class);
if (doc == null) {
saajIn.handleMessage(message);
doc = message.getContent(SOAPMessage.class);
}
Map<String,String> headers = toHeadersMap(doc);
DistributedThreadLocal.putAll(headers);
}
private Map toHeadersMap(SOAPMessage doc) {
SOAPHeader header = getSOAPHeader(doc);
if (header == null) {
return new HashMap(0);
}
Map<String,String> headersMap = new HashMap();
NodeList nodes = header.getChildNodes();
for(int i=0; i<nodes.getLength(); i++) {
Node item = nodes.item(i);
if(item.hasChildNodes()) {
headersMap.put(item.getLocalName(), item.getFirstChild().getNodeValue());
}
}
return headersMap;
}
private SOAPHeader getSOAPHeader(SOAPMessage doc) {
SOAPHeader header;
try {
header = doc.getSOAPHeader();
} catch (SOAPException e) {
throw new RuntimeException(e);
}
return header;
}
}
DistributedThreadLocalOutSOAPHeaderInterceptor
/**
* 輸出(Out)攔截器,用于將DistributedThreadLocal中的信息存放在 WebService SOAP 的Header中
*
* @author badqiu
*/
public class DistributedThreadLocalOutSOAPHeaderInterceptor extends AbstractSoapInterceptor {
public DistributedThreadLocalOutSOAPHeaderInterceptor() {
super(Phase.WRITE);
}
public void handleMessage(SoapMessage message) throws Fault {
List<Header> headers = message.getHeaders();
Map<String,String> threadlocalMap = DistributedThreadLocal.getMap();
for(Map.Entry<String, String> entry : threadlocalMap.entrySet()) {
headers.add(getHeader(entry.getKey(), entry.getValue()));
}
}
private Header getHeader(String key, String value) {
QName qName = new QName(key);
Document document = DOMUtils.createDocument();
Element element = document.createElement(key);
element.appendChild(document.createTextNode(value));
SoapHeader header = new SoapHeader(qName, element);
return (header);
}
}
CXF spring配置文件:
server端:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core"
xsi:schemaLocation="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
default-lazy-init="true">
<description>Apache CXF的Web Service配置</description>
<import resource="classpath:META-INF/cxf/cxf.xml" />
<import resource="classpath:META-INF/cxf/cxf-servlet.xml" />
<import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />
<!-- jax-ws endpoint定義 -->
<jaxws:endpoint address="/hello" >
<jaxws:implementor ref="hello" />
<jaxws:inInterceptors>
<bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdInSOAPHeaderInterceptor"/>
</jaxws:inInterceptors>
</jaxws:endpoint>
<!-- WebService的實現Bean定義 -->
<bean id="hello" class="cn.org.rapid_framework.hessian.HessianTest.HelloImpl" />
</beans>
client端:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core"
xsi:schemaLocation="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
default-lazy-init="true">
<description>Apache CXF Web Service Client端配置</description>
<jaxws:client id="hello" serviceClass="cn.org.rapid_framework.hessian.HessianTest.Hello"
address="http://localhost:8080/service/hello" >
<jaxws:outInterceptors>
<bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdOutSOAPHeaderInterceptor"/>
</jaxws:outInterceptors>
</jaxws:client>
</beans>
4. 應用場景.
通過分布式應用上下文,暫時想到的幾個應用場景.
1. Log4j MDC traceId傳遞. 通過一個traceId,將所有相關的 操作所有的日志信息關聯起來。
2. sessionId 傳遞, 讓我們的應用也有狀態,可以使用session什么的
3. Security(username,password)傳遞. 在需要安全調用的地方,避免污染接口,需要顯式的在接口傳遞username,password. 相對應的 WSSecurity也可以走這個通道
分布式應用上下文的概念,全球首創,歡迎轉載(因為google 搜索不到相關文章,或許早已經有相同的概念了,歡迎提醒我)。