Pyspark RDD - trasformazioni

Pyspark RDD - trasformazioni
In Python, Pyspark è un modulo Spark utilizzato per fornire un tipo di elaborazione simile come Spark.

RDD sta per set di dati distribuiti resilienti. Possiamo chiamare RDD una struttura di dati fondamentali in Apache Spark.

Dobbiamo importare RDD dal pyspark.Modulo RDD.

Quindi, in Pyspark per creare un RDD, possiamo usare il metodo parallelize ().

Sintassi:

Spark_app.SparkContext.parallelizza (dati)

Dove,

I dati possono essere un dati monodimensionali (dati lineari) o bidimensionali (dati di riga-colonna).

Trasformazioni RDD:

Una trasformazione RDD è un'operazione applicata a un RDD per creare nuovi dati dall'esistente RDD. Usando le trasformazioni, siamo in grado di filtrare l'RDD applicando alcune trasformazioni.

Vediamo le trasformazioni che vengono eseguite su RDD dato.

Ne discuteremo uno per uno.

1. carta geografica()

La trasformazione della mappa () viene utilizzata per mappare un valore agli elementi presenti nella RDD. Prende una funzione anonima come parametro, come lambda e trasforma gli elementi in un RDD.

Sintassi:

Rdd_data.mappa (anonymous_function)

Parametri:

Anonymous_Function sembra:

Elemento lambda: operazione

Ad esempio, l'operazione è aggiungere/sottrarre tutti gli elementi con un nuovo elemento.

Vediamo gli esempi per capire meglio questa trasformazione.

Esempio 1:

In questo esempio, creiamo un RDD chiamato Student_Marks con 20 elementi e applicare la trasformazione Map () aggiungendo ogni elemento con 20 e visualizzandoli usando l'azione da collezione ().

#import il modulo pyspark
importare pyspark
#IMPORT SPARKSESSION per la creazione di una sessione
da pyspark.SQL Importazione di importazione
# Importa RDD da Pyspark.rdd
da pyspark.RDD Import RDD
#Crea un'app chiamata LinuxHint
Spark_app = SparkSession.costruttore.appname ('LinuxHint').getOrCreate ()
# Crea dati per gli studenti con 20 elementi
Student_Marks = Spark_app.SparkContext.parallelize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])))
#Display Data in RDD
Stampa ("Dati effettivi in ​​RDD:", Student_Marks.Mappa (Lambda Element: Element).raccogliere())
#Apply Map () trasformazione aggiungendo 20 a ciascun elemento in RDD
Stampa ("Dopo aver aggiunto 20 ad ogni elemento in RDD:", Student_Marks.Mappa (Lambda Element: Element+ 20).raccogliere())

Produzione:

Dati effettivi in ​​RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Dopo aver aggiunto 20 a ciascun elemento in RDD: [109, 96, 98, 109, 110, 120, 54, 76, 74, 42, 65, 63, 43, 76, 98, 41, 54, 54, 76, 54]

Dall'output sopra, possiamo vedere che l'elemento 20 viene aggiunto a ogni elemento in RDD attraverso la funzione Lambda usando la trasformazione mappa ().

Esempio 2:

In questo esempio, creiamo un RDD chiamato Student_Marks con 20 elementi e applicare la trasformazione mappa () sottraendo ciascun elemento per 15 e visualizzandoli usando l'azione da collezione ().

#import il modulo pyspark
importare pyspark
#IMPORT SPARKSESSION per la creazione di una sessione
da pyspark.SQL Importazione di importazione
# Importa RDD da Pyspark.rdd
da pyspark.RDD Import RDD
#Crea un'app chiamata LinuxHint
Spark_app = SparkSession.costruttore.appname ('LinuxHint').getOrCreate ()
# Crea dati per gli studenti con 20 elementi
Student_Marks = Spark_app.SparkContext.parallelize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])))
#Display Data in RDD
Stampa ("Dati effettivi in ​​RDD:", Student_Marks.Mappa (Lambda Element: Element).raccogliere())
#Apply map () trasformazione sottraendo 15 da ciascun elemento in RDD
Stampa ("Dopo aver sottratto 15 da ciascun elemento in RDD:", Student_Marks.Mappa (elemento lambda: elemento-15).raccogliere())

Produzione:

Dati effettivi in ​​RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Dopo aver sottratto 15 da ciascun elemento in RDD: [74, 61, 63, 74, 75, 85, 19, 41, 39, 7, 30, 28, 8, 41, 63, 6, 19, 41, 19]

Dall'output sopra, possiamo vedere che l'elemento 15 viene sottratto a ogni elemento in RDD attraverso la funzione Lambda usando la trasformazione MAP ().

2. filtro()

La trasformazione del filtro () viene utilizzata per filtrare i valori da RDD. Prende una funzione anonima come lambda e restituisce gli elementi filtrando elementi da un RDD.

Sintassi:

Rdd_data.filtro (anonymous_function)

Parametri:

Anonymous_Function sembra:

Elemento lambda: condizione/espressione

Ad esempio, la condizione viene utilizzata per specificare le istruzioni espressive per filtrare RDD.

Vediamo esempi per capire meglio questa trasformazione.

Esempio 1:

In questo esempio, creiamo un RDD chiamato Student_Marks con 20 elementi e applicare la trasformazione Filter () filtrando solo multipli di 5 e visualizzandoli usando l'azione da collezione ().

#import il modulo pyspark
importare pyspark
#IMPORT SPARKSESSION per la creazione di una sessione
da pyspark.SQL Importazione di importazione
# Importa RDD da Pyspark.rdd
da pyspark.RDD Import RDD
#Crea un'app chiamata LinuxHint
Spark_app = SparkSession.costruttore.appname ('LinuxHint').getOrCreate ()
# Crea dati per gli studenti con 20 elementi
Student_Marks = Spark_app.SparkContext.parallelize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])))
#Display Data in RDD
Stampa ("Dati effettivi in ​​RDD:", Student_Marks.Mappa (Lambda Element: Element).raccogliere())
#Apply Filter () trasformazione restituendo multipli di 5.
stampa ("multipli di 5 da un RDD:", Student_Marks.filtro (elemento lambda: elemento%5 == 0).raccogliere())
)

Produzione:

Dati effettivi in ​​RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Multipli di 5 da un RDD: [90, 100, 45]

Dall'output sopra, possiamo vedere che multipli di 5 elementi sono filtrati dalla RDD.

Esempio 2:

In questo esempio, creiamo un RDD denominato Student_Marks con 20 elementi e applicare la trasformazione di filtro () filtrando elementi che sono superiori a 45 e visualizzandoli usando l'azione collection ().

#import il modulo pyspark
importare pyspark
#IMPORT SPARKSESSION per la creazione di una sessione
da pyspark.SQL Importazione di importazione
# Importa RDD da Pyspark.rdd
da pyspark.RDD Import RDD
#Crea un'app chiamata LinuxHint
Spark_app = SparkSession.costruttore.appname ('LinuxHint').getOrCreate ()
# Crea dati per gli studenti con 20 elementi
Student_Marks = Spark_app.SparkContext.parallelize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])))
#Display Data in RDD
Stampa ("Dati effettivi in ​​RDD:", Student_Marks.Mappa (Lambda Element: Element).raccogliere())
#Apply Filter () trasformazione filtrando valori superiori a 45
Stampa ("Valori superiori a 45:", Student_Marks.filtro (elemento lambda: elemento> 45).raccogliere())

Produzione:

Dati effettivi in ​​RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Valori superiori a 45: [89, 76, 78, 89, 90, 100, 56, 54, 56, 78, 56]

Dall'output di cui sopra, possiamo vedere che quegli elementi superiori a 45 sono filtrati dalla RDD.

3. unione()

La trasformazione di Union () viene utilizzata per combinare due RDD. Possiamo eseguire questa trasformazione su due RDD ..

Sintassi:

Rdd_data1.Union (RDD_DATA2)

Vediamo esempi per capire meglio questa trasformazione.

Esempio 1:

In questo esempio, creeremo un singolo RDD con i dati degli studenti e genereremo due RDD dal singolo RDD filtrando alcuni valori utilizzando la trasformazione di filtro (). Successivamente, possiamo eseguire la trasformazione di Union () sui due RDD filtrati.

#import il modulo pyspark
importare pyspark
#IMPORT SPARKSESSION per la creazione di una sessione
da pyspark.SQL Importazione di importazione
# Importa RDD da Pyspark.rdd
da pyspark.RDD Import RDD
#Crea un'app chiamata LinuxHint
Spark_app = SparkSession.costruttore.appname ('LinuxHint').getOrCreate ()
# Crea dati per gli studenti con 20 elementi
Student_Marks = Spark_app.SparkContext.parallelize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])))
#Display Data in RDD
Stampa ("Dati effettivi in ​​RDD:", Student_Marks.Mappa (Lambda Element: Element).raccogliere())
First_filter = Student_Marks.Filtro (elemento lambda: elemento> 90)
Second_filter = Student_Marks.filtro (elemento lambda: elemento <40)
#Display Prima trasformazione filtrata
Stampa ("Elements in RDD maggiore di 90", First_filter.raccogliere())
#Display Seconda trasformazione filtrata
Stampa ("Elementi in RDD inferiore a 40", second_filter.raccogliere())
#Apply Union () trasformazione eseguendo Union sui 2 filtri sopra
Print ("Union Transformation su due dati filtrati", First_filter.Union (Second_filter).raccogliere())

Produzione:

Dati effettivi in ​​RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Elementi in RDD superiori a 90 [100]
Elementi in RDD inferiore a 40 [34, 22, 23, 21, 34, 34, 34]
Trasformazione dell'Unione su due dati filtrati [100, 34, 22, 23, 21, 34, 34, 34]

Dall'output sopra, puoi vedere che abbiamo eseguito Union su First_filter e Second_filter.

First_filter è ottenuto ottenendo elementi dagli studenti marchi RDD superiori a 90 e second_filter viene ottenuto ottenendo elementi dagli studenti marchi RDD inferiore a 40 usando la trasformazione di filtro ().

Esempio 2:

In questo esempio, creeremo due RDD in modo tale che il primo RDD abbia 20 elementi e il secondo RDD ha 10 elementi. Successivamente, possiamo applicare una trasformazione di Union () a questi due RDD.

#import il modulo pyspark
importare pyspark
#IMPORT SPARKSESSION per la creazione di una sessione
da pyspark.SQL Importazione di importazione
# Importa RDD da Pyspark.rdd
da pyspark.RDD Import RDD
#Crea un'app chiamata LinuxHint
Spark_app = SparkSession.costruttore.appname ('LinuxHint').getOrCreate ()
# Crea dati per gli studenti con 20 elementi
Student_Marks1 = Spark_app.SparkContext.parallelize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])))
# Crea i dati degli studenti con 10 elementi
Student_Marks2 = Spark_app.SparkContext.parallelize ([45,43,23,56,78,21,34,34,56,34])
#Display Data in RDD
Stampa ("Dati effettivi in ​​Student Marks 1 RDD:", Student_Marks1.Mappa (Lambda Element: Element).raccogliere())
#Display Data in RDD
Stampa ("Dati effettivi in ​​Student Marks 2 RDD:", Student_Marks2.Mappa (Lambda Element: Element).raccogliere())
#Apply Union () trasformazione eseguendo unione sui 2 RDD sopra
Print ("Union Transformation su Two RDD", Student_Marks1.Union (Student_marks2).raccogliere())

Produzione:

Dati effettivi nei segni degli studenti 1 RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Dati effettivi nei segni degli studenti 2 RDD: [45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Trasformazione dell'Unione su due RDD [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

Possiamo vedere che due RDD sono combinati usando la trasformazione di Union ().

Conclusione

Da questo tutorial Pyspark, vediamo tre trasformazioni applicate a RDD. La trasformazione della mappa () viene utilizzata per mappare trasformando gli elementi in un RDD, il filtro () viene utilizzato per eseguire operazioni di filtro e creare un nuovo RDD filtrato dall'RDD esistente. Infine, abbiamo discusso di Union () RDD che viene utilizzato per combinare due RDD.