Redis Xread

Redis Xread

Redis Stream, consumatori e operazioni di blocco

Redis ha introdotto i flussi che imitano la struttura dei dati del registro con la versione 5.0. Stream è una struttura di dati solo per appendici con un set di operazioni più ricco che in un file di registro. È uno dei tipi di dati più complessi in Redis poiché implementa ulteriori operazioni di blocco che consentono ai clienti di attendere i nuovi dati di flusso. Questo è in qualche modo simile al comportamento di Redis Pub/Sub o blocco elenchi, ma sono presenti differenze fondamentali quando si tratta di come i consumatori consumano i dati di Redis Stream.

Come mostrato nell'illustrazione precedente, si possono vedere diversi vantaggi rispetto a Redis Pub/Sub e blocco elenchi. Ogni nuovo elemento di dati viene consegnato a ogni consumatore. A differenza degli elenchi di rimozione dell'elemento dell'elenco ogni volta che viene chiamato a BLPOP o BRPOP, gli elementi del flusso rimangono come lo è nel flusso. Il comando XRead opera come un candidato a blocco e non bloccante sui flussi di Redis.

Il comando XRead

Il comando XRead può recuperare le voci da più flussi contemporaneamente mentre le voci restituite hanno un ID più grande dell'ultimo ID ricevuto per un determinato consumatore. Può operare sia in modo bloccante che non bloccante. Nella natura non bloccante, il comando si comporta molto simile al comando XRange ma con alcune funzionalità aggiuntive elencate di seguito:

  • Può recuperare le voci a partire dalla voce più recente che ha l'ID più grande di qualsiasi altro elemento nel flusso.
  • Può leggere da più flussi contemporaneamente.

Questo comando ha una complessità del tempo lineare quando il numero N di elementi è memorizzato nel flusso. Quindi, con un conteggio di rendimento fisso, la complessità del tempo è costante.

Il comando XRead segue la seguente sintassi:

Sintassi:

XRead [CONTERE NUMERO_OF_ROTURNED_ELEMENTS] [blocco bloccante_time_in_milliseconds] Tasto flussi [chiave ...] ID [id…]

CONTARE : Il numero di elementi da restituire dal comando. Limita le righe restituite a un numero specificato.

Blocco: Il tempo massimo per attendere che un nuovo elemento appaia nel flusso.

Le due opzioni precedenti sono opzionali per il comando.

FLUSSI : La chiave del flusso. Questa è un'opzione obbligatoria e deve essere l'ultima opzione nel comando poiché accetta la lunghezza variabile delle chiavi e gli ID di iscrizione.

: L'ID della voce del flusso.

È possibile specificare più chiavi poiché il comando consente di leggere da più di un flusso. Allo stesso tempo, possono essere forniti più ID.

Questo comando restituisce una risposta array. Ogni elemento dell'array è costituito da due elementi come mostrato nel seguente formato:

Esempio 1: ispezionare i dati meteorologici per due posizioni con XRead non bloccante

Supponiamo che abbiamo ottenuto due flussi contenenti i dati meteorologici per Los Angeles e New York. Nel nostro sito di pubblicazione dei dati meteorologici, dobbiamo consumare da entrambi i flussi e recuperare gli ultimi dati meteorologici per queste due posizioni. Il comando XRead è il candidato ideale da utilizzare in questo scenario con la sua variante non bloccante.

È ora di creare due flussi chiamati Meteo: New York E Meteo: la e popola un paio di voci con alcune coppie di valori di campo come mostrato nel seguente:

Xadd Weather: New York * Wind 45 Umidità 78 Temp 12
Xadd Weather: La * Wind 12 Umidità 45 Temp 22

Entrambi i flussi Meteo: New York E Meteo: LC sono creati correttamente e gli ID di iscrizione restituiti 1658114094434-0 E 1658114110474-0, rispettivamente.

Usiamo il comando XRead per leggere da entrambi i flussi contemporaneamente in modo non bloccante.

XREAD Streams Weather: NYC Weather: LA 0 0

Come previsto, l'output contiene le voci di entrambi i flussi con la sequenza ID a partire da 0. È accettabile specificare gli ID incompleti come precedentemente illustrato in cui entrambi gli ID sono 0 che è il timestamp di millisecondi senza la parte del numero di sequenza. Quindi, il comando precedente può essere scritto come segue:

XREAD Streams Weather: NYC Weather: LA 0-0 0-0

Aggiungiamo un paio di voci ad entrambi i flussi ora.

Xadd Weather: New York * Wind 10 Umidità 60 Temp 10
Xadd Weather: La * Wind 18 Umidità 80 Temp 5

Dal momento che abbiamo già gli ultimi ID di voce per entrambi i flussi dai comandi precedenti, chiamiamo di nuovo il comando Xread per recuperare tutte le voci con ID più grandi di quelli che abbiamo già interrogato.

Xread Streams Weather: NYC Weather: LA 1658114094434-0 1658114110474-0

Come potresti vedere, gli ID specificati provengono dalla query precedente. Ora, la chiamata di comando restituisce tutte le voci che hanno ID maggiori di quelli specificati.

Come puoi vedere, le voci appena aggiunte vengono restituite dal comando precedente. Successivamente, quello che puoi fare è prendere gli ID di voce restituiti dal comando precedente e chiamare XRead con quegli ID fino a quando l'array restituito è vuoto.

Esempio 2: Ottieni le ultime promozioni per la pizza con il blocco di Xread

C'è un'altra variante del comando XRead che può essere utilizzato per attendere fino a quando gli editori pubblicano un nuovo dati nel flusso senza terminare immediatamente come una chiamata non bloccante. Supponiamo che uno scenario in cui i ragazzi della pizza vogliono spingere le notifiche a tutti i clienti per quanto riguarda le ultime promozioni disponibili. Potrebbero non esserci promozioni in determinati giorni. Quindi, i clienti dovrebbero aspettare fino a quando le nuove promozioni sono disponibili. Può essere ottenuto con il comando XRead con l'opzione Blocco in posizione.

Supponiamo che la società di pizza stia pubblicando i dettagli promozionali in un flusso chiamato Pizzapromos: ogni giorno. Quindi, possiamo usare il comando XRead per attendere fino a quando un nuovo elemento promozionale non viene aggiunto al flusso.

Xread Block 50000 Streams Pizzapromosnew: giornaliero $

In questo caso, specifichiamo l'ID di voce come $ che viene interpretato come ID di voce superiore. Quindi, il comando interrogherà solo le nuove voci aggiunte al flusso e non le voci storiche.

Dal momento che non abbiamo aggiunto nuove voci allo streaming, sarà timeout dopo 50000 millisecondi con un zero ritorno come mostrato nel seguente:

Ora, aggiungiamo una voce al flusso usando XADD mentre un altro consumatore è in attesa dei dati con il comando XRead come mostrato nel seguente:

Come previsto, la voce aggiunta viene consumata immediatamente dal consumatore. Dalla prossima chiamata, dobbiamo assicurarci di passare l'ID che viene restituito da questo comando e non il $. In caso contrario, ci mancheranno le voci aggiunte tra.

Se più client stanno aspettando lo stesso flusso, i dati appena aggiunti vengono spinti a tutti immediatamente. Il comando XRead è un comando molto utile e consigliato da utilizzare per bloccare le applicazioni naturali.

Conclusione

Per riassumere, il comando XRead è uno dei comandi ampiamente usati che operano sui flussi di Redis. Può operare in modi sia bloccanti che non bloccanti. Come discusso, la variante non bloccante è molto simile al comando XRange con un paio di differenze. Inoltre, questo comando può essere utilizzato con l'opzione Blocco per attendere fino a quando gli editori pubblicano un nuovo dati nel flusso. Nel complesso, il comando XRead è specializzato nel consumo dei dati da più flussi contemporaneamente. È una caratteristica utile che le applicazioni moderne stanno cercando.