背景
應(yīng)用項(xiàng)目組每個(gè)小時(shí)會(huì)定時(shí)的run一個(gè)存儲(chǔ)過(guò)程進(jìn)行結(jié)算,每次執(zhí)行的時(shí)間也許會(huì)超過(guò)一個(gè)小時(shí),而且需要絕對(duì)保證存儲(chǔ)過(guò)程的串行執(zhí)行。因?yàn)槭褂脙?nèi)存鎖不能絕對(duì)保證兩個(gè)存儲(chǔ)過(guò)程的串行執(zhí)行,因?yàn)閼?yīng)用服務(wù)器down掉重啟后可能會(huì)出現(xiàn)并發(fā)執(zhí)行的情況,因?yàn)橄惹暗拇鎯?chǔ)過(guò)程還在db中運(yùn)行。我們是使用LTS,對(duì)quartz進(jìn)行了封裝來(lái)做任務(wù)調(diào)度的。我們決定鎖的管理操作由framework來(lái)實(shí)現(xiàn)。原因是:
l 鎖管理器可以做成通用的模塊
l 申請(qǐng)鎖,釋放鎖是比較危險(xiǎn)的操作,擔(dān)心業(yè)務(wù)開(kāi)發(fā)人員由于遺忘導(dǎo)致死鎖或者并發(fā)問(wèn)題
l 可以很好的集成到我們現(xiàn)有的framework中,方便地開(kāi)放給業(yè)務(wù)開(kāi)發(fā)人員使用
注意:我們極其不推薦使用悲觀離線鎖,如果沖突出現(xiàn)的概率比較少,可以用其他方法比如樂(lè)觀離線鎖,DB Constraint再通過(guò)補(bǔ)償操作能解決的問(wèn)題,請(qǐng)不要使用悲觀離線鎖。
原理
PLSQL UL LOCK是oracle提供出來(lái)給開(kāi)發(fā)人員使用的鎖資源,功能和DML鎖是類似的,當(dāng)然我們可以通過(guò)DML鎖來(lái)完成并發(fā)控制,select…for update或者自己維護(hù)一張鎖表,考慮到實(shí)現(xiàn)代價(jià),我們打算使用PLSQL UL LOCK。而且Oracle保證了session釋放時(shí),UL lock都會(huì)被釋放。
但是用它時(shí),需要注意到它的DBMS_LOCK.Unique函數(shù)它每次都會(huì)commit數(shù)據(jù)。如果是在分布式事務(wù)當(dāng)中,會(huì)拋出事務(wù)已提交的異常。因?yàn)槲覀兪褂玫氖?/span>XA resource并且transaction level是global的,也就是JTA。為了使得鎖的申請(qǐng)和釋放不影響分布式業(yè)務(wù)事務(wù),或者我們是使用非xa的resource和local的transaction來(lái)完成鎖操作,或者也可以暫停已有事務(wù),等鎖操作完成后resume暫停的分布式事務(wù)??紤]到重用已有的xa resource我們打算使用后一種方法,其實(shí)這種方法我們也會(huì)經(jīng)常使用,暫停分布式事務(wù)做DDL操作,再釋放事務(wù)。
實(shí)現(xiàn)方法:
l 封裝DBMS_LOCK包中的幾個(gè)存儲(chǔ)過(guò)程為我們所用
l Java端提供一個(gè)基于PLSQL UL鎖的管理器
l Java端定義好申請(qǐng)鎖,業(yè)務(wù)操作,釋放鎖的使用流程,作為一個(gè)模板
DB存儲(chǔ)過(guò)程:
對(duì)DBMS_LOCK做了簡(jiǎn)單的封裝,避免直接調(diào)用DBMS_LOCK。這樣做的好處是:
l 和Oracle解耦,如果其他數(shù)據(jù)庫(kù)可以提供類似的功能,我們也可以用同名的存儲(chǔ)過(guò)程實(shí)現(xiàn)
l 方便以后對(duì)存儲(chǔ)過(guò)程重構(gòu),升級(jí)
l 我們需要對(duì)DBMS_LOCK進(jìn)行簡(jiǎn)單的封裝,因?yàn)?/span>DBMS_LOCK.Unique獲取lockhandle oracle中鎖的唯一標(biāo)識(shí),輸入是lockname,邏輯名,輸出是鎖的唯一標(biāo)識(shí),對(duì)java端應(yīng)該是透明的,java端應(yīng)該只關(guān)心鎖的邏輯名。
create or replace package body frm_lts_processor_lock_pkg is
/* table to store lockhandles by name */
TYPE handle_tbltype IS TABLE OF varchar2(128)
INDEX BY varchar2(128);
v_lockhandle_tbl handle_tbltype;
procedure frm_lts_lock_acquire(i_lock_name in varchar2, i_expiration_time in Integer default 864000, i_wait_time in Integer default DBMS_LOCK.maxwait, o_result out number) as
v_result number;
v_lockhandle varchar2(128);
begin
if v_lockhandle_tbl.count = 0 then
sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
v_lockhandle_tbl(i_lock_name) := v_lockhandle;
elsif v_lockhandle_tbl.exists(i_lock_name) then
dbms_output.put_line('atttacked');
v_lockhandle := v_lockhandle_tbl(i_lock_name);
else
dbms_output.put_line('new acquire');
--acquire a unique lock id
sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
v_lockhandle_tbl(i_lock_name) := v_lockhandle;
end if;
--acquire a lock
v_result := sys.dbms_lock.request(v_lockhandle, dbms_lock.x_mode, i_wait_time, false);
--set return values
o_result := v_result;
end frm_lts_lock_acquire;
function frm_lts_lock_release(i_lock_name in varchar2) return number as
v_result number := 6;
v_lockhandle varchar2(128);
begin
--release lock according to lockhandle
if v_lockhandle_tbl.exists(i_lock_name) then
v_lockhandle := v_lockhandle_tbl(i_lock_name);
v_result := sys.dbms_lock.release(v_lockhandle);
v_lockhandle_tbl.delete(i_lock_name);
end if;
return v_result;
end frm_lts_lock_release;
end frm_lts_processor_lock_pkg;
/
|
鎖管理器:
其實(shí)應(yīng)用項(xiàng)目組有多個(gè)這樣的存儲(chǔ)過(guò)程,而這些存儲(chǔ)過(guò)程之間的串行執(zhí)行可以有多個(gè)business key來(lái)決定的,比如job order number,delivery order等。所以我們需要給他們提供多鎖管理機(jī)制。我們會(huì)對(duì)這多個(gè)鎖進(jìn)行排序,以避免死鎖,并強(qiáng)烈推薦應(yīng)用項(xiàng)目設(shè)置超時(shí)時(shí)間。這些business key是由String對(duì)象構(gòu)成的,為了防止大量的業(yè)務(wù)操作被鎖在null或者空string這樣沒(méi)有意義的business key上面,我們對(duì)application提供的鎖集合還需要進(jìn)行過(guò)濾。
原理還是很簡(jiǎn)單的,就是在本地事務(wù)中調(diào)用db端的申請(qǐng)鎖,釋放鎖的存儲(chǔ)過(guò)程,然后對(duì)返回的結(jié)果進(jìn)行一系列處理。
在使用多鎖機(jī)制的時(shí)候要保證,如果只申請(qǐng)到了部分鎖,在申請(qǐng)其中另外一個(gè)鎖時(shí)發(fā)生了錯(cuò)誤或者超時(shí),要能夠安全地將已申請(qǐng)的鎖釋放掉,所以多鎖申請(qǐng)需要記錄已申請(qǐng)到的鎖,并且記錄發(fā)生的錯(cuò)誤,區(qū)分timeout和異常。Timeout返回false,如果出現(xiàn)異常記錄下來(lái),最后拋出。釋放多鎖時(shí),不能被中斷,記錄釋放每個(gè)鎖后的結(jié)果,最后判定如果其中一些鎖釋放時(shí)發(fā)生了錯(cuò)誤,拋出。
handleLock定義暫停jta事務(wù),執(zhí)行鎖操作,釋放jta事務(wù)流程
private Object handleLock(Connection connection,
LocalTransactionCallback localTransactionCallback)

throws LockException
{
TransactionManager tm = null;
Transaction currentTx = null;
Object result = null;

try
{
Context initialContext = new InitialContext();
UserTransaction userTrx = (javax.transaction.UserTransaction) initialContext
.lookup("java:comp/UserTransaction");

if (!(userTrx.getStatus() == Status.STATUS_NO_TRANSACTION))
{
tm = TransactionUtils.getTransactionManager(userTrx);

if (tm != null)
{
currentTx = tm.suspend();
}
}
result = localTransactionCallback
.executeInLocalTransaction(connection);


if (null != currentTx)
{
tm.resume(currentTx);
}

} catch (NamingException e)
{

} catch (SystemException e)
{

} catch (InvalidTransactionException e)
{

} catch (IllegalStateException e)
{
}
return result;
}

多鎖申請(qǐng)操作是上面流程的一個(gè)回調(diào)
private class ObtainMutipleLocksLocalTransactionCallback implements

LocalTransactionCallback
{
private Set<String> lockNames;
private int waitTime;

ObtainMutipleLocksLocalTransactionCallback(Set<String> lockNames,

int waitTime)
{
this.lockNames = lockNames;
this.waitTime = waitTime;
}

public Object executeInLocalTransaction(Connection conn)
{
CallableStatement lockAcquireStmt = null;
Set<String> obtainedLockNames = new HashSet<String>();
boolean timeOut = false;
String timeOutLockName = null;
Exception mifLockException = null;

try
{
lockAcquireStmt = conn.prepareCall(OBTAIN_LOCK_PROC_CALL);

for (String lockName : lockNames)
{
lockAcquireStmt.setString(1, lockName);
lockAcquireStmt.setInt(2, LCOK_EXPIRE_TIME);
lockAcquireStmt.setInt(3, waitTime);
lockAcquireStmt.registerOutParameter(4,
java.sql.Types.INTEGER);
lockAcquireStmt.registerOutParameter(5,
java.sql.Types.VARCHAR);
lockAcquireStmt.executeUpdate();
int lockacquireResult = lockAcquireStmt.getInt(4);
if (lockacquireResult == ULLockResultType.SUCCESSFUL)
obtainedLockNames.add(lockName);

} else if (lockacquireResult == ULLockResultType.TIMEOUT)
{
timeOut = true;
timeOutLockName = lockName;
break;

} else if (lockacquireResult != ULLockResultType.ALREADY_OWNED)
{
String lockResultDesc = ULLockResultType
.getAcquireTypeDesc(lockacquireResult);
LockException lockException = new LockException(
"Obtain lock " + lockName
+ " fails, the reason is "
+ lockResultDesc + " .");
lockException.setLockName(lockName);
lockException.setLockHandlingResult(lockResultDesc);
throw lockException;

} else
{
}
}

} catch (Exception ex)
{
mifLockException = ex;

} finally
{

if (null != lockAcquireStmt)
{

try
{
lockAcquireStmt.close();

} catch (SQLException e)
{
// swallow
}
}
}
boolean success = true;

if (timeOut || mifLockException != null)
{
success = false;
}
return new ObtainMultipleLocksResult(success, obtainedLockNames,
timeOut, timeOutLockName, mifLockException);
}
}

多鎖釋放操作也是事務(wù)暫停流程的一個(gè)回調(diào)
private class ReleaseMultipleLocksLocalTransactionCallback implements

LocalTransactionCallback
{
private Set<String> lockNames;


ReleaseMultipleLocksLocalTransactionCallback(Set<String> lockNames)
{
this.lockNames = lockNames;
}


public Object executeInLocalTransaction(Connection conn)
{
CallableStatement lockReleaseStmt = null;
Map<String, Exception> mifLockErrors = new HashMap<String, Exception>();
Set<String> releasedLocks = new HashSet<String>();

try
{

try
{
lockReleaseStmt = conn.prepareCall(RELEASE_LOCK_PROC_CALL);

} catch (Exception ex)
{

for (String lockName : lockNames)
{
mifLockErrors.put(lockName, ex);
}
return new ReleaseMutipleLocksResult(false, releasedLocks, mifLockErrors);
}


for (String lockName : lockNames)
{

try
{
lockReleaseStmt.registerOutParameter(1,
java.sql.Types.INTEGER);
lockReleaseStmt.setString(2, lockName);
lockReleaseStmt.executeUpdate();
int lockReleaseResult = lockReleaseStmt.getInt(1);

if (lockReleaseResult == ULLockResultType.SUCCESSFUL)
{
releasedLocks.add(lockName);

} else
{
String lockResultDesc = ULLockResultType
.getReleaseTypeDesc(lockReleaseResult);
LockException lockException = new LockException(
"Release lock " + lockName
+ " fails, the reason is "
+ lockResultDesc + " .");
lockException.setLockName(lockName);
lockException.setLockHandlingResult(lockResultDesc);
mifLockErrors.put(lockName, lockException);
}

} catch (Exception ex)
{
mifLockErrors.put(lockName, ex);
}
}

} finally
{

if (null != lockReleaseStmt)
{

try
{
lockReleaseStmt.close();

} catch (SQLException e)
{
}
}
}
boolean success = releasedLocks.size() == this.lockNames.size();
return new ReleaseMutipleLocksResult(success, releasedLocks,
mifLockErrors);
}
}

使用模板:注意鎖的釋放要寫在finally語(yǔ)句塊里面,保證鎖的釋放。
定義好模板,防止Application用戶直接調(diào)用鎖管理器或者濫用鎖,忘記釋放鎖。我們決定定義一個(gè)模板,做到鎖的申請(qǐng)和釋放對(duì)application用戶來(lái)說(shuō)是透明的,把它做成了隱含鎖。
public void execute(JobExecutionContext context)

throws JobExecutionException
{
Map jobDataMap = context
.getJobDetail().getJobDataMap();
Collection<String> lockKeys = (Collection<String>) jobDataMap.get(LOCK_NAME_KEY);
Integer waitTimeInteger = (Integer) jobDataMap
.get(LOCK_WAIT_TIME_SECONDS_KEY);
int waitTime = MAX_WAITTIME;

if (waitTimeInteger != null)
{
waitTime = waitTimeInteger.intValue();
}
Set<String> uniqueLockKeys = new HashSet<String>(lockKeys);

// filter empty keys
Iterator<String> keyIterator = uniqueLockKeys.iterator();

while (keyIterator.hasNext())
{
String key = keyIterator.next();

if (StringUtils.isEmptyNoOffset(key))
{
keyIterator.remove();
}
}

if (CollectionUtils.isNotEmptyCollection(uniqueLockKeys))
{
Set<String> obtainedLockNames = null;
Connection connection = null;

try
{
connection = DataSource.getConnection();
ObtainMultipleLocksResult result = LOCK_MANAGER.obtainLock(
connection, uniqueLockKeys, waitTime);
obtainedLockNames = result.getObtainedLockNames();

if (!result.isSuccess())
{

if (result.isTimeout())
{
//do log
return;

} else
{
JobExecutionException jobException = new JobExecutionException(
"Obtain locks failed! "
+ result.getMifLockException()
.getMessage(), result
.getMifLockException());
throw jobException;
}
}
this. executeInLock (context);

} catch (Throwable e)
{
throw new JobExecutionException(
"Get db connection failed!" + e.getMessage(), e);

} finally
{

if (null != connection)
{
this.releaseLocks(connection, obtainedLockNames);

try
{
connection.close();

} catch (SQLException e)
{
throw new JobExecutionException(
"close db connection failed!" + e.getMessage(), e);
}
}
}

} else
{
this.executeInLock(context);
}
}

executeInLock由application的子類繼承實(shí)現(xiàn)
緩存
l 緩存悲觀離線鎖
l 緩存lockhandle
因?yàn)槭褂玫氖潜^離線鎖,每次申請(qǐng)鎖都要跑一趟db,但如果當(dāng)前線程已經(jīng)是lock的所有者就不需要白跑一趟了。可以用ThreadLocal把當(dāng)前線程已經(jīng)擁有的鎖緩存起來(lái),釋放鎖時(shí)對(duì)應(yīng)的需要清除緩存。
在申請(qǐng)鎖時(shí),需要獲得UL Lock時(shí)的lockhandle,釋放鎖時(shí)也需要提供鎖的lockhandle,我們需要將它緩存起來(lái),主要是因?yàn)?/span>DBMS_LOCK.Unique每次都會(huì)commit,會(huì)影響性能,這樣每次釋放鎖時(shí)就可以直接使用lockhandle了。有兩種方法對(duì)lockhandle進(jìn)行緩存,緩存在java端作為實(shí)例變量,緩存在plsql包的全局變量中。緩存在java端需要注意的是,lock manager不能作為單例或者享元來(lái)使用,否則lock handle的緩存在多jvm之間也存在著并發(fā)控制和同步的問(wèn)題。
源代碼:
Java:
ULLock-sources.rar
PLSQL:
lockplsql.rar
參考:
http://docstore.mik.ua/orelly/oracle/bipack/ch04_01.htm