Vánoční nadílka Vánoční nadílka
Vánoční akce! Daruj lepší budoucnost blízkým nebo sobě. Až +50 % zdarma na dárkové poukazy. Více informací

Lekce 7 - Java server - Zapisovací vlákno

Java Server pro klientské aplikace Java server - Zapisovací vlákno

ONEbit hosting Unicorn College Tento obsah je dostupný zdarma v rámci projektu IT lidem. Vydávání, hosting a aktualizace umožňují jeho sponzoři.

V minulé lekci, Java server - Client dispatcher, jsme se zabývali čekací frontou s klienty. Dnes implementujeme odesílání zpráv klientovi pomocí samostatného vlákna. Dosáhneme tak pocitu lepší odezvy serveru.

Zapisovací vlákno

Až dosud jsme odesílali zprávy ze serveru ve vlákně, které zrovna zpracovávalo příchozí data. Tomu se říká synchronní zpracování. Problém v tomto případě je, že se jedná o I/O operaci, což je časově náročná činnost. Mnohem lepší řešení je zpracovávat veškeré tyto činnosti v samostatném vlákně. My si za tímto účelem vytvoříme vlákno, které bude mít jediný úkol: odeslat zprávy ze serveru klientovi.

V balíčku core vytvoříme nový balíček writer, ve kterém budeme vytvářet třídy týkající se zapisovacího vlákna. Opět začneme vytvořením rozhraní, kterým nadefinujeme metody. Vytvoříme rozhraní IWriterThread a necháme ho dědit od rozhraní IThreadControl. Rozhraní bude mít jedinou metodu sendMessage(). Metoda bude přijímat dva parametry: ObjectOutputStream, do kterého se zpráva zapíše, a Object , který je samotná zpráva:

public interface IWriterThread extends IThreadControl {
    void sendMessage(ObjectOutputStream writer, Object message);
}

Dále vytvoříme třídu WriterThread, která bude dědit ze třídy Thread a implementovat výše definované rozhraní:

public class WriterThread extends Thread implements IWriterThread {
}

V třídě si nadefinujeme dvě instanční konstanty a dvě proměnné:

private final Semaphore semaphore = new Semaphore(0);
private final Queue<QueueTuple> messageQueue = new ConcurrentLinkedQueue<>();
private boolean working = false;
private boolean interrupt = false;

Princip zde přítomného semaforu je stejný jako v předchozí lekci. Vlákno bude na semaforu spát, dokud nedostane signál, že má pracovat. Fronta zpráv messageQueue obsahuje zprávy, které se budou posílat. Zatím je typovaná na neznámý typ QueueTuple, který si záhy vytvoříme. Proměnná working říká, zda-li vlákno pracuje, nebo spí na semaforu. Proměnná interrupt má stejný účel jako v předchozích lekcích, kdy jsme vytvářeli třídy, které dědí od třídy Thread.

Konstruktor třídy necháme tentokrát prázdný. Pouze za účelem lepší orientace dáme vláknu lepší název. V konstruktoru zavoláme konstruktor třídy Thread s jedním přetížením, které jako parametr bere název vlákna:

public WriterThread() {
    super("WriterThread");
}

Metody vlákna

Rozhraní nám říká, že musíme implementovat dvě metody: sendMessage() a shutdown(). Podíváme se na první z nich. Odeslání zprávy nebude nic jiného, než přidání zprávy do fronty zpráv a v případě, že vlákno nepracuje, tak ho probudíme a nastavíme příznak working na hodnotu true:

@Override
public void sendMessage(ObjectOutputStream writer, Object message) {
    messageQueue.add(new QueueTuple(outputStream, message));
    if (!working) {
        working = true;
        semaphore.release();
    }
}

Metoda shutdown() bude stejná jako v minulé lekci:

@Override
public void shutdown() {
    interrupt = true;
    semaphore.release();
    try {
        join();
    } catch (InterruptedException ignored) {}
}

Nastavíme příznak interrupt na hodnotu true, uvolníme semafor a počkáme, až se zapisovací vlákno ukončí.

Výkonný kód vlákna

Nyní implementujeme samotný kód, který se postará o odeslání zpráv v našem vlákně. Metoda run() vypadá následovně:

@Override
public void run() {
    while(!interrupt) {
        while(messageQueue.isEmpty() && !interrupt) {
            try {
                semaphore.acquire();
            } catch (InterruptedException ignored) {}
        }
        working = true;
        while(!messageQueue.isEmpty()) {
            final QueueTuple entry = messageQueue.poll();
            try {
                entry.writer.writeLine(entry.message);
                entry.writer.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        working = false;
    }
}

Kód je velmi podobný tomu, co jsme implementovali u client dispatchera. Máme zde nekonečnou smyčku závisející na proměnné interrupt. Následuje další nekonečná smyčka pro čekání vlákna na semaforu. Proč je čekání vlákna ve smyčce jsme rozebírali opět v minulé lekci. V další smyčce se iteruje přes frontu zpráv a postupně se odešlou všechny zprávy, které se nashromáždily. Když ve frontě nejsou žádné zprávy, jde vlákno opět odpočívat na semafor.

Zbývá již jen nadefinovat třídu QueueTuple. Jedná se o jednoduchou přepravku, která bude obsahovat dvě konstanty. Není náhoda, že tyto konstanty se získají v metodě sendMessage(). Kód celé třídy je zde:

private static final class QueueTuple {
    final Object message;
    final ObjectOutputStream writer;

    private QueueTuple(ObjectOutputStream writer, Object message) {
        this.message = message;
        this.writer = writer;
    }
}

Třídu umístěte do třídy WriterThread, protože nikde jinde ji používat nebudeme.

Propojení logiky

Ve třídě ConnectionManager vytvoříme instanční konstantu zapisovacího vlákna, kterou budeme inicializovat v konstruktoru z parametru:

private final IWriterThread writerThread;
public ConnectionManager(IClientDispatcher clientDispatcher, IWriterThread writerThread,
    ExecutorService pool, int maxClients) {
    this.clientDispatcher = clientDispatcher;
    this.writerThread = writerThread;
    this.pool = pool;
    this.maxClients = maxClients;
}

Spuštění vlákna provedeme v metodě onServerStart() ve správci spojení hned po zapnutí client dispatchera:

writerThread.start();

Ukončení vlákna provedeme opět v metodě onServerStop() po vypnutí client dispatchera:

writerThread.shutdown();

Dále musíme upravit třídu Client, aby měla přístup k zapisovacímu vláknu. Vytvoříme tedy i ve třídě Client instanční konstantu zapisovacího vlákna, ale budeme ji inicializovat v konstruktoru, který ji získá jako parametr:

Client(Socket socket, IWriterThread writerThread) throws IOException {
    this.socket = socket;
    writer = new ObjectOutputStream(socket.getOutputStream());
    this.writerThread = writerThread;
}

Vytvoření instance třídy Client při příchozím spojení nechám na laskavém čtenáři.

Nakonec musíme upravit továrnu na výrobu správce spojení, protože jsme opět upravili konstruktor třídy ConnectionManager. V továrně přidáme novou instanční konstantu typu IWriterThread, kterou budeme inicializovat v konstruktoru pomocí parametru:

private final IWriterThread writerThread;

@Inject
public ConnectionManagerFactory(IClientDispatcherFactory clientDispatcherFactory,
    IWriterThread writerThread) {
    this.clientDispatcherFactory = clientDispatcherFactory;
    this.writerThread = writerThread;
}

Vytvoření instance by již nemělo dělat problémy:

return new ConnectionManager(clientDispatcherFactory.getClientDispatcher(waitingQueueSize), writerThread, pool, maxClients);

To by bylo z dnešní lekce vše. Funkčnost jsme nezměnili, pouze vylepšili odezvu serveru. Příště, v lekci Java server - Komunikační protokol, navrhneme komunikační protokol.


 

Stáhnout

Staženo 2x (132.33 kB)
Aplikace je včetně zdrojových kódů v jazyce Java

 

 

Článek pro vás napsal Petr Štechmüller
Avatar
Jak se ti líbí článek?
Ještě nikdo nehodnotil, buď první!
Autor se věnuje primárně programování v Jave, ale nebojí se ani webových technologií.
Miniatura
Předchozí článek
Java server - Client dispatcher
Miniatura
Všechny články v sekci
Server pro klientské aplikace v Javě
Miniatura
Následující článek
Java server - Komunikační protokol
Aktivity (2)

 

 

Komentáře

Děláme co je v našich silách, aby byly zdejší diskuze co nejkvalitnější. Proto do nich také mohou přispívat pouze registrovaní členové. Pro zapojení do diskuze se přihlas. Pokud ještě nemáš účet, zaregistruj se, je to zdarma.

Zatím nikdo nevložil komentář - buď první!