Wie kann ich die Anzahl der Partitionen für jedes Kafka-Thema aus dem Code abrufen. Ich habe viele Links recherchiert, aber keiner scheint zu funktionieren.
In Java-Code können wir verwenden AdminClient um Summenpartitionen eines Themas zu erhalten.
Properties props = new Properties();
props.put("bootstrap.servers", "host:9092");
AdminClient client = AdminClient.create(props);
DescribeTopicsResult result = client.describeTopics(Arrays.asList("TEST"));
Map<String, KafkaFuture<TopicDescription>> values = result.values();
KafkaFuture<TopicDescription> topicDescription = values.get("TEST");
int partitions = topicDescription.get().partitions().size();
System.out.println(partitions);
Markus Bonaci
So mache ich es:
/**
* Retrieves list of all partitions IDs of the given {@code topic}.
*
* @param topic
* @param seedBrokers List of known brokers of a Kafka cluster
* @return list of partitions or empty list if none found
*/
public static List<Integer> getPartitionsForTopic(String topic, List<BrokerInfo> seedBrokers) {
for (BrokerInfo seed : seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed.getHost(), seed.getPort(), 20000, 128 * 1024, "partitionLookup");
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<Integer> partitions = new ArrayList<>();
// find our partition's metadata
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
partitions.add(part.partitionId());
}
}
return partitions; // leave on first successful broker (every broker has this info)
} catch (Exception e) {
// try all available brokers, so just report error and go to next one
LOG.error("Error communicating with broker [" + seed + "] to find list of partitions for [" + topic + "]. Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
throw new RuntimeError("Could not get partitions");
}
Beachten Sie, dass ich nur Partitions-IDs herausziehen musste, aber Sie können zusätzlich alle anderen Partitionsmetadaten abrufen, wie z leader, isr, replicas…
Und BrokerInfo ist nur ein einfaches POJO, das hat host Und port Felder.
MD5
Unter Shell cmd kann die Anzahl der Partitionen gedruckt werden. Sie sollten sich im Verzeichnis kafka bin befinden, bevor Sie den cmd ausführen:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{print "count of partitions=" $1}'
Beachten Sie, dass Sie den Themennamen nach Bedarf ändern müssen. Sie können dies auch mit der if-Bedingung weiter validieren:
Der obige cmd-Befehl druckt gültige Partitionen, wenn die Anzahl 16 ist. Sie können die Anzahl je nach Bedarf ändern.
pjkmgs
Verwenden Sie PartitionList von KafkaConsumer
//create consumer then loop through topics
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
ArrayList<Integer> partitionList = new ArrayList<>();
System.out.println(partitions.get(0).partition());
for(int i = 0; i < partitions.size(); i++){
partitionList.add(partitions.get(i).partition());
}
Collections.sort(partitionList);
Sollte wie ein Zauber wirken. Lassen Sie mich wissen, ob es eine einfachere Möglichkeit gibt, auf die Partitionsliste von Topic zuzugreifen.
Avinash Kumar Pandey
Der folgende Ansatz funktioniert also für kafka 0.10 und verwendet keine Producer- oder Consumer-APIs. Es verwendet einige Klassen aus der Scala-API in Kafka wie ZkConnection und ZkUtils.
ZkConnection zkConnection = new ZkConnection(zkConnect);
ZkUtils zkUtils = new ZkUtils(zkClient,zkConnection,false);
System.out.println(JavaConversions.mapAsJavaMap(zkUtils.getPartitionAssignmentForTopics(
JavaConversions.asScalaBuffer(topicList))).get("bidlogs_kafka10").size());
14495800cookie-checkkafka erhält die Partitionsanzahl für ein Themayes
Welche Kafka-Version?
– Marko Bonaci
16. Februar 2016 um 17:48 Uhr
vish4071, wie wäre es, wenn Sie die Lösung akzeptieren, die Sie letztendlich verwendet haben?
– Marko Bonaci
7. Oktober 2018 um 9:09 Uhr