Java - откриване и обработка на висящи нишки

От Алекс. C. Punnen

Архитект - Nokia Siemens Networks

Бангалор

Висящите нишки са често срещано предизвикателство при разработването на софтуер, който трябва да взаимодейства със собствени устройства, използвайки собствени или стандартизирани интерфейси като SNMP, Q3 или Telnet. Този проблем не се ограничава до управлението на мрежата, но се появява в широк спектър от полета като уеб сървъри, процеси, извикващи извиквания за отдалечени процедури и т.н.

Нишка, която инициира заявка към устройство, се нуждае от механизъм за откриване, в случай че устройството не реагира или реагира само частично. В някои случаи, когато се установи такова задържане, трябва да се предприемат конкретни действия. Конкретното действие може да бъде или повторно проучване, или уведомяване на крайния потребител за неуспеха на задачата или друга опция за възстановяване. В някои случаи, когато голям брой задачи трябва да бъдат задействани от голям брой мрежови елементи от даден компонент, откриването на висяща нишка е важно, за да не се превърне в пречка за друга обработка на задачи. Така че има два аспекта на управлението на висящи нишки: изпълнение и уведомяване .

За аспекта на уведомяването можем да приспособим шаблона на Java Observer, за да се побере в многонишковия свят.

Приспособяване на Java Observer Pattern към многонишкови системи

Поради висящи задачи, използването на ThreadPoolкласа Java с подходяща стратегия е първото решение, което идва на ум. Въпреки това използването на Java ThreadPoolв контекста на някои нишки, произволно висящи за определен период от време, дава нежелано поведение въз основа на конкретната използвана стратегия, като гладуване на нишки в случай на фиксирана стратегия за пула на нишки. Това се дължи главно на факта, че Java ThreadPoolняма механизъм за откриване на зависване на нишка.

Можем да опитаме кеширан пул от нишки, но той също има проблеми. Ако има висока скорост на задействане на задачи и някои нишки увиснат, броят на нишките може да се изстреля, което в крайна сметка ще доведе до глад на ресурси и изключения извън паметта. Или бихме могли да използваме персонализирана ThreadPoolстратегия, извикваща a CallerRunsPolicy. И в този случай закъсването на нишка може да доведе до задържане на всички нишки. (Основната нишка никога не трябва да бъде повикващият, тъй като има вероятност всяка задача, предадена на основната нишка, да увисне, което да доведе до спиране на всичко.)

И така, какво е решението? Ще демонстрирам не толкова прост модел на ThreadPool, който регулира размера на пула според скоростта на задачата и въз основа на броя висящи нишки. Нека първо да преминем към проблема с откриването на висящи нишки.

Откриване на висящи нишки

Фигура 1 показва абстракция на шаблона:

Тук има два важни класа: ThreadManagerи ManagedThread. И двете се простират от Threadкласа Java . В ThreadManagerтрюмовете на контейнер, който държи ManagedThreads. Когато се създава нов, ManagedThreadтой се добавя към този контейнер.

 ThreadHangTester testthread = new ThreadHangTester("threadhangertest",2000,false); testthread.start(); thrdManger.manage(testthread, ThreadManager.RESTART_THREAD, 10); thrdManger.start(); 

На ThreadManagerизпълнява за този списък и призовава ManagedThreadте години isHung()метод. По принцип това е логика за проверка на клеймото за време.

 if(System.currentTimeMillis() - lastprocessingtime.get() > maxprocessingtime ) { logger.debug("Thread is hung"); return true; } 

Ако установи, че нишка е влязла в цикъл на задача и никога не е актуализирала резултатите си, е необходим механизъм за възстановяване, както е предвидено в ManageThread.

 while(isRunning) { for (Iterator iterator = managedThreads.iterator(); iterator.hasNext();) { ManagedThreadData thrddata = (ManagedThreadData) iterator.next(); if(thrddata.getManagedThread().isHung()) { logger.warn("Thread Hang detected for ThreadName=" + thrddata.getManagedThread().getName() ); switch (thrddata.getManagedAction()) { case RESTART_THREAD: // The action here is to restart the the thread //remove from the manager iterator.remove(); //stop the processing of this thread if possible thrddata.getManagedThread().stopProcessing(); if(thrddata.getManagedThread().getClass() == ThreadHangTester.class) //To know which type of thread to create { ThreadHangTester newThread =new ThreadHangTester("restarted_ThrdHangTest",5000,true); //Create a new thread newThread.start(); //add it back to be managed manage(newThread, thrddata.getManagedAction(), thrddata.getThreadChecktime()); } break; ......... 

За ManagedThreadда бъде създаден и използван нов вместо окачения, той не трябва да съдържа никакво състояние или контейнер. За това контейнерът, върху който ManagedThreadтрябва да се отделят актовете. Тук използваме базирания на ENUM модел Singleton, за да съхраним списъка със задачи. Така че контейнерът, съдържащ задачите, е независим от нишката, която обработва задачите. Щракнете върху следната връзка, за да изтеглите източника за описания модел: Java Thread Manager Source.

Висящи нишки и стратегии за Java ThreadPool

Java ThreadPoolняма механизъм за откриване на висящи нишки. Използването на стратегия като фиксиран пул от нишки ( Executors.newFixedThreadPool()) няма да работи, защото ако някои задачи увиснат с течение на времето, всички нишки в крайна сметка ще бъдат в закачено състояние. Друга опция е използването на кеширана политика на ThreadPool (Executors.newCachedThreadPool()). Това може да гарантира, че винаги ще има налични нишки за обработка на задача, ограничени само от VM памет, CPU и ограничения на нишките. С тази политика обаче няма контрол върху броя на нишките, които се създават. Независимо дали нишката за обработка виси или не, използването на тази политика, докато степента на задачите е висока, води до създаването на огромен брой нишки. Ако нямате достатъчно ресурси за JVM много скоро, ще достигнете максималния праг на паметта или високия процесор. Доста често се среща броят на нишките, ударени стотици или хиляди. Въпреки че те се освобождават, след като задачата бъде обработена, понякога по време на обработка на пакети големият брой нишки ще претовари системните ресурси.

Трети вариант е използването на персонализирани стратегии или политики. Една такава опция е да имате пул от нишки, който се мащабира от 0 до някакъв максимален брой. Така че дори ако една нишка виси, нова нишка ще бъде създадена, стига да е достигнат максималният брой нишки:

 execexec = new ThreadPoolExecutor(0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue()); 

Тук 3 е максималният брой нишки и времето за поддържане на живо е настроено на 60 секунди, тъй като това е процес, който изисква много задачи. Ако дадем достатъчно висок максимален брой нишки, това е повече или по-малко разумна политика за използване в контекста на висящи задачи. Единственият проблем е, че ако висящите нишки в крайна сметка не се освободят, има малък шанс всички нишки в даден момент да се закачат. Ако максималният брой нишки е достатъчно висок и ако приемем, че заданието е рядко явление, тогава тази политика би паснала на сметката.

Би било сладко, ако ThreadPoolимаше и включен механизъм за откриване на висящи конци. Ще обсъдя един такъв дизайн по-късно. Разбира се, ако всички нишки са замразени, можете да конфигурирате и използвате политиката за отхвърлени задачи на пула от нишки. Ако не искате да отхвърлите задачите, ще трябва да използвате CallerRunsPolicy:

 execexec = new ThreadPoolExecutor(0, 20, 20, TimeUnit.MILLISECONDS, new SynchronousQueue() new ThreadPoolExecutor.CallerRunsPolicy()); 

В този случай, ако висящата нишка доведе до отхвърляне на задача, тази задача ще бъде дадена на извикващата нишка, която трябва да бъде обработена. Винаги има шанс тази задача да виси твърде много. В този случай целият процес ще замръзне. Затова е по-добре да не добавяте такава политика в този контекст.

 public class NotificationProcessor implements Runnable { private final NotificationOriginator notificationOrginator; boolean isRunning = true; private final ExecutorService execexec; AlarmNotificationProcessor(NotificationOriginator norginator) { //ctor // execexec = Executors.newCachedThreadPool();// Too many threads // execexec = Executors.newFixedThreadPool(2);//, no hang tasks detection execexec = new ThreadPoolExecutor(0, 4, 250, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); } public void run() { while (isRunning) { try { final Task task = TaskQueue.INSTANCE.getTask(); Runnable thisTrap= new Runnable() { public void run() { ++alarmid; notificaionOrginator.notify(new OctetString(), // Task processing nbialarmnew.getOID(), nbialarmnew.createVariableBindingPayload()); É........}} ; execexec.execute(thisTrap); } 

Персонализиран ThreadPool с разпознаване на закачане

Библиотека за пул от нишки с възможността за откриване и обработка на закачане на задачи би било чудесно да имате. Разработил съм такъв и ще го демонстрирам по-долу. Това всъщност е порт от пул от нишки C ++, който проектирах и използвах преди известно време (вижте препратките). По принцип това решение използва шаблона Command и Chain of Responsibility. Прилагането на командния модел в Java без помощта на функцията за поддръжка на обекти е малко трудно. За това трябваше леко да променя изпълнението, за да използвам Java отражение. Обърнете внимание, че контекстът, в който е проектиран този модел, е, когато трябва да се монтира / включи пул от нишки, без да се променя някой от съществуващите класове.(Вярвам, че голямата полза от обектно-ориентираното програмиране е, че ни дава начин да проектираме класове, така че да използваме ефективно Отворения затворен принцип. Това важи особено за сложния стар наследствен код и може да има по-малко значение за разработване на нов продукт.) Затова използвах размисъл, вместо да използвам интерфейс за реализиране на командния модел. Останалата част от кода може да бъде пренесена без значителни промени, тъй като почти всички примитиви за синхронизация на нишки и сигнализация са налични в Java 1.5 нататък.Останалата част от кода може да бъде пренесена без значителни промени, тъй като почти всички примитиви за синхронизация на нишки и сигнализация са налични в Java 1.5 нататък.Останалата част от кода може да бъде пренесена без значителни промени, тъй като почти всички примитиви за синхронизация на нишки и сигнализация са налични в Java 1.5 нататък.

 public class Command { private Object[ ]argParameter; ........ //Ctor for a method with two args Command(T pObj, String methodName, long timeout, String key, int arg1, int arg2) { m_objptr = pObj; m_methodName = mthodName; m_timeout = timeout; m_key = key; argParameter = new Object[2]; argParameter[0] = arg1; argParameter[1] = arg2; } // Calls the method of the object void execute() { Class klass = m_objptr.getClass(); Class[] paramTypes = new Class[]{int.class, int.class}; try { Method methodName = klass.getMethod(m_methodName, paramTypes); //System.out.println("Found the method--> " + methodName); if (argParameter.length == 2) { methodName.invoke(m_objptr, (Object) argParameter[0], (Object) argParameter[1]); } 

Пример за използване на този модел:

 public class CTask {.. public int DoSomething(int a, int b) {...} } 

Command cmd4 = new Command(task4, "DoMultiplication", 1, "key2",2,5);

Сега имаме още два важни класа тук. Единият е ThreadChainкласът, който прилага модела Chain of Responsibility:

 public class ThreadChain implements Runnable { public ThreadChain(ThreadChain p, ThreadPool pool, String name) { AddRef(); deleteMe = false; busy = false; //--> very important next = p; //set the thread chain - note this is like a linked list impl threadpool = pool; //set the thread pool - Root of the threadpool ........ threadId = ++ThreadId; ...... // start the thread thisThread = new Thread(this, name + inttid.toString()); thisThread.start(); } 

Този клас има два основни метода. Единият е Boolean, CanHandle()който се инициира от ThreadPoolкласа и след това продължава рекурсивно. Това проверява дали текущата нишка (текущ ThreadChainекземпляр) е свободна да се справи със задачата. Ако вече се справя със задача, тя извиква следващата във веригата.

 public Boolean canHandle() { if (!busy) { //If not busy System.out.println("Can Handle This Event in id=" + threadId); // todo signal an event try { condLock.lock(); condWait.signal(); //Signal the HandleRequest which is waiting for this in the run method ......................................... return true; } ......................................... ///Else see if the next object in the chain is free /// to handle the request return next.canHandle(); 

Имайте предвид, че HandleRequestметодът е метод, ThreadChainкойто се извиква от Thread run()метода и изчаква сигнала от canHandleметода. Също така обърнете внимание как се обработва задачата чрез командния шаблон.