<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Dict.CN 在線詞典, 英語學習, 在線翻譯

    都市淘沙者

    荔枝FM Everyone can be host

    統計

    留言簿(23)

    積分與排名

    優秀學習網站

    友情連接

    閱讀排行榜

    評論排行榜

    java線程控制器實現(轉)

    java線程控制器代碼分享-根據cpu情況決定線程運行數量和情況

    原文地址:請點擊標題即可。


    在人人網海量存儲系統的存儲引擎部分,為了提高CPU和網絡的使用情況,使用了java多線程管理并行操作的方式。

    在java中控制線程是一件很簡單的事情,jdk提供了諸多的方法,其中比常用的兩個是notify()和wait(),一個是喚醒,一個等待線程,在下面的代碼中,將看到一個線程分配器,根據cpu的負載情況,自動完成對應線程的喚醒或者是等待操作。整個過程是一個平滑的過程,不會因為線程的切換而導致機器負載出線鋸齒。

    先看一個類,讀取Linux系統TOP等指令拿到系統當前負載:

        
    import java.io.BufferedReader;
        
    import java.io.InputStreamReader;

        
    /**
        * 節點的cpu 內存 磁盤空間 情況
        *
        * 
    @author zhen.chen
        *
        
    */
        
    public class NodeLoadView {

        
    /**
        * 獲取cpu使用情況
        *
        * 
    @return
        * 
    @throws Exception
        
    */
        
    public double getCpuUsage() throws Exception {
        
    double cpuUsed = 0;

        Runtime rt 
    = Runtime.getRuntime();
        Process p 
    = rt.exec(“/usr/bin/uptime”);// 調用系統的“top”命令
        String[] strArray = null;
        BufferedReader in 
    = null;
        
    try {
        in 
    = new BufferedReader(new InputStreamReader(p.getInputStream()));
        String str 
    = null;
        
    while ((str = in.readLine()) != null) {
        strArray 
    = str.split(“load average: “);
        strArray 
    = strArray[1].split(“,”);
        cpuUsed 
    = Double.parseDouble(strArray[0]);
        }
        } 
    catch (Exception e) {
        e.printStackTrace();
        } 
    finally {
        in.close();
        }
        
    return cpuUsed;
        }

        
    /**
        * 內存監控
        *
        * 
    @return
        * 
    @throws Exception
        
    */
        
    public double getMemUsage() throws Exception {

        
    double menUsed = 0;
        Runtime rt 
    = Runtime.getRuntime();
        Process p 
    = rt.exec(“top --1″);// 調用系統的“top”命令

        BufferedReader in 
    = null;
        
    try {
        in 
    = new BufferedReader(new InputStreamReader(p.getInputStream()));
        String str 
    = null;
        String[] strArray 
    = null;

        
    while ((str = in.readLine()) != null) {
        
    int m = 0;

        
    if (str.indexOf(” R “) != -1) {// 只分析正在運行的進程,top進程本身除外 &&
        
    //
        
    // System.out.println(“——————3—————–”);
        strArray = str.split(” “);
        
    for (String tmp : strArray) {
        
    if (tmp.trim().length() == 0)
        
    continue;

        
    if (++== 10) {
        
    // 9)–第10列為mem的使用百分比(RedHat 9)

        menUsed 
    += Double.parseDouble(tmp);

        }
        }

        }
        }
        } 
    catch (Exception e) {
        e.printStackTrace();
        } 
    finally {
        in.close();
        }
        
    return menUsed;
        }

        
    /**
        * 獲取磁盤空間大小
        *
        * 
    @return
        * 
    @throws Exception
        
    */
        
    public double getDeskUsage() throws Exception {
        
    double totalHD = 0;
        
    double usedHD = 0;
        Runtime rt 
    = Runtime.getRuntime();
        Process p 
    = rt.exec(“df -hl”);// df -hl 查看硬盤空間

        BufferedReader in 
    = null;
        
    try {
        in 
    = new BufferedReader(new InputStreamReader(p.getInputStream()));
        String str 
    = null;
        String[] strArray 
    = null;
        
    while ((str = in.readLine()) != null) {
        
    int m = 0;
        
    // if (flag > 0) {
        
    // flag++;
        strArray = str.split(” “);
        
    for (String tmp : strArray) {
        
    if (tmp.trim().length() == 0)
        
    continue;
        
    ++m;
        
    // System.out.println(“—-tmp—-” + tmp);
        if (tmp.indexOf(“G”) != -1) {
        
    if (m == 2) {
        
    // System.out.println(“—G—-” + tmp);
        if (!tmp.equals(“”) && !tmp.equals(“0″))
        totalHD 
    += Double.parseDouble(tmp.substring(0,
        tmp.length() – 
    1)) * 1024;

        }
        
    if (m == 3) {
        
    // System.out.println(“—G—-” + tmp);
        if (!tmp.equals(“none”) && !tmp.equals(“0″))
        usedHD 
    += Double.parseDouble(tmp.substring(0,
        tmp.length() – 
    1)) * 1024;

        }
        }
        
    if (tmp.indexOf(“M”) != -1) {
        
    if (m == 2) {
        
    // System.out.println(“—M—” + tmp);
        if (!tmp.equals(“”) && !tmp.equals(“0″))
        totalHD 
    += Double.parseDouble(tmp.substring(0,
        tmp.length() – 
    1));

        }
        
    if (m == 3) {
        
    // System.out.println(“—M—” + tmp);
        if (!tmp.equals(“none”) && !tmp.equals(“0″))
        usedHD 
    += Double.parseDouble(tmp.substring(0,
        tmp.length() – 
    1));
        
    // System.out.println(“—-3—-” + usedHD);
        }
        }

        }

        
    // }
        }
        } 
    catch (Exception e) {
        e.printStackTrace();
        } 
    finally {
        in.close();
        }
        
    return (usedHD / totalHD) * 100;
        }
        
    //
        
    //    public static void main(String[] args) throws Exception {
        
    //        NodeLoadView cpu = new NodeLoadView();
        
    //        System.out
        
    //                .println(“—————cpu used:” + cpu.getCpuUsage() + “%”);
        
    //        System.out
        
    //                .println(“—————mem used:” + cpu.getMemUsage() + “%”);
        
    //        System.out
        
    //                .println(“—————HD used:” + cpu.getDeskUsage() + “%”);
        
    //        System.out.println(“————jvm監控———————-”);
        
    //        Runtime lRuntime = Runtime.getRuntime();
        
    //        System.out.println(“————–Free Momery:” + lRuntime.freeMemory()
        
    //                + “K”);
        
    //        System.out.println(“————–Max Momery:” + lRuntime.maxMemory()
        
    //                + “K”);
        
    //        System.out.println(“————–Total Momery:”
        
    //                + lRuntime.totalMemory() + “K”);
        
    //        System.out.println(“—————Available Processors :”
        
    //                + lRuntime.availableProcessors());
        
    //    }
        }

    再來看關鍵的一個類,THreadScheduler:

        
    import java.util.Map;

        
    import org.apache.log4j.Logger;

        
    import test.NodeLoadView;
        
    public class ThreadScheduler {
        
    private static Logger logger = Logger.getLogger(ThreadScheduler.class.getName());
        
    private Map<String, Thread> runningThreadMap;
        
    private Map<String, Thread> waitingThreadMap;
        
    private boolean isFinished = false;
        
    private int runningSize;

        
    public ThreadScheduler (Map<String, Thread> runningThreadMap, Map<String, Thread> waitingThreadMap) {
        
    this.runningThreadMap = runningThreadMap;
        
    this.waitingThreadMap = waitingThreadMap;
        
    this.runningSize = waitingThreadMap.size();
        }

        
    /**
        * 開始調度線程
        * 
    @author zhen.chen
        * @createTime 2010-1-28 上午11:04:52
        
    */
        
    public void schedule(){
        
    long sleepMilliSecond = 1 * 1000;
        
    int allowRunThreads = 15;
        
    // 一次啟動的線程數,cpuLoad變大時以此值為參考遞減
        int allowRunThreadsRef = 15;
        
    double cpuLoad = 0;// 0-15
        NodeLoadView load = new NodeLoadView();

        
    while (true) {
        
    try {
        cpuLoad 
    = load.getCpuUsage();
        } 
    catch (Exception e1) {
        e1.printStackTrace();
        }
        
    // cpuLoad低 啟動的線程多
        allowRunThreads = (int) Math.floor(allowRunThreadsRef – cpuLoad);
        
    // threads不能為0
        if (allowRunThreads < 1) {
        allowRunThreads 
    = 1;
        }
        
    if (allowRunThreads > allowRunThreadsRef) {
        allowRunThreads 
    = allowRunThreadsRef;
        }
        
    if (logger.isDebugEnabled()) {
        logger.debug(“[ThreadScheduler]running Thread:” 
    + runningThreadMap.size() + “; waiting Thread:” + waitingThreadMap.size() + “; cpu:” + cpuLoad + ” allowRunThreads:” + allowRunThreads);
        }

        
    // 檢查runningSize個線程的情況,滿足條件則啟動
        for (int x = 0; x < runningSize; x++) {
        
    if (waitingThreadMap.get(x+") != null) {
        if (allowRunThreadsRef <= runningThreadMap.size()) {
        
    break;
        }
        
    synchronized (waitingThreadMap.get(x+")) {
        if (!waitingThreadMap.get(x+").isAlive()) {
        waitingThreadMap.get(x+").start();
        }else{
        waitingThreadMap.get(x
    +").notify();
        }
        }
        runningThreadMap.put(x
    +", waitingThreadMap.get(x+”"));
        waitingThreadMap.remove(x
    +");
        }
        }
        
    // 檢查runningSize個線程的情況,滿足條件則暫停
        for (int x = 0; x < runningSize; x++) {
        
    if (runningThreadMap.size() <= allowRunThreads) {
        
    break;
        }
        
    if (runningThreadMap.get(x+") != null) {
        synchronized (runningThreadMap.get(x+")) {
        try {
        
    if (runningThreadMap.get(x+").isAlive()) {
        runningThreadMap.get(x+").wait();
        }else{
        
    continue;
        }
        } 
    catch (InterruptedException e) {
        e.printStackTrace();
        }
        }
        waitingThreadMap.put(x
    +", runningThreadMap.get(x));
        runningThreadMap.remove(x+");
        }
        }
        
    // 全部跑完,返回
        if (waitingThreadMap.size() == 0 && runningThreadMap.size() == 0) {
        
    if (logger.isDebugEnabled()) {
        logger.debug(“[ThreadScheduler] over.total Threads size:” 
    + runningSize);
        }
        
    this.isFinished = true;
        
    return;
        }
        
    // 使主while循環慢一點
        try {
        Thread.sleep(sleepMilliSecond);
        } 
    catch (InterruptedException e1) {
        e1.printStackTrace();
        }
        }

        }

        
    public boolean isFinished() {
        
    return isFinished;
        }
        }

    這個類的作用:

    1.接收runningThreadMap和waitingThreadMap兩個map,里面對應存了運行中的線程實例和等待中的線程實例。

    2.讀cpu情況,自動判斷要notify等待中的線程還是wait運行中的線程。

    3.兩個map都結束,退出。(必須runningThreadMap內部的Thread自己將runningThreadMap對應的Thread remove掉)

    如何使用:

        
    public class TestThread {
        
    public static class Runner extends Thread {
        
    public Runner(int j, Map<String, Thread> threadMap) {

        }
        
    public void run() {
        
    // TODO 你的邏輯 完成后需要從threadMap中remove掉
        }
        }

        
    public static void main(String[] args) {
        
    // 運行中的線程
        Map<String, Thread> threadMap = new HashMap<String, Thread>();
        
    // 正在等待中的線程
        Map<String, Thread> waitThreadMap = new HashMap<String, Thread>();
        
    for (int j = 0; j < args.length; j++) {
        Thread t 
    = new Runner(j, threadMap);
        waitThreadMap.put(j 
    + “”, t);
        }

        ThreadScheduler threadScheduler 
    = new ThreadScheduler(threadMap, waitThreadMap);
        threadScheduler.schedule();
        
    if (threadScheduler.isFinished() == false) {
        
    //沒能正常結束
        }
        }
        }


    posted on 2010-08-18 07:59 都市淘沙者 閱讀(1160) 評論(0)  編輯  收藏 所屬分類: 多線程并發編程

    主站蜘蛛池模板: 最近新韩国日本免费观看| 国产亚洲精品免费视频播放| 久久精品国产这里是免费| 亚洲av无码专区在线观看下载| 亚洲国产精品无码av| 免费人成在线观看播放国产 | 国产成人免费爽爽爽视频 | 亚洲一区二区三区无码影院| 久久这里只有精品国产免费10| 久久免费观看国产精品88av| 久久国产免费直播| 深夜特黄a级毛片免费播放| 中文字幕 亚洲 有码 在线| 精品无码一区二区三区亚洲桃色| 亚洲午夜福利AV一区二区无码| 国产免费人人看大香伊| 免费的涩涩视频在线播放| 丁香花免费完整高清观看| 久久精品国产免费观看三人同眠 | 亚洲va久久久噜噜噜久久狠狠| 亚洲国产成人精品久久久国产成人一区二区三区综 | 中文永久免费观看网站| 美女黄频视频大全免费的| 中文无码亚洲精品字幕| 国产成人精品日本亚洲专| 亚洲网站在线播放| 亚洲欧洲综合在线| 亚洲第一区视频在线观看| 亚洲福利视频网址| 久久亚洲精品专区蓝色区| 亚洲www77777| 亚洲国产精品日韩av不卡在线 | 日韩免费一区二区三区| 青青草国产免费久久久下载| 午夜成人免费视频| 日韩免费在线观看| 亚洲精品成人片在线观看| 国产午夜亚洲精品午夜鲁丝片| 国产亚洲欧洲Aⅴ综合一区| 亚洲热线99精品视频| 久久精品国产96精品亚洲|