Cara menggunakan Redis untuk pemprosesan aliran masa nyata

Roshan Kumar adalah pengurus produk kanan di Redis Labs.

Pengambilan data streaming masa nyata adalah keperluan biasa untuk banyak kes penggunaan data besar. Dalam bidang seperti IoT, e-commerce, keselamatan, komunikasi, hiburan, kewangan, dan runcit, di mana banyak bergantung pada pembuatan keputusan berdasarkan data tepat pada masanya dan tepat, pengumpulan dan analisis data masa nyata sebenarnya menjadi teras perniagaan.

Walau bagaimanapun, pengumpulan, penyimpanan dan pemprosesan data streaming dalam jumlah besar dan pada kelajuan tinggi menghadirkan cabaran seni bina. Langkah pertama yang penting dalam menyampaikan analisis data masa nyata adalah memastikan sumber daya rangkaian, komputasi, penyimpanan, dan memori yang mencukupi tersedia untuk menangkap aliran data yang cepat. Tetapi timbunan perisian syarikat mesti sepadan dengan prestasi infrastruktur fizikalnya. Jika tidak, perniagaan akan menghadapi tunggakan data yang besar, atau data yang lebih buruk, hilang atau tidak lengkap.

Redis telah menjadi pilihan popular untuk senario pengambilan data yang pantas. Platform pangkalan data dalam memori yang ringan, Redis mencapai throughput dalam berjuta-juta operasi sesaat dengan latensi sub-milisaat, sambil menggunakan sumber minimum. Ia juga menawarkan implementasi sederhana, yang diaktifkan oleh struktur dan fungsinya yang pelbagai.

Dalam artikel ini, saya akan menunjukkan bagaimana Redis Enterprise dapat menyelesaikan cabaran umum yang berkaitan dengan pengambilan dan pemprosesan data berkelajuan tinggi dalam jumlah besar. Kami akan melalui tiga pendekatan yang berbeza (termasuk kod) untuk memproses suapan Twitter dalam masa nyata, menggunakan Redis Pub / Sub, Redis Lists, dan Redis Sorted Sets, masing-masing. Seperti yang akan kita lihat, ketiga-tiga kaedah mempunyai peranan dalam pengambilan data yang cepat, bergantung pada kes penggunaan.

Cabaran dalam merancang penyelesaian pengambilan data yang cepat

Pengambilan data berkelajuan tinggi sering melibatkan pelbagai jenis kerumitan:

  • Jumlah data yang besar kadang-kadang tiba dalam keadaan meletup. Data bursty memerlukan penyelesaian yang mampu memproses data dalam jumlah besar dengan kependaman minimum. Sebaik-baiknya, ia mesti dapat berjuta-juta penulisan sesaat dengan latensi sub-milisaat, menggunakan sumber yang minimum.
  • Data dari pelbagai sumber. Penyelesaian pengambilan data mestilah cukup fleksibel untuk menangani data dalam pelbagai format, mengekalkan identiti sumber jika diperlukan dan mengubah atau menormalkan dalam masa nyata.
  • Data yang perlu disaring, dianalisis, atau diteruskan. Sebilangan besar penyelesaian pengambilan data mempunyai satu atau lebih pelanggan yang menggunakan data tersebut. Ini adalah aplikasi yang sering berbeza yang berfungsi di lokasi yang sama atau berlainan dengan pelbagai andaian. Dalam kes seperti itu, pangkalan data tidak hanya perlu mengubah data, tetapi juga menyaring atau mengumpulkan bergantung pada keperluan aplikasi yang memakan.
  • Data berasal dari sumber yang diedarkan secara geografi. Dalam senario ini, selalunya mengedarkan node pengumpulan data, meletakkannya dekat dengan sumber. Node itu sendiri menjadi sebahagian daripada penyelesaian pengambilan data yang cepat, untuk mengumpulkan, memproses, meneruskan, atau mengubah arah pengambilan data.

Mengendalikan pengambilan data pantas di Redis

Banyak penyelesaian yang menyokong pengambilan data yang cepat hari ini adalah kompleks, kaya dengan ciri, dan terlalu direkayasa untuk keperluan sederhana. Redis, sebaliknya, sangat ringan, cepat, dan mudah digunakan. Dengan pelanggan yang tersedia dalam lebih dari 60 bahasa, Redis dapat disatukan dengan mudah dengan tumpukan perisian yang popular.

Redis menawarkan struktur data seperti Daftar, Set, Kumpulan Diurutkan, dan Hash yang menawarkan pemprosesan data yang mudah dan serba boleh. Redis memberikan lebih daripada satu juta operasi baca / tulis sesaat, dengan latensi sub-milisaat pada contoh awan komoditi bersaiz sederhana, menjadikannya sangat cekap sumber untuk jumlah data yang banyak. Redis juga menyokong perkhidmatan pesanan dan perpustakaan pelanggan dalam semua bahasa pengaturcaraan yang popular, menjadikannya sangat sesuai untuk menggabungkan pengambilan data berkelajuan tinggi dan analisis masa nyata. Perintah Redis Pub / Sub membolehkannya memainkan peranan sebagai broker mesej antara penerbit dan pelanggan, satu ciri yang sering digunakan untuk menghantar pemberitahuan atau mesej antara node pengambilan data yang diedarkan.

Redis Enterprise meningkatkan Redis dengan penskalaan lancar, ketersediaan selalu, penyebaran automatik, dan kemampuan untuk menggunakan memori kilat yang menjimatkan sebagai pemanjangan RAM sehingga pemprosesan set data yang besar dapat dicapai dengan kos efektif.

Pada bahagian di bawah ini, saya akan menggariskan cara menggunakan Redis Enterprise untuk menangani cabaran pengambilan data biasa.

Redis dengan kelajuan Twitter

Untuk menggambarkan kesederhanaan Redis, kami akan meneroka contoh penyelesaian pengambilan data cepat yang mengumpulkan mesej dari suapan Twitter. Matlamat penyelesaian ini adalah memproses tweet secara real-time dan mendorongnya ke bawah semasa diproses.

Data Twitter yang diserap oleh penyelesaian tersebut kemudian digunakan oleh beberapa pemproses. Seperti yang ditunjukkan dalam Rajah 1, contoh ini berkaitan dengan dua pemproses - English Tweet Processor dan Influencer Processor. Setiap pemproses menyaring tweet dan menyebarkannya ke saluran masing-masing kepada pengguna lain. Rantai ini dapat mencapai sejauh mana penyelesaiannya memerlukan. Walau bagaimanapun, dalam contoh kita, kita berhenti di tahap ketiga, di mana kita mengumpulkan perbincangan yang popular di kalangan penutur bahasa Inggeris dan penggiat utama.

Makmal Redis

Perhatikan bahawa kami menggunakan contoh memproses suapan Twitter kerana kecepatan kedatangan data dan kesederhanaan. Perhatikan juga bahawa data Twitter mencapai pengambilan data pantas kami melalui satu saluran. Dalam banyak kes, seperti IoT, mungkin terdapat banyak sumber data yang menghantar data ke penerima utama.

Terdapat tiga cara yang mungkin untuk melaksanakan penyelesaian ini dengan menggunakan Redis: pengambilan dengan Redis Pub / Sub, pengambilan dengan struktur data Daftar, atau pengambilan dengan struktur data Set Diurutkan. Mari kita periksa setiap pilihan ini.

Dimakan dengan Redis Pub / Sub

Ini adalah pelaksanaan termudah pengambilan data cepat. Penyelesaian ini menggunakan ciri Redis's Pub / Sub, yang membolehkan aplikasi menerbitkan dan melanggan mesej. Seperti yang ditunjukkan dalam Gambar 2, setiap tahap memproses data dan mempublikasikannya ke saluran. Peringkat seterusnya melanggan saluran dan menerima mesej untuk pemprosesan atau penapisan lebih lanjut.

Makmal Redis

Kelebihan

  • Mudah dilaksanakan.
  • Berfungsi dengan baik apabila sumber data dan pemproses diedarkan secara geografi.

Keburukan 

  • Penyelesaiannya memerlukan penerbit dan pelanggan sentiasa aktif. Pelanggan kehilangan data ketika dihentikan, atau ketika sambungan terputus.
  • Ia memerlukan lebih banyak sambungan. Program tidak dapat menerbitkan dan melanggan sambungan yang sama, jadi setiap pemproses data perantaraan memerlukan dua sambungan - satu untuk melanggan dan satu lagi untuk menerbitkan. Sekiranya menjalankan Redis pada platform DBaaS, penting untuk mengesahkan sama ada pakej atau tahap perkhidmatan anda mempunyai had bilangan sambungan.

Catatan mengenai sambungan

Sekiranya lebih daripada satu pelanggan melanggan saluran, Redis mendorong data ke setiap pelanggan secara linear, satu demi satu. Muatan data yang besar dan banyak sambungan mungkin memperkenalkan latensi antara penerbit dan pelanggannya. Walaupun had keras lalai untuk bilangan sambungan maksimum ialah 10,000, anda mesti menguji dan menanda aras berapa banyak sambungan yang sesuai untuk muatan anda.

Redis mengekalkan buffer output klien untuk setiap pelanggan. Had lalai untuk penyangga output klien untuk Pub / Sub ditetapkan sebagai:

pelanggan-output-buffer-had pubsub 32mb 8mb 60

Dengan tetapan ini, Redis akan memaksa klien untuk memutuskan hubungan dalam dua keadaan: jika buffer output tumbuh melebihi 32MB, atau jika buffer output menyimpan 8MB data secara konsisten selama 60 saat.

Ini adalah petunjuk bahawa pelanggan menggunakan data dengan lebih perlahan daripada yang diterbitkan. Sekiranya timbul situasi seperti itu, pertama-tama cuba mengoptimumkan pengguna agar mereka tidak menambahkan kependaman semasa memakan data. Sekiranya anda menyedari bahawa pelanggan anda masih terputus, anda boleh meningkatkan had client-output-buffer-limit pubsubharta tanah di redis.conf. Perlu diingat bahawa sebarang perubahan pada tetapan dapat meningkatkan kependaman antara penerbit dan pelanggan. Sebarang perubahan mesti diuji dan disahkan secara menyeluruh.

Reka bentuk kod untuk penyelesaian Redis Pub / Sub

Makmal Redis

Ini adalah termudah dari tiga penyelesaian yang dijelaskan dalam makalah ini. Berikut adalah kelas Java penting yang dilaksanakan untuk penyelesaian ini. Muat turun kod sumber dengan pelaksanaan sepenuhnya di sini: //github.com/redislabsdemo/IngestPubSub.

The Subscriberkelas adalah kelas teras reka bentuk ini. Setiap Subscriberobjek mengekalkan hubungan baru dengan Redis.

Pelanggan kelas memperluas JedisPubSub melaksanakan Runnable {

       nama String peribadi;

       RedisConnection peribadi conn = null;

       Jedis peribadi jedis = null;

       pelanggan String peribadiChannel;

       Pelanggan Umum (String subscriberName, String channelName) membuang Pengecualian {

              nama = pelangganNama;

              subscriberChannel = channelName;

              Thread t = Thread baru (ini);

              t.start ();

       }

       @Selamat

       larian kekosongan awam () {

              cuba {

                      conn = RedisConnection.getRedisConnection ();

                      jedis = conn.getJedis ();

                      sementara (benar) {

                             jedis.subscribe (ini, ini.subscriberChannel);

                      }

              } tangkapan (Pengecualian e) {

                      e.printStackTrace ();

              }

       }

       @Selamat

       kekosongan awam onMessage (String channel, String message) {

              super.onMessage (saluran, mesej);

       }

}

The Publisherkelas mengekalkan sambungan yang berasingan untuk Redis kerana menyiarkan mesej kepada saluran.

Penerbit kelas awam {

       RedisConnection conn = null;

       Jedis jedis = null;

       saluran String peribadi;

       penerbit awam (String channelName) membuang Pengecualian {

              saluran = saluranNama;

              conn = RedisConnection.getRedisConnection ();

              jedis = conn.getJedis ();

       }

       public void publish (String msg) membuang Pengecualian {

              jedis.publish (saluran, msg);

       }

}

Yang EnglishTweetFilter, InfluencerTweetFilter, HashTagCollector, dan InfluencerCollectorpenapis melanjutkan Subscriber, yang membolehkan mereka untuk mendengar saluran masuk. Oleh kerana anda memerlukan sambungan Redis yang berasingan untuk melanggan dan menerbitkan, setiap kelas penapis mempunyai RedisConnectionobjeknya sendiri . Penapis mendengar mesej baru di saluran mereka secara berterusan. Berikut adalah contoh kod EnglishTweetFilterkelas:

kelas awam EnglishTweetFilter meluaskan Pelanggan

{

       RedisConnection peribadi conn = null;

       Jedis peribadi jedis = null; 

       penerbit String peribadiChannel = null;

public EnglishTweetFilter (String name, String subscriberChannel, String publisherChannel) membuang Pengecualian {

              super (nama, pelangganChannel);

              ini.publisherChannel = penerbitChannel;

              conn = RedisConnection.getRedisConnection ();

              jedis = conn.getJedis ();           

       }

       @Selamat

       kekosongan awam onMessage (String subscriberChannel, String message) {

              JsonParser jsonParser = JsonParser baru ();

              JsonElement jsonElement = jsonParser.parse (mesej);

              JsonObject jsonObject = jsonElement.getAsJsonObject ();

              // tapis mesej: hanya menerbitkan tweet Inggeris           

jika (jsonObject.get (“lang”)! = null &&

       jsonObject.get ("lang"). getAsString (). sama dengan ("en")) {

                      jedis.publish (penerbitChannel, mesej);

              }

       }

}

The Publisherkelas mempunyai satu kaedah yang menerbitkan mesej ke saluran yang dikehendaki menerbitkan.

Penerbit kelas awam {

.

.     

       public void publish (String msg) membuang Pengecualian {

              jedis.publish (saluran, msg);

       }

.

}

Kelas utama membaca data dari aliran penyerapan dan menghantarnya ke AllDatasaluran. Kaedah utama kelas ini memulakan semua objek penapis.

kelas awam IngestPubSub

{

.

       permulaan kosong awam () membuang Pengecualian {

       .

       .

              penerbit = Penerbit baru (“AllData”);

              englishFilter = EnglishTweetFilter baru ("Penapis Bahasa Inggeris", "AllData",

                                           "EnglishTweets");

              influencerFilter = InfluencerTweetFilter baru ("Influencer Filter",

                                           "AllData", "InfluencerTweets");

              hashtagCollector = HashTagCollector baru ("Hashtag Collector", 

                                           "EnglishTweets");

              influencerCollector = InfluencerCollector baru ("Influencer Collector",

                                           "InfluencerTweets");

       .

       .

}

Dimakan dengan Daftar Redis

Struktur data Senarai di Redis menjadikan pelaksanaan penyelesaian giliran mudah dan mudah. Dalam penyelesaian ini, pengeluar mendorong setiap mesej ke belakang barisan, dan pelanggan mengundi antrian dan menarik mesej baru dari ujung lain.

Makmal Redis

Kelebihan

  • Kaedah ini boleh dipercayai dalam kes kehilangan sambungan. Setelah data dimasukkan ke dalam senarai, data disimpan di sana sehingga pelanggan membacanya. Ini berlaku walaupun pelanggan dihentikan atau terputus hubungan dengan pelayan Redis.
  • Pengeluar dan pengguna tidak memerlukan hubungan antara mereka.

Keburukan

  • Setelah data diambil dari senarai, data tersebut akan dihapus dan tidak dapat diambil lagi. Kecuali pengguna menyimpan data tersebut, data tersebut akan hilang sebaik sahaja digunakan.
  • Setiap pengguna memerlukan barisan berasingan, yang memerlukan menyimpan banyak salinan data.

Reka bentuk kod untuk penyelesaian Redis Lists

Makmal Redis

Anda boleh memuat turun kod sumber untuk penyelesaian Daftar Redis di sini: //github.com/redislabsdemo/IngestList. Kelas utama penyelesaian ini dijelaskan di bawah.

MessageListmembenamkan struktur data Redis List. The push()kaedah menolak mesej baru di sebelah kiri barisan, dan pop()menunggu untuk mesej baru dari kanan jika barisan kosong.

Senarai Mesej kelas awam {

       dilindungi String name = "MyList"; // Nama

.

.     

       tolakan kekosongan awam (String msg) membuang Pengecualian {

              jedis.lpush (nama, msg); // Tolak Kiri

       }

       public String pop () melontarkan Pengecualian {

              kembalikan jedis.brpop (0, name) .toString ();

       }

.

.

}

MessageListeneradalah kelas abstrak yang menerapkan logik pendengar dan penerbit. A MessageListenerobjek mendengar hanya satu senarai, tetapi boleh menerbitkan pelbagai saluran ( MessageFilterobjek). Penyelesaian ini memerlukan MessageFilterobjek yang terpisah untuk setiap pelanggan di bawah paip.

class MessageListener melaksanakan Runnable {

       nama String peribadi = null;

       PrivateList inboundList = null;

       Peta keluarBoundMsgFilters = HashMap baru ();

.

.     

       public void registerOutBoundMessageList (MessageFilter msgFilter) {

              jika (msgFilter! = null) {

                      jika (outBoundMsgFilters.get (msgFilter.name) == null) {

                             outBoundMsgFilters.put (msgFilter.name, msgFilter);

                      }

              }

       }

.

.

       @Selamat

       larian kekosongan awam () {

.

                      sementara (benar) {

                             Rentetan msg = inboundList.pop ();

                             processMessage (msg);

                      }                                  

.

       }

.

       dilindungi push videomessage (String msg) melontarkan Pengecualian {

              TetapkanBoundMsgNames = outBoundMsgFilters.keySet ();

              untuk (Nama rentetan: outBoundMsgNames) {

                      MessageFilter msgList = outBoundMsgFilters.get (nama);

                      msgList.filterAndPush (msg);

              }

       }

}

MessageFilteradalah kelas induk yang memudahkan filterAndPush()kaedah. Oleh kerana data mengalir melalui sistem pengambilan, data sering disaring atau diubah sebelum dikirim ke tahap berikutnya. Kelas yang memanjangkan MessageFilterkelas mengatasi filterAndPush()kaedah, dan menerapkan logik mereka sendiri untuk mendorong mesej yang disaring ke senarai seterusnya.

MessageFilter kelas awam {

       MessageList messageList = null;

.

.

       public void filterAndPush (String msg) membuang Pengecualian {

              messageList.push (msg);

       }

.

.     

}

AllTweetsListeneradalah contoh pelaksanaan MessageListenerkelas. Ini mendengarkan semua tweet di AllDatasaluran, dan menerbitkan data ke EnglishTweetsFilterdan InfluencerFilter.

kelas awam AllTweetsListener meluaskan MessageListener {

.

.     

       utama kekosongan statik awam (String [] args) membuang Pengecualian {

              MessageListener allTweetsProcessor = AllTweetsListener.getInstance ();

allTweetsProcessor.registerOutBoundMessageList (baru

              EnglishTweetsFilter ("EnglishTweetsFilter", "EnglishTweets"));

              allTweetsProcessor.registerOutBoundMessageList (baru

                             InfluencerFilter ("InfluencerFilter", "Influencers"));

              allTweetsProcessor.start ();

       }

.

.

}

EnglishTweetsFiltermemanjang MessageFilter. Kelas ini menerapkan logik untuk memilih hanya tweet yang ditandai sebagai tweet Inggeris. Penapis membuang tweet bukan bahasa Inggeris dan mendorong tweet bahasa Inggeris ke senarai seterusnya.

kelas awam EnglishTweetsFilter memperluas MessageFilter {

       public EnglishTweetsFilter (String name, String listName) melontarkan Pengecualian {

              super (nama, listName);

       }

       @Selamat

       public void filterAndPush (String message) membuang Pengecualian {

              JsonParser jsonParser = JsonParser baru ();

              JsonElement jsonElement = jsonParser.parse (mesej);

              JsonArray jsonArray = jsonElement.getAsJsonArray ();

              JsonObject jsonObject = jsonArray.get (1) .getAsJsonObject ();

              jika (jsonObject.get (“lang”)! = null &&

jsonObject.get ("lang"). getAsString (). sama dengan ("en")) {

                             Jedis jedis = super.getJedisInstance ();

                             jika (jedis! = null) {

                                    jedis.lpush (nama super. jsonObject.toString ());

                             }

              }

       }

}