Lekce 18 - Java chat - Klient - Spojení se serverem 2. část
V minulé lekci, Java chat - Klient - Spojení se serverem 1. část, jsme si mimo jiné vytvořili rozhraní pro klientský komunikátor.
V dnešním Java tutoriálu jej implementujeme.
Klientský komunikátor
V balíčku service
vytvoříme novou třídu
ClientCommunicationService
a necháme ji implementovat rozhraní
IClientCommunicationService
:
public class ClientCommunicationService implements IClientCommunicationService { }
Proměnné a konstanty
Do třídy přidáme následující proměnné a konstanty:
private final ObjectProperty<Socket> socket = new SimpleObjectProperty<>(this, "socket", null); private final ReadOnlyObjectWrapper<ConnectionState> connectionState = new ReadOnlyObjectWrapper<>(this, "connectionState", ConnectionState.DISCONNECTED); private final HashMap<String, List<OnDataReceivedListener>> listeners = new HashMap<>(); private final StringProperty host = new SimpleStringProperty(this, "host", null); private final IntegerProperty port = new SimpleIntegerProperty(this, "port", -1); private final StringProperty connectedServerName = new SimpleStringProperty(this, "connectedServerName", null); private final ObjectProperty<ServerStatus> serverStatus = new SimpleObjectProperty<>(this, "serverStatus", ServerStatus.EMPTY); private final Queue<Request> requests = new LinkedBlockingQueue<>(); private ReaderThread readerThread; private WriterThread writerThread;
Všechny konstanty jsou samopopisující, takže k jejich popisu nebudu
dávat žádný komentář. Za zmínku stojí konstanta socket
,
která je zabalena do třídy ObjectProperty
. Tím získáváme
možnost pozorovat změnu hodnoty. Zajímavá je taky fronta
requests
, pomocí které budeme realizovat komunikaci typu
request-responce. Třídu Request
vytvoříme
později. Proměnné readerThread
a writerThread
budou
obsahovat čtecí a zapisovací vlákno. Tyto proměnné inicializujeme až při
pokusu o vytvoření nového spojení.
Konstruktor
Konstruktor třídy nebude vyžadovat žádný parametr. V konstruktoru se nastaví listener na socket a vytvoří se binding na název serveru, který se bude mít formát: "název:port":
public ClientCommunicationService() { socket.addListener(this::socketListener); connectedServerName.bind(Bindings.createStringBinding(() -> String.format("%s:%d", host.get(), port.get()), host, port, connectionState)); }
Posluchač změny stavu socketu
Vytvoříme privátní metodu socketListener()
, kterou jsme
registrovali v konstruktoru. V této metodě budeme inicializovat/rušit
čtecí/zapisovací vlákno:
private void socketListener(ObservableValue<? extends Socket> observableValue, Socket oldSocket, Socket newSocket) { if (newSocket == null) { readerThread = null; writerThread = null; unregisterMessageObserver(ServerStatusMessage.MESSAGE_TYPE, this.serverStatusListener); return; } try { readerThread = new ReaderThread(newSocket.getInputStream(), listener, this::disconnect); writerThread = new WriterThread(newSocket.getOutputStream(), this::disconnect); readerThread.start(); writerThread.start(); registerMessageObserver(ServerStatusMessage.MESSAGE_TYPE, this.serverStatusListener); } catch (IOException e) { System.out.println("Vyskytl se problém při vytváření komunikace se serverem."); } }
Metoda se skládá ze dvou částí. V horní části se zpracovává
případ, kdy bylo z nějakého důvodu spojení ukončeno a je potřeba
odstranit instance starého čtecího/zapisovacího vlákna. Ve zbytku metody
se předpokládá, že proměnná newSocket
obsahuje nový validní
socket, který se vytvořil na základě nově vytvořeného spojení. Vytvoří
se nové instance čtecího a zapisovacího vlákna a metodou
start()
se spustí. Metodu
(un)registerMessageObserver()
si vysvětlíme, až ji budeme
implementovat.
Delegace zpracování přijatých zpráv
Do třídy přidáme další instanční konstantu, která bude obsahovat anonymní funkci starající se o rozeslání přijatých zpráv zaregistrovaným pozorovatelům:
private final OnDataReceivedListener listener = message -> { if (message.isResponce()) { final Request poll = requests.poll(); if (poll != null) { poll.onResponce(message); } return; } final List<OnDataReceivedListener> listenerList = listeners.get(message.getType()); if (listenerList == null) { return; } for (OnDataReceivedListener listener : listenerList) { listener.onDataReceived(message); } };
Na začátku metody se podíváme, zda-li je přijatá zpráva odpověď na
nějaký požadavek. Pokud ano, vyzvedne se a zavolá se obsluha zprávy z
fronty requests
. Pokud se jedná o běžnou zprávu, získáme z
mapy listeners
všechny listenery a postupně jim oznámíme, že
mají zpracovat přijatou zprávu.
Přihlášení/odhlášení odběru zpráv
Další metody, které musíme podle rozhraní implementovat, jsou metody pro
přihlášení a odhlášení odběru zpráv. Tyto metody budou modifikovat mapu
listeners
:
@Override public synchronized void registerMessageObserver(String messageType, OnDataReceivedListener listener) { List<OnDataReceivedListener> listenerList = listeners.computeIfAbsent(messageType, k -> new ArrayList<>()); listenerList.add(listener); } @Override public synchronized void unregisterMessageObserver(String messageType, OnDataReceivedListener listener) { List<OnDataReceivedListener> listenerList = listeners.get(messageType); if (listenerList == null) { return; } listenerList.remove(listener); }
Při registraci posluchače využijeme metodu computeIfAbsent()
,
která se podívá do mapy a pokud na zadaném klíči neexistuje hodnota, tak
ji vytvoří.
Navázání spojení
Konečně se dostáváme k nejdůležitějším metodám celého
komunikátoru. Začneme implementaci metody connect()
, kde poprvé
použijeme třídu CompletableFuture
:
@Override public CompletableFuture <Void> connect(String host, int port) { if (isConnected()) { throw new RuntimeException("Spojení je již vytvořeno."); } changeState(ConnectionState.CONNECTING); return CompletableFuture.supplyAsync(() -> { final Socket socket = new Socket(); try { socket.connect(new InetSocketAddress(host, port), 3000); return socket; } catch (IOException e) { return null; } }, ThreadPool.COMMON_EXECUTOR) .thenApplyAsync(socket -> { this.socket.set(socket); if (socket != null) { this.host.set(host); this.port.set(port); } else { changeState(ConnectionState.DISCONNECTED); this.host.set(null); this.port.set(-1); } if (socket == null) { throw new RuntimeException("Spojení se nepodařilo vytvořit."); } return null; }, ThreadPool.JAVAFX_EXECUTOR); }
Metoda je opět rozdělena do logických celků. V první části se
podíváme, zda-li jsme již připojení k serveru. Pokud jsme připojeni,
vyhodíme výjimku. RuntimeException
nebudeme muset ani
ošetřovat, pouze se nám zapíše do konzole. Důležité je, že aplikace se
neukončí. Metodou changeState()
informujeme okolí, že se
pokoušíme připojit k serveru.
V druhé části metody vytvoříme budoucnost, ve které se pokusíme
vytvořit samotné spojení se serverem voláním metody
socket.connect()
. Konstantou
ThreadPool.COMMON_EXECUTOR
nastavíme, že připojení se má
provést v samostatném vlákně. Pokud se úspěšně spojíme se serverem,
vrátíme socket. Metodou thenApplyAsync()
"transformujeme" socket
na výsledek.
Ve třetí části uložíme socket voláním příkazu
this.socket.set(socket)
. Tím se mimo jiné zavolá
changeListener
a vytvoří/odstraní se čtecí a zapisovací
vlákno. Celá třetí část se musí odehrát v JavaFX vlákně, protože na
některé pozorovatelné konstanty později navěsíme grafické komponenty,
které jak jsem již říkal minule, se mohou aktualizovat pouze v JavaFX
vlákně, jinak se vyhodí výjimka.
Ukončení spojení
Spojení budeme ukončovat metodou disconnect()
. Metoda bude
mít za úkol řádně ukončit čtecí a zapisovací vlákno:
public CompletableFuture<Boolean> disconnect() { if (!isConnected()) { return CompletableFuture.completedFuture(false); } return CompletableFuture.supplyAsync(() -> { try { socket.get().close(); readerThread.shutdown(); writerThread.shutdown(); } catch (IOException e) { e.printStackTrace(); return false; } return true; }, ThreadPool.COMMON_EXECUTOR) .thenApplyAsync(success -> { if (success) { this.socket.set(null); changeState(ConnectionState.DISCONNECTED); } return success; }, ThreadPool.JAVAFX_EXECUTOR); }
Pokud se úspěšně ukončí spojení, tak se pomocí příkazu
this.socket.set(null)
vymaže čtecí a zapisovací vlákno a
komunikátor se dostane do stavu DISCONNECTED
.
Odesílání zpráv
Zprávy budeme odesílat dvojího typu:
- bez čekání na výsledek
- s čekáním na výsledek
Metoda bez čekání na výsledek bude velmi jednoduchá. Vezme zprávu, předá ji zapisovacímu vláknu a o nic víc se nestará:
public synchronized void sendMessage(IMessage message) { if (writerThread != null) { writerThread.addMessageToQueue(message); } }
Odeslání zprávy s čekáním na výsledek má jeden problém, který musíme vyřešit. Tím problémem je ono čekání na odpověď ze serveru:
public synchronized CompletableFuture<IMessage> sendMessageFuture(IMessage message) { return CompletableFuture.supplyAsync(() -> { sendMessage(message); return null; }) .thenCompose(ignored -> { Request request = new Request(); requests.add(request); return request.getFuture(); }); }
Metoda, která bude odesílat zprávu s odpovědí, bude vracet budoucnost,
ve které odpověď přijde. Nejdříve se standartně odešle zpráva a pak se
zavolá nová metoda thenCompose()
. Tato metoda vlastně říká,
že výsledek budoucnosti získáme z jiné instance třídy
CompletableFuture
. Tuto jinou instanci získáme voláním metody
getFuture()
nad instancí třídy Request
, kterou si
za okamžik nadefinujeme.
Request-response zprávy
Vytvoříme podpůrnou třídu, která nám zajistí čekání na odpověď
ze serveru. Vytvoříme tedy novou třídu Request
a vše by
najednou mělo začít dávat smysl:
class Request { private final Object lock = new Object(); private boolean waiting = true; private IMessage responce; CompletableFuture<IMessage> getFuture() { return CompletableFuture.supplyAsync(() -> { while (waiting) { synchronized (lock) { try { lock.wait(); } catch (InterruptedException ignored) {} } } return responce; }); } void onResponce(IMessage message) { this.responce = message; waiting = false; synchronized (lock) { lock.notify(); } } }
Třída má pouze dvě metody: getFuture()
a
onResponce()
. První metoda vytvoří budoucnost, ve které se
vlákno uspí voláním metody wait()
. Jediný, kdo může tuto
budoucnost probudit, je metoda onResponce()
, která je zavolána,
když je přijata odpoveď ze serveru. Tímto jednoduchým trikem vytvoříme
dojem komunikace request-response.
Na konec jen doplním implementaci zbylých metod, které vyžaduje rozhraní:
@Override public ConnectionState getConnectionState() { return connectionState.get(); } @Override public ReadOnlyObjectProperty<ConnectionState> connectionStateProperty() { return connectionState.getReadOnlyProperty(); } @Override public String getConnectedServerName() { return connectedServerName.get(); }
To by pro tuto delší lekci vše.
Příště, v lekci Java chat - Klient - Spojení se serverem 3. část, již navážeme spojení se serverem.
Měl jsi s čímkoli problém? Stáhni si vzorovou aplikaci níže a porovnej ji se svým projektem, chybu tak snadno najdeš.
Stáhnout
Stažením následujícího souboru souhlasíš s licenčními podmínkami
Staženo 14x (120.98 kB)
Aplikace je včetně zdrojových kódů v jazyce Java