Lekce 18 - Java chat - Klient - Spojení se serverem 2. část
V minulé lekci, Java chat - Klient - Spojení se serverem 1. část, jsme si mimo jiné vytvořili rozhraní pro klientský komunikátor.
V dnešním Java tutoriálu jej implementujeme.
Klientský komunikátor
V balíčku service vytvoříme novou třídu
ClientCommunicationService a necháme ji implementovat rozhraní
IClientCommunicationService:
public class ClientCommunicationService implements IClientCommunicationService { }
Proměnné a konstanty
Do třídy přidáme následující proměnné a konstanty:
private final ObjectProperty<Socket> socket = new SimpleObjectProperty<>(this, "socket", null); private final ReadOnlyObjectWrapper<ConnectionState> connectionState = new ReadOnlyObjectWrapper<>(this, "connectionState", ConnectionState.DISCONNECTED); private final HashMap<String, List<OnDataReceivedListener>> listeners = new HashMap<>(); private final StringProperty host = new SimpleStringProperty(this, "host", null); private final IntegerProperty port = new SimpleIntegerProperty(this, "port", -1); private final StringProperty connectedServerName = new SimpleStringProperty(this, "connectedServerName", null); private final ObjectProperty<ServerStatus> serverStatus = new SimpleObjectProperty<>(this, "serverStatus", ServerStatus.EMPTY); private final Queue<Request> requests = new LinkedBlockingQueue<>(); private ReaderThread readerThread; private WriterThread writerThread;
Všechny konstanty jsou samopopisující, takže k jejich popisu nebudu
dávat žádný komentář. Za zmínku stojí konstanta socket,
která je zabalena do třídy ObjectProperty. Tím získáváme
možnost pozorovat změnu hodnoty. Zajímavá je taky fronta
requests, pomocí které budeme realizovat komunikaci typu
request-responce. Třídu Request vytvoříme
později. Proměnné readerThread a writerThread budou
obsahovat čtecí a zapisovací vlákno. Tyto proměnné inicializujeme až při
pokusu o vytvoření nového spojení.
Konstruktor
Konstruktor třídy nebude vyžadovat žádný parametr. V konstruktoru se nastaví listener na socket a vytvoří se binding na název serveru, který se bude mít formát: "název:port":
public ClientCommunicationService() { socket.addListener(this::socketListener); connectedServerName.bind(Bindings.createStringBinding(() -> String.format("%s:%d", host.get(), port.get()), host, port, connectionState)); }
Posluchač změny stavu socketu
Vytvoříme privátní metodu socketListener(), kterou jsme
registrovali v konstruktoru. V této metodě budeme inicializovat/rušit
čtecí/zapisovací vlákno:
private void socketListener(ObservableValue<? extends Socket> observableValue, Socket oldSocket, Socket newSocket) { if (newSocket == null) { readerThread = null; writerThread = null; unregisterMessageObserver(ServerStatusMessage.MESSAGE_TYPE, this.serverStatusListener); return; } try { readerThread = new ReaderThread(newSocket.getInputStream(), listener, this::disconnect); writerThread = new WriterThread(newSocket.getOutputStream(), this::disconnect); readerThread.start(); writerThread.start(); registerMessageObserver(ServerStatusMessage.MESSAGE_TYPE, this.serverStatusListener); } catch (IOException e) { System.out.println("Vyskytl se problém při vytváření komunikace se serverem."); } }
Metoda se skládá ze dvou částí. V horní části se zpracovává
případ, kdy bylo z nějakého důvodu spojení ukončeno a je potřeba
odstranit instance starého čtecího/zapisovacího vlákna. Ve zbytku metody
se předpokládá, že proměnná newSocket obsahuje nový validní
socket, který se vytvořil na základě nově vytvořeného spojení. Vytvoří
se nové instance čtecího a zapisovacího vlákna a metodou
start() se spustí. Metodu
(un)registerMessageObserver() si vysvětlíme, až ji budeme
implementovat.
Delegace zpracování přijatých zpráv
Do třídy přidáme další instanční konstantu, která bude obsahovat anonymní funkci starající se o rozeslání přijatých zpráv zaregistrovaným pozorovatelům:
private final OnDataReceivedListener listener = message -> { if (message.isResponce()) { final Request poll = requests.poll(); if (poll != null) { poll.onResponce(message); } return; } final List<OnDataReceivedListener> listenerList = listeners.get(message.getType()); if (listenerList == null) { return; } for (OnDataReceivedListener listener : listenerList) { listener.onDataReceived(message); } };
Na začátku metody se podíváme, zda-li je přijatá zpráva odpověď na
nějaký požadavek. Pokud ano, vyzvedne se a zavolá se obsluha zprávy z
fronty requests. Pokud se jedná o běžnou zprávu, získáme z
mapy listeners všechny listenery a postupně jim oznámíme, že
mají zpracovat přijatou zprávu.
Přihlášení/odhlášení odběru zpráv
Další metody, které musíme podle rozhraní implementovat, jsou metody pro
přihlášení a odhlášení odběru zpráv. Tyto metody budou modifikovat mapu
listeners:
@Override public synchronized void registerMessageObserver(String messageType, OnDataReceivedListener listener) { List<OnDataReceivedListener> listenerList = listeners.computeIfAbsent(messageType, k -> new ArrayList<>()); listenerList.add(listener); } @Override public synchronized void unregisterMessageObserver(String messageType, OnDataReceivedListener listener) { List<OnDataReceivedListener> listenerList = listeners.get(messageType); if (listenerList == null) { return; } listenerList.remove(listener); }
Při registraci posluchače využijeme metodu computeIfAbsent(),
která se podívá do mapy a pokud na zadaném klíči neexistuje hodnota, tak
ji vytvoří.
Navázání spojení
Konečně se dostáváme k nejdůležitějším metodám celého
komunikátoru. Začneme implementaci metody connect(), kde poprvé
použijeme třídu CompletableFuture:
@Override public CompletableFuture <Void> connect(String host, int port) { if (isConnected()) { throw new RuntimeException("Spojení je již vytvořeno."); } changeState(ConnectionState.CONNECTING); return CompletableFuture.supplyAsync(() -> { final Socket socket = new Socket(); try { socket.connect(new InetSocketAddress(host, port), 3000); return socket; } catch (IOException e) { return null; } }, ThreadPool.COMMON_EXECUTOR) .thenApplyAsync(socket -> { this.socket.set(socket); if (socket != null) { this.host.set(host); this.port.set(port); } else { changeState(ConnectionState.DISCONNECTED); this.host.set(null); this.port.set(-1); } if (socket == null) { throw new RuntimeException("Spojení se nepodařilo vytvořit."); } return null; }, ThreadPool.JAVAFX_EXECUTOR); }
Metoda je opět rozdělena do logických celků. V první části se
podíváme, zda-li jsme již připojení k serveru. Pokud jsme připojeni,
vyhodíme výjimku. RuntimeException nebudeme muset ani
ošetřovat, pouze se nám zapíše do konzole. Důležité je, že aplikace se
neukončí. Metodou changeState() informujeme okolí, že se
pokoušíme připojit k serveru.
V druhé části metody vytvoříme budoucnost, ve které se pokusíme
vytvořit samotné spojení se serverem voláním metody
socket.connect(). Konstantou
ThreadPool.COMMON_EXECUTOR nastavíme, že připojení se má
provést v samostatném vlákně. Pokud se úspěšně spojíme se serverem,
vrátíme socket. Metodou thenApplyAsync() "transformujeme" socket
na výsledek.
Ve třetí části uložíme socket voláním příkazu
this.socket.set(socket). Tím se mimo jiné zavolá
changeListener a vytvoří/odstraní se čtecí a zapisovací
vlákno. Celá třetí část se musí odehrát v JavaFX vlákně, protože na
některé pozorovatelné konstanty později navěsíme grafické komponenty,
které jak jsem již říkal minule, se mohou aktualizovat pouze v JavaFX
vlákně, jinak se vyhodí výjimka.
Ukončení spojení
Spojení budeme ukončovat metodou disconnect(). Metoda bude
mít za úkol řádně ukončit čtecí a zapisovací vlákno:
public CompletableFuture<Boolean> disconnect() { if (!isConnected()) { return CompletableFuture.completedFuture(false); } return CompletableFuture.supplyAsync(() -> { try { socket.get().close(); readerThread.shutdown(); writerThread.shutdown(); } catch (IOException e) { e.printStackTrace(); return false; } return true; }, ThreadPool.COMMON_EXECUTOR) .thenApplyAsync(success -> { if (success) { this.socket.set(null); changeState(ConnectionState.DISCONNECTED); } return success; }, ThreadPool.JAVAFX_EXECUTOR); }
Pokud se úspěšně ukončí spojení, tak se pomocí příkazu
this.socket.set(null) vymaže čtecí a zapisovací vlákno a
komunikátor se dostane do stavu DISCONNECTED.
Odesílání zpráv
Zprávy budeme odesílat dvojího typu:
- bez čekání na výsledek
- s čekáním na výsledek
Metoda bez čekání na výsledek bude velmi jednoduchá. Vezme zprávu, předá ji zapisovacímu vláknu a o nic víc se nestará:
public synchronized void sendMessage(IMessage message) { if (writerThread != null) { writerThread.addMessageToQueue(message); } }
Odeslání zprávy s čekáním na výsledek má jeden problém, který musíme vyřešit. Tím problémem je ono čekání na odpověď ze serveru:
public synchronized CompletableFuture<IMessage> sendMessageFuture(IMessage message) { return CompletableFuture.supplyAsync(() -> { sendMessage(message); return null; }) .thenCompose(ignored -> { Request request = new Request(); requests.add(request); return request.getFuture(); }); }
Metoda, která bude odesílat zprávu s odpovědí, bude vracet budoucnost,
ve které odpověď přijde. Nejdříve se standartně odešle zpráva a pak se
zavolá nová metoda thenCompose(). Tato metoda vlastně říká,
že výsledek budoucnosti získáme z jiné instance třídy
CompletableFuture. Tuto jinou instanci získáme voláním metody
getFuture() nad instancí třídy Request, kterou si
za okamžik nadefinujeme.
Request-response zprávy
Vytvoříme podpůrnou třídu, která nám zajistí čekání na odpověď
ze serveru. Vytvoříme tedy novou třídu Request a vše by
najednou mělo začít dávat smysl:
class Request { private final Object lock = new Object(); private boolean waiting = true; private IMessage responce; CompletableFuture<IMessage> getFuture() { return CompletableFuture.supplyAsync(() -> { while (waiting) { synchronized (lock) { try { lock.wait(); } catch (InterruptedException ignored) {} } } return responce; }); } void onResponce(IMessage message) { this.responce = message; waiting = false; synchronized (lock) { lock.notify(); } } }
Třída má pouze dvě metody: getFuture() a
onResponce(). První metoda vytvoří budoucnost, ve které se
vlákno uspí voláním metody wait(). Jediný, kdo může tuto
budoucnost probudit, je metoda onResponce(), která je zavolána,
když je přijata odpoveď ze serveru. Tímto jednoduchým trikem vytvoříme
dojem komunikace request-response.
Na konec jen doplním implementaci zbylých metod, které vyžaduje rozhraní:
@Override public ConnectionState getConnectionState() { return connectionState.get(); } @Override public ReadOnlyObjectProperty<ConnectionState> connectionStateProperty() { return connectionState.getReadOnlyProperty(); } @Override public String getConnectedServerName() { return connectedServerName.get(); }
To by pro tuto delší lekci vše.
Příště, v lekci Java chat - Klient - Spojení se serverem 3. část, již navážeme spojení se serverem.
Měl jsi s čímkoli problém? Stáhni si vzorovou aplikaci níže a porovnej ji se svým projektem, chybu tak snadno najdeš.
Stáhnout
Stažením následujícího souboru souhlasíš s licenčními podmínkami
Staženo 15x (120.98 kB)
Aplikace je včetně zdrojových kódů v jazyce Java
