Cara membina aplikasi penstriman bernegara dengan Apache Flink

Fabian Hueske adalah komite dan ahli PMC projek Apache Flink dan pengasas bersama Data Artisans.

Apache Flink adalah kerangka kerja untuk menerapkan aplikasi pemrosesan aliran negara dan menjalankannya pada skala pada kluster komputasi. Dalam artikel sebelumnya, kami meneliti apa itu pemrosesan aliran bernegara, kasus penggunaan apa yang ditangani, dan mengapa anda harus melaksanakan dan menjalankan aplikasi streaming anda dengan Apache Flink.

Dalam artikel ini, saya akan mengemukakan contoh untuk dua kes penggunaan umum pemprosesan aliran bernegara dan membincangkan bagaimana ia dapat dilaksanakan dengan Flink. Kes penggunaan pertama adalah aplikasi yang didorong oleh peristiwa, iaitu, aplikasi yang menggunakan aliran peristiwa yang berterusan dan menerapkan beberapa logik perniagaan untuk peristiwa ini. Yang kedua adalah kes penggunaan analitik streaming, di mana saya akan membentangkan dua pertanyaan analitik yang dilaksanakan dengan Flink's SQL API, yang mengumpulkan data streaming dalam masa nyata. Kami di Data Artisans memberikan kod sumber semua contoh kami di repositori GitHub awam.

Sebelum kita menyelidiki perincian contoh, saya akan memperkenalkan aliran acara yang diserap oleh aplikasi contoh dan menjelaskan bagaimana anda dapat menjalankan kod yang kami sediakan.

Aliran acara menaiki teksi

Aplikasi contoh kami didasarkan pada kumpulan data awam mengenai perjalanan teksi yang berlaku di New York City pada tahun 2013. Penganjur Grand Challenge 2015 DEBS (Persidangan Antarabangsa ACM mengenai Sistem Acara Terdistribusi) menyusun semula set data asal dan mengubahnya menjadi satu fail CSV dari mana kami membaca sembilan bidang berikut.

  • Medallion - jumlah id MD5 teksi
  • Hack_license — id jumlah MD5 bagi lesen teksi
  • Pickup_datetime - masa ketika penumpang dijemput
  • Dropoff_datetime - masa ketika penumpang turun
  • Pickup_longitude — garis bujur lokasi pengambilan
  • Pickup_latitude — garis lintang lokasi pengambilan
  • Dropoff_longitude — garis bujur lokasi drop-off
  • Dropoff_latitude — garis lintang lokasi drop-off
  • Jumlah_jumlah - jumlah yang dibayar dalam dolar

Fail CSV menyimpan rekod dalam urutan menaik atribut waktu berhenti mereka. Oleh itu, fail boleh dianggap sebagai log peristiwa yang disusun yang diterbitkan ketika perjalanan berakhir. Untuk menjalankan contoh yang kami berikan di GitHub, anda perlu memuat turun set data cabaran DEBS dari Google Drive.

Semua aplikasi contoh membaca fail CSV secara berurutan dan memasukkannya sebagai aliran acara menaiki teksi. Dari situ, aplikasi memproses peristiwa seperti aliran lain, seperti aliran yang diserap dari sistem langganan penerbitan-langganan berasaskan log, seperti Apache Kafka atau Kinesis. Sebenarnya, membaca fail (atau jenis data berterusan lain) dan menganggapnya sebagai aliran adalah tonggak pendekatan Flink untuk menyatukan pemprosesan kumpulan dan aliran.

Menjalankan contoh Flink

Seperti disebutkan sebelumnya, kami menerbitkan kod sumber aplikasi contoh kami di repositori GitHub. Kami menggalakkan anda untuk membuat dan mengklon repositori. Contohnya dapat dilaksanakan dengan mudah dari dalam IDE pilihan anda; anda tidak perlu mengatur dan mengkonfigurasi Flink cluster untuk menjalankannya. Pertama, import kod sumber contoh sebagai projek Maven. Kemudian, jalankan kelas utama aplikasi dan sediakan lokasi penyimpanan fail data (lihat di atas untuk pautan untuk memuat turun data) sebagai parameter program.

Setelah anda melancarkan aplikasi, aplikasi akan memulai instance Flink lokal, tertanam di dalam proses JVM aplikasi dan mengirimkan aplikasi untuk melaksanakannya. Anda akan melihat banyak pernyataan log semasa Flink memulakan dan tugas tugas dijadualkan. Setelah aplikasi dijalankan, outputnya akan ditulis ke output standard.

Membangun aplikasi berdasarkan acara di Flink

Sekarang, mari bincangkan kes penggunaan pertama kami, yang merupakan aplikasi berdasarkan acara. Aplikasi yang didorong oleh peristiwa menelan aliran peristiwa, melakukan perhitungan ketika peristiwa diterima, dan dapat memancarkan peristiwa baru atau memicu tindakan luaran. Beberapa aplikasi berdasarkan peristiwa dapat disusun dengan menghubungkannya bersama melalui sistem log peristiwa, sama seperti seberapa besar sistem yang dapat disusun dari perkhidmatan mikro. Aplikasi berdasarkan peristiwa, log peristiwa, dan snapshot keadaan aplikasi (dikenali sebagai titik simpanan di Flink) merangkumi corak reka bentuk yang sangat kuat kerana anda dapat menetapkan semula keadaannya dan memainkan semula input mereka untuk pulih dari kegagalan, memperbaiki bug, atau memindahkan aplikasi ke kluster yang berbeza.

Dalam artikel ini kita akan memeriksa aplikasi berdasarkan peristiwa yang menyokong perkhidmatan, yang memantau waktu kerja pemandu teksi. Pada tahun 2016, NYC Taxi and Limousine Commission memutuskan untuk mengehadkan waktu bekerja pemandu teksi kepada 12 jam shift dan memerlukan rehat sekurang-kurangnya lapan jam sebelum peralihan seterusnya dapat dimulakan. Peralihan bermula dengan permulaan perjalanan pertama. Sejak itu, pemandu boleh memulakan perjalanan baru dalam masa 12 jam. Aplikasi kami mengesan perjalanan pemandu, menandakan waktu akhir jendela 12 jam mereka (iaitu, saat mereka memulakan perjalanan terakhir), dan menandakan perjalanan yang melanggar peraturan. Anda boleh mendapatkan kod sumber penuh contoh ini di repositori GitHub kami.

Aplikasi kami diimplementasikan dengan Flink's DataStream API dan a KeyedProcessFunction. DataStream API adalah API berfungsi dan berdasarkan konsep aliran data yang diketik. A DataStreamadalah perwakilan logik aliran peristiwa jenis T. Aliran diproses dengan menerapkan fungsi padanya yang menghasilkan aliran data lain, mungkin dari jenis yang berbeza. Flink memproses aliran secara selari dengan menyebarkan acara ke aliran partisi dan menerapkan fungsi yang berbeza pada setiap partisi.

Coretan kod berikut menunjukkan aliran tahap tinggi aplikasi pemantauan kami.

// mengambil aliran perjalanan teksi.

DataStream rides = TaxiRides.getRides (env, inputPath);

Strim Data pemberitahuan = tunggangan

   // aliran partition oleh id lesen memandu

   .keyBy (r -> r.licenseId)

   // pantau acara perjalanan dan buat pemberitahuan

   .proses (MonitorWorkTime baru ());

// mencetak pemberitahuan

notifikasi.cetakan ();

Aplikasi ini mula menelan arus acara menaiki teksi. Dalam contoh kami, peristiwa dibaca dari fail teks, dihuraikan, dan disimpan dalam TaxiRideobjek POJO. Aplikasi dunia nyata biasanya akan mengambil peristiwa dari barisan mesej atau log peristiwa, seperti Apache Kafka atau Pravega. Langkah seterusnya adalah memasukkan TaxiRideacara oleh licenseIdpemandu. The keyBypartition operasi aliran di atas padang yang diisytiharkan, seperti yang semua acara dengan kekunci yang sama diproses oleh contoh selari yang sama fungsi berikut. Dalam kes kami, kami berpisah di licenseIdlapangan kerana kami ingin memantau waktu kerja setiap pemandu.

Seterusnya, kami menerapkan MonitorWorkTimefungsi pada TaxiRideacara berpartisi . Fungsi mengesan perjalanan setiap pemandu dan memantau pergeseran dan waktu rehat mereka. Ini memancarkan peristiwa jenis Tuple2, di mana setiap tupel mewakili pemberitahuan yang terdiri daripada ID lesen pemandu dan pesan. Akhirnya, aplikasi kami mengeluarkan mesej dengan mencetaknya ke output standard. Aplikasi dunia nyata akan menulis pemberitahuan ke sistem pesanan atau penyimpanan luaran, seperti Apache Kafka, HDFS, atau sistem pangkalan data, atau akan memicu panggilan luaran untuk segera mengeluarkannya.

Setelah kita membincangkan aliran keseluruhan aplikasi, mari kita lihat MonitorWorkTimefungsinya, yang mengandungi sebahagian besar logik perniagaan sebenar aplikasi. Yang MonitorWorkTimefungsi ialah stateful KeyedProcessFunctionyang ingests TaxiRideacara dan mengeluarkan Tuple2rekod. Yang KeyedProcessFunctionantara muka mempunyai dua kaedah untuk memproses data: processElement()dan onTimer(). The processElement()kaedah dipanggil bagi setiap acara tiba. The onTimer()kaedah dipanggil apabila sebelum ini didaftarkan kebakaran pemasa. Coretan berikut menunjukkan kerangka MonitorWorkTimefungsi dan semua yang dinyatakan di luar kaedah pemprosesan.

MonitorWorkTime kelas statik awam

    memanjangkan KeyedProcessFunction {

  // pemalar masa dalam milisaat

  akhir statik peribadi ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 jam

  panjang statik peribadi REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 jam

  panjang akhir statik persendirian CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 jam

 pemformat DateTimeFormatter sementara sementara;

  // pegangan negeri untuk menyimpan waktu permulaan shift

  ValueState shiftStart;

  @Selamat

  kekosongan awam terbuka (Configuration conf) {

    // daftar pemegang negeri

    shiftStart = getRuntimeContext (). getState (

      ValueStateDescriptor baru ("shiftStart", Types.LONG));

    // memulakan pemformat masa

    this.formatter = DateTimeFormat.forPattern (“yyyy-MM-dd HH: mm: ss”);

  }

  // processElement () dan onTimer () dibincangkan secara terperinci di bawah.

}

Fungsi tersebut menyatakan beberapa pemalar untuk selang waktu dalam milisaat, pemformat masa, dan pemegang keadaan untuk keadaan yang dikendalikan oleh Flink. Keadaan yang diuruskan secara berkala diperiksa dan dipulihkan secara automatik sekiranya berlaku kegagalan. Keadaan kekunci disusun per kunci, yang bermaksud bahawa fungsi akan mengekalkan satu nilai per pegangan dan kunci. Dalam kes kami, MonitorWorkTimefungsi mengekalkan Longnilai untuk setiap kekunci, iaitu, untuk setiap kunci licenseId. The shiftStartnegeri menyimpan masa permulaan peralihan memandu. Pemegang keadaan diinisialisasi dalam open()kaedah, yang dipanggil sekali sebelum peristiwa pertama diproses.

Sekarang, mari kita lihat processElement()kaedahnya.

@Selamat

proses kekosongan awamElemen (

    Naik teksi,

    Konteks ctx,

    Pemungut keluar) membuang Pengecualian {

  // cari masa mula peralihan terakhir

  Permulaan panjang = shiftStart.value ();

  jika (startTs == null ||

    startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

    // ini adalah perjalanan pertama peralihan baru.

    startTs = ride.pickUpTime;

    shiftStart.update (startTs);

    endTs panjang = startTs + ALLOWED_WORK_TIME;

    out.collect (Tuple2.of (naik.licenseId,

      "Anda dibenarkan untuk menerima penumpang baru sehingga" + formatter.print (endTs)));

    // daftar pemasa untuk membersihkan keadaan dalam 24 jam

    ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

  } lain jika (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

    // perjalanan ini dimulakan setelah waktu kerja yang dibenarkan berakhir.

    // itu melanggar peraturan!

    out.collect (Tuple2.of (naik.licenseId,

      "Perjalanan ini melanggar peraturan waktu kerja."));

  }

}

The processElement()kaedah dipanggil bagi setiap TaxiRideacara. Pertama, kaedah ini mengambil masa permulaan peralihan pemandu dari pemegang keadaan. Sekiranya keadaan tidak mengandungi waktu permulaan ( startTs == null) atau jika pergeseran terakhir bermula lebih dari 20 jam ( ALLOWED_WORK_TIME + REQ_BREAK_TIME) lebih awal daripada perjalanan semasa, perjalanan semasa adalah perjalanan pertama peralihan baru. Dalam mana-mana keadaan, fungsi memulakan pergeseran baru dengan memperbarui waktu mula pergeseran ke waktu mula perjalanan semasa, memancarkan pesan kepada pemandu dengan waktu akhir pergeseran baru, dan mendaftarkan pemasa untuk membersihkan nyatakan dalam 24 jam.

Sekiranya perjalanan semasa bukan perjalanan pertama pergeseran baru, fungsinya memeriksa jika melanggar peraturan waktu kerja, iaitu, apakah ia bermula lebih dari 12 jam lebih lambat dari permulaan peralihan semasa pemandu. Sekiranya demikian, fungsi memancarkan mesej untuk memberitahu pemandu mengenai pelanggaran tersebut.

The processElement()kaedah yang MonitorWorkTimefungsi mendaftarkan pemasa untuk membersihkan negeri ini 24 jam selepas permulaan satu syif. Membuang keadaan yang tidak diperlukan lagi adalah mustahak untuk mengelakkan saiz keadaan bertambah kerana keadaan bocor. Pemasa akan menyala apabila waktu aplikasi melepasi cap waktu pemasa. Pada ketika itu, onTimer()kaedah itu disebut. Mirip dengan keadaan, pemasa dikekalkan per kunci, dan fungsinya dimasukkan ke dalam konteks kunci yang berkaitan sebelum onTimer()kaedah dipanggil. Oleh itu, semua akses negeri diarahkan ke kunci yang aktif semasa pemasa didaftarkan.

Mari kita lihat onTimer()kaedah MonitorWorkTime.

@Selamat

kekosongan awam onTimer (

    pemasa panjang,

    OnTimerContext ctx,

    Pemungut keluar) membuang Pengecualian {

  // hapus keadaan shift jika belum ada shift baru yang dimulakan.

  Permulaan panjang = shiftStart.value ();

  jika (startTs == timerTs - CLEAN_UP_INTERVAL) {

    shiftStart.clear ();

  }

}

The processElement()Cara daftar pemasa untuk 24 jam selepas syif bermula untuk membersihkan negeri yang tidak lagi diperlukan. Membersihkan keadaan adalah satu-satunya logik yang onTimer()dilaksanakan oleh kaedah ini. Apabila pemasa menyala, kami memeriksa sama ada pemacu memulakan pergeseran baru untuk sementara waktu, iaitu, sama ada waktu mula pergeseran berubah. Sekiranya tidak berlaku, kami akan mengosongkan keadaan shift untuk pemandu.