<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆 - 100  文章 - 50  trackbacks - 0
    <2018年12月>
    2526272829301
    2345678
    9101112131415
    16171819202122
    23242526272829
    303112345

    常用鏈接

    留言簿(3)

    隨筆分類

    隨筆檔案

    文章分類

    文章檔案

    收藏夾

    我收藏的一些文章!

    搜索

    •  

    最新評論

    閱讀排行榜

    評論排行榜

     使用kafka 2.1.0 ,然后用最新的kafka-manager 1.3.3.18來管理kafka, 然后寫了一個生產者和消費者程序,程序運行后,死活顯示不出來
    程序運行后,消費者的group死活顯示不出來。
    生產者代碼如下:
    package com.kafka.producer;
    import org.apache.commons.lang3.exception.ExceptionUtils;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    public class ProducerDemo {
        public static void main(String[] args) {
            int i = 0;
            while (true) {
                i++;
                try {
                    send("test", String.format("test_%d", i), "123");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                System.out.println(String.format("Kafka寫入:%d", i));
            }
        }
    private static Producer<String, Object> producer;
    private static KafkaConsumer<String, Object> consumer;
    private static final String server = "127.0.0.1:9092";
    static {
    Properties props = buildProducerConfig();
    producer = new KafkaProducer<>(props);
    }
    private static Properties buildProducerConfig() {
    Properties props = new Properties();
    // bootstrap.servers是Kafka集群的IP地址,也就是Broker地址
    props.put("bootstrap.servers", server);
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return props;
    }
    public static RecordMetadata send(String topic, String key, Object Obj) throws InterruptedException, ExecutionException {
    return producer.send(new ProducerRecord<String, Object>(topic, key, Obj)).get();
    }
    public static void sendAsync(String topic,String key,Object obj) {
    producer.send(new ProducerRecord<String, Object>(topic, key, obj), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) {
    if(e !=null) {
    System.out.println(ExceptionUtils.getStackTrace(e));
    }
    }
    } );
    }
    }
    消費者程序如下:
    package com.kafka.consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.util.Arrays;
    import java.util.Properties;
    public class ConsumerDemo {
        public static void main(String[] args) {
            KafkaUtils.consume();
        }
        private static KafkaConsumer<String, Object> consumer;
        private static final String server = "127.0.0.1:9092";
        static {
            Properties props = buildConsumerConfig();
            consumer = new KafkaConsumer<>(props);
        }
        private static Properties buildConsumerConfig() {
            Properties props;
            props = new Properties();
            props.put("bootstrap.servers", server);
            // 消費組
            props.put("group.id", "testGroup");
            props.put("enable.auto.commit", "true");
            // 設置多久一次更新被消費消息的偏移量
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            return props;
        }
        public static void consume() {
            consumer.subscribe(Arrays.asList("test"));
            while (true) {
                // 每隔100ms拉取一次數據
                ConsumerRecords<String, Object> records = consumer.poll(100);
                for (ConsumerRecord<String, Object> record : records) {
                    System.out.printf("partition=%d,offset = %d, key = %s, value = %s\n", record.partition(),
                            record.offset(), record.key(), record.value());
                }
            }
        }
    }
    然后在kafka manager的消費者組顯示不出來,為了查找原因,去看kafka manager日志。發現日志報錯如下:
    [warn] k.m.a.c.KafkaManagedOffsetCache - Failed to process a message from offset topic on cluster test-Kafka!
    kafka.common.KafkaException: Unknown offset schema version 3
            at kafka.manager.utils.one10.GroupMetadataManager$.schemaForOffset(GroupMetadataManager.scala:428) ~[kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
            at kafka.manager.utils.one10.GroupMetadataManager$.readOffsetMessageValue(GroupMetadataManager.scala:532) ~[kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
            at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$4.apply(KafkaStateActor.scala:332) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
            at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$4.apply(KafkaStateActor.scala:308) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
            at scala.util.Success.foreach(Try.scala:236) [org.scala-lang.scala-library-2.11.12.jar:na]
            at kafka.manager.actor.cluster.KafkaManagedOffsetCache.run(KafkaStateActor.scala:308) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
            at java.lang.Thread.run(Thread.java:745) [na:1.8.0_74]
    初步診斷是kafka manager的問題,覺得具體深入分析下,發現kafka manager是用scala寫的, 自己有不了解scala,頓時感覺無從下手,
    但是想想,程序應該都差不多,就去分析分析原因吧,發現錯誤日志在GroupMetadataManager.scala:428,這行,那應該錯誤也在這邊,
    然后在google找了找,也沒有很好的解決方式,只能在github的kafka manager提了個Issue,發現有人修改過源代碼后成功顯示了,安裝這位老兄的提示
    修改scala源代碼,然后重新編譯打包,問題終于得到了解決。
    修改的scala源代碼如下:
    git diff origin/master
    diff --git a/app/kafka/manager/utils/one10/GroupMetadataManager.scala b/app/kafka/manager/utils/one10/GroupMetadataManager.scala
    index 85771cd..f16b1a3 100644
    --- a/app/kafka/manager/utils/one10/GroupMetadataManager.scala
    +++ b/app/kafka/manager/utils/one10/GroupMetadataManager.scala
    @@ -368,6 +368,25 @@ object GroupMetadataManager {
         new Field(SUBSCRIPTION_KEY, BYTES),
         new Field(ASSIGNMENT_KEY, BYTES))
     
    +  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
    +
    +  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
    +    new Field("metadata", STRING, "Associated metadata.", ""),
    +    new Field("commit_timestamp", INT64))
    +  private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
    +  private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
    +  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
    +
    +  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
    +    new Field("offset", INT64),
    +    new Field("leader_epoch", INT32),
    +    new Field("metadata", STRING, "Associated metadata.", ""),
    +    new Field("commit_timestamp", INT64))
    +  private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
    +  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
    +  private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
    +  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
    +
       private val PROTOCOL_TYPE_KEY = "protocol_type"
       private val GENERATION_KEY = "generation"
       private val PROTOCOL_KEY = "protocol"
    @@ -388,6 +407,12 @@ object GroupMetadataManager {
         new Field(LEADER_KEY, NULLABLE_STRING),
         new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
     
    +  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
    +    new Field(PROTOCOL_TYPE_KEY, STRING),
    +    new Field(GENERATION_KEY, INT32),
    +    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    +    new Field(LEADER_KEY, NULLABLE_STRING),
    +    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
     
       // map of versions to key schemas as data types
       private val MESSAGE_TYPE_SCHEMAS = Map(
    @@ -398,13 +423,18 @@ object GroupMetadataManager {
       // map of version of offset value schemas
       private val OFFSET_VALUE_SCHEMAS = Map(
         0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
    -    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1)
    +    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
    +    2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
    +    3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3)
    +
       private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort
     
       // map of version of group metadata value schemas
       private val GROUP_VALUE_SCHEMAS = Map(
         0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
    -    1 -> GROUP_METADATA_VALUE_SCHEMA_V1)
    +    1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
    +    2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
    +
       private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort
     
       private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
    @@ -545,6 +575,20 @@ object GroupMetadataManager {
             val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
     
             OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
    +      } else if (version == 2) {
    +        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
    +        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]
    +        val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
    +
    +        OffsetAndMetadata(offset, metadata, commitTimestamp)
    +      } else if (version == 3) {
    +        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
    +        val leaderEpoch = value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]
    +        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]
    +        val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
    +
    +        // val leaderEpochOpt: Optional[Integer] = if (leaderEpoch < 0) Optional.empty() else Optional.of(leaderEpoch)
    +        OffsetAndMetadata(offset, metadata, commitTimestamp)
           } else {
             throw new IllegalStateException("Unknown offset message version")
           }
    完整的app/kafka/manager/utils/one10/GroupMetadataManager.scala b/app/kafka/manager/utils/one10/GroupMetadataManager.scala代碼如下:
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements. See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License. You may obtain a copy of the License at
     *
     *    
    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     
    */

    package kafka.manager.utils.one10

    import java.io.PrintStream
    import java.nio.ByteBuffer
    import java.nio.charset.StandardCharsets
    import java.util.UUID

    import kafka.common.{KafkaException, MessageFormatter, OffsetAndMetadata}
    import kafka.utils.{Logging, nonthreadsafe}
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.clients.consumer.internals.{ConsumerProtocol, PartitionAssignor}
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.protocol.types.Type._
    import org.apache.kafka.common.protocol.types._
    import org.apache.kafka.common.utils.Utils

    import scala.collection.JavaConverters._
    import scala.collection.{Seq, immutable, mutable, _}


    /**
      * Case class used to represent group metadata for the ListGroups API
      
    */
    case class GroupOverview(groupId: String,
                             protocolType: String)

    /**
      * We cache offset commits along with their commit record offset. This enables us to ensure that the latest offset
      * commit is always materialized when we have a mix of transactional and regular offset commits. Without preserving
      * information of the commit record offset, compaction of the offsets topic it self may result in the wrong offset commit
      * being materialized.
      
    */
    case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offsetAndMetadata: OffsetAndMetadata) {
      def olderThan(that: CommitRecordMetadataAndOffset) : Boolean = appendedBatchOffset.get < that.appendedBatchOffset.get
    }

    /**
      * Group contains the following metadata:
      *
      *  Membership metadata:
      *  1. Members registered in this group
      *  2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
      *  3. Protocol metadata associated with group members
      *
      *  State metadata:
      *  1. group state
      *  2. generation id
      *  3. leader id
      
    */
    @nonthreadsafe
    class GroupMetadata(val groupId: String
                                       , var protocolType: Option[String]
                                       , var generationId: Int
                                       , var protocol: Option[String]
                                       , var leaderId: Option[String]
                                      ) extends Logging {

      private val members = new mutable.HashMap[String, MemberMetadata]
      private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
      private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
      private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
      private var receivedTransactionalOffsetCommits = false
      private var receivedConsumerOffsetCommits = false

      var newMemberAdded: Boolean = false

      def has(memberId: String) = members.contains(memberId)
      def get(memberId: String) = members(memberId)

      def isLeader(memberId: String): Boolean = leaderId.contains(memberId)
      def leaderOrNull: String = leaderId.orNull
      def protocolOrNull: String = protocol.orNull

      def add(member: MemberMetadata) {
        if (members.isEmpty)
          this.protocolType = Some(member.protocolType)

        assert(groupId == member.groupId)
        assert(this.protocolType.orNull == member.protocolType)
        assert(supportsProtocols(member.protocols))

        if (leaderId.isEmpty)
          leaderId = Some(member.memberId)
        members.put(member.memberId, member)
      }

      def remove(memberId: String) {
        members.remove(memberId)
        if (isLeader(memberId)) {
          leaderId = if (members.isEmpty) {
            None
          } else {
            Some(members.keys.head)
          }
        }
      }

      def allMembers = members.keySet

      def allMemberMetadata = members.values.toList

      // TODO: decide if ids should be predictable or random
      def generateMemberIdSuffix = UUID.randomUUID().toString

      private def candidateProtocols = {
        // get the set of protocols that are commonly supported by all members
        allMemberMetadata
          .map(_.protocols)
          .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
      }

      def supportsProtocols(memberProtocols: Set[String]) = {
        members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty
      }

      def overview: GroupOverview = {
        GroupOverview(groupId, protocolType.getOrElse(""))
      }

      def initializeOffsets(offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset],
                            pendingTxnOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) {
        this.offsets ++= offsets
        this.pendingTransactionalOffsetCommits ++= pendingTxnOffsets
      }

      def onOffsetCommitAppend(topicPartition: TopicPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset) {
        if (pendingOffsetCommits.contains(topicPartition)) {
          if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty)
            throw new IllegalStateException("Cannot complete offset commit write without providing the metadata of the record " +
              "in the log.")
          if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata))
            offsets.put(topicPartition, offsetWithCommitRecordMetadata)
        }

        pendingOffsetCommits.get(topicPartition) match {
          case Some(stagedOffset) if offsetWithCommitRecordMetadata.offsetAndMetadata == stagedOffset =>
            pendingOffsetCommits.remove(topicPartition)
          case _ =>
          // The pendingOffsetCommits for this partition could be empty if the topic was deleted, in which case
          
    // its entries would be removed from the cache by the `removeOffsets` method.
        }
      }

      def failPendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata): Unit = {
        pendingOffsetCommits.get(topicPartition) match {
          case Some(pendingOffset) if offset == pendingOffset => pendingOffsetCommits.remove(topicPartition)
          case _ =>
        }
      }

      def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]) {
        receivedConsumerOffsetCommits = true
        pendingOffsetCommits ++= offsets
      }

      def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicPartition, OffsetAndMetadata]) {
        trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $offsets is pending")
        receivedTransactionalOffsetCommits = true
        val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,
          mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])

        offsets.foreach { case (topicPartition, offsetAndMetadata) =>
          producerOffsets.put(topicPartition, CommitRecordMetadataAndOffset(None, offsetAndMetadata))
        }
      }

      def hasReceivedConsistentOffsetCommits : Boolean = {
        !receivedConsumerOffsetCommits || !receivedTransactionalOffsetCommits
      }

      /* Remove a pending transactional offset commit if the actual offset commit record was not written to the log.
       * We will return an error and the client will retry the request, potentially to a different coordinator.
       
    */
      def failPendingTxnOffsetCommit(producerId: Long, topicPartition: TopicPartition): Unit = {
        pendingTransactionalOffsetCommits.get(producerId) match {
          case Some(pendingOffsets) =>
            val pendingOffsetCommit = pendingOffsets.remove(topicPartition)
            trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetCommit failed " +
              s"to be appended to the log")
            if (pendingOffsets.isEmpty)
              pendingTransactionalOffsetCommits.remove(producerId)
          case _ =>
          // We may hit this case if the partition in question has emigrated already.
        }
      }

      def onTxnOffsetCommitAppend(producerId: Long, topicPartition: TopicPartition,
                                  commitRecordMetadataAndOffset: CommitRecordMetadataAndOffset) {
        pendingTransactionalOffsetCommits.get(producerId) match {
          case Some(pendingOffset) =>
            if (pendingOffset.contains(topicPartition)
              && pendingOffset(topicPartition).offsetAndMetadata == commitRecordMetadataAndOffset.offsetAndMetadata)
              pendingOffset.update(topicPartition, commitRecordMetadataAndOffset)
          case _ =>
          // We may hit this case if the partition in question has emigrated.
        }
      }

      /* Complete a pending transactional offset commit. This is called after a commit or abort marker is fully written
       * to the log.
       
    */
      def completePendingTxnOffsetCommit(producerId: Long, isCommit: Boolean): Unit = {
        val pendingOffsetsOpt = pendingTransactionalOffsetCommits.remove(producerId)
        if (isCommit) {
          pendingOffsetsOpt.foreach { pendingOffsets =>
            pendingOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) =>
              if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
                throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +
                  s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.")

              val currentOffsetOpt = offsets.get(topicPartition)
              if (currentOffsetOpt.forall(_.olderThan(commitRecordMetadataAndOffset))) {
                trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " +
                  "committed and loaded into the cache.")
                offsets.put(topicPartition, commitRecordMetadataAndOffset)
              } else {
                trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " +
                  s"committed, but not loaded since its offset is older than current offset $currentOffsetOpt.")
              }
            }
          }
        } else {
          trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetsOpt aborted")
        }
      }

      def activeProducers = pendingTransactionalOffsetCommits.keySet

      def hasPendingOffsetCommitsFromProducer(producerId: Long) =
        pendingTransactionalOffsetCommits.contains(producerId)

      def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
        topicPartitions.flatMap { topicPartition =>

          pendingOffsetCommits.remove(topicPartition)
          pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) =>
            pendingOffsets.remove(topicPartition)
          }
          val removedOffset = offsets.remove(topicPartition)
          removedOffset.map(topicPartition -> _.offsetAndMetadata)
        }.toMap
      }

      def removeExpiredOffsets(startMs: Long) : Map[TopicPartition, OffsetAndMetadata] = {
        val expiredOffsets = offsets
          .filter {
            case (topicPartition, commitRecordMetadataAndOffset) =>
              commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
          }
          .map {
            case (topicPartition, commitRecordOffsetAndMetadata) =>
              (topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata)
          }
        offsets --= expiredOffsets.keySet
        expiredOffsets.toMap
      }

      def allOffsets = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
        (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
      }.toMap

      def offset(topicPartition: TopicPartition): Option[OffsetAndMetadata] = offsets.get(topicPartition).map(_.offsetAndMetadata)

      // visible for testing
      def offsetWithRecordMetadata(topicPartition: TopicPartition): Option[CommitRecordMetadataAndOffset] = offsets.get(topicPartition)

      def numOffsets = offsets.size

      def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty || pendingTransactionalOffsetCommits.nonEmpty


      override def toString: String = {
        "GroupMetadata(" +
          s"groupId=$groupId, " +
          s"generation=$generationId, " +
          s"protocolType=$protocolType, " +
          s"members=$members)"
      }

    }


    /**
      * Messages stored for the group topic has versions for both the key and value fields. Key
      * version is used to indicate the type of the message (also to differentiate different types
      * of messages from being compacted together if they have the same field values); and value
      * version is used to evolve the messages within their data types:
      *
      * key version 0:       group consumption offset
      *    -> value version 0:       [offset, metadata, timestamp]
      *
      * key version 1:       group consumption offset
      *    -> value version 1:       [offset, metadata, commit_timestamp, expire_timestamp]
      *
      * key version 2:       group metadata
      *     -> value version 0:       [protocol_type, generation, protocol, leader, members]
      
    */
    object GroupMetadataManager {

      private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
      private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
      private val CURRENT_GROUP_KEY_SCHEMA_VERSION2 = 3.toShort

      private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
        new Field("topic", STRING),
        new Field("partition", INT32))
      private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
      private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
      private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition")

      private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
        new Field("metadata", STRING, "Associated metadata.", ""),
        new Field("timestamp", INT64))
      private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
      private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
      private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")

      private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
        new Field("metadata", STRING, "Associated metadata.", ""),
        new Field("commit_timestamp", INT64),
        new Field("expire_timestamp", INT64))

      private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
      private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
      private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
      private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")

     //new add for version
      private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
           new Field("metadata", STRING, "Associated metadata.", ""),
           new Field("commit_timestamp", INT64))
      private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
      private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
      private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")

     private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
       new Field("offset", INT64),
       new Field("leader_epoch", INT32),
       new Field("metadata", STRING, "Associated metadata.", ""),
       new Field("commit_timestamp", INT64))
      private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
      private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
      private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
      private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")

      //new add for version 3-end

      private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
      private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")

      private val MEMBER_ID_KEY = "member_id"
      private val CLIENT_ID_KEY = "client_id"
      private val CLIENT_HOST_KEY = "client_host"
      private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
      private val SESSION_TIMEOUT_KEY = "session_timeout"
      private val SUBSCRIPTION_KEY = "subscription"
      private val ASSIGNMENT_KEY = "assignment"

      private val MEMBER_METADATA_V0 = new Schema(
        new Field(MEMBER_ID_KEY, STRING),
        new Field(CLIENT_ID_KEY, STRING),
        new Field(CLIENT_HOST_KEY, STRING),
        new Field(SESSION_TIMEOUT_KEY, INT32),
        new Field(SUBSCRIPTION_KEY, BYTES),
        new Field(ASSIGNMENT_KEY, BYTES))

      private val MEMBER_METADATA_V1 = new Schema(
        new Field(MEMBER_ID_KEY, STRING),
        new Field(CLIENT_ID_KEY, STRING),
        new Field(CLIENT_HOST_KEY, STRING),
        new Field(REBALANCE_TIMEOUT_KEY, INT32),
        new Field(SESSION_TIMEOUT_KEY, INT32),
        new Field(SUBSCRIPTION_KEY, BYTES),
        new Field(ASSIGNMENT_KEY, BYTES))

      //new add for version
      private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1

      private val PROTOCOL_TYPE_KEY = "protocol_type"
      private val GENERATION_KEY = "generation"
      private val PROTOCOL_KEY = "protocol"
      private val LEADER_KEY = "leader"
      private val MEMBERS_KEY = "members"

      private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
        new Field(PROTOCOL_TYPE_KEY, STRING),
        new Field(GENERATION_KEY, INT32),
        new Field(PROTOCOL_KEY, NULLABLE_STRING),
        new Field(LEADER_KEY, NULLABLE_STRING),
        new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))

      private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
        new Field(PROTOCOL_TYPE_KEY, STRING),
        new Field(GENERATION_KEY, INT32),
        new Field(PROTOCOL_KEY, NULLABLE_STRING),
        new Field(LEADER_KEY, NULLABLE_STRING),
        new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))

      private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
        new Field(PROTOCOL_TYPE_KEY, STRING),
        new Field(GENERATION_KEY, INT32),
        new Field(PROTOCOL_KEY, NULLABLE_STRING),
        new Field(LEADER_KEY, NULLABLE_STRING),
        new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))

      // map of versions to key schemas as data types
      private val MESSAGE_TYPE_SCHEMAS = Map(
        0 -> OFFSET_COMMIT_KEY_SCHEMA,
        1 -> OFFSET_COMMIT_KEY_SCHEMA,
        2 -> GROUP_METADATA_KEY_SCHEMA
      )

      // map of version of offset value schemas
      private val OFFSET_VALUE_SCHEMAS = Map(
         1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
         2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
         3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3
        )
      private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort

      // map of version of group metadata value schemas
      private val GROUP_VALUE_SCHEMAS = Map(
        1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
        2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
      private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort

      private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
      private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)

      private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
      private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)

      private def schemaForKey(version: Int) = {
        val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
        schemaOpt match {
          case Some(schema) => schema
          case _ => throw new KafkaException("Unknown offset schema version " + version)
        }
      }

      private def schemaForOffset(version: Int) = {
        val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
        println("version is:"+version+", schemaOpt is: "+schemaOpt)
        schemaOpt match {
          case Some(schema) => schema
          case _ => throw new KafkaException("Unknown offset schema version " + version)
        }
      }

      private def schemaForGroup(version: Int) = {
        val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
        schemaOpt match {
          case Some(schema) => schema
          case _ => throw new KafkaException("Unknown group metadata version " + version)
        }
      }

      /**
        * Generates the key for offset commit message for given (group, topic, partition)
        *
        * 
    @return key for offset commit message
        
    */
      def offsetCommitKey(group: String, topicPartition: TopicPartition,
                                         versionId: Short = 0): Array[Byte] = {
        val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
        key.set(OFFSET_KEY_GROUP_FIELD, group)
        key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
        key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition)

        val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
        byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
        key.writeTo(byteBuffer)
        byteBuffer.array()
      }

      /**
        * Generates the key for group metadata message for given group
        *
        * 
    @return key bytes for group metadata message
        
    */
      def groupMetadataKey(group: String): Array[Byte] = {
        val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
        key.set(GROUP_KEY_GROUP_FIELD, group)

        val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
        byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION)
        key.writeTo(byteBuffer)
        byteBuffer.array()
      }

      /**
        * Generates the payload for offset commit message from given offset and metadata
        *
        * 
    @param offsetAndMetadata consumer's current offset and metadata
        * 
    @return payload for offset commit message
        
    */
      def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
        // generate commit value with schema version 1
        val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA)
        value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
        value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
        value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
        value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)
        val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
        byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
        value.writeTo(byteBuffer)
        byteBuffer.array()
      }

      /**
        * Decodes the offset messages' key
        *
        * 
    @param buffer input byte-buffer
        * 
    @return an GroupTopicPartition object
        
    */
      def readMessageKey(buffer: ByteBuffer): BaseKey = {
        val version = buffer.getShort
        val keySchema = schemaForKey(version)
        val key = keySchema.read(buffer)

        if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {
          // version 0 and 1 refer to offset
          val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String]
          val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]
          val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]

          OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, partition)))

        } else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {
          // version 2 refers to offset
          val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]

          GroupMetadataKey(version, group)
        } else if(version == CURRENT_GROUP_KEY_SCHEMA_VERSION2) {//new add
         
    // version 3 refers to offset
          val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]

          GroupMetadataKey(version, group)
        } else {
          throw new IllegalStateException("Unknown version " + version + " for group metadata message")
        }
      }

      /**
        * Decodes the offset messages' payload and retrieves offset and metadata from it
        *
        * 
    @param buffer input byte-buffer
        * 
    @return an offset-metadata object from the message
        
    */
      def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
        if (buffer == null) { // tombstone
          null
        } else {
          val version = buffer.getShort
          val valueSchema = schemaForOffset(version)
          val value = valueSchema.read(buffer)

          if (version == 0) {
            val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
            val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V0).asInstanceOf[String]
            val timestamp = value.get(OFFSET_VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]

            OffsetAndMetadata(offset, metadata, timestamp)
          } else if (version == 1) {
            val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
            val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V1).asInstanceOf[String]
            val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
            val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]

            OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
          } else if (version == 2) {
            val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
            val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]
            val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]

            OffsetAndMetadata(offset, metadata, commitTimestamp)
          } else if (version == 3) {
            val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
            val leaderEpoch = value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]
            val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]
            val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
            OffsetAndMetadata(offset, metadata, commitTimestamp)
          } else {
            throw new IllegalStateException("Unknown offset message version")
          }
        }
      }

      /**
        * Decodes the group metadata messages' payload and retrieves its member metadatafrom it
        *
        * 
    @param buffer input byte-buffer
        * 
    @return a group metadata object from the message
        
    */
      def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = {
        if (buffer == null) { // tombstone
          null
        } else {
          val version = buffer.getShort
          val valueSchema = schemaForGroup(version)
          val value = valueSchema.read(buffer)

          if (version == 0 || version == 1) {
            val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
            val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
            val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
            val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
            val memberMetadataArray = value.getArray(MEMBERS_KEY)

            val members = memberMetadataArray.map { memberMetadataObj =>
              val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
              val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
              val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
              val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
              val subscription: PartitionAssignor.Subscription = ConsumerProtocol.deserializeSubscription(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
              val assignment: PartitionAssignor.Assignment = ConsumerProtocol.deserializeAssignment(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
              val member = new MemberMetadata(memberId
                , groupId
                , clientId
                , clientHost
                , protocolType
                , List((protocol, subscription.topics().asScala.toSet))
                , assignment.partitions().asScala.map(tp => (tp.topic(), tp.partition())).toSet)
              member
            }
            val finalProtocolType = if (protocolType == null || protocolType.isEmpty) None else Some(protocolType)
            val group = new GroupMetadata(groupId = groupId
              , generationId = generationId
              , protocolType = finalProtocolType
              , protocol = Option(protocol)
              , leaderId = Option(leaderId)
            )
            members.foreach(group.add)
            group
          } else {
            throw new IllegalStateException("Unknown group metadata message version")
          }
        }
      }

      // Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
      
    // (specify --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
      class OffsetsMessageFormatter extends MessageFormatter {
        def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
          Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
            // Only print if the message is an offset record.
            
    // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
            case offsetKey: OffsetKey =>
              val groupTopicPartition = offsetKey.key
              val value = consumerRecord.value
              val formattedValue =
                if (value == null) "NULL"
                else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
              output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
              output.write("::".getBytes(StandardCharsets.UTF_8))
              output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
              output.write("\n".getBytes(StandardCharsets.UTF_8))
            case _ => // no-op
          }
        }
      }

      // Formatter for use with tools to read group metadata history
      class GroupMetadataMessageFormatter extends MessageFormatter {
        def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
          Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
            // Only print if the message is a group metadata record.
            
    // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
            case groupMetadataKey: GroupMetadataKey =>
              val groupId = groupMetadataKey.key
              val value = consumerRecord.value
              val formattedValue =
                if (value == null) "NULL"
                else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
              output.write(groupId.getBytes(StandardCharsets.UTF_8))
              output.write("::".getBytes(StandardCharsets.UTF_8))
              output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
              output.write("\n".getBytes(StandardCharsets.UTF_8))
            case _ => // no-op
          }
        }
      }

    }

    case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {

      def this(group: String, topic: String, partition: Int) =
        this(group, new TopicPartition(topic, partition))

      override def toString: String =
        "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
    }

    trait BaseKey{
      def version: Short
      def key: Any
    }

    case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey {

      override def toString: String = key.toString
    }

    case class GroupMetadataKey(version: Short, key: String) extends BaseKey {

      override def toString: String = key
    }

    posted on 2018-12-13 15:36 fly 閱讀(1514) 評論(0)  編輯  收藏 所屬分類: J2EE
    主站蜘蛛池模板: 亚洲性无码av在线| 亚洲午夜一区二区三区| 久久免费动漫品精老司机| 亚洲激情黄色小说| 免费无码又爽又刺激毛片| 东北美女野外bbwbbw免费| 亚洲大尺码专区影院| 亚洲第一区在线观看| 日本免费人成视频在线观看| 国产精品亚洲自在线播放页码| 免费人成在线观看网站品爱网日本| 免费观看91视频| 亚洲国产欧美国产综合一区| 久久精品国产亚洲av麻豆| 亚洲高清在线播放| 日韩免费观看的一级毛片| 中文无码成人免费视频在线观看| 亚洲夂夂婷婷色拍WW47| 亚洲国产精品无码久久一区二区 | 亚洲AV一宅男色影视| 韩国二级毛片免费播放| 18禁超污无遮挡无码免费网站| 国产亚洲精品bv在线观看| 亚洲精品无码不卡在线播放HE | 在线观看亚洲av每日更新| 成人影片麻豆国产影片免费观看 | 亚洲精品国产福利片| 亚洲 自拍 另类小说综合图区| 69免费视频大片| 插鸡网站在线播放免费观看| 亚洲乱码中文字幕在线| 久久亚洲精精品中文字幕| 久久精品国产精品亚洲| 国产卡二卡三卡四卡免费网址| 亚洲成aⅴ人片在线影院八| 亚洲欧洲一区二区三区| 四虎在线视频免费观看| 91精品免费在线观看| 无码人妻一区二区三区免费n鬼沢 无码人妻一区二区三区免费看 | 亚洲无线电影官网| 国产亚洲美日韩AV中文字幕无码成人 |