Как да изграждаме приложения за стрийминг със състояние с Apache Flink

Фабиан Хюске е комисар и член на PMC на проекта Apache Flink и съосновател на Data Artisans.

Apache Flink е рамка за внедряване на приложения за обработка на потоци с състояние и тяхното стартиране в мащаб на изчислителен клъстер. В предишна статия разгледахме какво представлява обработката на потока с състояние, какви случаи на употреба се отнася и защо трябва да внедрите и стартирате вашите поточни приложения с Apache Flink.

В тази статия ще представя примери за два често използвани случая на обработка на потоци с състояние и ще обсъдя как те могат да бъдат приложени с Flink. Първият случай на употреба са приложения, управлявани от събития, т.е. приложения, които поглъщат непрекъснати потоци от събития и прилагат някаква бизнес логика към тези събития. Вторият е случаят на използване на поточна аналитика, където ще представя две аналитични заявки, изпълнени с SQL API на Flink, които обобщават поточни данни в реално време. Ние от Data Artisans предоставяме изходния код на всички наши примери в публично хранилище на GitHub.

Преди да се потопим в подробностите на примерите, ще представя потока от събития, погълнат от примерните приложения, и ще обясня как можете да стартирате кода, който предоставяме.

Поток от събития с такси

Нашите примерни приложения се основават на публичен набор от данни за пътувания с такси, които се случиха в Ню Йорк през 2013 г. Организаторите на 2015 DEBS (ACM International Conference on Distributed Event-Based Systems) Grand Challenge пренаредиха оригиналния набор от данни и го преобразуваха в един CSV файл, от който четем следните девет полета.

  • Медальон - идентификационен номер на такси в MD5
  • Hack_license - идентификационен номер на MD5 сума на лиценза за такси
  • Pickup_datetime - времето, когато пътниците са взети
  • Dropoff_datetime - времето, когато пътниците са били откарани
  • Pickup_longitude - дължина на мястото за вземане
  • Pickup_latitude - географската ширина на мястото за вземане
  • Dropoff_longitude - дължина на мястото за отпадане
  • Dropoff_latitude - географската ширина на мястото за отпадане
  • Обща_сума - общо платена в долари

CSV файлът съхранява записите във възходящ ред на техния атрибут време за отпадане. Следователно файлът може да се третира като подреден дневник на събитията, които са публикувани, когато пътуването приключи. За да стартирате примерите, които предоставяме на GitHub, трябва да изтеглите набора от данни за предизвикателството DEBS от Google Drive.

Всички примерни приложения четат последователно CSV файла и го поглъщат като поток от събития с такси. Оттам нататък приложенията обработват събитията точно както всеки друг поток, т.е. като поток, който се поглъща от базирана на дневника система за публикуване-абониране, като Apache Kafka или Kinesis. Всъщност четенето на файл (или друг вид запазени данни) и третирането му като поток е крайъгълен камък на подхода на Flink за обединяване на обработката на партиди и потоци.

Стартиране на примери за Flink

Както споменахме по-рано, публикувахме изходния код на нашите примерни приложения в хранилище на GitHub. Препоръчваме ви да разклоните и клонирате хранилището. Примерите могат лесно да бъдат изпълнени от избраната от вас IDE; не е необходимо да настройвате и конфигурирате клъстер Flink, за да ги стартирате. Първо, импортирайте изходния код на примерите като проект на Maven. След това изпълнете основния клас на приложение и предоставете местоположението за съхранение на файла с данни (вижте по-горе връзката за изтегляне на данните) като параметър на програмата.

След като стартирате приложение, то ще стартира локален, вграден екземпляр на Flink в процеса на JVM на приложението и ще изпрати приложението, за да го изпълни. Ще видите куп операторски записи, докато Flink стартира и задачите на заданието са планирани. След като приложението се изпълни, изходът му ще бъде записан в стандартния изход.

Изграждане на управлявано от събития приложение във Flink

Сега нека обсъдим първия ни случай на употреба, който е приложение, управлявано от събития. Приложенията, управлявани от събития, поглъщат потоци от събития, извършват изчисления при получаване на събитията и могат да излъчват нови събития или да задействат външни действия. Могат да се съставят множество приложения, управлявани от събития, като се свързват заедно чрез системи за регистриране на събития, подобно на това как големите системи могат да бъдат съставени от микроуслуги. Управляваните от събития приложения, дневници на събития и моментни снимки на състоянието на приложението (известни като точки за запис във Flink) съдържат много мощен модел на проектиране, тъй като можете да нулирате състоянието им и да възпроизведете входа им, за да се възстановите от повреда, да поправите грешка или да мигрирате приложение към различен клъстер.

В тази статия ще разгледаме приложение, управлявано от събития, което подкрепя услуга, която следи работното време на таксиметровите шофьори. През 2016 г. комисията по такси и лимузина в Ню Йорк реши да ограничи работното време на таксиметровите шофьори до 12-часови смени и да изисква почивка от най-малко осем часа преди да започне следващата смяна. Смяната започва с началото на първото пътуване. От този момент нататък водачът може да започне нови пътувания в рамките на 12 часа. Нашето приложение проследява пътуванията на шофьорите, отбелязва крайния час на 12-часовия им прозорец (т.е. времето, когато те могат да започнат последното пътуване) и маркира вози, които нарушават разпоредбите. Можете да намерите пълния изходен код на този пример в нашето хранилище на GitHub.

Нашето приложение е реализирано с API за данни DataStream на Flink и a KeyedProcessFunction. API на DataStream е функционален API и се основава на концепцията за типизирани потоци от данни. A DataStreamе логическото представяне на поток от събития от тип T. Потокът се обработва чрез прилагане на функция към него, която произвежда друг поток от данни, вероятно от различен тип. Flink обработва потоци паралелно, като разпределя събития към поточни дялове и прилага различни екземпляри от функции към всеки дял.

Следният кодов фрагмент показва потока на високо ниво на нашето приложение за наблюдение.

// поглъщаме поток от таксита.

DataStream вози = TaxiRides.getRides (env, inputPath);

DataStream известия = вози

   // дял поток от идентификатор на шофьорска книжка

   .keyBy (r -> r.licenseId)

   // наблюдаваме събития за каране и генерираме известия

   .process (нов MonitorWorkTime ());

// отпечатване на известия

notifications.print ();

Приложението започва да поглъща поток от събития с такси. В нашия пример събитията се четат от текстов файл, анализират се и се съхраняват в TaxiRidePOJO обекти. Приложението от реалния свят обикновено поглъща събитията от опашка от съобщения или дневник на събития, като Apache Kafka или Pravega. Следващата стъпка е да съберете TaxiRideсъбитията от licenseIdводача. На keyByдяловете на работа потока на обявената областта, така че всички събития с един и същ ключ се обработват по същия паралел инстанция на следната функция. В нашия случай разделяме licenseIdполето, защото искаме да следим работното време на всеки отделен шофьор.

След това прилагаме MonitorWorkTimeфункцията върху разделените TaxiRideсъбития. Функцията проследява пътуванията на водач и следи техните смени и времето за почивка. Той излъчва събития от тип Tuple2, където всеки кортеж представлява известие, състоящо се от идентификационния номер на драйвера и съобщение. И накрая, нашето приложение излъчва съобщенията, като ги отпечатва на стандартния изход. Приложение от реалния свят би записало известията във външно съобщение или система за съхранение, като Apache Kafka, HDFS или система от бази данни, или би задействало външно обаждане, което незабавно да ги изтласка.

Сега, когато обсъдихме цялостния поток на приложението, нека разгледаме MonitorWorkTimeфункцията, която съдържа по-голямата част от действителната бизнес логика на приложението. Най MonitorWorkTimeфункция е динамична защитна KeyedProcessFunctionче поглъща TaxiRideсъбития и излъчва Tuple2записи. В KeyedProcessFunctionинтерфейса разполага с две методи за данни процес: processElement()а onTimer(). В processElement()метода се нарича за всеки пристигащ събитие. В onTimer()метод се нарича, когато регистрираното преди таймера пожари. Следващият фрагмент показва скелета на MonitorWorkTimeфункцията и всичко, което е декларирано извън методите за обработка.

публичен статичен клас MonitorWorkTime

    разширява KeyedProcessFunction {

  // константи на времето в милисекунди

  частен статичен финал long ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 часа

  частен статичен финал дълъг REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 часа

  частен статичен финал long CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 часа

 частно преходно форматиране DateTimeFormatter;

  // държач за съхранение, за да съхранява началния час на смяна

  ValueState shiftStart;

  @Override

  отворена публична невалидност (Configuration conf) {

    // регистратор за състояние на регистъра

    shiftStart = getRuntimeContext (). getState (

      нов ValueStateDescriptor (“shiftStart”, Types.LONG));

    // инициализиране на форматиране на време

    this.formatter = DateTimeFormat.forPattern („гггг-MM-dd HH: mm: ss“);

  }

  // processElement () и onTimer () са разгледани подробно по-долу.

}

Функцията декларира няколко константи за интервали от време в милисекунди, форматиране на време и манипулатор на състоянието за ключово състояние, управлявано от Flink. Управляваното състояние се проверява периодично и автоматично се възстановява в случай на повреда. Състоянието на ключа е организирано за ключ, което означава, че функцията ще поддържа по една стойност на манипулатор и ключ. В нашия случай MonitorWorkTimeфункцията поддържа Longстойност за всеки ключ, т.е. за всеки licenseId. В shiftStartдържавата съхранява времето на стартиране на смяна на шофьора. Манипулаторът на състоянието се инициализира в open()метода, който се извиква веднъж преди първото събитие да бъде обработено.

Сега, нека да разгледаме processElement()метода.

@Override

public void processElement (

    Пътуване с такси,

    Контекст ctx,

    Колекционер out) хвърля изключение {

  // търсим началния час на последната смяна

  Дълги стартове = shiftStart.value ();

  if (startTs == null ||

    startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

    // това е първото пътуване от нова смяна.

    startTs = ride.pickUpTime;

    shiftStart.update (startTs);

    long endTs = startTs + ALLOWED_WORK_TIME;

    out.collect (Tuple2.of (ride.licenseId,

      „Имате право да приемате нови пътници до„ + formatter.print (endTs))));

    // регистрирайте таймер за почистване на състоянието за 24 часа

    ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

  } иначе ако (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

    // това пътуване започна след приключване на разрешеното работно време.

    // това е нарушение на разпоредбите!

    out.collect (Tuple2.of (ride.licenseId,

      „Това пътуване нарушава разпоредбите за работното време.“));

  }

}

В processElement()метода се нарича за всеки TaxiRideслучай. Първо, методът извлича началния час на смяната на водача от държача на състоянието. Ако състоянието не съдържа начален час ( startTs == null) или ако последната смяна е започнала с повече от 20 часа ( ALLOWED_WORK_TIME + REQ_BREAK_TIME) по-рано от текущото пътуване, текущото пътуване е първото пътуване от нова смяна. И в двата случая функцията стартира нова смяна, като актуализира началния час на смяната до началния час на текущото пътуване, излъчва съобщение до водача с края на новата смяна и регистрира таймер за почистване на състояние за 24 часа.

Ако текущото пътуване не е първото пътуване от нова смяна, функцията проверява дали нарушава регулирането на работното време, т.е. дали е започнало повече от 12 часа по-късно от началото на текущата смяна на водача. Ако случаят е такъв, функцията излъчва съобщение, за да информира водача за нарушението.

В processElement()метода на MonitorWorkTimeфункцията регистрира таймер за изчистване на държавата 24 часа след началото на промяната. Премахването на състояние, което вече не е необходимо, е важно за предотвратяване на нарастващите размери на състоянието поради състояние на изтичане. Таймерът се задейства, когато времето на приложението премине клеймото на таймера. В този момент onTimer()се извиква методът. Подобно на състоянието, таймерите се поддържат за всеки ключ и функцията се поставя в контекста на свързания ключ, преди onTimer()методът да бъде извикан. Следователно целият достъп на държавата е насочен към ключа, който е бил активен при регистрирането на таймера.

Нека да разгледаме onTimer()метода на MonitorWorkTime.

@Override

публична невалидност onTimer (

    дълги таймери,

    OnTimerContext ctx,

    Колекционер out) хвърля изключение {

  // премахване на състоянието на смяна, ако вече не е стартирана нова смяна.

  Дълги стартове = shiftStart.value ();

  if (startTs == timerTs - CLEAN_UP_INTERVAL) {

    shiftStart.clear ();

  }

}

На processElement()таймерите метод регистри за 24 часа след преминаването започнаха да почисти държава, която вече не е необходимо. Изчистването на състоянието е единствената логика, която onTimer()методът прилага. Когато таймерът се задейства, ние проверяваме дали водачът междувременно е започнал нова смяна, т.е. дали времето за стартиране на смяната се е променило. Ако случаят не е такъв, изчистваме състоянието на смяна за водача.