Java - Pengesanan dan Pengendalian Benang Gantung

Oleh Alex. C. Punnen

Arkitek - Rangkaian Nokia Siemens

Bangalore

Benang gantung adalah cabaran umum dalam pengembangan perisian yang harus berinteraksi dengan peranti proprietari menggunakan antara muka proprietari atau standard seperti SNMP, Q3, atau Telnet. Masalah ini tidak terhad kepada pengurusan rangkaian tetapi berlaku di pelbagai bidang seperti pelayan web, proses yang memanggil panggilan prosedur jauh, dan sebagainya.

Benang yang memulakan permintaan ke perangkat memerlukan mekanisme untuk mengesan sekiranya peranti tidak merespon atau merespon hanya sebahagian. Dalam beberapa kes di mana hang seperti itu dikesan, tindakan khusus mesti diambil. Tindakan khusus boleh dilakukan sama ada percubaan semula atau memberitahu pengguna akhir tentang kegagalan tugas atau beberapa pilihan pemulihan lain. Dalam beberapa kes di mana sebilangan besar tugas mesti dilancarkan ke sebilangan besar elemen rangkaian oleh komponen, pengesanan benang gantung adalah penting agar tidak menjadi halangan untuk pemprosesan tugas lain. Oleh itu, terdapat dua aspek untuk menguruskan tali gantung: prestasi dan pemberitahuan .

Untuk aspek pemberitahuan kita dapat menyesuaikan pola Java Observer agar sesuai dengan dunia multithread.

Menyesuaikan Pola Pengamat Java ke Sistem Multithreaded

Oleh kerana menggantung tugas, menggunakan ThreadPoolkelas Java dengan strategi yang sesuai adalah penyelesaian pertama yang terlintas di fikiran. Namun menggunakan Java ThreadPooldalam konteks beberapa utas yang tergantung secara rawak dalam jangka waktu tertentu memberikan perilaku yang tidak diinginkan berdasarkan strategi tertentu yang digunakan, seperti kelaparan benang dalam hal strategi kumpulan benang tetap. Ini terutama disebabkan oleh fakta bahawa Java ThreadPooltidak memiliki mekanisme untuk mengesan tali gantung.

Kami boleh mencuba kolam benang Cached, tetapi juga mempunyai masalah. Sekiranya terdapat kadar tugas yang tinggi, dan beberapa utas menggantung, jumlah utas dapat meningkat, akhirnya menyebabkan kebuluran sumber daya dan pengecualian kehabisan memori. Atau kita bisa menggunakan ThreadPoolstrategi Custom memanggil a CallerRunsPolicy. Dalam kes ini juga, tali gantung boleh menyebabkan semua utas tergantung akhirnya. (Benang utama tidak boleh menjadi pemanggil, kerana ada kemungkinan bahawa tugas yang diserahkan ke utas utama dapat digantung, menyebabkan semuanya berhenti.)

Jadi, apakah penyelesaiannya? Saya akan menunjukkan corak ThreadPool yang tidak begitu sederhana yang menyesuaikan ukuran kolam mengikut kadar tugas dan berdasarkan bilangan utas gantung. Mari kita pergi ke masalah mengesan benang gantung terlebih dahulu.

Mengesan Benang Gantung

Rajah 1 menunjukkan pengabaian corak:

Terdapat dua kelas penting di sini: ThreadManagerdan ManagedThread. Kedua-duanya melanjutkan dari Threadkelas Java . Tempat ThreadManagermemegang bekas yang memegang ManagedThreads. Apabila yang baru ManagedThreaddibuat, ia menambahkan dirinya ke dalam bekas ini.

 ThreadHangTester testthread = new ThreadHangTester("threadhangertest",2000,false); testthread.start(); thrdManger.manage(testthread, ThreadManager.RESTART_THREAD, 10); thrdManger.start(); 

The ThreadManageriterates melalui senarai ini dan memanggil ManagedThread's isHung()kaedah. Ini pada dasarnya adalah logik periksa cap waktu.

 if(System.currentTimeMillis() - lastprocessingtime.get() > maxprocessingtime ) { logger.debug("Thread is hung"); return true; } 

Sekiranya mendapati bahawa utas telah memasuki gelung tugas dan tidak pernah mengemas kini hasilnya, diperlukan mekanisme pemulihan seperti yang ditentukan oleh ManageThread.

 while(isRunning) { for (Iterator iterator = managedThreads.iterator(); iterator.hasNext();) { ManagedThreadData thrddata = (ManagedThreadData) iterator.next(); if(thrddata.getManagedThread().isHung()) { logger.warn("Thread Hang detected for ThreadName=" + thrddata.getManagedThread().getName() ); switch (thrddata.getManagedAction()) { case RESTART_THREAD: // The action here is to restart the the thread //remove from the manager iterator.remove(); //stop the processing of this thread if possible thrddata.getManagedThread().stopProcessing(); if(thrddata.getManagedThread().getClass() == ThreadHangTester.class) //To know which type of thread to create { ThreadHangTester newThread =new ThreadHangTester("restarted_ThrdHangTest",5000,true); //Create a new thread newThread.start(); //add it back to be managed manage(newThread, thrddata.getManagedAction(), thrddata.getThreadChecktime()); } break; ......... 

Untuk yang baru ManagedThreaddibuat dan digunakan di tempat yang digantung, ia tidak boleh menahan keadaan atau bekas apa pun. Untuk ini wadah di mana ManagedThreadtindakan harus dipisahkan. Di sini kita menggunakan corak Singleton berasaskan ENUM untuk memegang senarai Tugas. Oleh itu, wadah yang memegang tugas-tugas itu tidak bergantung pada utas yang memproses tugas. Klik pautan berikut untuk memuat turun sumber untuk corak yang dijelaskan: Java Thread Manager Source.

Strategi Hanging Threads dan Java ThreadPool

Java ThreadPooltidak mempunyai mekanisme untuk mengesan benang gantung. Menggunakan strategi seperti threadpool tetap ( Executors.newFixedThreadPool()) tidak akan berfungsi kerana jika beberapa tugas tergantung dari masa ke masa, semua utas akhirnya akan berada dalam keadaan tergantung. Pilihan lain adalah menggunakan polisi ThreadPool yang dicache (Executors.newCachedThreadPool()). Ini dapat memastikan bahawa selalu ada utas yang tersedia untuk memproses tugas, hanya dibatasi oleh memori VM, CPU, dan had utas. Walau bagaimanapun, dengan dasar ini tidak ada kawalan terhadap bilangan utas yang dibuat. Tidak kira sama ada benang pemprosesan tergantung atau tidak, menggunakan dasar ini semasa kadar tugas tinggi menyebabkan sejumlah besar utas dibuat. Sekiranya anda tidak mempunyai sumber daya yang mencukupi untuk JVM tidak lama lagi, anda akan mencapai ambang memori maksimum atau CPU yang tinggi. Cukup biasa melihat bilangan utas mencecah ratusan atau ribuan. Walaupun mereka dilepaskan setelah tugas diproses, kadang-kadang semasa menangani pecah, jumlah utas yang tinggi akan membanjiri sumber sistem.

Pilihan ketiga adalah menggunakan strategi atau polisi tersuai. Salah satu pilihan tersebut ialah mempunyai kumpulan utas yang berkisar dari 0 hingga beberapa bilangan maksimum. Oleh itu, walaupun satu utas digantung, benang baru akan dibuat selagi jumlah utas maksimum dicapai:

 execexec = new ThreadPoolExecutor(0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue()); 

Di sini 3 adalah jumlah utas maksimum dan masa bertahan hidup ditetapkan pada 60 saat kerana ini adalah proses yang memerlukan tugas. Sekiranya kita memberikan jumlah utas maksimum yang cukup tinggi, ini lebih kurang merupakan dasar yang wajar untuk digunakan dalam konteks tugas gantung. Satu-satunya masalah ialah jika benang gantung tidak dilepaskan akhirnya ada sedikit kemungkinan semua benang dapat tergantung. Sekiranya utas maksimum cukup tinggi dan menganggap bahawa tugas menggantung adalah fenomena yang jarang terjadi, maka dasar ini sesuai dengan rang undang-undang.

Alangkah manisnya jika ThreadPoolada juga mekanisme untuk mengesan benang gantung. Saya akan membincangkan satu reka bentuk itu kemudian. Sudah tentu jika semua utas dibekukan, anda boleh mengkonfigurasi dan menggunakan dasar tugas yang ditolak dari kumpulan utas. Sekiranya anda tidak mahu membuang tugas, anda harus menggunakan CallerRunsPolicy:

 execexec = new ThreadPoolExecutor(0, 20, 20, TimeUnit.MILLISECONDS, new SynchronousQueue() new ThreadPoolExecutor.CallerRunsPolicy()); 

Dalam kes ini, jika tali utas menyebabkan tugas ditolak, tugas itu akan diberikan kepada utas panggilan yang akan ditangani. Selalu ada kemungkinan tugas itu juga tergantung. Dalam kes ini, keseluruhan proses akan membeku. Oleh itu, lebih baik tidak menambahkan polisi sedemikian dalam konteks ini.

 public class NotificationProcessor implements Runnable { private final NotificationOriginator notificationOrginator; boolean isRunning = true; private final ExecutorService execexec; AlarmNotificationProcessor(NotificationOriginator norginator) { //ctor // execexec = Executors.newCachedThreadPool();// Too many threads // execexec = Executors.newFixedThreadPool(2);//, no hang tasks detection execexec = new ThreadPoolExecutor(0, 4, 250, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); } public void run() { while (isRunning) { try { final Task task = TaskQueue.INSTANCE.getTask(); Runnable thisTrap= new Runnable() { public void run() { ++alarmid; notificaionOrginator.notify(new OctetString(), // Task processing nbialarmnew.getOID(), nbialarmnew.createVariableBindingPayload()); É........}} ; execexec.execute(thisTrap); } 

ThreadPool Tersuai dengan Pengesanan Hang

Perpustakaan kumpulan utas dengan kemampuan pengesanan dan pengendalian tugas gantung akan sangat bagus untuk dimiliki. Saya telah mengembangkannya dan saya akan menunjukkannya di bawah. Ini sebenarnya adalah port dari kumpulan benang C ++ yang saya reka dan gunakan beberapa waktu lalu (lihat rujukan). Pada dasarnya, penyelesaian ini menggunakan corak Perintah dan corak Rantai Tanggungjawab. Walau bagaimanapun untuk melaksanakan corak Perintah di Jawa tanpa bantuan Fungsi sokongan objek agak sukar. Untuk ini saya harus mengubah implementasi sedikit untuk menggunakan refleksi Java. Perhatikan bahawa konteks di mana corak ini dirancang adalah di mana kumpulan utas harus dipasang / dipasang tanpa mengubah kelas yang ada.(Saya percaya satu keuntungan besar dari pengaturcaraan berorientasikan objek adalah bahawa ia memberi kita cara untuk merancang kelas agar dapat menggunakan Prinsip Tertutup Terbuka dengan berkesan. Ini terutama berlaku untuk kod warisan lama yang kompleks dan mungkin kurang relevan untuk pengembangan produk baru.) Oleh itu, saya menggunakan refleksi dan bukannya menggunakan antara muka untuk melaksanakan corak Perintah. Selebihnya kod boleh dibawa tanpa perubahan besar kerana hampir semua primitif penyegerakan dan pemberian isyarat tersedia di Java 1.5 dan seterusnya.Selebihnya kod dapat dibawa tanpa perubahan besar kerana hampir semua primitif penyegerakan dan pemberian isyarat tersedia di Java 1.5 dan seterusnya.Selebihnya kod dapat dibawa tanpa perubahan besar kerana hampir semua primitif penyegerakan dan pemberian isyarat tersedia di Java 1.5 dan seterusnya.

 public class Command { private Object[ ]argParameter; ........ //Ctor for a method with two args Command(T pObj, String methodName, long timeout, String key, int arg1, int arg2) { m_objptr = pObj; m_methodName = mthodName; m_timeout = timeout; m_key = key; argParameter = new Object[2]; argParameter[0] = arg1; argParameter[1] = arg2; } // Calls the method of the object void execute() { Class klass = m_objptr.getClass(); Class[] paramTypes = new Class[]{int.class, int.class}; try { Method methodName = klass.getMethod(m_methodName, paramTypes); //System.out.println("Found the method--> " + methodName); if (argParameter.length == 2) { methodName.invoke(m_objptr, (Object) argParameter[0], (Object) argParameter[1]); } 

Contoh penggunaan corak ini:

 public class CTask {.. public int DoSomething(int a, int b) {...} } 

Command cmd4 = new Command(task4, "DoMultiplication", 1, "key2",2,5);

Sekarang kita mempunyai dua kelas yang lebih penting di sini. Salah satunya ialah ThreadChainkelas, yang menerapkan corak Rantai Tanggungjawab:

 public class ThreadChain implements Runnable { public ThreadChain(ThreadChain p, ThreadPool pool, String name) { AddRef(); deleteMe = false; busy = false; //--> very important next = p; //set the thread chain - note this is like a linked list impl threadpool = pool; //set the thread pool - Root of the threadpool ........ threadId = ++ThreadId; ...... // start the thread thisThread = new Thread(this, name + inttid.toString()); thisThread.start(); } 

Kelas ini mempunyai dua kaedah utama. Salah satunya adalah Boolean CanHandle()yang dimulakan oleh ThreadPoolkelas dan kemudian diteruskan secara berulang. Ini memeriksa sama ada thread semasa ( ThreadChaincontoh semasa ) bebas untuk menangani tugas tersebut. Sekiranya sudah menjalankan tugas, ia memanggil tugas berikutnya dalam rantai.

 public Boolean canHandle() { if (!busy) { //If not busy System.out.println("Can Handle This Event in id=" + threadId); // todo signal an event try { condLock.lock(); condWait.signal(); //Signal the HandleRequest which is waiting for this in the run method ......................................... return true; } ......................................... ///Else see if the next object in the chain is free /// to handle the request return next.canHandle(); 

Perhatikan bahawa HandleRequestadalah kaedah ThreadChainyang dipanggil dari Thread run()kaedah dan menunggu isyarat dari canHandlekaedah tersebut. Perhatikan juga bagaimana tugas tersebut ditangani melalui corak Perintah.