Lekce 6 - Java server - Client dispatcher
V minulé lekci, Java server - Správce spojení, jsme implementovali správce spojení.
Dnes se podíváme na správu klientů, kteří byli přesunuti do čekací fronty, protože byly naplněny maximální kapacity serveru.
Client dispatcher
Třída bude mít jednoduchý úkol. Bude se snažit udržet aktivní spojení mezi serverem a klientem. V definovaném intervalu bude procházet jednotlivé klienty v čekací frontě a odešle jim (prozatím) jednoduchou textovou zprávu s informací o počtu klientů ve frontě. V případě, že se zprávu nepodaří doručit, ukončí se spojení s klientem a klient se vyřadí z čekací fronty. Celý tento proces se bude dít pouze, pokud ve frontě budou nějací klienti.
Funkce
Funkce jsem již popsal v odstavci výše, nyní si je přehledně sepíšeme v bodech:
- vložení klienta do fronty
- získání klienta z fronty
- dotaz, zda-li je ve frontě klient
V balíčku core
založíme nový balíček s názvem
dispatcher
, ve kterém vytvoříme rozhraní
IClientDispatcher
. Rozhraní bude definovat funkce
dispatchera:
public interface IClientDispatcher extends IThreadControl { boolean hasClientInQueue(); Client getClientFromQueue(); boolean addClientToQueue(Client client); }
Rozhraní dědí od IThreadControl
, abychom
mohli ovládat vlákno, ve kterém dispatcher poběží.
Implementace třídy
Třídu, která bude implementovat rozhraní, nazveme jednoduše
ClientDispatcher
, necháme ji dědit od třídy
Thread
a implementovat rozhraní
IClientDispatcher
:
public class ClientDispatcher extends Thread implements IClientDispatcher
Nadefinujeme si jednu třídní konstantu, která bude reprezentovat interval, ve kterém se bude opakovat komunikace s klienty ve frontě:
private static final int SLEEP_TIME = 5000;
Následují instanční proměnné:
private boolean interupt = false; private final Semaphore semaphore = new Semaphore(0); private final Queue<Client> waitingQueue = new ConcurrentLinkedQueue<>(); private final Collection<Client> clientsToRemove = new ArrayList<>(); private final int waitingQueueSize;
Proměnná interrupt
bude řídit vlákno. Dokud bude mít
hodnotu false
, vlákno bude běžet. Semafor zde bude mít
řídící funkci. Dokud ve frontě nebudou žádní klienti, vlákno bude na
semaforu čekat. Jakmile se připojený klient dostane do fronty, vlákno projde
přes semafor a bude dělat svoji práci. Po odebrání všech klientů z fronty
se vlákno opět uspí na semaforu. Následují dvě kolekce. Ve
waitingQueue
se budou uchovávat klienti a
clientsToRemove
bude obsahovat klienty, kteří ukončili spojení
a je potřeba je odebrat z fronty. Proměnná waitingQueueSize
obsahuje maximální počet klientů ve frontě.
Konstruktor třídy bude vyžadovat jediný parametr. Bude to právě maximální počet klientů, kteří budou čekat ve frontě:
public ClientDispatcher(int waitingQueueSize) { this.waitingQueueSize = waitingQueueSize; }
Implementace funkcí
Začneme implementovat metody z rozhraní
IClientDispatcher
:
@Override public boolean hasClientInQueue() { return !waitingQueue.isEmpty(); } @Override public Client getClientFromQueue() { return waitingQueue.poll(); } @Override public boolean addClientToQueue(Client client) { if (waitingQueue.size() < waitingQueueSize) { waitingQueue.add(client); semaphore.release(); return true; } return false; }
První dvě metody mají implementaci jednoduchou a není třeba je
komentovat. U funkce pro přidání klienta do fronty musíme nejdříve
zjistit, zda-li fronta pojme dalšího klienta. Pokud ho pojme, uvolní se
semafor a vrátí se true
, jinak se vrátí false
a
nic víc se nestane.
Výkonný kód vlákna
Nyní implementujeme nejdůležitější metodu, run()
, ve
které se bude odehrávat veškerá logika:
@Override public void run() { while(!interupt) { while(waitingQueue.isEmpty() && !interupt) { try { semaphore.acquire(); } catch (InterruptedException ignored) {} } if (interupt) { clientsToRemove.addAll(waitingQueue); } else { final int count = waitingQueue.size(); waitingQueue.iterator().forEach(client -> { try { client.writer.write(("count: " + count + "\n").getBytes()); client.writer.flush(); } catch (IOException e) { clientsToRemove.add(client); } }); } waitingQueue.removeAll(clientsToRemove); for (Client client : clientsToRemove) { client.close(); } clientsToRemove.clear(); try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException ignored) {} } }
Jako první se nadefinuje nekonečná smyčka, která bude závislá na
proměnné interupt
. Následuje další smyčka, která bude
závislá na semaforu. Vždycky je lepší mít čekání na semaforu ve smyčce
než v jedné podmínce. Rozdíl mezi kódem:
if (waitingQueue.isEmpty() && !interupt) { try { semaphore.acquire(); } catch (InterruptedException ignored) {} }
a kódem:
while (waitingQueue.isEmpty() && !interupt) { try { semaphore.acquire(); } catch (InterruptedException ignored) {} }
je sice pouze v jednom slově (if
a
while
), ale významově je rozdíl velký. Během
čekání na semaforu se může vyvolat ona
InterruptedException
výjimka. Pokud bychom měli
čekání na semaforu pomocí if
, tak by vlákno
začalo zbytečně vykonávat výkonný kód. Proto je důležité čekat na
semaforu ve smyčce. Ve smyčce se kontrolují dvě věci:
- obsazenost fronty
- příznak
interupt
Pokud se vlákno vzbudí na semaforu a fronta bude prázdná, nebo příznak
interupt
bude false
, tak se vlákno opět uspí. Je
důležité, aby byl příznak interupt
přítomný. Jinak bychom
nemohli ukončit vlákno při vypínání serveru.
Když opustíme čekání vlákna na semaforu, následuje vyhodnocení, zda-li se bude vlákno ukončovat či nikoliv. Pokud má nastat ukončení vlákna, tak se všichni klienti z fronty vloží do kolekce pro odstranění klientů z fronty. V případě standardního průchodu se pošle všem klientům jednoduchá zpráva. Pokud se zprávu nepodaří doručit, vloží se klient do kolekce pro odstranění klientů z fronty, protože klient nejspíš neudržel spojení.
Ve finále se ukončí spojení se všemi uživateli, kteří byli v kolekci pro odstranění klientů, a počká se definovaný čas a celý cyklus začne od začátku.
Ukončení vlákna
Nakonec implementujeme metodu shutdown()
, kterou nám
předepisuje rozhraní IThreadControl
:
@Override public void shutdown() { interupt = true; semaphore.release(); try { join(); } catch (InterruptedException ignored) { } }
V této metodě uděláme tři věci:
- nastavíme příznak
interupt
natrue
- uvolníme semafor
- počkáme, až se vlákno ukončí
Uvolněním semaforu se spustí vlákno dispatchera. Díky tomu, že jsme
nastavili příznak interupt
na true
,
přidají se všichni klienti z fronty na seznam adeptů pro ukončení
spojení. Po ukončení spojení s klienty a odebrání z fronty se již
nesplní podmínka v nekonečné smyčce a vlákno se bezpečně ukončí.
Propojení logiky
Ve druhé části dnešního článku použijeme client dispatcher ve
třídě ConnectionManager
. Pro začátek
přidáme novou třídní konstantu typu
IClientDispatcher
a do konstruktoru třídy
ConnectionManager
parametr stejného typu, kterým
konstantu inicializujeme:
public ConnectionManager(IClientDispatcher clientDispatcher, ExecutorService pool, int maxClients) { this.clientDispatcher = clientDispatcher; this.pool = pool; this.maxClients = maxClients; }
Dále dokončíme implementaci metody
insertClientToListOrQueue()
. Upravíme
connectionClosedListener
tak, aby se server
pokusil získat z fronty čekajícího klienta a přidal ho mezi aktivní
klienty:
client.setConnectionClosedListener(() - > { clients.remove(client); if (clientDispatcher.hasClientInQueue()) { this.insertClientToListOrQueue(clientDispatcher.getClientFromQueue()); } });
Místo druhého TODO naimplementujeme vložení klienta do fronty:
if (!clientDispatcher.addClientToQueue(client)) {
client.close();
}
Zde využijeme návratové hodnoty metody addClientToQueue()
,
která vrátí true
, pokud klienta vloží do fronty. Pokud je
fronta plná, vrátí false
a jako reakci na plnou frontu odpojíme
klienta.
Nyní je potřeba již jen spustit vlákno dispatchera. O spuštění se
postaráme v metodě onServerStart()
třídy
ConnectionManager
, kde zavoláme:
clientDispatcher.start();
V metodě onServerStop()
ukončíme dispatchera:
clientDispatcher.shutdown();
Nad dispatcherem zavoláme metodu shutdown()
, kterou se nastaví
příznak a probudí vlákno. Po chvíli se vlákno dispatchera ukončí.
Nakonec vytvoříme továrnu na výrobu dispatchera a zaregistrujeme ji.
Vytvoříme tedy rozhraní
IClientDispatcherFactory
, které bude mít jedinou
metodu getClientDispatcher()
, která bude v parametru přijímat
maximální počet klientů ve frontě.
public interface IClientDispatcherFactory { IClientDispatcher getClientDispatcher(int waitingQueueSize); }
Implementace tohoto rozhraní bude velice jednoduchá. Vytvoříme tedy
třídu ClientDispatcherFactory
, která bude toto
rozhraní implementovat a implementujeme jedinou metodu
getClientDispatcher()
:
public class ClientDispatcherFactory implements IClientDispatcherFactory { @Override public IClientDispatcher getClientDispatcher(int waitingQueueSize) { return new ClientDispatcher(waitingQueueSize); } }
Továrnu zaregistrujeme ve třídě ServerModule
obvyklým
způsobem:
bind(IClientDispatcherFactory.class).to(ClientDispatcherFactory.class);
Vše je téměř v pořádku, až na továrnu správce spojení
ConnectionManagerFactory
. Třídě
ConnectionManager
jsme změnili signaturu
konstruktoru přidáním parametru typu
IClientDispatcher
. Vytvoříme tedy v této
továrně novou instanční konstantu typu
IClientDispatcherFactory
. Tuto konstantu bude
dostávat továrna správce spojení v konstruktoru:
private final IClientDispatcherFactory clientDispatcherFactory; @Inject ConnectionManagerFactory(IClientDispatcherFactory clientDispatcherFactory) { this.clientDispatcherFactory = clientDispatcherFactory; }
Nyní nám nic nebrání upravit metodu getConnectionManager()
.
Pro vytvoření nové instance správce spojení využijeme právě továrnu
client dispatchera:
return new ConnectionManager(clientDispatcherFactory.getClientDispatcher( waitingQueueSize), pool, maxClients);
Tím bychom měli hotovou správu klientů čekajících ve frontě.
Příště, v lekci Java server - Zapisovací vlákno, vytvoříme vlákno, které bude asynchronně odesílat zprávy ze serveru ke klientům.
Měl jsi s čímkoli problém? Stáhni si vzorovou aplikaci níže a porovnej ji se svým projektem, chybu tak snadno najdeš.
Stáhnout
Stažením následujícího souboru souhlasíš s licenčními podmínkami
Staženo 22x (156.7 kB)
Aplikace je včetně zdrojových kódů v jazyce Java