Pyspark - Funzione di ritardo

Pyspark - Funzione di ritardo
La funzione Lag () in Pyspark è disponibile nel modulo della finestra che viene utilizzato per restituire i valori delle righe precedenti alle righe corrente. Firstl, la funzione Lag () restituisce null per le migliori righe. Prende un parametro di offset che rappresenta il numero totale di righe in modo tale che i valori di riga precedente vengano restituiti alle righe successive. Per le prime righe superiori, vengono posizionati i null (offset).

È possibile partizione delle righe nel frame dati utilizzando la funzione della finestra. È disponibile in pyspark.SQL.finestra modulo.

Sintassi:

dataframe_obj.WithColumn ("lag_column", lag ("colonna", offset).Over (partizione))

Ci vogliono due parametri:

  1. La colonna è il nome della colonna nel frame dati pyspark in cui i valori di riga ritardati sono posizionati in base ai valori in questa colonna.
  2. L'offset specifica l'intero per restituire quel numero di righe precedenti ai valori della riga corrente.

Passaggi:

  1. Crea un frame dati pyspark che ha alcuni valori simili in almeno una colonna.
  2. Partizione I dati utilizzando il metodo PartitionBy () disponibili nella funzione della finestra e ordinarli in base alla colonna utilizzando la funzione orderBy ().

Sintassi:

partizione = finestra.Partitionby ("colonna").orderby ("colonna")

Possiamo ordinare i dati partizionati con la colonna partizionata o qualsiasi altra colonna.

Ora puoi usare la funzione Lag () sulle righe partizionate usando il Sopra() funzione.

Aggiungiamo una colonna per archiviare il numero di riga usando il withColumn () funzione.

Sintassi:

dataframe_obj.WithColumn ("lag_column", lag ("colonna", offset).Over (partizione))

Qui, il nome specifica il nome della riga e DataFrame_Obj è il nostro PysPark DataFrame.

Implementamoci il codice.

Esempio 1:

Qui, creiamo un frame dati pyspark con 5 colonne - ['soggetto_id', 'name', 'age', 'tecnologia1', 'tecnologia2] con 10 righe e partizione le righe basate su Tecnologia1 Usando la funzione della finestra. Dopodiché, in ritardo di 1 riga.

importare pyspark
da pyspark.SQL Import *
Spark_app = SparkSession.costruttore.nome dell'applicazione('_').getOrCreate ()
Studenti = [(4, "Sravan", 23, "Php", "Test"),
(4, "Sravan", 23, "Php", "Test"),
(46, "Mounika", 22, ".Net ',' html '),
(4, "Deepika", 21, "Oracle", "HTML"),
(46, "Mounika", 22, "Oracle", "Test"),
(12, "Chandrika", 22, "Hadoop", "C#"),
(12, "Chandrika", 22, "Oracle", "Test"),
(4, "Sravan", 23, "Oracle", "C#"),
(4, "Deepika", 21, "Php", "C#"),
(46, "Mounika", 22, ".Net ',' test ')
"
DataFrame_obj = Spark_app.CreateAtaFrame (Students, ['soggetto_id', 'name', 'age', 'technology1', 'technology2'])
print ("---------- Frame dati effettivo ----------")
dataframe_obj.spettacolo()
# Importa la funzione della finestra
da pyspark.SQL.finestra di importazione della finestra
#import il ritardo da pyspark.SQL.funzioni
da pyspark.SQL.funzioni in ritardo di importazione
#partition Il frame dati in base ai valori nella colonna Technology1 e
#ordina le righe in ogni partizione in base alla colonna soggetto_id
partizione = finestra.Partitionby ("Technology1").orderby ('soggetto_id')
print ("---------- DataFrame partizionato ----------")
#Now menzione di ritardo con offset-1 in base a soggetto_id
dataframe_obj.WithColumn ("lag", lag ("soggetto_id", 1).Over (partizione)).spettacolo()

Produzione:

Spiegazione:

Nel primo output rappresenta i dati effettivi presenti nel frame dati. Nel secondo output, la partizione viene eseguita in base al Tecnologia1 colonna.

Il numero totale di partizioni è 4.

Partizione 1:

IL .La rete si è verificata due volte nella prima partizione. Dal momento che abbiamo specificato il lag-offset come 1, il primo .Il valore netto è nullo e il prossimo .Il valore netto è il valore di riga soggetto_id precedente - 46.

Partizione 2:

Hadoop si è verificato una volta nella seconda partizione. Quindi, il ritardo è nullo.

Partizione 3:

Oracle si è verificato quattro volte nella terza partizione.

Per il primo oracolo, il ritardo è nullo.

Per il secondo oracolo, il valore di ritardo è 4 (poiché il valore di riga precedente di riga è 4).

Per il terzo oracolo, il valore di ritardo è 4 (poiché il valore di riga precedente di riga è 4).

Per il quarto oracolo, il valore di ritardo è 12 (poiché il valore di riga precedente di riga è 12).

Partizione 4:

PHP si è verificato tre volte nella quarta partizione.

Il valore di ritardo per il 1 ° PHP è nullo.

Il valore di ritardo per il 2 ° PHP è 4 (poiché il valore di riga precedente di riga è 4).

Il valore LAG per il 3 ° PHP è 4 (poiché il valore di riga di riga precedente è 4).

Esempio 2:

LAG le file di 2. Assicurati di creare il telaio dati PysPark come visualizzato nell'esempio 1.

# Importa la funzione della finestra
da pyspark.SQL.finestra di importazione della finestra
#import il ritardo da pyspark.SQL.funzioni
da pyspark.SQL.funzioni in ritardo di importazione
#partition Il frame dati in base ai valori nella colonna Technology1 e
#ordina le righe in ogni partizione in base alla colonna soggetto_id
partizione = finestra.Partitionby ("Technology1").orderby ('soggetto_id')
print ("---------- DataFrame partizionato ----------")
#Now menzione di ritardo con offset-2 in base a soggetto_id
dataframe_obj.WithColumn ("lag", lag ("soggetto_id", 2).Over (partizione)).spettacolo()

Produzione:

Spiegazione:

La partizione viene eseguita in base al file Tecnologia1 colonna. Il numero totale di partizioni è 4.

Partizione 1:

IL .La rete si è verificata due volte nella prima partizione. Poiché abbiamo specificato il lag-offset come 2, l'offset è nullo per entrambi i valori.

Partizione 2:

Hadoop si è verificato una volta nella seconda partizione. Quindi, il ritardo è nullo.

Partizione 3:

Oracle si è verificato quattro volte nella terza partizione.

Per il primo e il secondo oracolo, il ritardo è nullo.

Per il terzo oracolo, il valore di ritardo è 4 (poiché il valore di 2 righe soggetto_id precedente è 4).

Per il quarto Oracle, il valore di ritardo è 4 (poiché il valore di 2 righe soggetti a 2 righe precedenti è 4).

Partizione 4:

PHP si è verificato tre volte nella quarta partizione.

Il valore di ritardo per il 1 ° e il 2 ° PHP è nullo.

Il valore di ritardo per il 3 ° PHP è 4 (poiché il valore soggetto_id 2 delle righe precedenti è 4).

Esempio 3:

LAG le righe di 2 in base alla colonna di età. Assicurati di creare il telaio dati PysPark come visualizzato nell'esempio 1.

# Importa la funzione della finestra
da pyspark.SQL.finestra di importazione della finestra
#import il ritardo da pyspark.SQL.funzioni
da pyspark.SQL.funzioni in ritardo di importazione
#partition Il frame dati in base ai valori nella colonna Technology1 e
#Ordina le righe in ogni partizione in base alla colonna di età
partizione = finestra.Partitionby ("Technology1").Orderby ('Age')
print ("---------- DataFrame partizionato ----------")
#Now menzione di ritardo con offset-2 in base all'età
dataframe_obj.WithColumn ("Lag", Lag ("Age", 2).Over (partizione)).spettacolo()

Produzione:

Spiegazione:

La partizione viene eseguita in base al file Tecnologia1 colonna e ritardo è definito in base alla colonna di età. Il numero totale di partizioni è 4.

Partizione 1:

IL .La rete si è verificata due volte nella prima partizione. Poiché abbiamo specificato il lag-offset come 2, l'offset è nullo per entrambi i valori.

Partizione 2:

Hadoop si è verificato una volta nella seconda partizione. Quindi, il ritardo è nullo.

Partizione 3:

Oracle si è verificato quattro volte nella terza partizione.

Per il primo e il secondo oracolo, il ritardo è nullo.

Per il terzo oracolo, il valore di ritardo è 21 (il valore di età rispetto alle due righe precedenti è 21).

Per il quarto Oracle, il valore di ritardo è 22 (il valore di età rispetto alle due righe precedenti è 22).

Partizione 4:

PHP si è verificato tre volte nella quarta partizione.

Il valore di ritardo per il 1 ° e il 2 ° PHP è nullo.

Il valore di ritardo per il terzo HP è 21 (il valore di età rispetto alle due righe precedenti è 21).

Conclusione

Abbiamo imparato come ottenere i valori di ritardo nel frame dati Pyspark in righe partizionate. La funzione Lag () in Pyspark è disponibile nel modulo della finestra che viene utilizzato per restituire i valori delle righe precedenti alle righe corrente. Abbiamo imparato i diversi esempi impostando i diversi offset.