<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆-159  評論-114  文章-7  trackbacks-0
    最近在funplus做游戲,進而研究了一個新型架構。

    之前做游戲都是自己使用java搭建架構,經過幾年的積累確實也達到了最初的設想,多進程,進程內多線程,無鎖,0延遲純jdbc寫庫。對于單服架構來說,已經趨近于極致。

    今年小游戲盛行,如海盜來了,瘋狂游戲那家公司,全部使用的都是go+mongodb實現的,因為go的語言級別支援高并發,這點是java無法比擬的。不過java開源項目多,有很多的高手鋪墊了超多的框架,比如vertx,akka都可以更加充分的釋放java的能力。就看使用者的認識水平了。

    本次選擇vertx,主要是其在網絡通訊這塊,對netty的包裝,加上自己的eventloop模型,使得響應web請求速度基本屬于前3的水平。

    netServer = vertx.createHttpServer(httpServerOptions);
            netServer.requestHandler();
            netServer.requestHandler(hs 
    -> {
                
    if (hs.path().equals("/ping")) {
                    hs.response().end(
    "pong");
                    
    return;
                }
                hs.response().close();
                
    return;
            });
            
            netServer.websocketHandler(ws 
    -> {
                
    if (!ws.path().equals("/" + wsname)) {
                    ws.reject();
                    
    return;
                }
                Player player 
    = new Player(ws, ws.binaryHandlerID());
                players.put(player.getConnId(), player);
                player.setServerUUID(gateAdress);
                
    //日志
                if (log.isDebugEnabled()) {
                    SocketAddress addrLocal 
    = ws.localAddress();
                    log.debug(
    "新建一個連接:連接ID={},  本地端口={}, 遠程地址={}", player.getconnId(), addrLocal.port(), ws.remoteAddress());
                }
                
    //有連接過來了
                ws.binaryMessageHandler(data -> {
                    
    int readableBytes = data.length();
                    
    if (readableBytes < IMessage.MIN_MESSAGE_LENGTH) {
                        
    return;
                    }
                    
    int len = data.getShort(0);
                    
    if (len > 64 * 1024) {
                        log.error(
    "conn:" + player.getId() + "  發送數據包過大:" + len);
                        
    return;
                    }
                    
    if (readableBytes < len) {
                        
    return;
                    }

                    CGMessage msg 
    = decode(data);
                    
    if (msg == nullreturn;
                    inputHandler(msg, player);
                });
                ws.exceptionHandler(e 
    -> {
                    
    if (e.getMessage().equals("WebSocket is closed")) {
    //                    player.disconnect();
                    }
                    
    //斷連的日志就不打印堆棧了
                    if (e.getMessage().contains("Player reset by peer"|| e.getMessage().contains("遠程主機強迫關閉了一個現有的連接")) {
                        log.error(
    "主動斷開:connId={},cause={}", player.getconnId(), e.getCause());
                    } 
    else {
                        
    //輸出錯誤日志
                        log.error("發生異常:connId={},cause={}", player.getconnId(), e.getCause());
                    }
                });
                ws.closeHandler(t 
    -> {
    //                if (player == null) return;
                    
    //連接狀態
                    
    //日志
                    if (log.isDebugEnabled()) {
                        log.debug(
    "連接關閉:connId={}, status={}", player.getconnId(), player == null ? "" : player.toString());
                    }
                    
    if (player.getState() == PlayerState.connected || player.getState() == PlayerState.init || player.getState() == PlayerState.logouting) {
                        player.setState(PlayerState.logouted);
                        
    //Remove掉 session connId = Player
                        
    //刪掉連接對應的player
                        players.remove(player.getConnId());
                        
    return;
                    }
                    
    if (player.getUserInfo() == null) {
                        
    //刪掉連接對應的player
                        players.remove(player.getConnId());
                        
    return;
                    }
                    gateService.closePlayer(player.getconnId(), ar 
    -> {
                        
    if (ar.failed()) {
                            Loggers.coreLogger.error(
    "player connId:" + player.getconnId() + " 離線退出異常!!!" + ar.cause().getMessage());
                        }
                        
    //刪掉連接對應的player
                        players.remove(player.getConnId());
                    });

                });
            }).listen(port, host, res 
    -> {
                
    if (res.succeeded()) {
                    
    //啟動日志信息
                    log.info(" started. Listen: " + port + "  vert:" + vertx.hashCode());
                    future.complete();
                }
            });
    vertx能方便的使用eventloop線程池響應玩家發來的請求,并永遠在特定線程進行代碼調用。

    比自己使用hash線程池靠譜很多。ps. 自己造輪子不是不好,主要實現方法不一定測試完整,有意想不到的情況,就要自己來趟坑。

    后面主要是說一下,但如果大規模請求MongoDB,需要更高的MongoDB響應要求。進而想到要加緩存機制,最初想到的是redis+mongodb,自己實現讀通過,寫通過。
    如果redis不存在,則從mongodb讀取,并放入緩存,寫數據先寫緩存,后寫mongodb。

    自己實現的這種存儲機制,比較low。所以繼續尋找緩存方案。

    過程中,發現了一個曝光率不高的框架,也就是Apache Ignite。最新一代數據網格。

    關鍵的一步,就是如果讓vertx與Ignite工作到一起。這是一個必要的條件。

    package cn.empires;

    import cn.empires.common.Globals;
    import cn.empires.common.contants.Loggers;
    import cn.empires.gs.support.observer.Event;
    import cn.empires.verticle.OnlineVerticle;
    import io.vertx.core.DeploymentOptions;
    import io.vertx.core.Launcher;
    import io.vertx.core.Vertx;
    import io.vertx.core.VertxOptions;
    import io.vertx.core.json.JsonObject;

    public class MainLaunch extends Launcher {

        
    private JsonObject config;
        
        
    public static void main(String[] args) {
            System.setProperty(
    "logFileName""gateServer");
            
    new MainLaunch().dispatch(args);
        }
        
        @Override
        
    protected String getDefaultCommand() {
            
    return super.getDefaultCommand();
        }
     
        @Override
        
    protected String getMainVerticle() {
            
    return "cn.empires.verticle.GateVerticle";
        }
        
        @Override
        
    public void afterConfigParsed(JsonObject config) {
            
    super.afterConfigParsed(config);
            
    this.config = config;
        }
        
        @Override
        
    public void beforeStartingVertx(VertxOptions options) {
            options.setClustered(
    true);
        }
        
        @Override
        
    public void afterStartingVertx(Vertx vertx) {
            
    super.afterStartingVertx(vertx);
            
    //config.put("redis.password", "123456");
            
    //初始化全局相關信息
            ListenerInit.init(Event.instance);
            Loggers.coreLogger.info(
    "Globals init .");
            Globals.init(vertx, config);
            vertx.deployVerticle(OnlineVerticle.
    classnew DeploymentOptions().setConfig(config));
        }
        
        @Override
        
    public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {
            
    super.beforeDeployingVerticle(deploymentOptions);
        }
        
        @Override
        
    public void beforeStoppingVertx(Vertx vertx) {
            
    super.beforeStoppingVertx(vertx);
        }
        
        @Override
        
    public void afterStoppingVertx() {
            
    super.afterStoppingVertx();
        }
        
        @Override
        
    public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) {
            
    super.handleDeployFailed(vertx, mainVerticle, deploymentOptions, cause);
        }
        
    }

    如果想使用Ignite的緩存,必須需要Ignite實例對象。否則無法獲取。
    if (ignite == null) {
         ClusterManager clusterManager 
    = ((VertxInternal) vertx).getClusterManager();
         String uuid 
    = clusterManager.getNodeID();
         ignite 
    = Ignition.ignite(UUID.fromString(uuid));
    }

    在classpath中,配置一個ignite.xml,vertx啟動的時候自動會加載ignite.xml,然后使用IgniteManager進行集群管理。
    我只貼一遍ignite.xml配置
    <?xml version="1.0" encoding="UTF-8"?>

    <!--
      Licensed to the Apache Software Foundation (ASF) under one or more
      contributor license agreements.  See the NOTICE file distributed with
      this work for additional information regarding copyright ownership.
      The ASF licenses this file to You under the Apache License, Version 2.0
      (the "License"); you may not use this file except in compliance with
      the License.  You may obtain a copy of the License at

           http://www.apache.org/licenses/LICENSE-2.0

      Unless required by applicable law or agreed to in writing, software
      distributed under the License is distributed on an "AS IS" BASIS,
      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      See the License for the specific language governing permissions and
      limitations under the License.
    -->

    <!--
        Ignite Spring configuration file to startup Ignite cache.

        This file demonstrates how to configure cache using Spring. Provided cache
        will be created on node startup.

        Use this configuration file when running HTTP REST examples (see 'examples/rest' folder).

        When starting a standalone node, you need to execute the following command:
        {IGNITE_HOME}/bin/ignite.{bat|sh} examples/config/example-cache.xml

        When starting Ignite from Java IDE, pass path to this file to Ignition:
        Ignition.start("examples/config/example-cache.xml");
    -->
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi
    ="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation
    ="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd"
    >
        
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
            
    <property name="dataStorageConfiguration">
                
    <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
                    
    <!-- Set the page size to 4 KB -->
                      
    <property name="pageSize" value="4096"/>
                    
    <!-- Set concurrency level -->
                      
    <property name="concurrencyLevel" value="6"/>
                      
    <property name="systemRegionInitialSize" value="#{40 * 1024 * 1024}"/>
                      
    <property name="systemRegionMaxSize" value="#{80 * 1024 * 1024}"/>
                      
    <property name="defaultDataRegionConfiguration">
                        
    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                            
    <property name="name" value="Default_Region"/>
                            
    <!-- 設置默認內存區最大內存為 512M. -->
                            
    <property name="maxSize" value="#{512L * 1024 * 1024}"/>
                            
    <!-- Enabling RANDOM_LRU eviction for this region.  -->
                                
    <property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                        
    </bean>
                    
    </property>
                   
    <property name="dataRegionConfigurations">
                        
    <list>
                          
    <!--
                              Defining a data region that will consume up to 500 MB of RAM and 
                              will have eviction and persistence enabled.
                          
    -->
                          
    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                            
    <!-- Custom region name. -->
                            
    <property name="name" value="500MB_Region"/>
                
                            
    <!-- 100 MB initial size. -->
                            
    <property name="initialSize" value="#{100L * 1024 * 1024}"/>
                
                            
    <!-- 500 MB maximum size. -->
                            
    <property name="maxSize" value="#{500L * 1024 * 1024}"/>
                            
                            
    <!-- Enabling RANDOM_LRU eviction for this region.  -->
                                
    <property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                          
    </bean>
                        
    </list>
                    
    </property>
                
    </bean>
            
    </property>
            
    <property name="cacheConfiguration">
                
    <list>
                       
    <bean class="org.apache.ignite.configuration.CacheConfiguration">
                               
    <property name="name" value="UserInfo"/>
                               
    <property name="cacheMode" value="PARTITIONED"/>
                               
    <property name="atomicityMode" value="ATOMIC"/>
                               
    <property name="backups" value="0"/>
                               
    <property name="cacheStoreFactory">
                                   
    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                                       
    <constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
                                   
    </bean>
                               
    </property>
                               
    <property name="readThrough" value="true"/>
                               
    <property name="writeThrough" value="true"/>
                               
    <property name="writeBehindEnabled" value="true"/>
                               
    <property name="writeBehindFlushSize" value="1024"/>
                               
    <property name="writeBehindFlushFrequency" value="5"/>
                               
    <property name="writeBehindFlushThreadCount" value="1"/>
                               
    <property name="writeBehindBatchSize" value="512"/>
                               
    <property name="dataRegionName" value="Default_Region"/>
                    
    </bean>
                
    </list>
            
    </property>
            
    <property name="failureDetectionTimeout" value="60000"/>
            
    <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
            
    <property name="discoverySpi">
                
    <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                    
    <property name="ipFinder">
                        
    <!--
                            Ignite provides several options for automatic discovery that can be used
                            instead os static IP based discovery. For information on all options refer
                            to our documentation: http://apacheignite.readme.io/docs/cluster-config
                        
    -->
                        
    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                        
    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        
    <!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> -->
                            
    <property name="addresses">
                                
    <list>
                                    
    <!-- In distributed environment, replace with actual host IP address. -->
                                    
    <value>127.0.0.1:47500..47509</value>
                                
    </list>
                            
    </property>
                        
    </bean>
                    
    </property>
                
    </bean>
            
    </property>
        
    </bean>
    </beans>

    Ignite 對內存有細致的劃分,可以分多個區域Region,每個區域有自己的配置,比如設置初始大小和最大大小,以及淘汰策略。
    UserInfo對應的CacheConfigurationCache使用進行了配置,比如readThrough writeThrough writeBehindEnabled等等,細致的配置諸如后寫刷新頻率writeBehindFlushFrequency為5,表示5秒才會刷新一次更新數據。

        public static <T> IgniteCache<String, T> createIgniteCache(String cacheName, Class<? extends CacheStoreAdapter<String, T>> clazz) {
            CacheConfiguration
    <String, T> cacheCfg = new CacheConfiguration<>(cacheName);
            
    return Globals.ignite().getOrCreateCache(cacheCfg);
        }

    在Globals工具類,提供工具方法獲得IgniteCache對象。

    package cn.empires.gs.player.service.impl;

    import org.apache.ignite.IgniteCache;
    import org.apache.ignite.lang.IgniteFuture;

    import cn.empires.common.Globals;
    import cn.empires.common.cache.UserCacheStore;
    import cn.empires.common.service.ServiceBase;
    import cn.empires.gs.model.UserInfo;
    import cn.empires.gs.player.service.UserService;
    import io.vertx.core.AsyncResult;
    import io.vertx.core.Future;
    import io.vertx.core.Handler;
    import io.vertx.core.Vertx;
    import io.vertx.core.json.JsonObject;

    public class UserServiceImpl extends ServiceBase implements UserService {
        
        
    private final IgniteCache<String, UserInfo> cache;

        
    public UserServiceImpl(Vertx vertx, JsonObject config) {
            
    super(vertx, config);
            cache 
    = Globals.createIgniteCache(UserInfo.tableName, UserCacheStore.class);
        }

        @Override
        
    public UserService getUserInfo(String id, Handler<AsyncResult<UserInfo>> handler) {
            IgniteFuture
    <UserInfo> future = cache.getAsync(id);
            future.listen(h 
    -> {
                
    if(h.isDone()) {
                    handler.handle(Future.succeededFuture(h.get()));
                }
            });        
            
    return this;
        }
        

        @Override
        
    public UserService saveUserInfo(UserInfo userInfo, Handler<AsyncResult<UserInfo>> handler) {
            IgniteFuture
    <Void> future = cache.putAsync(userInfo.get_id(), userInfo);
            future.listen(h 
    -> {
                
    if(h.isDone()) {
                    handler.handle(Future.succeededFuture(userInfo));
                }
            });
            
    return this;
        }

    }

    最后一件事,就是同步寫庫,可以讀通過從MongoDB進行讀取。

    package cn.empires.common.cache;

    import java.util.ArrayList;
    import java.util.List;

    import javax.cache.Cache.Entry;
    import javax.cache.integration.CacheLoaderException;
    import javax.cache.integration.CacheWriterException;

    import org.apache.ignite.IgniteException;
    import org.apache.ignite.cache.store.CacheStoreAdapter;
    import org.apache.ignite.lifecycle.LifecycleAware;
    import org.bson.Document;

    import com.mongodb.Block;
    import com.mongodb.client.FindIterable;
    import com.mongodb.client.MongoCollection;
    import com.mongodb.client.model.Filters;
    import com.mongodb.client.model.UpdateOptions;

    import cn.empires.common.Globals;
    import cn.empires.common.contants.Loggers;
    import cn.empires.gs.model.UserInfo;
    import io.vertx.core.json.JsonObject;

    public class UserCacheStore extends CacheStoreAdapter<String, UserInfo> implements LifecycleAware {
        
        
    /** Mongo collection. */
        
    private MongoCollection<Document> collection;
        
        @Override
        
    public void start() throws IgniteException {
        }

        @Override
        
    public UserInfo load(String key) throws CacheLoaderException {
            
    if(collection == null) {
                collection 
    = Globals.mongoDb().getCollection(UserInfo.tableName);
            }
            FindIterable
    <Document> iter = collection.find(Filters.eq("_id", key));
            
    final List<JsonObject> result = new ArrayList<>(1);
            iter.forEach(
    new Block<Document>() {
                
    public void apply(Document _doc) {
                    result.add(
    new JsonObject(_doc.toJson()));
                }
            });
            
    if(result != null && !result.isEmpty()) {
                Loggers.userLogger.info(
    "CacheStore load UserInfo.");
                JsonObject jsonObj 
    = result.get(0);
                
    return UserInfo.fromDB(jsonObj);
            }
            
    return null;
        }

        @Override
        
    public void write(Entry<? extends String, ? extends UserInfo> entry) throws CacheWriterException {
            
    if(collection == null) {
                collection 
    = Globals.mongoDb().getCollection(UserInfo.tableName);
            }
            Document filter 
    = new Document();
            filter.append(
    "_id", entry.getKey());
            
            Document replacement 
    = new Document();
            replacement.append(
    "value", entry.getValue().toString());
            collection.replaceOne(filter, replacement, 
    new UpdateOptions().upsert(true));
            Loggers.userLogger.info(
    "CacheStore saved UserInfo.");
        }

        @Override
        
    public void delete(Object key) throws CacheWriterException {
            
        }



        @Override
        
    public void stop() throws IgniteException {
            
        }

    }

    由于在ignite.xml中進行了配置
    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
        
    <constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
    </bean>
    ,所以在使用Cache獲取UserInfo的時候,如果不存在對應的信息,就會從MongoDB讀取。

    更多的信息只能下一篇文章再寫了。有問題可以留言。

    posted on 2018-11-13 14:29 北國狼人的BloG 閱讀(1612) 評論(0)  編輯  收藏

    只有注冊用戶登錄后才能發表評論。


    網站導航:
    博客園   IT新聞   Chat2DB   C++博客   博問  
     
    主站蜘蛛池模板: 18禁网站免费无遮挡无码中文| 杨幂最新免费特级毛片| 久久久久久一品道精品免费看| 亚洲精品无码久久久久AV麻豆| 国产综合激情在线亚洲第一页| 日本成人在线免费观看| 亚洲综合色一区二区三区| 最近中文字幕mv免费高清电影| 国产精品亚洲自在线播放页码| 在线观看日本免费a∨视频| 亚洲色www永久网站| 免费黄色大片网站| 国产亚洲精品91| 奇米影视亚洲春色| 国产又黄又爽又刺激的免费网址 | 亚洲av无码乱码国产精品 | 亚洲成在人线aⅴ免费毛片| 色在线亚洲视频www| 永久免费看bbb| 人妻仑乱A级毛片免费看| 国产亚洲真人做受在线观看| 亚洲av无码一区二区三区人妖 | 亚洲欧美日韩中文二区| 国产成人免费片在线观看| 乱人伦中文视频在线观看免费| 3344在线看片免费| 久久亚洲AV成人无码软件| 女人被男人桶得好爽免费视频| 日本亚洲中午字幕乱码| 国产亚洲精品美女久久久| 18女人毛片水真多免费| 国产精品无码亚洲精品2021| 久久久久久久亚洲精品| 222www在线观看免费| 婷婷国产偷v国产偷v亚洲| 亚洲va中文字幕无码久久| 毛片在线看免费版| 国产黄色片免费看| 久久狠狠爱亚洲综合影院| 亚洲另类激情专区小说图片| 亚洲精品视频在线观看免费|