Lekce 7 - Java server - Zapisovací vlákno
V minulé lekci, Java server - Client dispatcher, jsme se zabývali čekací frontou s klienty.
Dnes implementujeme odesílání zpráv klientovi pomocí samostatného vlákna. Dosáhneme tak pocitu lepší odezvy serveru.
Zapisovací vlákno
Až dosud jsme odesílali zprávy ze serveru ve vlákně, které zrovna zpracovávalo příchozí data. Tomu se říká synchronní zpracování. Problém v tomto případě je, že se jedná o I/O operaci, což je časově náročná činnost. Mnohem lepší řešení je zpracovávat veškeré tyto činnosti v samostatném vlákně. My si za tímto účelem vytvoříme vlákno, které bude mít jediný úkol: odeslat zprávy ze serveru klientovi.
V balíčku core
vytvoříme nový balíček
writer
, ve kterém budeme vytvářet třídy týkající se
zapisovacího vlákna. Opět začneme vytvořením rozhraní, kterým
nadefinujeme metody. Vytvoříme rozhraní
IWriterThread
a necháme ho dědit od rozhraní
IThreadControl
. Rozhraní bude mít jedinou metodu
sendMessage()
. Metoda bude přijímat dva parametry:
ObjectOutputStream
, do kterého se zpráva
zapíše, a Object
, který je samotná
zpráva:
public interface IWriterThread extends IThreadControl { void sendMessage(ObjectOutputStream writer, Object message); }
Dále vytvoříme třídu WriterThread
, která
bude dědit ze třídy Thread
a implementovat
výše definované rozhraní:
public class WriterThread extends Thread implements IWriterThread { }
V třídě si nadefinujeme dvě instanční konstanty a dvě proměnné:
private final Semaphore semaphore = new Semaphore(0); private final Queue<QueueTuple> messageQueue = new ConcurrentLinkedQueue<>(); private boolean working = false; private boolean interrupt = false;
Princip zde přítomného semaforu je stejný jako v předchozí lekci.
Vlákno bude na semaforu spát, dokud nedostane signál, že má pracovat.
Fronta zpráv messageQueue
obsahuje zprávy, které se budou
posílat. Zatím je typovaná na neznámý typ QueueTuple
, který
si záhy vytvoříme. Proměnná working
říká, zda-li vlákno
pracuje, nebo spí na semaforu. Proměnná interrupt
má stejný
účel jako v předchozích lekcích, kdy jsme vytvářeli třídy, které
dědí od třídy Thread
.
Konstruktor třídy necháme tentokrát prázdný. Pouze za účelem lepší
orientace dáme vláknu lepší název. V konstruktoru zavoláme konstruktor
třídy Thread
s jedním přetížením, které
jako parametr bere název vlákna:
public WriterThread() { super("WriterThread"); }
Metody vlákna
Rozhraní nám říká, že musíme implementovat dvě metody:
sendMessage()
a shutdown()
. Podíváme se na první z
nich. Odeslání zprávy nebude nic jiného, než přidání zprávy do fronty
zpráv a v případě, že vlákno nepracuje, tak ho probudíme a nastavíme
příznak working
na hodnotu true
:
@Override public void sendMessage(ObjectOutputStream writer, Object message) { messageQueue.add(new QueueTuple(outputStream, message)); if (!working) { working = true; semaphore.release(); } }
Metoda shutdown()
bude stejná jako v minulé lekci:
@Override public void shutdown() { interrupt = true; semaphore.release(); try { join(); } catch (InterruptedException ignored) {} }
Nastavíme příznak interrupt
na hodnotu
true
, uvolníme semafor a počkáme, až se
zapisovací vlákno ukončí.
Výkonný kód vlákna
Nyní implementujeme samotný kód, který se postará o odeslání zpráv v
našem vlákně. Metoda run()
vypadá následovně:
@Override public void run() { while(!interrupt) { while(messageQueue.isEmpty() && !interrupt) { try { semaphore.acquire(); } catch (InterruptedException ignored) {} } working = true; while(!messageQueue.isEmpty()) { final QueueTuple entry = messageQueue.poll(); try { entry.writer.writeLine(entry.message); entry.writer.flush(); } catch (IOException e) { e.printStackTrace(); } } working = false; } }
Kód je velmi podobný tomu, co jsme implementovali u client dispatchera.
Máme zde nekonečnou smyčku závisející na proměnné
interrupt
. Následuje další nekonečná smyčka pro čekání
vlákna na semaforu. Proč je čekání vlákna ve smyčce jsme rozebírali
opět v minulé lekci. V další smyčce se iteruje přes frontu zpráv a
postupně se odešlou všechny zprávy, které se nashromáždily. Když ve
frontě nejsou žádné zprávy, jde vlákno opět odpočívat na semafor.
Zbývá již jen nadefinovat třídu
QueueTuple
. Jedná se o jednoduchou přepravku,
která bude obsahovat dvě konstanty. Není náhoda, že tyto konstanty se
získají v metodě sendMessage()
. Kód celé třídy je zde:
private static final class QueueTuple { final Object message; final ObjectOutputStream writer; private QueueTuple(ObjectOutputStream writer, Object message) { this.message = message; this.writer = writer; } }
Třídu umístěte do třídy WriterThread
,
protože nikde jinde ji používat nebudeme.
Propojení logiky
Ve třídě ConnectionManager
vytvoříme
instanční konstantu zapisovacího vlákna, kterou budeme inicializovat v
konstruktoru z parametru:
private final IWriterThread writerThread; public ConnectionManager(IClientDispatcher clientDispatcher, IWriterThread writerThread, ExecutorService pool, int maxClients) { this.clientDispatcher = clientDispatcher; this.writerThread = writerThread; this.pool = pool; this.maxClients = maxClients; }
Spuštění vlákna provedeme v metodě onServerStart()
ve
správci spojení hned po zapnutí client dispatchera:
writerThread.start();
Ukončení vlákna provedeme opět v metodě onServerStop()
po
vypnutí client dispatchera:
writerThread.shutdown();
Dále musíme upravit třídu Client
, aby měla
přístup k zapisovacímu vláknu. Vytvoříme tedy i ve třídě
Client
instanční konstantu zapisovacího
vlákna, ale budeme ji inicializovat v konstruktoru, který ji získá jako
parametr:
Client(Socket socket, IWriterThread writerThread) throws IOException { this.socket = socket; writer = new ObjectOutputStream(socket.getOutputStream()); this.writerThread = writerThread; }
Vytvoření instance třídy Client
při
příchozím spojení nechám na laskavém čtenáři.
Nakonec musíme upravit továrnu na výrobu správce spojení, protože jsme
opět upravili konstruktor třídy
ConnectionManager
. V továrně přidáme novou
instanční konstantu typu IWriterThread
, kterou
budeme inicializovat v konstruktoru pomocí parametru:
private final IWriterThread writerThread; @Inject public ConnectionManagerFactory(IClientDispatcherFactory clientDispatcherFactory, IWriterThread writerThread) { this.clientDispatcherFactory = clientDispatcherFactory; this.writerThread = writerThread; }
Vytvoření instance by již nemělo dělat problémy:
return new ConnectionManager(clientDispatcherFactory.getClientDispatcher(waitingQueueSize), writerThread, pool, maxClients);
To by bylo z dnešní lekce vše. Funkčnost jsme nezměnili, pouze vylepšili odezvu serveru.
V následujícím kvízu, Kvíz - Správa klientů a spojení, zapisovací vlákno v Javě, si vyzkoušíme nabyté zkušenosti z předchozích lekcí.
Měl jsi s čímkoli problém? Stáhni si vzorovou aplikaci níže a porovnej ji se svým projektem, chybu tak snadno najdeš.
Stáhnout
Stažením následujícího souboru souhlasíš s licenčními podmínkami
Staženo 20x (138.86 kB)
Aplikace je včetně zdrojových kódů v jazyce Java