kafka erhält die Partitionsanzahl für ein Thema

Lesezeit: 4 Minuten

Wie kann ich die Anzahl der Partitionen für jedes Kafka-Thema aus dem Code abrufen. Ich habe viele Links recherchiert, aber keiner scheint zu funktionieren.

Um nur einige zu nennen:

http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api

http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic

http://qnalist.com/questions/5809219/get-replication-and-partition-count-of-a-topic

die wie ähnliche Diskussionen aussehen.

Es gibt auch ähnliche Links auf SO, die keine funktionierende Lösung dafür haben.

  • Welche Kafka-Version?

    – Marko Bonaci

    16. Februar 2016 um 17:48 Uhr

  • vish4071, wie wäre es, wenn Sie die Lösung akzeptieren, die Sie letztendlich verwendet haben?

    – Marko Bonaci

    7. Oktober 2018 um 9:09 Uhr

Benutzeravatar von peter.petrov
peter.petrow

Gehen Sie zu Ihrem kafka/bin Verzeichnis.

Führen Sie dann Folgendes aus:

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name

Sie sollten unten sehen, was Sie brauchen PartitionCount.

Topic:topic_name        PartitionCount:5        ReplicationFactor:1     Configs:
        Topic: topic_name       Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 3    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 4    Leader: 1001    Replicas: 1001  Isr: 1001

Bei Verwendung einer Version, in der Zookeeper nicht mehr von Kafka abhängig ist

kafka-topics --describe --bootstrap-server localhost:9092 --topic topic_name

  • Upvoting, weil ich zufällig nach einer Nicht-Code-Lösung gesucht habe und diese perfekt war.

    – David Kaczyński

    2. März 2017 um 15:24 Uhr

  • Wenn Sie alle Partitionen (einschließlich Replikate) für eine Topic-Regex zusammenfassen möchten, lesen Sie meine Antwort unten.

    – Pragmatischer Programmierer

    14. Januar 2021 um 20:29 Uhr

  • Für mich gibt es kein “PartitionCount” -Feld. jede Partition hat ihre eigene Zeile, aber kein sichtbares Feld für die Gesamtzahl der Partitionen.

    – Bünyamin Sentürk

    29. Juli 2022 um 8:47 Uhr

Benutzeravatar von Sunil Patil
Sunil Patil

In der 0.82 Producer API und 0.9 Consumer API können Sie so etwas wie verwenden

Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
producer.partitionsFor("test")

In Java-Code können wir verwenden AdminClient um Summenpartitionen eines Themas zu erhalten.

Properties props = new Properties();
props.put("bootstrap.servers", "host:9092");
AdminClient client = AdminClient.create(props);

DescribeTopicsResult result = client.describeTopics(Arrays.asList("TEST"));
Map<String, KafkaFuture<TopicDescription>>  values = result.values();
KafkaFuture<TopicDescription> topicDescription = values.get("TEST");
int partitions = topicDescription.get().partitions().size();
System.out.println(partitions);

Benutzeravatar von Marko Bonaci
Markus Bonaci

So mache ich es:

  /**
   * Retrieves list of all partitions IDs of the given {@code topic}.
   * 
   * @param topic
   * @param seedBrokers List of known brokers of a Kafka cluster
   * @return list of partitions or empty list if none found
   */
  public static List<Integer> getPartitionsForTopic(String topic, List<BrokerInfo> seedBrokers) {
    for (BrokerInfo seed : seedBrokers) {
      SimpleConsumer consumer = null;
      try {
        consumer = new SimpleConsumer(seed.getHost(), seed.getPort(), 20000, 128 * 1024, "partitionLookup");
        List<String> topics = Collections.singletonList(topic);
        TopicMetadataRequest req = new TopicMetadataRequest(topics);
        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

        List<Integer> partitions = new ArrayList<>();
        // find our partition's metadata
        List<TopicMetadata> metaData = resp.topicsMetadata();
        for (TopicMetadata item : metaData) {
          for (PartitionMetadata part : item.partitionsMetadata()) {
            partitions.add(part.partitionId());
          }
        }
        return partitions;  // leave on first successful broker (every broker has this info)
      } catch (Exception e) {
        // try all available brokers, so just report error and go to next one
        LOG.error("Error communicating with broker [" + seed + "] to find list of partitions for [" + topic + "]. Reason: " + e);
      } finally {
        if (consumer != null)
          consumer.close();
      }
    }
    throw new RuntimeError("Could not get partitions");
  }

Beachten Sie, dass ich nur Partitions-IDs herausziehen musste, aber Sie können zusätzlich alle anderen Partitionsmetadaten abrufen, wie z leader, isr, replicas
Und BrokerInfo ist nur ein einfaches POJO, das hat host Und port Felder.

Benutzeravatar von MD5
MD5

Unter Shell cmd kann die Anzahl der Partitionen gedruckt werden. Sie sollten sich im Verzeichnis kafka bin befinden, bevor Sie den cmd ausführen:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{print "count of partitions=" $1}'

Beachten Sie, dass Sie den Themennamen nach Bedarf ändern müssen. Sie können dies auch mit der if-Bedingung weiter validieren:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{if ($1=="16") print "valid partitions"}'

Der obige cmd-Befehl druckt gültige Partitionen, wenn die Anzahl 16 ist. Sie können die Anzahl je nach Bedarf ändern.

Benutzeravatar von pjkmgs
pjkmgs

Verwenden Sie PartitionList von KafkaConsumer

     //create consumer then loop through topics
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    List<PartitionInfo> partitions = consumer.partitionsFor(topic);

    ArrayList<Integer> partitionList = new ArrayList<>();
    System.out.println(partitions.get(0).partition());

    for(int i = 0; i < partitions.size(); i++){
        partitionList.add(partitions.get(i).partition());
    }

    Collections.sort(partitionList);

Sollte wie ein Zauber wirken. Lassen Sie mich wissen, ob es eine einfachere Möglichkeit gibt, auf die Partitionsliste von Topic zuzugreifen.

Benutzeravatar von Avinash Kumar Pandey
Avinash Kumar Pandey

Der folgende Ansatz funktioniert also für kafka 0.10 und verwendet keine Producer- oder Consumer-APIs. Es verwendet einige Klassen aus der Scala-API in Kafka wie ZkConnection und ZkUtils.

    ZkConnection zkConnection = new ZkConnection(zkConnect);
    ZkUtils zkUtils = new ZkUtils(zkClient,zkConnection,false);
    System.out.println(JavaConversions.mapAsJavaMap(zkUtils.getPartitionAssignmentForTopics(
         JavaConversions.asScalaBuffer(topicList))).get("bidlogs_kafka10").size());

1449580cookie-checkkafka erhält die Partitionsanzahl für ein Thema

This website is using cookies to improve the user-friendliness. You agree by using the website further.

Privacy policy