<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    走自己的路

    路漫漫其修遠(yuǎn)兮,吾將上下而求索

      BlogJava :: 首頁(yè) :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
      50 隨筆 :: 4 文章 :: 118 評(píng)論 :: 0 Trackbacks

    背景

    應(yīng)用項(xiàng)目組每個(gè)小時(shí)會(huì)定時(shí)的run一個(gè)存儲(chǔ)過(guò)程進(jìn)行結(jié)算,每次執(zhí)行的時(shí)間也許會(huì)超過(guò)一個(gè)小時(shí),而且需要絕對(duì)保證存儲(chǔ)過(guò)程的串行執(zhí)行。因?yàn)槭褂脙?nèi)存鎖不能絕對(duì)保證兩個(gè)存儲(chǔ)過(guò)程的串行執(zhí)行,因?yàn)閼?yīng)用服務(wù)器down掉重啟后可能會(huì)出現(xiàn)并發(fā)執(zhí)行的情況,因?yàn)橄惹暗拇鎯?chǔ)過(guò)程還在db中運(yùn)行。我們是使用LTS,對(duì)quartz進(jìn)行了封裝來(lái)做任務(wù)調(diào)度的。我們決定鎖的管理操作由framework來(lái)實(shí)現(xiàn)。原因是:

    l         鎖管理器可以做成通用的模塊

    l         申請(qǐng)鎖,釋放鎖是比較危險(xiǎn)的操作,擔(dān)心業(yè)務(wù)開(kāi)發(fā)人員由于遺忘導(dǎo)致死鎖或者并發(fā)問(wèn)題

    l         可以很好的集成到我們現(xiàn)有的framework中,方便地開(kāi)放給業(yè)務(wù)開(kāi)發(fā)人員使用

    注意:我們極其不推薦使用悲觀離線鎖,如果沖突出現(xiàn)的概率比較少,可以用其他方法比如樂(lè)觀離線鎖,DB Constraint再通過(guò)補(bǔ)償操作能解決的問(wèn)題,請(qǐng)不要使用悲觀離線鎖。

     

    原理

    PLSQL UL LOCKoracle提供出來(lái)給開(kāi)發(fā)人員使用的鎖資源,功能和DML鎖是類似的,當(dāng)然我們可以通過(guò)DML鎖來(lái)完成并發(fā)控制,select…for update或者自己維護(hù)一張鎖表,考慮到實(shí)現(xiàn)代價(jià),我們打算使用PLSQL UL LOCK。而且Oracle保證了session釋放時(shí),UL lock都會(huì)被釋放。

    但是用它時(shí),需要注意到它的DBMS_LOCK.Unique函數(shù)它每次都會(huì)commit數(shù)據(jù)。如果是在分布式事務(wù)當(dāng)中,會(huì)拋出事務(wù)已提交的異常。因?yàn)槲覀兪褂玫氖?/span>XA resource并且transaction levelglobal的,也就是JTA。為了使得鎖的申請(qǐng)和釋放不影響分布式業(yè)務(wù)事務(wù),或者我們是使用非xaresourcelocaltransaction來(lái)完成鎖操作,或者也可以暫停已有事務(wù),等鎖操作完成后resume暫停的分布式事務(wù)??紤]到重用已有的xa resource我們打算使用后一種方法,其實(shí)這種方法我們也會(huì)經(jīng)常使用,暫停分布式事務(wù)做DDL操作,再釋放事務(wù)。

     

    實(shí)現(xiàn)方法:

    l         封裝DBMS_LOCK包中的幾個(gè)存儲(chǔ)過(guò)程為我們所用

    l         Java端提供一個(gè)基于PLSQL UL鎖的管理器

    l         Java端定義好申請(qǐng)鎖,業(yè)務(wù)操作,釋放鎖的使用流程,作為一個(gè)模板

     

    DB存儲(chǔ)過(guò)程:

    對(duì)DBMS_LOCK做了簡(jiǎn)單的封裝,避免直接調(diào)用DBMS_LOCK。這樣做的好處是:

    l         Oracle解耦,如果其他數(shù)據(jù)庫(kù)可以提供類似的功能,我們也可以用同名的存儲(chǔ)過(guò)程實(shí)現(xiàn)

    l         方便以后對(duì)存儲(chǔ)過(guò)程重構(gòu),升級(jí)

    l         我們需要對(duì)DBMS_LOCK進(jìn)行簡(jiǎn)單的封裝,因?yàn)?/span>DBMS_LOCK.Unique獲取lockhandle oracle中鎖的唯一標(biāo)識(shí),輸入是lockname,邏輯名,輸出是鎖的唯一標(biāo)識(shí),對(duì)java端應(yīng)該是透明的,java端應(yīng)該只關(guān)心鎖的邏輯名。

    create or replace package body frm_lts_processor_lock_pkg is
      
    /* table to store lockhandles by name */
       TYPE handle_tbltype IS TABLE OF varchar2(
    128)
          INDEX BY varchar2(
    128);

       v_lockhandle_tbl handle_tbltype;
      
     procedure frm_lts_lock_acquire(i_lock_name in varchar2, i_expiration_time in Integer default
    864000, i_wait_time in Integer default DBMS_LOCK.maxwait, o_result out number) as
          v_result     number;
          v_lockhandle varchar2(
    128);
       begin
       if v_lockhandle_tbl.count =
    0 then
          sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
          v_lockhandle_tbl(i_lock_name) := v_lockhandle;
       elsif v_lockhandle_tbl.exists(i_lock_name) then
          dbms_output.put_line(
    'atttacked');
          v_lockhandle := v_lockhandle_tbl(i_lock_name);
       else
         dbms_output.put_line(
    'new acquire');
         
    --acquire a unique lock id
         sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
         v_lockhandle_tbl(i_lock_name) := v_lockhandle;
           
       end if;
       
    --acquire a lock
        v_result := sys.dbms_lock.request(v_lockhandle, dbms_lock.x_mode, i_wait_time, false);
     
       
    --set return values
         o_result := v_result;
     end frm_lts_lock_acquire;
     
     function frm_lts_lock_release(i_lock_name in varchar2) return number as
           v_result number :=
    6;
           v_lockhandle varchar2(
    128);
     begin
         
    --release lock according to lockhandle
          if v_lockhandle_tbl.exists(i_lock_name) then
              v_lockhandle :=  v_lockhandle_tbl(i_lock_name);
              v_result := sys.dbms_lock.release(v_lockhandle);
              v_lockhandle_tbl.delete(i_lock_name);       
          end if;
          return v_result;
     end frm_lts_lock_release;

    end frm_lts_processor_lock_pkg;
    /

     

     

    鎖管理器:

    其實(shí)應(yīng)用項(xiàng)目組有多個(gè)這樣的存儲(chǔ)過(guò)程,而這些存儲(chǔ)過(guò)程之間的串行執(zhí)行可以有多個(gè)business key來(lái)決定的,比如job order numberdelivery order等。所以我們需要給他們提供多鎖管理機(jī)制。我們會(huì)對(duì)這多個(gè)鎖進(jìn)行排序,以避免死鎖,并強(qiáng)烈推薦應(yīng)用項(xiàng)目設(shè)置超時(shí)時(shí)間。這些business key是由String對(duì)象構(gòu)成的,為了防止大量的業(yè)務(wù)操作被鎖在null或者空string這樣沒(méi)有意義的business key上面,我們對(duì)application提供的鎖集合還需要進(jìn)行過(guò)濾。

    原理還是很簡(jiǎn)單的,就是在本地事務(wù)中調(diào)用db端的申請(qǐng)鎖,釋放鎖的存儲(chǔ)過(guò)程,然后對(duì)返回的結(jié)果進(jìn)行一系列處理。

    在使用多鎖機(jī)制的時(shí)候要保證,如果只申請(qǐng)到了部分鎖,在申請(qǐng)其中另外一個(gè)鎖時(shí)發(fā)生了錯(cuò)誤或者超時(shí),要能夠安全地將已申請(qǐng)的鎖釋放掉,所以多鎖申請(qǐng)需要記錄已申請(qǐng)到的鎖,并且記錄發(fā)生的錯(cuò)誤,區(qū)分timeout和異常。Timeout返回false,如果出現(xiàn)異常記錄下來(lái),最后拋出。釋放多鎖時(shí),不能被中斷,記錄釋放每個(gè)鎖后的結(jié)果,最后判定如果其中一些鎖釋放時(shí)發(fā)生了錯(cuò)誤,拋出。

     

    handleLock定義暫停jta事務(wù),執(zhí)行鎖操作,釋放jta事務(wù)流程

    private Object handleLock(Connection connection,
                LocalTransactionCallback localTransactionCallback)
                
    throws LockException {
            TransactionManager tm 
    = null;
            Transaction currentTx 
    = null;
            Object result 
    = null;
            
    try {
                Context initialContext 
    = new InitialContext();
                UserTransaction userTrx 
    = (javax.transaction.UserTransaction) initialContext
                        .lookup(
    "java:comp/UserTransaction");
                
    if (!(userTrx.getStatus() == Status.STATUS_NO_TRANSACTION)) {
                    tm 
    = TransactionUtils.getTransactionManager(userTrx);
                    
    if (tm != null{
                        currentTx 
    = tm.suspend();
                    }

                }

                result 
    = localTransactionCallback
                        .executeInLocalTransaction(connection);

                
    if (null != currentTx) {
                    tm.resume(currentTx);
                }

            }
     catch (NamingException e) {
            }
     catch (SystemException e) {
            }
     catch (InvalidTransactionException e) {
            }
     catch (IllegalStateException e) {
            }

            
    return result;
        }


    多鎖申請(qǐng)操作是上面流程的一個(gè)回調(diào)

    private class ObtainMutipleLocksLocalTransactionCallback implements
                LocalTransactionCallback 
    {
            
    private Set<String> lockNames;
            
    private int waitTime;

            ObtainMutipleLocksLocalTransactionCallback(Set
    <String> lockNames,
                    
    int waitTime) {
                
    this.lockNames = lockNames;
                
    this.waitTime = waitTime;
            }

            
    public Object executeInLocalTransaction(Connection conn) {
                CallableStatement lockAcquireStmt 
    = null;
                Set
    <String> obtainedLockNames = new HashSet<String>();
                
    boolean timeOut = false;
                String timeOutLockName 
    = null;
                Exception mifLockException 
    = null;
                
    try {
                    lockAcquireStmt 
    = conn.prepareCall(OBTAIN_LOCK_PROC_CALL);
                    
    for (String lockName : lockNames) {
                        lockAcquireStmt.setString(
    1, lockName);
                        lockAcquireStmt.setInt(
    2, LCOK_EXPIRE_TIME);
                        lockAcquireStmt.setInt(
    3, waitTime);
                        lockAcquireStmt.registerOutParameter(
    4,
                                java.sql.Types.INTEGER);
                        lockAcquireStmt.registerOutParameter(
    5,
                                java.sql.Types.VARCHAR);
                        lockAcquireStmt.executeUpdate();
                        
    int lockacquireResult = lockAcquireStmt.getInt(4);
                        
    if (lockacquireResult == ULLockResultType.SUCCESSFUL) 
                            obtainedLockNames.add(lockName);
                        }
     else if (lockacquireResult == ULLockResultType.TIMEOUT) {
                            timeOut 
    = true;
                            timeOutLockName 
    = lockName;
                            
    break;
                        }
     else if (lockacquireResult != ULLockResultType.ALREADY_OWNED) {
                            String lockResultDesc 
    = ULLockResultType
                                    .getAcquireTypeDesc(lockacquireResult);
                            LockException lockException 
    = new LockException(
                                    
    "Obtain lock " + lockName
                                            
    + " fails, the reason is "
                                            
    + lockResultDesc + " .");
                            lockException.setLockName(lockName);
                            lockException.setLockHandlingResult(lockResultDesc);
                            
    throw lockException;
                        }
     else {
                        }

                    }

                }
     catch (Exception ex) {
                    mifLockException 
    = ex;
                }
     finally {
                    
    if (null != lockAcquireStmt) {
                        
    try {
                            lockAcquireStmt.close();
                        }
     catch (SQLException e) {
                            
    // swallow
                        }

                    }

                }

                
    boolean success = true;
                
    if (timeOut || mifLockException != null{
                    success 
    = false;
                }

                
    return new ObtainMultipleLocksResult(success, obtainedLockNames,
                        timeOut, timeOutLockName, mifLockException);
            }

        }


    多鎖釋放操作也是事務(wù)暫停流程的一個(gè)回調(diào)

    private class ReleaseMultipleLocksLocalTransactionCallback implements
                LocalTransactionCallback 
    {
            
    private Set<String> lockNames;

            ReleaseMultipleLocksLocalTransactionCallback(Set
    <String> lockNames) {
                
    this.lockNames = lockNames;
            }


            
    public Object executeInLocalTransaction(Connection conn) {
                CallableStatement lockReleaseStmt 
    = null;
                Map
    <String, Exception> mifLockErrors = new HashMap<String, Exception>();
                Set
    <String> releasedLocks = new HashSet<String>();
                
    try {
                    
    try {
                        lockReleaseStmt 
    = conn.prepareCall(RELEASE_LOCK_PROC_CALL);
                    }
     catch (Exception ex) {
                        
    for (String lockName : lockNames) {
                            mifLockErrors.put(lockName, ex);
                        }

    return new ReleaseMutipleLocksResult(false, releasedLocks, mifLockErrors);
                    }


                    
    for (String lockName : lockNames) {
                        
    try {
                            lockReleaseStmt.registerOutParameter(
    1,
                                    java.sql.Types.INTEGER);
                            lockReleaseStmt.setString(
    2, lockName);
                            lockReleaseStmt.executeUpdate();
                            
    int lockReleaseResult = lockReleaseStmt.getInt(1);
                            
    if (lockReleaseResult == ULLockResultType.SUCCESSFUL) {
                                releasedLocks.add(lockName);
                            }
     else {
                                String lockResultDesc 
    = ULLockResultType
                                        .getReleaseTypeDesc(lockReleaseResult);
                                LockException lockException 
    = new LockException(
                                        
    "Release lock " + lockName
                                                
    + " fails, the reason is "
                                                
    + lockResultDesc + " .");
                                lockException.setLockName(lockName);
                                lockException.setLockHandlingResult(lockResultDesc);
                                mifLockErrors.put(lockName, lockException);
                            }

                        }
     catch (Exception ex) {
                            mifLockErrors.put(lockName, ex);
                        }

                    }

                }
     finally {
                    
    if (null != lockReleaseStmt) {
                        
    try {
                            lockReleaseStmt.close();
                        }
     catch (SQLException e) {
                        }

                    }

                }

                
    boolean success = releasedLocks.size() == this.lockNames.size();
                
    return new ReleaseMutipleLocksResult(success, releasedLocks,
                        mifLockErrors);
            }

        }


    使用模板:注意鎖的釋放要寫在finally語(yǔ)句塊里面,保證鎖的釋放。

    定義好模板,防止Application用戶直接調(diào)用鎖管理器或者濫用鎖,忘記釋放鎖。我們決定定義一個(gè)模板,做到鎖的申請(qǐng)和釋放對(duì)application用戶來(lái)說(shuō)是透明的,把它做成了隱含鎖。

    public void execute(JobExecutionContext context)
                
    throws JobExecutionException {
            Map jobDataMap 
    = context
            .getJobDetail().getJobDataMap();
            Collection
    <String> lockKeys = (Collection<String>) jobDataMap.get(LOCK_NAME_KEY);
            Integer waitTimeInteger 
    = (Integer) jobDataMap
            .get(LOCK_WAIT_TIME_SECONDS_KEY);
            
    int waitTime = MAX_WAITTIME;
            
    if (waitTimeInteger != null{
                waitTime 
    = waitTimeInteger.intValue();
            }

            Set
    <String> uniqueLockKeys = new HashSet<String>(lockKeys);

            
    // filter empty keys
            Iterator<String> keyIterator = uniqueLockKeys.iterator();
            
    while (keyIterator.hasNext()) {
                String key 
    = keyIterator.next();
                
    if (StringUtils.isEmptyNoOffset(key)) {
                    keyIterator.remove();
                }

            }

            
    if (CollectionUtils.isNotEmptyCollection(uniqueLockKeys)) {
                Set
    <String> obtainedLockNames = null;
                Connection connection 
    = null;
                
    try {
                    connection 
    = DataSource.getConnection();
                    ObtainMultipleLocksResult result 
    = LOCK_MANAGER.obtainLock(
                            connection, uniqueLockKeys, waitTime);
                    obtainedLockNames 
    = result.getObtainedLockNames();
                    
    if (!result.isSuccess()) {
                        
    if (result.isTimeout()) {
                          
    //do log
                            return;
                        }
     else {
                            JobExecutionException jobException 
    = new JobExecutionException(
                                    
    "Obtain locks failed! "
                                            
    + result.getMifLockException()
                                                    .getMessage(), result
                                            .getMifLockException());
                            
    throw jobException;
                        }

                    }

                    
    this. executeInLock (context);
                }
     catch (Throwable e) {
                    
    throw new JobExecutionException(
                            
    "Get db connection failed!" + e.getMessage(), e);
                }
     finally {
                    
    if (null != connection) {
                        
    this.releaseLocks(connection, obtainedLockNames);
                        
    try {
                            connection.close();
                        }
     catch (SQLException e) {
                            
    throw new JobExecutionException(
                                    
    "close  db connection failed!" + e.getMessage(), e);
                        }

                    }

                }

            }
     else {
                
    this.executeInLock(context);
            }

        }


     

    executeInLockapplication的子類繼承實(shí)現(xiàn)

     

    緩存

    l         緩存悲觀離線鎖

    l         緩存lockhandle

    因?yàn)槭褂玫氖潜^離線鎖,每次申請(qǐng)鎖都要跑一趟db,但如果當(dāng)前線程已經(jīng)是lock的所有者就不需要白跑一趟了。可以用ThreadLocal把當(dāng)前線程已經(jīng)擁有的鎖緩存起來(lái),釋放鎖時(shí)對(duì)應(yīng)的需要清除緩存。

    在申請(qǐng)鎖時(shí),需要獲得UL Lock時(shí)的lockhandle,釋放鎖時(shí)也需要提供鎖的lockhandle,我們需要將它緩存起來(lái),主要是因?yàn)?/span>DBMS_LOCK.Unique每次都會(huì)commit,會(huì)影響性能,這樣每次釋放鎖時(shí)就可以直接使用lockhandle了。有兩種方法對(duì)lockhandle進(jìn)行緩存,緩存在java端作為實(shí)例變量,緩存在plsql包的全局變量中。緩存在java端需要注意的是,lock manager不能作為單例或者享元來(lái)使用,否則lock handle的緩存在多jvm之間也存在著并發(fā)控制和同步的問(wèn)題。

    源代碼:

    Java:
    ULLock-sources.rar

    PLSQL:
    lockplsql.rar

    參考:

    http://docstore.mik.ua/orelly/oracle/bipack/ch04_01.htm




    posted on 2009-07-03 19:24 叱咤紅人 閱讀(1508) 評(píng)論(0)  編輯  收藏

    只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


    網(wǎng)站導(dǎo)航:
     
    主站蜘蛛池模板: 久久久久亚洲av无码尤物| 国产免费av片在线无码免费看 | 亚洲午夜无码久久久久小说 | 国产成人无码区免费内射一片色欲| 成人A毛片免费观看网站| 久久免费福利视频| 成人福利免费视频| 国产精品二区三区免费播放心| 亚洲黄片毛片在线观看| 亚洲av无码潮喷在线观看| 亚洲一级免费毛片| 日本一区二区在线免费观看 | 特级av毛片免费观看| 三年在线观看免费观看完整版中文| 国产麻豆一精品一AV一免费| 国产在线观看麻豆91精品免费| 成人黄页网站免费观看大全| 亚洲精品无码专区2| 亚洲小视频在线观看| 亚洲人成色99999在线观看| 国产免费伦精品一区二区三区| 99re免费在线视频| 日韩免费毛片视频| 黄色免费网站在线看| 国产午夜精品理论片免费观看| 思思re热免费精品视频66| 亚洲成年人啊啊aa在线观看| 青青草原精品国产亚洲av| 亚洲av无码专区在线电影天堂 | 亚欧人成精品免费观看| 国产成人免费高清在线观看| 亚洲VA中文字幕无码毛片| 亚洲色欲色欱wwW在线| 西西人体免费视频| 好吊妞在线新免费视频| 亚洲无线观看国产精品| 中文无码亚洲精品字幕| 国产精品免费无遮挡无码永久视频| 日本黄色免费观看| 久久丫精品国产亚洲av不卡| 成人a毛片免费视频观看|