<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆-159  評論-114  文章-7  trackbacks-0
    最近在funplus做游戲,進而研究了一個新型架構。

    之前做游戲都是自己使用java搭建架構,經過幾年的積累確實也達到了最初的設想,多進程,進程內多線程,無鎖,0延遲純jdbc寫庫。對于單服架構來說,已經趨近于極致。

    今年小游戲盛行,如海盜來了,瘋狂游戲那家公司,全部使用的都是go+mongodb實現的,因為go的語言級別支援高并發,這點是java無法比擬的。不過java開源項目多,有很多的高手鋪墊了超多的框架,比如vertx,akka都可以更加充分的釋放java的能力。就看使用者的認識水平了。

    本次選擇vertx,主要是其在網絡通訊這塊,對netty的包裝,加上自己的eventloop模型,使得響應web請求速度基本屬于前3的水平。

    netServer = vertx.createHttpServer(httpServerOptions);
            netServer.requestHandler();
            netServer.requestHandler(hs 
    -> {
                
    if (hs.path().equals("/ping")) {
                    hs.response().end(
    "pong");
                    
    return;
                }
                hs.response().close();
                
    return;
            });
            
            netServer.websocketHandler(ws 
    -> {
                
    if (!ws.path().equals("/" + wsname)) {
                    ws.reject();
                    
    return;
                }
                Player player 
    = new Player(ws, ws.binaryHandlerID());
                players.put(player.getConnId(), player);
                player.setServerUUID(gateAdress);
                
    //日志
                if (log.isDebugEnabled()) {
                    SocketAddress addrLocal 
    = ws.localAddress();
                    log.debug(
    "新建一個連接:連接ID={},  本地端口={}, 遠程地址={}", player.getconnId(), addrLocal.port(), ws.remoteAddress());
                }
                
    //有連接過來了
                ws.binaryMessageHandler(data -> {
                    
    int readableBytes = data.length();
                    
    if (readableBytes < IMessage.MIN_MESSAGE_LENGTH) {
                        
    return;
                    }
                    
    int len = data.getShort(0);
                    
    if (len > 64 * 1024) {
                        log.error(
    "conn:" + player.getId() + "  發送數據包過大:" + len);
                        
    return;
                    }
                    
    if (readableBytes < len) {
                        
    return;
                    }

                    CGMessage msg 
    = decode(data);
                    
    if (msg == nullreturn;
                    inputHandler(msg, player);
                });
                ws.exceptionHandler(e 
    -> {
                    
    if (e.getMessage().equals("WebSocket is closed")) {
    //                    player.disconnect();
                    }
                    
    //斷連的日志就不打印堆棧了
                    if (e.getMessage().contains("Player reset by peer"|| e.getMessage().contains("遠程主機強迫關閉了一個現有的連接")) {
                        log.error(
    "主動斷開:connId={},cause={}", player.getconnId(), e.getCause());
                    } 
    else {
                        
    //輸出錯誤日志
                        log.error("發生異常:connId={},cause={}", player.getconnId(), e.getCause());
                    }
                });
                ws.closeHandler(t 
    -> {
    //                if (player == null) return;
                    
    //連接狀態
                    
    //日志
                    if (log.isDebugEnabled()) {
                        log.debug(
    "連接關閉:connId={}, status={}", player.getconnId(), player == null ? "" : player.toString());
                    }
                    
    if (player.getState() == PlayerState.connected || player.getState() == PlayerState.init || player.getState() == PlayerState.logouting) {
                        player.setState(PlayerState.logouted);
                        
    //Remove掉 session connId = Player
                        
    //刪掉連接對應的player
                        players.remove(player.getConnId());
                        
    return;
                    }
                    
    if (player.getUserInfo() == null) {
                        
    //刪掉連接對應的player
                        players.remove(player.getConnId());
                        
    return;
                    }
                    gateService.closePlayer(player.getconnId(), ar 
    -> {
                        
    if (ar.failed()) {
                            Loggers.coreLogger.error(
    "player connId:" + player.getconnId() + " 離線退出異常!!!" + ar.cause().getMessage());
                        }
                        
    //刪掉連接對應的player
                        players.remove(player.getConnId());
                    });

                });
            }).listen(port, host, res 
    -> {
                
    if (res.succeeded()) {
                    
    //啟動日志信息
                    log.info(" started. Listen: " + port + "  vert:" + vertx.hashCode());
                    future.complete();
                }
            });
    vertx能方便的使用eventloop線程池響應玩家發來的請求,并永遠在特定線程進行代碼調用。

    比自己使用hash線程池靠譜很多。ps. 自己造輪子不是不好,主要實現方法不一定測試完整,有意想不到的情況,就要自己來趟坑。

    后面主要是說一下,但如果大規模請求MongoDB,需要更高的MongoDB響應要求。進而想到要加緩存機制,最初想到的是redis+mongodb,自己實現讀通過,寫通過。
    如果redis不存在,則從mongodb讀取,并放入緩存,寫數據先寫緩存,后寫mongodb。

    自己實現的這種存儲機制,比較low。所以繼續尋找緩存方案。

    過程中,發現了一個曝光率不高的框架,也就是Apache Ignite。最新一代數據網格。

    關鍵的一步,就是如果讓vertx與Ignite工作到一起。這是一個必要的條件。

    package cn.empires;

    import cn.empires.common.Globals;
    import cn.empires.common.contants.Loggers;
    import cn.empires.gs.support.observer.Event;
    import cn.empires.verticle.OnlineVerticle;
    import io.vertx.core.DeploymentOptions;
    import io.vertx.core.Launcher;
    import io.vertx.core.Vertx;
    import io.vertx.core.VertxOptions;
    import io.vertx.core.json.JsonObject;

    public class MainLaunch extends Launcher {

        
    private JsonObject config;
        
        
    public static void main(String[] args) {
            System.setProperty(
    "logFileName""gateServer");
            
    new MainLaunch().dispatch(args);
        }
        
        @Override
        
    protected String getDefaultCommand() {
            
    return super.getDefaultCommand();
        }
     
        @Override
        
    protected String getMainVerticle() {
            
    return "cn.empires.verticle.GateVerticle";
        }
        
        @Override
        
    public void afterConfigParsed(JsonObject config) {
            
    super.afterConfigParsed(config);
            
    this.config = config;
        }
        
        @Override
        
    public void beforeStartingVertx(VertxOptions options) {
            options.setClustered(
    true);
        }
        
        @Override
        
    public void afterStartingVertx(Vertx vertx) {
            
    super.afterStartingVertx(vertx);
            
    //config.put("redis.password", "123456");
            
    //初始化全局相關信息
            ListenerInit.init(Event.instance);
            Loggers.coreLogger.info(
    "Globals init .");
            Globals.init(vertx, config);
            vertx.deployVerticle(OnlineVerticle.
    classnew DeploymentOptions().setConfig(config));
        }
        
        @Override
        
    public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {
            
    super.beforeDeployingVerticle(deploymentOptions);
        }
        
        @Override
        
    public void beforeStoppingVertx(Vertx vertx) {
            
    super.beforeStoppingVertx(vertx);
        }
        
        @Override
        
    public void afterStoppingVertx() {
            
    super.afterStoppingVertx();
        }
        
        @Override
        
    public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) {
            
    super.handleDeployFailed(vertx, mainVerticle, deploymentOptions, cause);
        }
        
    }

    如果想使用Ignite的緩存,必須需要Ignite實例對象。否則無法獲取。
    if (ignite == null) {
         ClusterManager clusterManager 
    = ((VertxInternal) vertx).getClusterManager();
         String uuid 
    = clusterManager.getNodeID();
         ignite 
    = Ignition.ignite(UUID.fromString(uuid));
    }

    在classpath中,配置一個ignite.xml,vertx啟動的時候自動會加載ignite.xml,然后使用IgniteManager進行集群管理。
    我只貼一遍ignite.xml配置
    <?xml version="1.0" encoding="UTF-8"?>

    <!--
      Licensed to the Apache Software Foundation (ASF) under one or more
      contributor license agreements.  See the NOTICE file distributed with
      this work for additional information regarding copyright ownership.
      The ASF licenses this file to You under the Apache License, Version 2.0
      (the "License"); you may not use this file except in compliance with
      the License.  You may obtain a copy of the License at

           http://www.apache.org/licenses/LICENSE-2.0

      Unless required by applicable law or agreed to in writing, software
      distributed under the License is distributed on an "AS IS" BASIS,
      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      See the License for the specific language governing permissions and
      limitations under the License.
    -->

    <!--
        Ignite Spring configuration file to startup Ignite cache.

        This file demonstrates how to configure cache using Spring. Provided cache
        will be created on node startup.

        Use this configuration file when running HTTP REST examples (see 'examples/rest' folder).

        When starting a standalone node, you need to execute the following command:
        {IGNITE_HOME}/bin/ignite.{bat|sh} examples/config/example-cache.xml

        When starting Ignite from Java IDE, pass path to this file to Ignition:
        Ignition.start("examples/config/example-cache.xml");
    -->
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi
    ="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation
    ="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd"
    >
        
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
            
    <property name="dataStorageConfiguration">
                
    <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
                    
    <!-- Set the page size to 4 KB -->
                      
    <property name="pageSize" value="4096"/>
                    
    <!-- Set concurrency level -->
                      
    <property name="concurrencyLevel" value="6"/>
                      
    <property name="systemRegionInitialSize" value="#{40 * 1024 * 1024}"/>
                      
    <property name="systemRegionMaxSize" value="#{80 * 1024 * 1024}"/>
                      
    <property name="defaultDataRegionConfiguration">
                        
    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                            
    <property name="name" value="Default_Region"/>
                            
    <!-- 設置默認內存區最大內存為 512M. -->
                            
    <property name="maxSize" value="#{512L * 1024 * 1024}"/>
                            
    <!-- Enabling RANDOM_LRU eviction for this region.  -->
                                
    <property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                        
    </bean>
                    
    </property>
                   
    <property name="dataRegionConfigurations">
                        
    <list>
                          
    <!--
                              Defining a data region that will consume up to 500 MB of RAM and 
                              will have eviction and persistence enabled.
                          
    -->
                          
    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                            
    <!-- Custom region name. -->
                            
    <property name="name" value="500MB_Region"/>
                
                            
    <!-- 100 MB initial size. -->
                            
    <property name="initialSize" value="#{100L * 1024 * 1024}"/>
                
                            
    <!-- 500 MB maximum size. -->
                            
    <property name="maxSize" value="#{500L * 1024 * 1024}"/>
                            
                            
    <!-- Enabling RANDOM_LRU eviction for this region.  -->
                                
    <property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                          
    </bean>
                        
    </list>
                    
    </property>
                
    </bean>
            
    </property>
            
    <property name="cacheConfiguration">
                
    <list>
                       
    <bean class="org.apache.ignite.configuration.CacheConfiguration">
                               
    <property name="name" value="UserInfo"/>
                               
    <property name="cacheMode" value="PARTITIONED"/>
                               
    <property name="atomicityMode" value="ATOMIC"/>
                               
    <property name="backups" value="0"/>
                               
    <property name="cacheStoreFactory">
                                   
    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                                       
    <constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
                                   
    </bean>
                               
    </property>
                               
    <property name="readThrough" value="true"/>
                               
    <property name="writeThrough" value="true"/>
                               
    <property name="writeBehindEnabled" value="true"/>
                               
    <property name="writeBehindFlushSize" value="1024"/>
                               
    <property name="writeBehindFlushFrequency" value="5"/>
                               
    <property name="writeBehindFlushThreadCount" value="1"/>
                               
    <property name="writeBehindBatchSize" value="512"/>
                               
    <property name="dataRegionName" value="Default_Region"/>
                    
    </bean>
                
    </list>
            
    </property>
            
    <property name="failureDetectionTimeout" value="60000"/>
            
    <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
            
    <property name="discoverySpi">
                
    <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                    
    <property name="ipFinder">
                        
    <!--
                            Ignite provides several options for automatic discovery that can be used
                            instead os static IP based discovery. For information on all options refer
                            to our documentation: http://apacheignite.readme.io/docs/cluster-config
                        
    -->
                        
    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                        
    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        
    <!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> -->
                            
    <property name="addresses">
                                
    <list>
                                    
    <!-- In distributed environment, replace with actual host IP address. -->
                                    
    <value>127.0.0.1:47500..47509</value>
                                
    </list>
                            
    </property>
                        
    </bean>
                    
    </property>
                
    </bean>
            
    </property>
        
    </bean>
    </beans>

    Ignite 對內存有細致的劃分,可以分多個區域Region,每個區域有自己的配置,比如設置初始大小和最大大小,以及淘汰策略。
    UserInfo對應的CacheConfigurationCache使用進行了配置,比如readThrough writeThrough writeBehindEnabled等等,細致的配置諸如后寫刷新頻率writeBehindFlushFrequency為5,表示5秒才會刷新一次更新數據。

        public static <T> IgniteCache<String, T> createIgniteCache(String cacheName, Class<? extends CacheStoreAdapter<String, T>> clazz) {
            CacheConfiguration
    <String, T> cacheCfg = new CacheConfiguration<>(cacheName);
            
    return Globals.ignite().getOrCreateCache(cacheCfg);
        }

    在Globals工具類,提供工具方法獲得IgniteCache對象。

    package cn.empires.gs.player.service.impl;

    import org.apache.ignite.IgniteCache;
    import org.apache.ignite.lang.IgniteFuture;

    import cn.empires.common.Globals;
    import cn.empires.common.cache.UserCacheStore;
    import cn.empires.common.service.ServiceBase;
    import cn.empires.gs.model.UserInfo;
    import cn.empires.gs.player.service.UserService;
    import io.vertx.core.AsyncResult;
    import io.vertx.core.Future;
    import io.vertx.core.Handler;
    import io.vertx.core.Vertx;
    import io.vertx.core.json.JsonObject;

    public class UserServiceImpl extends ServiceBase implements UserService {
        
        
    private final IgniteCache<String, UserInfo> cache;

        
    public UserServiceImpl(Vertx vertx, JsonObject config) {
            
    super(vertx, config);
            cache 
    = Globals.createIgniteCache(UserInfo.tableName, UserCacheStore.class);
        }

        @Override
        
    public UserService getUserInfo(String id, Handler<AsyncResult<UserInfo>> handler) {
            IgniteFuture
    <UserInfo> future = cache.getAsync(id);
            future.listen(h 
    -> {
                
    if(h.isDone()) {
                    handler.handle(Future.succeededFuture(h.get()));
                }
            });        
            
    return this;
        }
        

        @Override
        
    public UserService saveUserInfo(UserInfo userInfo, Handler<AsyncResult<UserInfo>> handler) {
            IgniteFuture
    <Void> future = cache.putAsync(userInfo.get_id(), userInfo);
            future.listen(h 
    -> {
                
    if(h.isDone()) {
                    handler.handle(Future.succeededFuture(userInfo));
                }
            });
            
    return this;
        }

    }

    最后一件事,就是同步寫庫,可以讀通過從MongoDB進行讀取。

    package cn.empires.common.cache;

    import java.util.ArrayList;
    import java.util.List;

    import javax.cache.Cache.Entry;
    import javax.cache.integration.CacheLoaderException;
    import javax.cache.integration.CacheWriterException;

    import org.apache.ignite.IgniteException;
    import org.apache.ignite.cache.store.CacheStoreAdapter;
    import org.apache.ignite.lifecycle.LifecycleAware;
    import org.bson.Document;

    import com.mongodb.Block;
    import com.mongodb.client.FindIterable;
    import com.mongodb.client.MongoCollection;
    import com.mongodb.client.model.Filters;
    import com.mongodb.client.model.UpdateOptions;

    import cn.empires.common.Globals;
    import cn.empires.common.contants.Loggers;
    import cn.empires.gs.model.UserInfo;
    import io.vertx.core.json.JsonObject;

    public class UserCacheStore extends CacheStoreAdapter<String, UserInfo> implements LifecycleAware {
        
        
    /** Mongo collection. */
        
    private MongoCollection<Document> collection;
        
        @Override
        
    public void start() throws IgniteException {
        }

        @Override
        
    public UserInfo load(String key) throws CacheLoaderException {
            
    if(collection == null) {
                collection 
    = Globals.mongoDb().getCollection(UserInfo.tableName);
            }
            FindIterable
    <Document> iter = collection.find(Filters.eq("_id", key));
            
    final List<JsonObject> result = new ArrayList<>(1);
            iter.forEach(
    new Block<Document>() {
                
    public void apply(Document _doc) {
                    result.add(
    new JsonObject(_doc.toJson()));
                }
            });
            
    if(result != null && !result.isEmpty()) {
                Loggers.userLogger.info(
    "CacheStore load UserInfo.");
                JsonObject jsonObj 
    = result.get(0);
                
    return UserInfo.fromDB(jsonObj);
            }
            
    return null;
        }

        @Override
        
    public void write(Entry<? extends String, ? extends UserInfo> entry) throws CacheWriterException {
            
    if(collection == null) {
                collection 
    = Globals.mongoDb().getCollection(UserInfo.tableName);
            }
            Document filter 
    = new Document();
            filter.append(
    "_id", entry.getKey());
            
            Document replacement 
    = new Document();
            replacement.append(
    "value", entry.getValue().toString());
            collection.replaceOne(filter, replacement, 
    new UpdateOptions().upsert(true));
            Loggers.userLogger.info(
    "CacheStore saved UserInfo.");
        }

        @Override
        
    public void delete(Object key) throws CacheWriterException {
            
        }



        @Override
        
    public void stop() throws IgniteException {
            
        }

    }

    由于在ignite.xml中進行了配置
    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
        
    <constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
    </bean>
    ,所以在使用Cache獲取UserInfo的時候,如果不存在對應的信息,就會從MongoDB讀取。

    更多的信息只能下一篇文章再寫了。有問題可以留言。

    posted on 2018-11-13 14:29 北國狼人的BloG 閱讀(1597) 評論(0)  編輯  收藏

    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 午夜一级毛片免费视频| 最近最好最新2019中文字幕免费| 青青青国产在线观看免费| 亚洲天堂在线播放| 亚洲毛片免费观看| 亚洲xxxx18| 日本免费人成视频播放| 精品国产亚洲一区二区三区在线观看| 国内一级一级毛片a免费| 亚洲天堂2016| 免费观看美女裸体网站| 美女被羞羞网站免费下载| 亚洲乱码日产精品a级毛片久久| 人与动性xxxxx免费| 亚洲人成色777777在线观看| 久久久免费的精品| 亚洲人成免费电影| 国产jizzjizz免费视频| 国产99精品一区二区三区免费| 不卡一卡二卡三亚洲| 毛片在线全部免费观看| 亚洲精品亚洲人成在线观看麻豆| 成熟女人牲交片免费观看视频| 亚洲AV日韩综合一区| 一本久久a久久精品亚洲| 国产成人久久AV免费| 亚洲香蕉久久一区二区三区四区| 国产免费观看a大片的网站| 成人A毛片免费观看网站| 亚洲欧洲日产国码二区首页| 在线观看免费宅男视频| aa毛片免费全部播放完整| 亚洲精品国产成人| 亚洲国产成人乱码精品女人久久久不卡| 水蜜桃视频在线观看免费播放高清| 亚洲网站在线播放| AV在线播放日韩亚洲欧| 午夜国产精品免费观看| 人成免费在线视频| 亚洲男人的天堂久久精品| 亚洲国产婷婷香蕉久久久久久|