在springside3.*中的showcase案例中,有一個(gè)把log4j的日志存入數(shù)據(jù)庫的演示,下面是我對(duì)這個(gè)案例的學(xué)習(xí)筆記。
1、我們首先來看下log4j相關(guān)日志的配置:
#Async Database Appender (Store business message)
log4j.appender.DB=org.springside.examples.showcase.log.appender.QueueAppender
log4j.appender.DB.QueueName=dblog

#Demo level with Async Database appender
log4j.logger.DBLogExample=INFO,Console,DB
log4j.additivity.DBLogExample=false
其中org.springside.examples.showcase.log.appender.QueueAppender就是對(duì)ssLog4j日志的一個(gè)擴(kuò)展,而日志的event(里面是日志的內(nèi)容)是存放在一個(gè)BlockingQueue中,當(dāng)有多個(gè)日志需要分別存入不同的地方時(shí),就根據(jù)QueryName來區(qū)分。
2、接下來看一下org.springside.examples.showcase.log.appender.QueueAppender里面的內(nèi)容:

/** *//**
* Copyright (c) 2005-2009 springside.org.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
*
* $Id: QueueAppender.java 1189 2010-09-01 17:24:12Z calvinxiu $
*/
package org.springside.examples.showcase.log.appender;

import java.util.concurrent.BlockingQueue;

import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
import org.springside.examples.showcase.queue.QueuesHolder;


/** *//**
* 輕量級(jí)的Log4j異步Appender.
*
* 將所有消息放入QueueManager所管理的Blocking Queue中.
*
* @see QueuesHolder
*
* @author calvin
*/

public class QueueAppender extends org.apache.log4j.AppenderSkeleton
{

protected String queueName;

protected BlockingQueue<LoggingEvent> queue;


/** *//**
* AppenderSkeleton回調(diào)函數(shù), 事件到達(dá)時(shí)將時(shí)間放入Queue.
*/
@Override

public void append(LoggingEvent event)
{

if (queue == null)
{
queue = QueuesHolder.getQueue(queueName);
}

boolean sucess = queue.offer(event);


if (sucess)
{
LogLog.debug("put event to queue success:" + new LoggingEventWrapper(event).convertToString());


} else
{
LogLog.error("Put event to queue fail:" + new LoggingEventWrapper(event).convertToString());
}
}


/** *//**
* AppenderSkeleton回調(diào)函數(shù),關(guān)閉Logger時(shí)的清理動(dòng)作.
*/

public void close()
{
}


/** *//**
* AppenderSkeleton回調(diào)函數(shù), 設(shè)置是否需要定義Layout.
*/

public boolean requiresLayout()
{
return false;
}


/** *//**
* Log4j根據(jù)getter/setter從log4j.properties中注入同名參數(shù).
*/

public String getQueueName()
{
return queueName;
}


/** *//**
* @see #getQueueName()
*/

public void setQueueName(String queueName)
{
this.queueName = queueName;
}
}

這是對(duì)Log4j擴(kuò)展的標(biāo)準(zhǔn)做法,繼承abstract class AppenderSkeleton,實(shí)現(xiàn)它的abstract protected void append(LoggingEvent event) 方法。
而這個(gè)方法的實(shí)現(xiàn)很簡單,就是根據(jù)queueName從queuesHolder中取出一個(gè)BlockingQueue<LoggingEvent>,然后把LoggerEvent塞到這個(gè)BlockingQueue中去,關(guān)于queuesHolder,下面會(huì)講到。到這為止,log4j的活就完成了,下面的都是concurrent的事了。
3、讓我們轉(zhuǎn)到spring的配置文件中,看看springside是如何接收下面的工作,下面是applicationContext-log.xml的片段:
<!-- 消息Queue管理器-->
<bean class="org.springside.examples.showcase.queue.QueuesHolder">
<property name="queueSize" value="1000" />
</bean>

<!-- 讀出Queue中日志消息寫入數(shù)據(jù)庫的任務(wù) -->
<bean id="jdbcLogWriter" class="org.springside.examples.showcase.log.appender.JdbcLogWriter">
<property name="queueName" value="dblog" />
<property name="batchSize" value="10" />
<property name="sql">
<value>
insert into SS_LOG(THREAD_NAME,LOGGER_NAME,LOG_TIME,LEVEL,MESSAGE)
values(:thread_name,:logger_name,:log_time,:level,:message)
</value>
</property>
</bean>
我們先從簡單的下手,先看QueuesHolder:
private static ConcurrentMap<String, BlockingQueue> queueMap = new MapMaker().concurrencyLevel(32).makeMap();//消息隊(duì)列


/** *//**
* 根據(jù)queueName獲得消息隊(duì)列的靜態(tài)函數(shù).
* 如消息隊(duì)列還不存在, 會(huì)自動(dòng)進(jìn)行創(chuàng)建.
*/

public static <T> BlockingQueue<T> getQueue(String queueName)
{
BlockingQueue queue = queueMap.get(queueName);


if (queue == null)
{
BlockingQueue newQueue = new LinkedBlockingQueue(queueSize);

//如果之前消息隊(duì)列還不存在,放入新隊(duì)列并返回Null.否則返回之前的值.
queue = queueMap.putIfAbsent(queueName, newQueue);

if (queue == null)
{
queue = newQueue;
}
}
return queue;
}
其實(shí)這個(gè)類很簡單,就是一個(gè)map,key就是上面log4j配置文件中的queueName,value就是一個(gè)BlockingQueue,這樣就可以存放多個(gè)日志queue,做不同的處理。
4、下面這個(gè)是重頭戲,先把JdbcLogWriter的代碼全貼出來:

/** *//**
* Copyright (c) 2005-2009 springside.org.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
*
* $Id: JdbcAppenderTask.java 353 2009-08-22 09:33:28Z calvinxiu
*/
package org.springside.examples.showcase.log.appender;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.annotation.Resource;
import javax.sql.DataSource;

import org.apache.log4j.spi.LoggingEvent;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.jdbc.core.namedparam.SqlParameterSourceUtils;
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springside.examples.showcase.queue.BlockingConsumer;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;


/** *//**
* 將Queue中的log4j event寫入數(shù)據(jù)庫的消費(fèi)者任務(wù).
*
* 即時(shí)阻塞的讀取Queue中的事件,達(dá)到緩存上限后使用Jdbc批量寫入模式.
* 如需換為定時(shí)讀取模式,繼承于PeriodConsumer稍加改造即可.
*
* @see BlockingConsumer
*
* @author calvin
*/

public class JdbcLogWriter extends BlockingConsumer
{

protected String sql;
protected int batchSize = 10;

protected List<LoggingEvent> eventsBuffer = Lists.newArrayList();
protected SimpleJdbcTemplate jdbcTemplate;
protected TransactionTemplate transactionTemplate;


/** *//**
* 帶Named Parameter的insert sql.
*
* Named Parameter的名稱見AppenderUtils中的常量定義.
*/

public void setSql(String sql)
{
this.sql = sql;
}


/** *//**
* 批量讀取事件數(shù)量, 默認(rèn)為10.
*/

public void setBatchSize(int batchSize)
{
this.batchSize = batchSize;
}


/** *//**
* 根據(jù)注入的DataSource創(chuàng)建jdbcTemplate.
*/
@Resource

public void setDataSource(DataSource dataSource)
{
jdbcTemplate = new SimpleJdbcTemplate(dataSource);
}


/** *//**
* 根據(jù)注入的PlatformTransactionManager創(chuàng)建transactionTemplate.
*/
@Resource

public void setDefaultTransactionManager(PlatformTransactionManager defaultTransactionManager)
{
transactionTemplate = new TransactionTemplate(defaultTransactionManager);
}


/** *//**
* 消息處理函數(shù),將消息放入buffer,當(dāng)buffer達(dá)到batchSize時(shí)執(zhí)行批量更新函數(shù).
*/
@Override

protected void processMessage(Object message)
{
LoggingEvent event = (LoggingEvent) message;
eventsBuffer.add(event);


if (logger.isDebugEnabled())
{
logger.debug("get event: {}", new LoggingEventWrapper(event).convertToString());
}

//已到達(dá)BufferSize則執(zhí)行批量插入操作

if (eventsBuffer.size() >= batchSize)
{
updateBatch();
}
}


/** *//**
* 將Buffer中的事件列表批量插入數(shù)據(jù)庫.
*/
@SuppressWarnings("unchecked")

public void updateBatch()
{

try
{
//分析事件列表, 轉(zhuǎn)換為jdbc批處理參數(shù).
int i = 0;
Map[] paramMapArray = new HashMap[eventsBuffer.size()];

for (LoggingEvent event : eventsBuffer)
{
paramMapArray[i++] = parseEvent(event);
}
final SqlParameterSource[] batchParams = SqlParameterSourceUtils.createBatch(paramMapArray);

//執(zhí)行批量插入,如果失敗調(diào)用失敗處理函數(shù).

transactionTemplate.execute(new TransactionCallbackWithoutResult()
{
@Override

protected void doInTransactionWithoutResult(TransactionStatus status)
{

try
{
jdbcTemplate.batchUpdate(getActualSql(), batchParams);

if (logger.isDebugEnabled())
{

for (LoggingEvent event : eventsBuffer)
{
logger.debug("saved event: {}", new LoggingEventWrapper(event).convertToString());
}
}

} catch (DataAccessException e)
{
status.setRollbackOnly();
handleDataAccessException(e, eventsBuffer);
}
}
});

//清除已完成的Buffer
eventsBuffer.clear();

} catch (Exception e)
{
logger.error("批量提交任務(wù)時(shí)發(fā)生錯(cuò)誤.", e);
}
}


/** *//**
* 退出清理函數(shù),完成buffer中未完成的消息.
*/
@Override

protected void clean()
{

if (!eventsBuffer.isEmpty())
{
updateBatch();
}
logger.debug("cleaned task {}", this);
}


/** *//**
* 分析Event, 建立Parameter Map, 用于綁定sql中的Named Parameter.
*/

protected Map<String, Object> parseEvent(LoggingEvent event)
{
Map<String, Object> parameterMap = Maps.newHashMap();
LoggingEventWrapper eventWrapper = new LoggingEventWrapper(event);

parameterMap.put("thread_name", eventWrapper.getThreadName());
parameterMap.put("logger_name", eventWrapper.getLoggerName());
parameterMap.put("log_time", eventWrapper.getDate());
parameterMap.put("level", eventWrapper.getLevel());
parameterMap.put("message", eventWrapper.getMessage());
return parameterMap;
}


/** *//**
* 可被子類重載的數(shù)據(jù)訪問錯(cuò)誤處理函數(shù),如將出錯(cuò)的事件持久化到文件.
*/

protected void handleDataAccessException(DataAccessException e, List<LoggingEvent> errorEventBatch)
{

if (e instanceof DataAccessResourceFailureException)
{
logger.error("database connection error", e);

} else
{
logger.error("other database error", e);
}


for (LoggingEvent event : errorEventBatch)
{
logger.error("event insert to database error, ignore it: "
+ new LoggingEventWrapper(event).convertToString(), e);
}
}


/** *//**
* 可被子類重載的sql提供函數(shù),可對(duì)sql語句進(jìn)行特殊處理,如日志表的表名可帶日期后綴 LOG_2009_02_31.
*/

protected String getActualSql()
{
return sql;
}
}

這個(gè)類的作用有
1)當(dāng)沒有處理的日志在1000以內(nèi)時(shí),不停執(zhí)行日志的處理(設(shè)置在QueuesHolder中),超過1000,就報(bào)錯(cuò)(見QueueAppender的append方法).
2)每次都把一條日志放到buffer中,達(dá)到10條時(shí)開始批量的把日志入數(shù)據(jù)庫,條數(shù)和入庫的sql都寫在上面的spring配置文件中。
可以看到,主要的方法就是processMessage。那么,這個(gè)processMessage方法是在哪里被調(diào)用的呢?
在上面的JdbcLogWriter類的代碼中可以看到,它繼承自BlockingConsumer,我們看看BlockingConsumer里面有些什么:

/** *//**
* Copyright (c) 2005-2009 springside.org.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
*
* $Id$
*/
package org.springside.examples.showcase.queue;


/** *//**
* 采用即時(shí)阻塞讀取Queue中消息策略的Consumer.
*/

public abstract class BlockingConsumer extends QueueConsumer
{


/** *//**
* 線程執(zhí)行函數(shù),阻塞獲取消息并調(diào)用processMessage()進(jìn)行處理.
*/

public void run()
{
//循環(huán)阻塞獲取消息直到線程被中斷.

try
{

while (!Thread.currentThread().isInterrupted())
{
Object message = queue.take();
processMessage(message);
}

} catch (InterruptedException e)
{
// Ignore.

} finally
{
//退出線程前調(diào)用清理函數(shù).
clean();
}
}


/** *//**
* 消息處理函數(shù).
*/
protected abstract void processMessage(Object message);


/** *//**
* 退出清理函數(shù).
*/
protected abstract void clean();
}
很明顯,BlockingConsumer肯定是繼承自Thread類或者實(shí)現(xiàn)于Runnable接口的線程類,在線程啟動(dòng)的時(shí)候processMessage方法被調(diào)用。當(dāng)我們需要?jiǎng)e的需要處理日志內(nèi)容時(shí),就可以繼承BlockingConsumer寫自己的processMessage來處理日志了。
5、下面,讓我們看看這個(gè)線程類是怎么啟動(dòng)的吧??匆幌翨lockingConsumer就知道,它其實(shí)還繼承于另外一個(gè)類QueueConsumer:

/** *//**
* Copyright (c) 2005-2009 springside.org.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
*
* $Id$
*/
package org.springside.examples.showcase.queue;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springside.modules.utils.ThreadUtils;
import org.springside.modules.utils.ThreadUtils.CustomizableThreadFactory;


/** *//**
* 單線程消費(fèi)Queue中消息的任務(wù)基類.
*
* 定義了QueueConsumer的啟動(dòng)關(guān)閉流程.
*
* TODO:支持多線程執(zhí)行.
*
* @see QueuesHolder
*
* @author calvin
*/
@SuppressWarnings("unchecked")

public abstract class QueueConsumer implements Runnable
{

protected Logger logger = LoggerFactory.getLogger(getClass());

protected String queueName;
protected int shutdownTimeout = Integer.MAX_VALUE;

protected boolean persistence = true;
protected String persistencePath = System.getProperty("java.io.tmpdir") + File.separator + "queue";
protected Object persistenceLock = new Object(); //用于在backup與restore間等待的鎖.

protected BlockingQueue queue;
protected ExecutorService executor;


/** *//**
* 任務(wù)所消費(fèi)的隊(duì)列名稱.
*/

public void setQueueName(String queueName)
{
this.queueName = queueName;
}


/** *//**
* 停止任務(wù)時(shí)最多等待的時(shí)間, 單位為毫秒.
*/

public void setShutdownTimeout(int shutdownTimeout)
{
this.shutdownTimeout = shutdownTimeout;
}


/** *//**
* 在JVM關(guān)閉時(shí)是否需要持久化未完成的消息到文件.
*/

public void setPersistence(boolean persistence)
{
this.persistence = persistence;
}


/** *//**
* 系統(tǒng)關(guān)閉時(shí)將隊(duì)列中未處理的消息持久化到文件的目錄,默認(rèn)為系統(tǒng)臨時(shí)文件夾下的queue目錄.
*/

public void setPersistencePath(String persistencePath)
{
this.persistencePath = persistencePath;
}


/** *//**
* 任務(wù)初始化函數(shù).
*/
@PostConstruct

public void start() throws IOException, ClassNotFoundException, InterruptedException
{

queue = QueuesHolder.getQueue(queueName);


if (persistence)
{

synchronized (persistenceLock)
{
restoreQueue();
}
}

executor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory("Queue Consumer-" + queueName));
executor.execute(this);
}


/** *//**
* 任務(wù)結(jié)束函數(shù).
*/
@PreDestroy

public void stop() throws IOException
{

try
{
ThreadUtils.normalShutdown(executor, shutdownTimeout, TimeUnit.MILLISECONDS);

} finally
{

if (persistence)
{

synchronized (persistenceLock)
{
backupQueue();
}
}
}

}


/** *//**
* 保存隊(duì)列中的消息到文件.
*/

protected void backupQueue() throws IOException
{
List list = new ArrayList();
queue.drainTo(list);


if (!list.isEmpty())
{
ObjectOutputStream oos = null;

try
{
File file = new File(getPersistenceDir(), getPersistenceFileName());
oos = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)));

for (Object message : list)
{
oos.writeObject(message);
}

logger.info("隊(duì)列{}已持久化{}個(gè)消息到{}", new Object[]
{ queueName, list.size(), file.getAbsolutePath() });

} finally
{

if (oos != null)
{
oos.close();
}
}

} else
{
logger.debug("隊(duì)列{}為空,不需要持久化 .", queueName);
}
}


/** *//**
* 載入持久化文件中的消息到隊(duì)列.
*/

protected void restoreQueue() throws ClassNotFoundException, IOException, InterruptedException
{
ObjectInputStream ois = null;
File file = new File(getPersistenceDir(), getPersistenceFileName());


if (file.exists())
{

try
{
ois = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)));
int count = 0;

while (true)
{

try
{
Object message = ois.readObject();
queue.put(message);
count++;

} catch (EOFException e)
{
break;
}
}

logger.info("隊(duì)列{}已從{}中恢復(fù){}個(gè)消息.", new Object[]
{ queueName, file.getAbsolutePath(), count });

} finally
{

if (ois != null)
{
ois.close();
}
}
file.delete();

} else
{
logger.debug("隊(duì)列{}的持久化文件{}不存在", queueName, file.getAbsolutePath());
}
}


/** *//**
* 獲取持久化文件路徑.
* 持久化文件默認(rèn)路徑為java.io.tmpdir/queue/隊(duì)列名.
* 如果java.io.tmpdir/queue/目錄不存在,會(huì)進(jìn)行創(chuàng)建.
*/

protected File getPersistenceDir()
{
File parentDir = new File(persistencePath + File.separator);

if (!parentDir.exists())
{
parentDir.mkdirs();
}
return parentDir;
}


/** *//**
* 獲取持久化文件的名稱,默認(rèn)為queueName,可重載.
*/

protected String getPersistenceFileName()
{
return queueName;
}
}

這里終于可以確信JdbcLogWriter是一個(gè)實(shí)現(xiàn)了Runnable的線程類了。我們先略過那些保存日志到文件的方法,關(guān)注它的啟動(dòng)方法start()。在start方法中,用到了concurrent包的Executors來執(zhí)行線程任務(wù)。所以整個(gè)的過程是這樣的:
1、spring隨應(yīng)用啟動(dòng),創(chuàng)建QueuesHolder靜態(tài)類用于存放多種queueName的日志queue;創(chuàng)建JdbcLogWriter開始啟動(dòng)線程,不停循環(huán)處理日志。
2、log4j隨應(yīng)用啟動(dòng),并產(chǎn)生日志,把日志存到queue中(使用offer方法)。
3、JdbcLogWriter不停的把日志從queue中移出來(使用take方法)。
3、每當(dāng)有10條日志生成,JdbcLogWriter的updateBatch方法就把日志批量入庫,這個(gè)工作在processMesage方法里面。
這就是springside日志入庫的整個(gè)過程了,茲以為記。
4、
我的微博
http://t.sina.com.cn/1401900445