IT rekvalifikace s garancí práce. Seniorní programátoři vydělávají až 160 000 Kč/měsíc a rekvalifikace je prvním krokem. Zjisti, jak na to!
Hledáme nové posily do ITnetwork týmu. Podívej se na volné pozice a přidej se do nejagilnější firmy na trhu - Více informací.

Lekce 6 - Java server - Client dispatcher

V minulé lekci, Java server - Správce spojení, jsme implementovali správce spojení.

Dnes se podíváme na správu klientů, kteří byli přesunuti do čekací fronty, protože byly naplněny maximální kapacity serveru.

Client dispatcher

Třída bude mít jednoduchý úkol. Bude se snažit udržet aktivní spojení mezi serverem a klientem. V definovaném intervalu bude procházet jednotlivé klienty v čekací frontě a odešle jim (prozatím) jednoduchou textovou zprávu s informací o počtu klientů ve frontě. V případě, že se zprávu nepodaří doručit, ukončí se spojení s klientem a klient se vyřadí z čekací fronty. Celý tento proces se bude dít pouze, pokud ve frontě budou nějací klienti.

Funkce

Funkce jsem již popsal v odstavci výše, nyní si je přehledně sepíšeme v bodech:

  • vložení klienta do fronty
  • získání klienta z fronty
  • dotaz, zda-li je ve frontě klient

V balíčku core založíme nový balíček s názvem dispatcher, ve kterém vytvoříme rozhraní IClientDispatcher. Rozhraní bude definovat funkce dispatchera:

public interface IClientDispatcher extends IThreadControl {
    boolean hasClientInQueue();
    Client getClientFromQueue();
    boolean addClientToQueue(Client client);
}

Rozhraní dědí od IThreadControl, abychom mohli ovládat vlákno, ve kterém dispatcher poběží.

Implementace třídy

Třídu, která bude implementovat rozhraní, nazveme jednoduše ClientDispatcher, necháme ji dědit od třídy Thread a implementovat rozhraní IClientDispatcher:

public class ClientDispatcher extends Thread implements IClientDispatcher

Nadefinujeme si jednu třídní konstantu, která bude reprezentovat interval, ve kterém se bude opakovat komunikace s klienty ve frontě:

private static final int SLEEP_TIME = 5000;

Následují instanční proměnné:

private boolean interupt = false;
private final Semaphore semaphore = new Semaphore(0);
private final Queue<Client> waitingQueue = new ConcurrentLinkedQueue<>();
private final Collection<Client> clientsToRemove = new ArrayList<>();
private final int waitingQueueSize;

Proměnná interrupt bude řídit vlákno. Dokud bude mít hodnotu false, vlákno bude běžet. Semafor zde bude mít řídící funkci. Dokud ve frontě nebudou žádní klienti, vlákno bude na semaforu čekat. Jakmile se připojený klient dostane do fronty, vlákno projde přes semafor a bude dělat svoji práci. Po odebrání všech klientů z fronty se vlákno opět uspí na semaforu. Následují dvě kolekce. Ve waitingQueue se budou uchovávat klienti a clientsToRemove bude obsahovat klienty, kteří ukončili spojení a je potřeba je odebrat z fronty. Proměnná waitingQueueSize obsahuje maximální počet klientů ve frontě.

Konstruktor třídy bude vyžadovat jediný parametr. Bude to právě maximální počet klientů, kteří budou čekat ve frontě:

public ClientDispatcher(int waitingQueueSize) {
    this.waitingQueueSize = waitingQueueSize;
}

Implementace funkcí

Začneme implementovat metody z rozhraní IClientDispatcher:

@Override
public boolean hasClientInQueue() {
    return !waitingQueue.isEmpty();
}

@Override
public Client getClientFromQueue() {
    return waitingQueue.poll();
}

@Override
public boolean addClientToQueue(Client client) {
    if (waitingQueue.size() < waitingQueueSize) {
        waitingQueue.add(client);
        semaphore.release();
        return true;
    }

    return false;
}

První dvě metody mají implementaci jednoduchou a není třeba je komentovat. U funkce pro přidání klienta do fronty musíme nejdříve zjistit, zda-li fronta pojme dalšího klienta. Pokud ho pojme, uvolní se semafor a vrátí se true, jinak se vrátí false a nic víc se nestane.

Výkonný kód vlákna

Nyní implementujeme nejdůležitější metodu, run(), ve které se bude odehrávat veškerá logika:

@Override
public void run() {
    while(!interupt) {
        while(waitingQueue.isEmpty() && !interupt) {
            try {
                semaphore.acquire();
            } catch (InterruptedException ignored) {}
        }

        if (interupt) {
            clientsToRemove.addAll(waitingQueue);
        } else {
        final int count = waitingQueue.size();
            waitingQueue.iterator().forEach(client -> {
                try {
                    client.writer.write(("count: " + count + "\n").getBytes());
                    client.writer.flush();
                } catch (IOException e) {
                    clientsToRemove.add(client);
                }
            });
        }

        waitingQueue.removeAll(clientsToRemove);
        for (Client client : clientsToRemove) {
            client.close();
        }
        clientsToRemove.clear();

        try {
            Thread.sleep(SLEEP_TIME);
        } catch (InterruptedException ignored) {}
    }
}

Jako první se nadefinuje nekonečná smyčka, která bude závislá na proměnné interupt. Následuje další smyčka, která bude závislá na semaforu. Vždycky je lepší mít čekání na semaforu ve smyčce než v jedné podmínce. Rozdíl mezi kódem:

if (waitingQueue.isEmpty() && !interupt) {
    try {
        semaphore.acquire();
    } catch (InterruptedException ignored) {}
}

a kódem:

while (waitingQueue.isEmpty() && !interupt) {
    try {
        semaphore.acquire();
    } catch (InterruptedException ignored) {}
}

je sice pouze v jednom slově (if a while), ale významově je rozdíl velký. Během čekání na semaforu se může vyvolat ona InterruptedException výjimka. Pokud bychom měli čekání na semaforu pomocí if, tak by vlákno začalo zbytečně vykonávat výkonný kód. Proto je důležité čekat na semaforu ve smyčce. Ve smyčce se kontrolují dvě věci:

  1. obsazenost fronty
  2. příznak interupt

Pokud se vlákno vzbudí na semaforu a fronta bude prázdná, nebo příznak interupt bude false, tak se vlákno opět uspí. Je důležité, aby byl příznak interupt přítomný. Jinak bychom nemohli ukončit vlákno při vypínání serveru.

Když opustíme čekání vlákna na semaforu, následuje vyhodnocení, zda-li se bude vlákno ukončovat či nikoliv. Pokud má nastat ukončení vlákna, tak se všichni klienti z fronty vloží do kolekce pro odstranění klientů z fronty. V případě standardního průchodu se pošle všem klientům jednoduchá zpráva. Pokud se zprávu nepodaří doručit, vloží se klient do kolekce pro odstranění klientů z fronty, protože klient nejspíš neudržel spojení.

Ve finále se ukončí spojení se všemi uživateli, kteří byli v kolekci pro odstranění klientů, a počká se definovaný čas a celý cyklus začne od začátku.

Ukončení vlákna

Nakonec implementujeme metodu shutdown(), kterou nám předepisuje rozhraní IThreadControl:

@Override
public void shutdown() {
    interupt = true;
    semaphore.release();
    try {
        join();
    } catch (InterruptedException ignored) { }

}

V této metodě uděláme tři věci:

  1. nastavíme příznak interupt na true
  2. uvolníme semafor
  3. počkáme, až se vlákno ukončí

Uvolněním semaforu se spustí vlákno dispatchera. Díky tomu, že jsme nastavili příznak interupt na true, přidají se všichni klienti z fronty na seznam adeptů pro ukončení spojení. Po ukončení spojení s klienty a odebrání z fronty se již nesplní podmínka v nekonečné smyčce a vlákno se bezpečně ukončí.

Propojení logiky

Ve druhé části dnešního článku použijeme client dispatcher ve třídě ConnectionManager. Pro začátek přidáme novou třídní konstantu typu IClientDispatcher a do konstruktoru třídy ConnectionManager parametr stejného typu, kterým konstantu inicializujeme:

public ConnectionManager(IClientDispatcher clientDispatcher, ExecutorService pool,
    int maxClients) {
    this.clientDispatcher = clientDispatcher;
    this.pool = pool;
    this.maxClients = maxClients;
}

Dále dokončíme implementaci metody insertClientToListOrQueue(). Upravíme connectionClosedListener tak, aby se server pokusil získat z fronty čekajícího klienta a přidal ho mezi aktivní klienty:

client.setConnectionClosedListener(() - > {
    clients.remove(client);
    if (clientDispatcher.hasClientInQueue()) {
        this.insertClientToListOrQueue(clientDispatcher.getClientFromQueue());
    }
});

Místo druhého TODO naimplementujeme vložení klienta do fronty:

if (!clientDispatcher.addClientToQueue(client)) {
    client.close();
}

Zde využijeme návratové hodnoty metody addClientToQueue(), která vrátí true, pokud klienta vloží do fronty. Pokud je fronta plná, vrátí false a jako reakci na plnou frontu odpojíme klienta.

Nyní je potřeba již jen spustit vlákno dispatchera. O spuštění se postaráme v metodě onServerStart() třídy ConnectionManager, kde zavoláme:

clientDispatcher.start();

V metodě onServerStop() ukončíme dispatchera:

clientDispatcher.shutdown();

Nad dispatcherem zavoláme metodu shutdown(), kterou se nastaví příznak a probudí vlákno. Po chvíli se vlákno dispatchera ukončí.

Nakonec vytvoříme továrnu na výrobu dispatchera a zaregistrujeme ji. Vytvoříme tedy rozhraní IClientDispatcherFactory, které bude mít jedinou metodu getClientDispatcher(), která bude v parametru přijímat maximální počet klientů ve frontě.

public interface IClientDispatcherFactory {
    IClientDispatcher getClientDispatcher(int waitingQueueSize);
}

Implementace tohoto rozhraní bude velice jednoduchá. Vytvoříme tedy třídu ClientDispatcherFactory, která bude toto rozhraní implementovat a implementujeme jedinou metodu getClientDispatcher():

public class ClientDispatcherFactory implements IClientDispatcherFactory {
    @Override
    public IClientDispatcher getClientDispatcher(int waitingQueueSize) {
        return new ClientDispatcher(waitingQueueSize);
    }
}

Továrnu zaregistrujeme ve třídě ServerModule obvyklým způsobem:

bind(IClientDispatcherFactory.class).to(ClientDispatcherFactory.class);

Vše je téměř v pořádku, až na továrnu správce spojení ConnectionManagerFactory. Třídě ConnectionManager jsme změnili signaturu konstruktoru přidáním parametru typu IClientDispatcher. Vytvoříme tedy v této továrně novou instanční konstantu typu IClientDispatcherFactory. Tuto konstantu bude dostávat továrna správce spojení v konstruktoru:

private final IClientDispatcherFactory clientDispatcherFactory;

@Inject
ConnectionManagerFactory(IClientDispatcherFactory clientDispatcherFactory) {
    this.clientDispatcherFactory = clientDispatcherFactory;
}

Nyní nám nic nebrání upravit metodu getConnectionManager(). Pro vytvoření nové instance správce spojení využijeme právě továrnu client dispatchera:

return new ConnectionManager(clientDispatcherFactory.getClientDispatcher( waitingQueueSize), pool, maxClients);

Tím bychom měli hotovou správu klientů čekajících ve frontě.

Příště, v lekci Java server - Zapisovací vlákno, vytvoříme vlákno, které bude asynchronně odesílat zprávy ze serveru ke klientům.


 

Měl jsi s čímkoli problém? Stáhni si vzorovou aplikaci níže a porovnej ji se svým projektem, chybu tak snadno najdeš.

Stáhnout

Stažením následujícího souboru souhlasíš s licenčními podmínkami

Staženo 20x (156.7 kB)
Aplikace je včetně zdrojových kódů v jazyce Java

 

Předchozí článek
Java server - Správce spojení
Všechny články v sekci
Server pro klientské aplikace v Javě
Přeskočit článek
(nedoporučujeme)
Java server - Zapisovací vlákno
Článek pro vás napsal Petr Štechmüller
Avatar
Uživatelské hodnocení:
1 hlasů
Autor se věnuje primárně programování v Javě, ale nebojí se ani webových technologií.
Aktivity