Aggregatfunktion Zählen Sie die Nutzung mit groupBy in Spark

Lesezeit: 2 Minuten

Benutzer-Avatar
Adel

Ich versuche, mehrere Operationen in einer Codezeile in pySpark durchzuführen, und bin mir nicht sicher, ob das für meinen Fall möglich ist.

Meine Absicht ist es nicht, die Ausgabe als neuen Datenrahmen zu speichern.

Mein aktueller Code ist ziemlich einfach:

encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
  .groupBy('timePeriod')
  .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"),
    stddev('DOWNSTREAM_SIZE').alias("Stddev")
  )
  .show(20, False)

Und meine Absicht ist hinzuzufügen count() nach der Verwendung groupByum die Anzahl der Datensätze zu erhalten, die mit jedem Wert von übereinstimmen timePeriod Spalte, gedruckt\gezeigt als Ausgabe.

Beim Versuch zu verwenden groupBy(..).count().agg(..) Ich bekomme Ausnahmen.

Gibt es eine Möglichkeit beides zu erreichen count() und agg().show() druckt, ohne den Code in zwei Befehlszeilen aufzuteilen, zB:

new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()

Oder noch besser, um eine zusammengeführte Ausgabe zu erhalten agg.show() Ausgabe – Eine zusätzliche Spalte, die die gezählte Anzahl von Datensätzen angibt, die mit dem Wert der Zeile übereinstimmen. z.B:

timePeriod | Mean | Stddev | Num Of Records
    X      | 10   |   20   |    315

Benutzer-Avatar
Mrsrinivas

count() kann innen verwendet werden agg() wie groupBy Ausdruck ist gleich.

Mit Python

import pyspark.sql.functions as func

new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"])) 
  .groupBy("timePeriod")
  .agg(
     func.mean("DOWNSTREAM_SIZE").alias("Mean"), 
     func.stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     func.count(func.lit(1)).alias("Num Of Records")
   )
  .show(20, False)

pySpark SQL-Funktionen doc

Mit Scala

import org.apache.spark.sql.functions._ //for count()

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)

count(1) zählt die Datensätze nach der ersten Spalte, die gleich ist count("timePeriod")

Mit Java

import static org.apache.spark.sql.functions.*;

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)

  • Gibt es im Datenrahmen eine Möglichkeit, alle Datensätze partitionsweise zu zählen und zur endgültigen Zählung zu aggregieren, wenn ja, wie?

    – BdIngenieur

    7. Januar 2019 um 11:16 Uhr

  • du meinst, etwas ähnliches reduceBy ?

    – Mrsrinivas

    7. Januar 2019 um 12:40 Uhr

  • Ein kleiner Kommentar zur Syntax: Ich bin ein großer Fan der dict-Syntax in Python, zB .agg({"X: "sum", "Y": "sum", "Z": "sum", "blah": "count"})das funktioniert wirklich gut mit .withColumn("blah", lit(1)) – es könnte einen besseren Weg geben, aber ich habe ihn (noch!) nicht gefunden.

    – m-dz

    7. Januar 2019 um 15:36 Uhr

  • Dein Code funktioniert wunderbar! Gibt es eine Möglichkeit, Alias ​​zu verwenden und die Spalten umzubenennen? Denn im Moment kommt das Ergebnis in Spaltennamen wie sum(X), sum(Z), …

    – RFAI

    2. Juni um 1:26

  • @RFAI: lit(1) bedeutet die erste Spalte im Ergebnis, die ist timePeriod .

    – Mrsrinivas

    4. Juni um 11:35 Uhr

1054620cookie-checkAggregatfunktion Zählen Sie die Nutzung mit groupBy in Spark

This website is using cookies to improve the user-friendliness. You agree by using the website further.

Privacy policy