Как да използваме потребителски групи в Redis Streams

Рошан Кумар е старши продуктов мениджър в Redis Labs.

Redis Streams е нова структура от данни, въведена в Redis 5.0, която ви позволява да създавате и управлявате потоци от данни. В предишна статия показах как да добавяте данни към поток и как да ги четете по множество начини. В тази статия ще обясня как да използвам потребителски групи в Redis Streams. Потребителската група е начин за разделяне на поток от съобщения между множество клиенти, за да се ускори обработката или да се облекчи товара за по-бавни потребители.

В един перфектен свят както производителите на данни, така и потребителите работят с еднаква скорост и няма загуба на данни или изоставане на данните. За съжаление не е така в реалния свят. В почти всички случаи на използване на обработка на потоци от данни в реално време производителите и потребителите работят с различна скорост. Освен това има повече от един тип потребители, всеки със свои изисквания и темп на обработка. Redis Streams адресира тази необходимост с набор от функции, който силно гравитира към подкрепа на потребителите. Една от най-важните му характеристики е потребителската група.

Кога да се използва потребителска група Redis Streams

Целта на потребителските групи е да разшири процеса на консумация на данни. Нека разгледаме един пример - приложение за обработка на изображения. Решението изисква три основни компонента:

  1. Производител (една или повече камери, може би), който заснема и съхранява изображения;
  2. Redis Stream, който запазва изображенията (в хранилище за поточни данни) в реда, в който пристигат; и
  3. Обработващ изображения, който обработва всяко изображение. 
Лаборатории Redis

Да предположим, че вашият производител запазва 500 изображения в секунда, а процесорът за обработка на изображения обработва само 100 изображения в секунда с пълния си капацитет. Тази разлика в скоростта ще създаде изоставане и вашият процесор за изображения никога няма да може да навакса. Лесен начин за решаване на този проблем е да стартирате пет процесора за изображения (както е показано на фигура 2), като всеки обработва взаимно изключващ се набор от изображения. Можете да постигнете това чрез потребителска група, която ви позволява да разпределите работните си товари и да ги насочите към различни потребители.

Лаборатории Redis

Потребителска група прави повече от разделянето на данни - осигурява безопасност на данните и позволява възстановяване при бедствия.

Как работи потребителската група на Redis Streams

Потребителска група е структура от данни в потока Redis. Както е показано на Фигура 3, можете да мислите за потребителска група като колекция от списъци. Друго нещо, което трябва да си представите, е списък с артикули, които не се консумират от нито един потребител - за нашата дискусия нека наречем това „списък без консумация“. Тъй като данните пристигат в потока, те незабавно се изпращат към неконсумирания списък.

Лаборатории Redis

Потребителската група поддържа отделен списък за всеки потребител, обикновено с приложено приложение. На фигура 3 нашето решение има N идентични приложения (App 1, App 2, .... App n), които четат данни чрез Consumer 1, Consumer 2, ... Consumer n съответно.

Когато дадено приложение чете данни с помощта на командата XREADGROUP, конкретни записи от данни се премахват от неконсумирания списък и се изпращат в списъка с чакащи записи, който принадлежи на съответния потребител. По този начин няма двама потребители да консумират едни и същи данни.

И накрая, когато приложението уведоми потока с командата XACK, ще премахне елемента от списъка с чакащи записи на потребителя.

Сега, след като обясних основите на потребителските групи, нека разгледаме по-задълбочено как работи този жизнен цикъл на данните.

Създаване на потребителска група Redis Streams

Можете да създадете нова потребителска група, като използвате командата XGROUP CREATE, както е показано по-долу.

XGROUP СЪЗДАЙ mystream mygroup $ MKSTREAM

Както при XREAD, знакът $ в края на командата казва на потока да доставя само нови данни от този момент във времето. Алтернативната опция е 0 или друг ID от записа на потока. Когато използвате 0, потокът ще достави всички данни от началото на потока.

MKSTREAM създава нов поток, mystream в този случай, ако вече не съществува.

Четене и управление на данните на Redis Stream

Да приемем, че имате Redis Stream (mystream) и вече сте създали потребителска група (mygroup), както е показано по-горе. Вече можете да добавяте елементи с имена a, b, c, d, e както в следващия пример.

XADD mystream * име a

Изпълнението на тази команда за имена от a до e ще попълни Redis Stream, mystream и неконсумирания списък на потребителската група mystream. Това е илюстрирано на фигура 4.

Лаборатории Redis

Тук можете да видите, че потребителите Алис и Боб все още не са започнали работата си. Приложение А консумира данни чрез потребителя Алис, докато приложение Б консумира данни чрез Боб.

Консумиране на данни на Redis Streams

Командата за четене на данни от група е XREADGROUP. В нашия пример, когато приложение А започва да обработва данни, то извиква потребителя (Алиса) да извлече данни, както в:

XREADGROUP GROUP mygroup COUNT 2 Alice STREAMS mystream>

По същия начин App B чете данните чрез Bob, както следва:

XREADGROUP GROUP mygroup COUNT 2 Bob STREAMS mystream>

Специалният знак> в края казва на Redis Streams да извлича само записи от данни, които не са доставени на други потребители. Също така имайте предвид, че няма двама потребители да консумират едни и същи данни, което ще доведе до преместване на данни от неконсумирания списък към Алис и Боб, както е показано на фигура 5.

Лаборатории Redis

Премахване на обработени съобщения от чакащи списъци с записи

Данните в списъците с чакащи записи на вашите потребители ще останат там, докато Приложения А и Приложение Б не потвърдят на Redis Streams, че успешно са използвали данните. Това се прави с помощта на командата XACK. Например App A ще потвърди, както следва, след консумиране на d и e, които имат идентификаторите 1526569411111-0 и 1526569411112-0.

XACK mystream mygroup 1526569411111-0 1526569411112-0

Комбинацията от XREADGROUP и XACK е аналогична на започването на транзакция и извършването й, което гарантира безопасността на данните. 

След стартиране на XACK, нека приемем, че приложение A изпълнява XREADGROUP, както е показано по-долу. Сега структурата на данните изглежда като Фигура 6.

XREADGROUP GROUP mygroup COUNT 2 Alice STREAMS mystream>
Лаборатории Redis

Възстановяване след неуспехи

Ако приложението B се прекрати поради неуспех по време на обработката на b и c, тогава структурата на данните ще изглежда като Фигура 7.

Лаборатории Redis

Сега ви остават две възможности:

1. Рестартирайте App B и презаредете данните от потребителя (Bob).

В този случай App B трябва да чете данни от вашия потребител (Bob) с помощта на командата XREADGROUP, но с една разлика. Вместо> в края, приложение B ще предаде 0 (или идентификатора по-нисък от предишния въведен данни, който е бил обработен). Не забравяйте, че> изпраща нови данни от неконсумирания списък до потребителя.

XREADGROUP GROUP mygroup COUNT 2 Bob STREAMS mystream 0

Горната команда ще извлече записи от данни, които вече се съхраняват в списъка за потребител Боб. Той няма да извлича нови данни от неконсумирания списък. Приложение Б може да извърши итерация през всички данни в потребителя Боб, преди да извлече нови данни.

2. Принудете Алис да поиска всички данни от Боб и да ги обработи чрез приложение А.

Това е особено полезно, ако не можете да възстановите приложение B поради повреда на възел, диск или мрежа. В такива случаи всеки друг потребител (като Алис) може да поиска данните на Боб и да продължи да обработва тези данни, като по този начин предотвратява прекъсването на услугата. За да заявите данните на Боб, трябва да изпълните два набора от команди:

XPENDING mystream mygroup - + 10 Боб

Това ще извлече всички чакащи записи на данни за Боб. Опциите - и + извличане на целия диапазон. Ако b и c имаха идентификаторите 1526569411113-0 и 1526569411114-0 съответно, командата, която ще премести данните на Боб в Алис, е както следва:

XCLAIM mystream mygroup Alice 0 1526569411113-0 1526569411114-0

Потребителските групи поддържат текущ часовник за данни в консумирания списък. Например, когато приложение B чете b, часовникът започва, докато Боб не получи ACK. С опцията за време в командата XCLAIM можете да кажете на групата потребители да премества само данни, които не работят по-дълго от определено време. Можете също да игнорирате това, като предадете 0, както е показано в примера по-горе. Резултатът от тези команди е илюстриран на фигура 8. XCLAIM също е полезен, когато един от вашите потребителски процесори е бавен, което води до натрупване на необработени данни.

Лаборатории Redis

В предишната статия разгледахме основите на това как да използваме Redis Streams. Навлязохме малко по-дълбоко в тази статия и обяснихме кога да използваме потребителски групи и как работят. Потребителските групи в Redis Streams намаляват тежестта ви, когато става въпрос за управление на дялове на данни, техния жизнен цикъл и безопасност на данните. Освен това възможностите за мащабиране на потребителските групи могат да се възползват от много приложения в реално време.

В предстоящата трета статия за Redis Streams ще демонстрирам как да разработя приложение за класификация в реално време, използвайки Redis Streams и Lettuce, базирана на Java библиотека с отворен код за Redis. Междувременно можете да научите повече, като работите чрез урока за Redis Streams на уебсайта на проекта Redis. 

Рошан Кумар е старши продуктов мениджър в  Redis Labs . Притежава богат опит в разработването на софтуер и технологичния маркетинг. Рошан е работил в Hewlett-Packard и много успешни стартиращи компании в Силициевата долина, включително ZillionTV, Salorix, Alopa и ActiveVideo. Като ентусиазиран програмист, той проектира и разработи mindzeal.com, онлайн платформа, хостваща курсове по компютърно програмиране за млади студенти. Рошан има бакалавърска степен по компютърни науки и MBA от университета Санта Клара.

-

Форумът New Tech предоставя място за изследване и обсъждане на нововъзникващите корпоративни технологии в безпрецедентна дълбочина и широчина. Изборът е субективен, базиран на нашия избор на технологиите, които смятаме, че са важни и представляват най-голям интерес за читателите. не приема маркетингово обезпечение за публикуване и си запазва правото да редактира цялото съдържание. Изпращайте всички запитвания на  [email protected] .