Vydělávej až 160.000 Kč měsíčně! Akreditované rekvalifikační kurzy s garancí práce od 0 Kč. Více informací.
Hledáme nové posily do ITnetwork týmu. Podívej se na volné pozice a přidej se do nejagilnější firmy na trhu - Více informací.

Lekce 7 - Java server - Zapisovací vlákno

V minulé lekci, Java server - Client dispatcher, jsme se zabývali čekací frontou s klienty.

Dnes implementujeme odesílání zpráv klientovi pomocí samostatného vlákna. Dosáhneme tak pocitu lepší odezvy serveru.

Zapisovací vlákno

Až dosud jsme odesílali zprávy ze serveru ve vlákně, které zrovna zpracovávalo příchozí data. Tomu se říká synchronní zpracování. Problém v tomto případě je, že se jedná o I/O operaci, což je časově náročná činnost. Mnohem lepší řešení je zpracovávat veškeré tyto činnosti v samostatném vlákně. My si za tímto účelem vytvoříme vlákno, které bude mít jediný úkol: odeslat zprávy ze serveru klientovi.

V balíčku core vytvoříme nový balíček writer, ve kterém budeme vytvářet třídy týkající se zapisovacího vlákna. Opět začneme vytvořením rozhraní, kterým nadefinujeme metody. Vytvoříme rozhraní IWriterThread a necháme ho dědit od rozhraní IThreadControl. Rozhraní bude mít jedinou metodu sendMessage(). Metoda bude přijímat dva parametry: ObjectOutputStream, do kterého se zpráva zapíše, a Object , který je samotná zpráva:

public interface IWriterThread extends IThreadControl {
    void sendMessage(ObjectOutputStream writer, Object message);
}

Dále vytvoříme třídu WriterThread, která bude dědit ze třídy Thread a implementovat výše definované rozhraní:

public class WriterThread extends Thread implements IWriterThread {
}

V třídě si nadefinujeme dvě instanční konstanty a dvě proměnné:

private final Semaphore semaphore = new Semaphore(0);
private final Queue<QueueTuple> messageQueue = new ConcurrentLinkedQueue<>();
private boolean working = false;
private boolean interrupt = false;

Princip zde přítomného semaforu je stejný jako v předchozí lekci. Vlákno bude na semaforu spát, dokud nedostane signál, že má pracovat. Fronta zpráv messageQueue obsahuje zprávy, které se budou posílat. Zatím je typovaná na neznámý typ QueueTuple, který si záhy vytvoříme. Proměnná working říká, zda-li vlákno pracuje, nebo spí na semaforu. Proměnná interrupt má stejný účel jako v předchozích lekcích, kdy jsme vytvářeli třídy, které dědí od třídy Thread.

Konstruktor třídy necháme tentokrát prázdný. Pouze za účelem lepší orientace dáme vláknu lepší název. V konstruktoru zavoláme konstruktor třídy Thread s jedním přetížením, které jako parametr bere název vlákna:

public WriterThread() {
    super("WriterThread");
}

Metody vlákna

Rozhraní nám říká, že musíme implementovat dvě metody: sendMessage() a shutdown(). Podíváme se na první z nich. Odeslání zprávy nebude nic jiného, než přidání zprávy do fronty zpráv a v případě, že vlákno nepracuje, tak ho probudíme a nastavíme příznak working na hodnotu true:

@Override
public void sendMessage(ObjectOutputStream writer, Object message) {
    messageQueue.add(new QueueTuple(outputStream, message));
    if (!working) {
        working = true;
        semaphore.release();
    }
}

Metoda shutdown() bude stejná jako v minulé lekci:

@Override
public void shutdown() {
    interrupt = true;
    semaphore.release();
    try {
        join();
    } catch (InterruptedException ignored) {}
}

Nastavíme příznak interrupt na hodnotu true, uvolníme semafor a počkáme, až se zapisovací vlákno ukončí.

Výkonný kód vlákna

Nyní implementujeme samotný kód, který se postará o odeslání zpráv v našem vlákně. Metoda run() vypadá následovně:

@Override
public void run() {
    while(!interrupt) {
        while(messageQueue.isEmpty() && !interrupt) {
            try {
                semaphore.acquire();
            } catch (InterruptedException ignored) {}
        }
        working = true;
        while(!messageQueue.isEmpty()) {
            final QueueTuple entry = messageQueue.poll();
            try {
                entry.writer.writeLine(entry.message);
                entry.writer.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        working = false;
    }
}

Kód je velmi podobný tomu, co jsme implementovali u client dispatchera. Máme zde nekonečnou smyčku závisející na proměnné interrupt. Následuje další nekonečná smyčka pro čekání vlákna na semaforu. Proč je čekání vlákna ve smyčce jsme rozebírali opět v minulé lekci. V další smyčce se iteruje přes frontu zpráv a postupně se odešlou všechny zprávy, které se nashromáždily. Když ve frontě nejsou žádné zprávy, jde vlákno opět odpočívat na semafor.

Zbývá již jen nadefinovat třídu QueueTuple. Jedná se o jednoduchou přepravku, která bude obsahovat dvě konstanty. Není náhoda, že tyto konstanty se získají v metodě sendMessage(). Kód celé třídy je zde:

private static final class QueueTuple {
    final Object message;
    final ObjectOutputStream writer;

    private QueueTuple(ObjectOutputStream writer, Object message) {
        this.message = message;
        this.writer = writer;
    }
}

Třídu umístěte do třídy WriterThread, protože nikde jinde ji používat nebudeme.

Propojení logiky

Ve třídě ConnectionManager vytvoříme instanční konstantu zapisovacího vlákna, kterou budeme inicializovat v konstruktoru z parametru:

private final IWriterThread writerThread;
public ConnectionManager(IClientDispatcher clientDispatcher, IWriterThread writerThread,
    ExecutorService pool, int maxClients) {
    this.clientDispatcher = clientDispatcher;
    this.writerThread = writerThread;
    this.pool = pool;
    this.maxClients = maxClients;
}

Spuštění vlákna provedeme v metodě onServerStart() ve správci spojení hned po zapnutí client dispatchera:

writerThread.start();

Ukončení vlákna provedeme opět v metodě onServerStop() po vypnutí client dispatchera:

writerThread.shutdown();

Dále musíme upravit třídu Client, aby měla přístup k zapisovacímu vláknu. Vytvoříme tedy i ve třídě Client instanční konstantu zapisovacího vlákna, ale budeme ji inicializovat v konstruktoru, který ji získá jako parametr:

Client(Socket socket, IWriterThread writerThread) throws IOException {
    this.socket = socket;
    writer = new ObjectOutputStream(socket.getOutputStream());
    this.writerThread = writerThread;
}

Vytvoření instance třídy Client při příchozím spojení nechám na laskavém čtenáři.

Nakonec musíme upravit továrnu na výrobu správce spojení, protože jsme opět upravili konstruktor třídy ConnectionManager. V továrně přidáme novou instanční konstantu typu IWriterThread, kterou budeme inicializovat v konstruktoru pomocí parametru:

private final IWriterThread writerThread;

@Inject
public ConnectionManagerFactory(IClientDispatcherFactory clientDispatcherFactory,
    IWriterThread writerThread) {
    this.clientDispatcherFactory = clientDispatcherFactory;
    this.writerThread = writerThread;
}

Vytvoření instance by již nemělo dělat problémy:

return new ConnectionManager(clientDispatcherFactory.getClientDispatcher(waitingQueueSize), writerThread, pool, maxClients);

To by bylo z dnešní lekce vše. Funkčnost jsme nezměnili, pouze vylepšili odezvu serveru.

V následujícím kvízu, Kvíz - Správa klientů a spojení, zapisovací vlákno v Javě, si vyzkoušíme nabyté zkušenosti z předchozích lekcí.


 

Měl jsi s čímkoli problém? Stáhni si vzorovou aplikaci níže a porovnej ji se svým projektem, chybu tak snadno najdeš.

Stáhnout

Stažením následujícího souboru souhlasíš s licenčními podmínkami

Staženo 18x (138.86 kB)
Aplikace je včetně zdrojových kódů v jazyce Java

 

Předchozí článek
Java server - Client dispatcher
Všechny články v sekci
Server pro klientské aplikace v Javě
Přeskočit článek
(nedoporučujeme)
Kvíz - Správa klientů a spojení, zapisovací vlákno v Javě
Článek pro vás napsal Petr Štechmüller
Avatar
Uživatelské hodnocení:
1 hlasů
Autor se věnuje primárně programování v Javě, ale nebojí se ani webových technologií.
Aktivity