Vánoční nadílka Vánoční nadílka
Vánoční akce! Daruj lepší budoucnost blízkým nebo sobě. Až +50 % zdarma na dárkové poukazy. Více informací

Lekce 6 - Java server - Client dispatcher

Java Server pro klientské aplikace Java server - Client dispatcher

ONEbit hosting Unicorn College Tento obsah je dostupný zdarma v rámci projektu IT lidem. Vydávání, hosting a aktualizace umožňují jeho sponzoři.

V minulé lekci, Java server - Správce spojení, jsme implementovali správce spojení. Dnes se podíváme na správu klientů, kteří byli přesunuti do čekací fronty, protože byly naplněny maximální kapacity serveru.

Client dispatcher

Třída bude mít jednoduchý úkol. Bude se snažit udržet aktivní spojení mezi serverem a klientem. V definovaném intervalu bude procházet jednotlivé klienty v čekací frontě a odešle jim (prozatím) jednoduchou textovou zprávu s informací o počtu klientů ve frontě. V případě, že se zprávu nepodaří doručit, ukončí se spojení s klientem a klient se vyřadí z čekací fronty. Celý tento proces se bude dít pouze, pokud ve frontě budou nějací klienti.

Funkce

Funkce jsem již popsal v odstavci výše, nyní si je přehledně sepíšeme v bodech:

  • vložení klienta do fronty
  • získání klienta z fronty
  • dotaz, zda-li je ve frontě klient

V balíčku core založíme nový balíček s názvem dispatcher, ve kterém vytvoříme rozhraní IClientDispatcher. Rozhraní bude definovat funkce dispatchera:

public interface IClientDispatcher extends IThreadControl {
    boolean hasClientInQueue();
    Client getClientFromQueue();
    boolean addClientToQueue(Client client);
}

Rozhraní dědí od IThreadControl, abychom mohli ovládat vlákno, ve kterém dispatcher poběží.

Implementace třídy

Třídu, která bude implementovat rozhraní, nazveme jednoduše ClientDispatcher, necháme ji dědit od třídy Thread a implementovat rozhraní IClientDispatcher:

public class ClientDispatcher extends Thread implements IClientDispatcher

Nadefinujeme si jednu třídní konstantu, která bude reprezentovat interval, ve kterém se bude opakovat komunikace s klienty ve frontě:

private static final int SLEEP_TIME = 5000;

Následují instanční proměnné:

private boolean interupt = false;
private final Semaphore semaphore = new Semaphore(0);
private final Queue<Client> waitingQueue = new ConcurrentLinkedQueue<>();
private final Collection<Client> clientsToRemove = new ArrayList<>();
private final int waitingQueueSize;

Proměnná interrupt bude řídit vlákno. Dokud bude mít hodnotu false, vlákno bude běžet. Semafor zde bude mít řídící funkci . Dokud ve frontě nebudou žádní klienti, vlákno bude na semaforu čekat. Jakmile se připojený klient dostane do fronty, vlákno projde přes semafor a bude dělat svoji práci. Po odebrání všech klientů z fronty se vlákno opět uspí na semaforu. Následují dvě kolekce. Ve waitingQueue se budou uchovávat klienti a clientsToRemove bude obsahovat klienty, kteří ukončili spojení a je potřeba je odebrat z fronty. Proměnná waitingQueueSize obsahuje maximální počet klientů ve frontě.

Konstruktor třídy bude vyžadovat jediný parametr. Bude to právě maximální počet klientů, kteří budou čekat ve frontě:

public ClientDispatcher(int waitingQueueSize) {
    this.waitingQueueSize = waitingQueueSize;
}

Implementace funkcí

Začneme implementovat metody z rozhraní IClientDispatcher:

@Override
public boolean hasClientInQueue() {
    return !waitingQueue.isEmpty();
}

@Override
public Client getClientFromQueue() {
    return waitingQueue.poll();
}

@Override
public boolean addClientToQueue(Client client) {
    if (waitingQueue.size() < waitingQueueSize) {
        waitingQueue.add(client);
        semaphore.release();
        return true;
    }

    return false;
}

První dvě metody mají implementaci jednoduchou a není třeba je komentovat. U funkce pro přidání klienta do fronty musíme nejdříve zjistit, zda-li fronta pojme dalšího klienta. Pokud ho pojme, uvolní se semafor a vrátí se true, jinak se vrátí false a nic víc se nestane.

Výkonný kód vlákna

Nyní implementujeme nejdůležitější metodu, run(), ve které se bude odehrávat veškerá logika:

@Override
public void run() {
    while(!interupt) {
        while(waitingQueue.isEmpty() && !interupt) {
            try {
                semaphore.acquire();
            } catch (InterruptedException ignored) {}
        }

        if (interupt) {
            clientsToRemove.addAll(waitingQueue);
        } else {
        final int count = waitingQueue.size();
            waitingQueue.iterator().forEach(client -> {
                try {
                    client.writer.write(("count: " + count + "\n").getBytes());
                    client.writer.flush();
                } catch (IOException e) {
                    clientsToRemove.add(client);
                }
            });
        }

        waitingQueue.removeAll(clientsToRemove);
        for (Client client : clientsToRemove) {
            client.close();
        }
        clientsToRemove.clear();

        try {
            Thread.sleep(SLEEP_TIME);
        } catch (InterruptedException ignored) {}
    }
}

Jako první se nadefinuje nekonečná smyčka, která bude závislá na proměnné interupt. Následuje další smyčka, která bude závislá na semaforu. Vždycky je lepší mít čekání na semaforu ve smyčce než v jedné podmínce. Rozdíl mezi kódem:

if (waitingQueue.isEmpty() && !interupt) {
    try {
        semaphore.acquire();
    } catch (InterruptedException ignored) {}
}

a kódem:

while (waitingQueue.isEmpty() && !interupt) {
    try {
        semaphore.acquire();
    } catch (InterruptedException ignored) {}
}

je sice pouze v jednom slově (if a while), ale významově je rozdíl velký. Během čekání na semaforu se může vyvolat ona InterruptedException výjimka. Pokud bychom měli čekání na semaforu pomocí if, tak by vlákno začalo zbytečně vykonávat výkonný kód. Proto je důležité čekat na semaforu ve smyčce. Ve smyčce se kontrolují dvě věci:

  1. obsazenost fronty
  2. příznak interupt

Pokud se vlákno vzbudí na semaforu a fronta bude prázdná, nebo příznak interupt bude false, tak se vlákno opět uspí. Je důležité, aby byl příznak interupt přítomný. Jinak bychom nemohli ukončit vlákno při vypínání serveru.

Když opustíme čekání vlákna na semaforu, následuje vyhodnocení, zda-li se bude vlákno ukončovat či nikoliv. Pokud má nastat ukončení vlákna, tak se všichni klienti z fronty vloží to kolekce pro odstranění klientů z fronty. V případě standardního průchodu se pošle všem klientům jednoduchá zpráva. Pokud se zprávu nepodaří doručit, vloží se klient do kolekce pro odstranění klientů z fronty, protože klient nejspíš neudržel spojení.

Ve finále se ukončí spojení se všemi uživateli, kteří byli v kolekci pro odstranění klientů, a počká se definovaný čas a celý cyklus začne od začátku.

Ukončení vlákna

Nakonec implementujeme metodu shutdown(), kterou nám předepisuje rozhraní IThreadControl:

@Override
public void shutdown() {
    interupt = true;
    semaphore.release();
    try {
        join();
    } catch (InterruptedException ignored) { }

}

V této metodě uděláme tři věci:

  1. nastavíme příznak interupt na true
  2. uvolníme semafor
  3. počkáme, až se vlákno ukončí

Uvolněním semaforu se spustí vlákno dispatchera. Díky tomu, že jsme nastavili příznak interupt na true, přidají se všichni klienti z fronty na seznam adeptů pro ukončení spojení. Po ukončení spojení s klienty a odebrání z fronty se již nesplní podmínka v nekonečné smyčce a vlákno se bezpečně ukončí.

Propojení logiky

Ve druhé části dnešního článku použijeme client dispatcher ve třídě ConnectionManager. Pro začátek přidáme novou třídní konstantu typu IClientDispatcher a do konstruktoru třídy ConnectionManager parametr stejného typu, kterým konstantu inicializujeme:

public ConnectionManager(IClientDispatcher clientDispatcher, ExecutorService pool,
    int maxClients) {
    this.clientDispatcher = clientDispatcher;
    this.pool = pool;
    this.maxClients = maxClients;
}

Dále dokončíme implementaci metody insertClientToListOrQueue(). Upravíme connectionClosedListener tak, aby se server pokusil získat z fronty čekajícího klienta a přidal ho mezi aktivní klienty:

client.setConnectionClosedListener(() - > {
    clients.remove(client);
    if (clientDispatcher.hasClientInQueue()) {
        this.insertClientToListOrQueue(clientDispatcher.getClientFromQueue());
    }
});

Místo druhého TODO naimplementujeme vložení klienta do fronty:

if (!clientDispatcher.addClientToQueue(client)) {
    client.close();
}

Zde využijeme návratové hodnoty metody addClientToQueue(), která vrátí true, pokud klienta vloží do fronty. Pokud je fronta plná, vrátí false a jako reakci na plnou frontu odpojíme klienta.

Nyní je potřeba již jen spustit vlákno dispatchera. O spuštění se postaráme v metodě onServerStart() třídy ConnectionManager, kde zavoláme:

clientDispatcher.start();

V metodě onServerStop() ukončíme dispatchera:

clientDispatcher.shutdown();

Nad dispatcherem zavoláme metodu shutdown(), kterou se nastaví příznak a probudí vlákno. Po chvíli se vlákno dispatchera ukončí.

Nakonec vytvoříme továrnu na výrobu dispatchera a zaregistrujeme ji. Vytvoříme tedy rozhraní IClientDispatcherFactory, které bude mít jedinou metodu getClientDispatcher(), která bude v parametru přijímat maximální počet klientů ve frontě.

public interface IClientDispatcherFactory {
    IClientDispatcher getClientDispatcher(int waitingQueueSize);
}

Implementace tohoto rozhraní bude velice jednoduchá. Vytvoříme tedy třídu ClientDispatcherFactory, která bude toto rozhraní implementovat a implementujeme jedinou metodu getClientDispatcher():

public class ClientDispatcherFactory implements IClientDispatcherFactory {
    @Override
    public IClientDispatcher getClientDispatcher(int waitingQueueSize) {
        return new ClientDispatcher(waitingQueueSize);
    }
}

Továrnu zaregistrujeme ve třídě ServerModule obvyklým způsobem:

bind(IClientDispatcherFactory.class).to(ClientDispatcherFactory.class);

Vše je téměř v pořádku, až na továrnu správce spojení ConnectionManagerFactory. Třídě ConnectionManager jsme změnili signaturu konstruktoru přidáním parametru typu IClientDispatcher. Vytvoříme tedy v této továrně novou instanční konstantu typu IClientDispatcherFactory. Tuto konstantu bude dostávat továrna správce spojení v konstruktoru:

private final IClientDispatcherFactory clientDispatcherFactory;

@Inject
ConnectionManagerFactory(IClientDispatcherFactory clientDispatcherFactory) {
    this.clientDispatcherFactory = clientDispatcherFactory;
}

Nyní nám nic nebrání upravit metodu getConnectionManager(). Pro vytvoření nové instance správce spojení využijeme právě továrnu client dispatchera:

return new ConnectionManager(clientDispatcherFactory.getClientDispatcher( waitingQueueSize), pool, maxClients);

Tím bychom měli hotovou správu klientů čekajících ve frontě. Příště, v lekci Java server - Zapisovací vlákno, vytvoříme vlákno, které bude asynchronně odesílat zprávy ze serveru ke klientům.


 

Stáhnout

Staženo 2x (130.02 kB)
Aplikace je včetně zdrojových kódů v jazyce Java

 

 

Článek pro vás napsal Petr Štechmüller
Avatar
Jak se ti líbí článek?
Ještě nikdo nehodnotil, buď první!
Autor se věnuje primárně programování v Jave, ale nebojí se ani webových technologií.
Miniatura
Předchozí článek
Java server - Správce spojení
Miniatura
Všechny články v sekci
Server pro klientské aplikace v Javě
Miniatura
Následující článek
Java server - Zapisovací vlákno
Aktivity (2)

 

 

Komentáře

Děláme co je v našich silách, aby byly zdejší diskuze co nejkvalitnější. Proto do nich také mohou přispívat pouze registrovaní členové. Pro zapojení do diskuze se přihlas. Pokud ještě nemáš účet, zaregistruj se, je to zdarma.

Zatím nikdo nevložil komentář - buď první!