Come leggere i dati da Kafka con Python

Come leggere i dati da Kafka con Python
Kafka è un sistema di messaggistica distribuito a source open source per inviare il messaggio in argomenti partiti e diversi. Lo streaming dei dati in tempo reale può essere implementato utilizzando Kafka per ricevere dati tra le applicazioni. Ha tre parti principali. Questi sono produttori, consumatori e argomenti. Il produttore viene utilizzato per inviare un messaggio a un argomento particolare e ogni messaggio è allegato a una chiave. Il consumatore viene utilizzato per leggere un messaggio su un argomento particolare dall'insieme delle partizioni. I dati ricevuti dal produttore e archiviati sulle partizioni in base a un argomento particolare. Molte biblioteche esistono in Python per creare produttori e consumatori per costruire un sistema di messaggistica utilizzando Kafka. In questo tutorial è mostrato come i dati di kafka utilizzando Python.

Prerequisito

È necessario installare la libreria Python necessaria per leggere i dati da Kafka. Python3 è usato in questo tutorial per scrivere la sceneggiatura del consumatore e del produttore. Se il pacchetto PIP non è installato prima nel sistema operativo Linux, è necessario installare PIP prima di installare la libreria Kafka per Python. Python3-Kafka viene utilizzato in questo tutorial per leggere i dati da Kafka. Esegui il comando seguente per installare la libreria.

$ pip Installa Python3-Kafka

Leggere semplici dati di testo da Kafka

Diversi tipi di dati possono essere inviati dal produttore su un argomento particolare che può essere letto dal consumatore. In questa parte di questo tutorial è mostrato come un semplice dati di testo può essere inviato e ricevuto da Kafka usando produttore e consumatore.

Crea un file denominato produttore1.Py con la seguente sceneggiatura Python. Kafkaproducer Il modulo viene importato dalla libreria Kafka. L'elenco dei broker deve definire al momento dell'inizializzazione dell'oggetto produttore per connettersi con il server Kafka. La porta predefinita di Kafka è '9092'. L'argomento bootstrap_servers viene utilizzato per definire il nome host con la porta. 'First_topic'è impostato come nome argomento con cui verrà inviato il messaggio di testo. Successivamente, un semplice messaggio di testo, "Ciao da Kafka'viene inviato usando Inviare() metodo di Kafkaproducer all'argomento "First_topic'.

produttore1.PY:

# Importa Kafkaproducer dalla biblioteca Kafka
da kafka import kafkaproduttore
# Definisci il server con porta
bootstrap_servers = ['localhost: 9092']
# Definisci il nome dell'argomento in cui il messaggio pubblicherà
TopicName = 'First_topic'
# Inizializza la variabile del produttore
produttore = kafkaproducer (bootstrap_servers = bootstrap_servers)
# Pubblica il testo nell'argomento definito
produttore.Invia (TopicName, B'Hello da Kafka ... ')
# Messaggio di stampa
Stampa ("Messaggio inviato")

Crea un file denominato Consumer1.Py con la seguente sceneggiatura Python. Kafkaconsumer Il modulo viene importato dalla libreria Kafka per leggere i dati da Kafka. sys Il modulo viene utilizzato qui per terminare lo script. Lo stesso nome host e il numero di porta del produttore sono utilizzati nella sceneggiatura del consumatore per leggere i dati da Kafka. Il nome dell'argomento del consumatore e del produttore deve essere lo stessoFirst_topic'. Successivamente, l'oggetto del consumatore viene inizializzato con i tre argomenti. Nome argomento, ID gruppo e informazioni sul server. per Loop viene utilizzato qui per leggere il testo Invia dal produttore Kafka.

Consumer1.PY:

# Importa KafkaConsumer dalla biblioteca Kafka
da kafka import kafkaconsumer
# Importa modulo sys
Import sys
# Definisci il server con porta
bootstrap_servers = ['localhost: 9092']
# Definisci il nome dell'argomento da dove riceverà il messaggio
TopicName = 'First_topic'
# Inizializza la variabile del consumatore
consumer = kafkaconsumer (argomentazione, gruppo_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Leggi e stampa il messaggio dal consumatore
Per MSG nel consumatore:
print ("Nome argomento =%s, messaggio =%s"%(msg.Argomento, msg.valore))
# Termina lo script
sys.Uscita()

Produzione:

Esegui il seguente comando da un terminale per eseguire lo script del produttore.

$ python3 produttore1.Py

Verrà visualizzato il seguente output dopo aver inviato il messaggio.

Esegui il seguente comando da un altro terminale per eseguire lo script del consumatore.

$ python3 consumer1.Py

L'output mostra il nome dell'argomento e il messaggio di testo inviato dal produttore.

Leggere i dati formattati JSON da Kafka

I dati formattati JSON possono essere inviati dal produttore di Kafka e letto da Kafka Consumer utilizzando il json Modulo di Python. In che modo i dati JSON possono essere serializzati e de-serializzati prima di inviare e ricevere i dati utilizzando il modulo Python-Kafka è mostrato in questa parte di questo tutorial.

Crea uno script Python chiamato produttore2.Py Con il seguente script. Un altro modulo chiamato JSON viene importato con Kafkaproducer modulo qui. value_serializer L'argomento è usato con bootstrap_servers Argomento qui per inizializzare l'oggetto del produttore di Kafka. Questo argomento indica che i dati JSON saranno codificati utilizzando 'UTF-8'Carattere impostato al momento dell'invio. Successivamente, i dati formattati JSON vengono inviati all'argomento denominato JSONTOPICO.

produttore2.PY:

# Importa Kafkaproducer dalla biblioteca Kafka
da kafka import kafkaproduttore
# Importa il modulo JSON per serializzare i dati
Importa JSON
# Inizializza la variabile del produttore e imposta il parametro per JSON ENCODE
produttore = kafkaproducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda V: JSON.Dumps (V).ENCODE ('UTF-8'))
# Invia dati in formato JSON
produttore.Send ('JSonTopic', 'Name': 'Fahmida', 'Email': '[email protected] ')
# Messaggio di stampa
Stampa ("Messaggio inviato a JSontopic")

Crea uno script Python chiamato Consumer2.Py Con il seguente script. Kafkaconsumer, sys e i moduli JSON vengono importati in questo script. Kafkaconsumer Il modulo viene utilizzato per leggere i dati formattati JSON da Kafka. Il modulo JSON viene utilizzato per decodificare i dati JSON codificati inviati dal produttore Kafka. Sys Il modulo viene utilizzato per terminare lo script. value_deserializer L'argomento è usato con bootstrap_servers Per definire come i dati JSON saranno decodificati. Prossimo, per Loop viene utilizzato per stampare tutti i record dei consumatori e i dati JSON recuperati da Kafka.

Consumer2.PY:

# Importa KafkaConsumer dalla biblioteca Kafka
da kafka import kafkaconsumer
# Importa modulo sys
Import sys
# Importa il modulo JSON per serializzare i dati
Importa JSON
# Inizializza la variabile del consumatore e imposta la proprietà per JSON Decode
consumer = kafkaconsumer ('jsontopic', bootstrap_servers = ['localhost: 9092'],
value_deserializer = lambda m: json.Carichi (m.decode ('utf-8')))
# Leggi i dati da Kafka
Per il messaggio nel consumatore:
Stampa ("Record dei consumatori: \ n")
Stampa (messaggio)
print ("\ nreading da JSON Data \ n")
print ("nome:", messaggio [6] ['name'])
print ("email:", messaggio [6] ['email'])
# Termina lo script
sys.Uscita()

Produzione:

Esegui il seguente comando da un terminale per eseguire lo script del produttore.

$ python3 produttore2.Py

Lo script stamperà il seguente messaggio dopo aver inviato i dati JSON.

Esegui il seguente comando da un altro terminale per eseguire lo script del consumatore.

$ python3 consumer2.Py

Il seguente output apparirà dopo aver eseguito lo script.

Conclusione:

I dati possono essere inviati e ricevuti in diversi formati da Kafka usando Python. I dati possono anche essere archiviati nel database e recuperati dal database utilizzando Kafka e Python. A casa, questo tutorial aiuterà l'utente di Python a iniziare a lavorare con Kafka.