Iniziare con Apache Kafka e Python

Iniziare con Apache Kafka e Python
In questa lezione, vedremo come possiamo usare Apache Kafka con Python e creare un'applicazione di esempio utilizzando il client Python per Apache Kafka.

Per completare questa lezione, è necessario avere un'installazione attiva per Kafka sulla macchina. Leggi Installa Apache Kafka su Ubuntu per sapere come farlo.

Installazione del client Python per Apache Kafka

Prima di poter iniziare a lavorare con Apache Kafka nel programma Python, dobbiamo installare il client Python per Apache Kafka. Questo può essere fatto usando PIP (Indice del pacchetto Python). Ecco un comando per raggiungere questo obiettivo:

PIP3 Installa Kafka-Python

Questa sarà una rapida installazione sul terminale:

Installazione client di Python Kafka utilizzando PIP

Ora che abbiamo un'installazione attiva per Apache Kafka e abbiamo anche installato il client Python Kafka, siamo pronti per iniziare la codifica.

Fare un produttore

La prima cosa da dover pubblicare messaggi su Kafka è un'applicazione del produttore che può inviare messaggi agli argomenti in Kafka.

Si noti che i produttori di kafka sono produttori di messaggi asincroni. Ciò significa che le operazioni eseguite mentre un messaggio è pubblicato sulla partizione dell'argomento di Kafka non bloccano. Per semplificare le cose, scriveremo un semplice editore JSON per questa lezione.

Per iniziare, fai un'istanza per il produttore di Kafka:

da kafka import kafkaproduttore
Importa JSON
importare pprint
produttore = kafkaproducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.Dumps (V).ENCODE ('UTF-8'))

L'attributo bootstrap_servers informa l'host e la porta per il server Kafka. L'attributo value_serializer è solo a scopo della serializzazione JSON dei valori JSON incontrati.

Per giocare con il produttore di Kafka, proviamo a stampare le metriche relative al produttore e al cluster Kafka:

Metriche = produttore.metrica()
pprint.pprint (metriche)

Vedremo quanto segue ora:

Kafka Mterics

Ora, provamo finalmente a inviare qualche messaggio alla coda Kafka. Un semplice oggetto JSON sarà un buon esempio:

produttore.Send ('LinuxHint', 'Topic': 'Kafka')

IL Linuxhint è la partizione dell'argomento su cui verrà inviato l'oggetto JSON. Quando esegui lo script, non otterrai alcun output poiché il messaggio viene appena inviato alla partizione dell'argomento. È tempo di scrivere un consumatore in modo da poter testare la nostra applicazione.

Fare un consumatore

Ora, siamo pronti a stabilire una nuova connessione come applicazione del consumatore e ottenere i messaggi dall'argomento Kafka. Inizia con una nuova istanza per il consumatore:

da kafka import kafkaconsumer
da Kafka Import Topic -Partition
Stampa ('Making Connection.')
consumer = kafkaconsumer (bootstrap_servers = 'localhost: 9092')

Ora, assegna un argomento a questa connessione e anche a un possibile valore di offset.

Stampa ('Assegnazione dell'argomento.')
consumatore.Assegna ([TopicPartition ('LinuxHint', 2)])

Infine, siamo pronti a stampare il mssage:

stampa ('ricevere messaggio.')
Per il messaggio nel consumatore:
print ("offset:" + str (messaggio [0]) + "\ t msg:" + str (messaggio))

Attraverso questo, avremo un elenco di tutti i messaggi pubblicati sulla partizione dell'argomento del consumatore Kafka. L'output per questo programma sarà:

Consumatore Kafka

Solo per un rapido riferimento, ecco lo script del produttore completo:

da kafka import kafkaproduttore
Importa JSON
importare pprint
produttore = kafkaproducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.Dumps (V).ENCODE ('UTF-8'))
produttore.Send ('LinuxHint', 'Topic': 'Kafka')
# metrics = produttore.metrica()
# pprint.pprint (metriche)

Ed ecco il programma completo del consumatore che abbiamo usato:

da kafka import kafkaconsumer
da Kafka Import Topic -Partition
Stampa ('Making Connection.')
consumer = kafkaconsumer (bootstrap_servers = 'localhost: 9092')
Stampa ('Assegnazione dell'argomento.')
consumatore.Assegna ([TopicPartition ('LinuxHint', 2)])
stampa ('ricevere messaggio.')
Per il messaggio nel consumatore:
print ("offset:" + str (messaggio [0]) + "\ t msg:" + str (messaggio))

Conclusione

In questa lezione, abbiamo esaminato come possiamo installare e iniziare a utilizzare Apache Kafka nei nostri programmi Python. Abbiamo mostrato quanto sia facile eseguire compiti semplici relativi a Kafka in Python con il cliente Kafka dimostrato per Python.