I dati nel flusso sono immutabili. I nuovi dati possono essere aggiunti solo alla fine del flusso. Redis Streams può contenere voci che non sono solo stringhe. Ogni voce può contenere una o più coppie di valore di campo come negli hash Redis. Queste voci hanno un ID univoco per identificarle all'interno del flusso, simile ai numeri di riga o agli offset di byte utilizzati in un file di registro. La seguente illustrazione fornirà una migliore comprensione di come appare il flusso Redis:
Il comando XADD viene utilizzato per aggiungere voci a un determinato flusso che è semplice. Il meccanismo di accesso ai dati è diverso con i tipi di flusso rispetto ad altri tipi. Il vantaggio principale dei flussi è che possono spingere i messaggi appena aggiunti a più clienti o consumatori. È solo un modo di guardare i rossi flussi. Inoltre, possiamo vederlo come un negozio di serie temporali in cui è possibile ripetere l'intero flusso per recuperare tutte le voci per un determinato lasso di tempo.
Redis Stream Consumer Groups
Come accennato, i flussi di Redis consentono a più consumatori di leggere i dati da esso. Inoltre, estende questa funzionalità a un livello di accesso a un sottoinsieme dei messaggi di flusso da parte di diversi consumatori. Ogni consumatore prenderà dati diversi da elaborare, mentre Kafka implementa lo stesso comportamento con i gruppi di consumatori. La tecnica dei gruppi di consumatori è disponibile nei flussi Redis che consentono di distribuire i dati di flusso forniti tra più consumatori.
Possiamo utilizzare il comando XReadGroup per leggere i dati tramite un gruppo di consumatori. Ogni gruppo di consumatori può contenere più consumatori identificati con un nome univoco.
Il comando xack
Come accennato in precedenza, i consumatori all'interno di un gruppo di consumatori ricevono messaggi dal flusso in cui gli ID del messaggio sono maggiori dell'ultimo ID consegnato. Dopo aver consegnato un messaggio a un consumatore, il suo stato sarà impostato in sospeso e memorizzato nell'elenco delle voci in sospeso del gruppo di consumatore (PEL). È un effetto collaterale di chiamare i comandi XReadGroup o XClaim. Ciò causerebbe comunque il server restituire i messaggi in sospeso ogni volta che effettua una chiamata utilizzando il comando XReadGroup per recuperare la cronologia dei messaggi per consumatore. Quindi, i gruppi di consumatori Redis hanno introdotto un processo di riconoscimento dei messaggi. Il comando XACK può avvisare il server che un messaggio recuperato è stato elaborato correttamente. Rimuoverebbe la voce per un tale messaggio nel PEL.
Sintassi
XackIl comando XACK restituisce il numero di voci riconosciute come risposta.
Esempio: il bilanciamento del carico serve client diversi da più nodi del server
Diamo un'occhiata a uno scenario del mondo reale in cui un bilanciamento del carico legge da un flusso di indirizzi IP client e serve ogni client da diversi nodi del server. Possiamo pensare a questo come a un gruppo di consumatori che legge da un flusso Redis in cui il gruppo contiene più nodi di consumo.
Innanzitutto, dovremmo creare un gruppo di consumatori a cui appartiene ogni nodo server. Possiamo utilizzare il seguente comando XGroup per creare un gruppo di consumatori per un determinato flusso:
XGroup Crea iPAddressStream UkserverGroup $ mkstreamIl comando è autoesplicativo, dove il iPaddressStream ha un nuovo gruppo di consumatori chiamato UkserverGroup che fornisce solo i nuovi messaggi disponibili per lo streaming quando un consumatore è connesso. Crea anche il iPaddressStream flusso perché il Mkstream è stato specificato il parametro.
Quindi, dovremmo aggiungere diversi indirizzi IP a iPaddressStream Creato in precedenza utilizzando il seguente comando Redis XADD:
XADD iPaddressStream * IP 123.456.12.1Output dopo ogni comando:
Questo aggiungerebbe 5 voci al iPaddressStream flusso. Ogni voce è assegnata a un ID univoco generato dal server che è tornato dopo aver chiamato il comando XADD.
Ora leggiamo il iPaddressStream tramite il UkserverGroup's consumatore chiamato Server0.
XReadGroup Group UkserverGroup Server0 Conteggio 2 streams iPAddressStream>Produzione
Come previsto, il Server0 Il consumatore riceve due nuovi messaggi da iPaddressStream flusso. Questi due indirizzi IP sono stati aggiunti all'elenco delle voci in sospeso. Se chiamiamo il comando XReadGroup con l'ID 0, restituirà i messaggi in sospeso per il Server0 consumatore.
XReadGroup Group UkserverGroup Server0 Streams iPAddressStream 0Produzione
Ciò significa che il server sta ancora aspettando il Server0 consumatore per riconoscere questi due messaggi. Riconosciamo i messaggi per il Server0 consumatore utilizzando il seguente comando XACK.
XACK IPADDRESSTREAM UKSERVERGROUP 1656192378464-0 1656192389344-0Qui, stiamo riconoscendo entrambe le voci identificate dai rispettivi ID. Il comando restituisce anche il conteggio dei messaggi elaborati correttamente. Sono due in questo caso.
Produzione
Dopo il processo precedente, questi due messaggi avrebbero dovuto essere rimossi dall'elenco delle voci in sospeso (PEL). Quindi il Server0 Il consumatore non restituirà alcun messaggio in sospeso dopo aver chiamato il comando XReadGroup tramite il gruppo di consumatori UkserverGroup.
XReadGroup Group UkserverGroup Server0 Streams iPAddressStream 0Produzione
Restituisce un array vuoto che significa nessun messaggio in sospeso per questo consumatore. La funzione di riconoscimento del messaggio è molto utile in questi tipi di scenari.
Conclusione
Redis viene fornito con il tipo di dati del flusso la cui implementazione sottostante si basa sulla struttura dei dati del registro. Quindi, le nuove voci vengono aggiunte alla fine del flusso. Il vantaggio più grande è che più consumatori possono interrogare gli ultimi messaggi aggiunti allo streaming. Inoltre, la tecnica del gruppo di consumatori Redis consente il flusso di lettura da parte di un gruppo di consumatori in cui ogni consumatore vede solo un sottoinsieme dei messaggi di flusso. Dopo aver letto la voce di un consumatore dal flusso, tale voce viene aggiunta all'elenco delle voci in sospeso. Quindi, il consumatore deve riconoscere ciascuno dei messaggi in sospeso. Notirà al server di rimuovere la voce da PEL e rilasciare la memoria. Il comando XACK può essere utilizzato per riconoscere i messaggi di Redis Stream. Supporta il riconoscimento di più messaggi contemporaneamente.