Vydělávej až 160.000 Kč měsíčně! Akreditované rekvalifikační kurzy s garancí práce od 0 Kč. Více informací.
Hledáme nové posily do ITnetwork týmu. Podívej se na volné pozice a přidej se do nejagilnější firmy na trhu - Více informací.

Lekce 18 - Java chat - Klient - Spojení se serverem 2. část

V minulé lekci, Java chat - Klient - Spojení se serverem 1. část, jsme si mimo jiné vytvořili rozhraní pro klientský komunikátor.

V dnešním Java tutoriálu jej implementujeme.

Klientský komunikátor

V balíčku service vytvoříme novou třídu ClientCommunicationService a necháme ji implementovat rozhraní IClientCommunicationService:

public class ClientCommunicationService implements IClientCommunicationService {

}

Proměnné a konstanty

Do třídy přidáme následující proměnné a konstanty:

private final ObjectProperty<Socket> socket = new SimpleObjectProperty<>(this, "socket", null);
    private final ReadOnlyObjectWrapper<ConnectionState> connectionState = new ReadOnlyObjectWrapper<>(this, "connectionState", ConnectionState.DISCONNECTED);
    private final HashMap<String, List<OnDataReceivedListener>> listeners = new HashMap<>();
    private final StringProperty host = new SimpleStringProperty(this, "host", null);
    private final IntegerProperty port = new SimpleIntegerProperty(this, "port", -1);
    private final StringProperty connectedServerName = new SimpleStringProperty(this, "connectedServerName", null);
    private final ObjectProperty<ServerStatus> serverStatus = new SimpleObjectProperty<>(this, "serverStatus", ServerStatus.EMPTY);
    private final Queue<Request> requests = new LinkedBlockingQueue<>();

    private ReaderThread readerThread;
    private WriterThread writerThread;

Všechny konstanty jsou samopopisující, takže k jejich popisu nebudu dávat žádný komentář. Za zmínku stojí konstanta socket, která je zabalena do třídy ObjectProperty. Tím získáváme možnost pozorovat změnu hodnoty. Zajímavá je taky fronta requests, pomocí které budeme realizovat komunikaci typu request-responce. Třídu Request vytvoříme později. Proměnné readerThread a writerThread budou obsahovat čtecí a zapisovací vlákno. Tyto proměnné inicializujeme až při pokusu o vytvoření nového spojení.

Konstruktor

Konstruktor třídy nebude vyžadovat žádný parametr. V konstruktoru se nastaví listener na socket a vytvoří se binding na název serveru, který se bude mít formát: "název:port":

public ClientCommunicationService() {
    socket.addListener(this::socketListener);
    connectedServerName.bind(Bindings.createStringBinding(() -> String.format("%s:%d", host.get(), port.get()), host, port, connectionState));
}

Posluchač změny stavu socketu

Vytvoříme privátní metodu socketListener(), kterou jsme registrovali v konstruktoru. V této metodě budeme inicializovat/rušit čtecí/zapisovací vlákno:

private void socketListener(ObservableValue<? extends Socket> observableValue, Socket oldSocket, Socket newSocket) {
        if (newSocket == null) {
            readerThread = null;
            writerThread = null;
            unregisterMessageObserver(ServerStatusMessage.MESSAGE_TYPE, this.serverStatusListener);
            return;
        }

        try {
            readerThread = new ReaderThread(newSocket.getInputStream(), listener, this::disconnect);
            writerThread = new WriterThread(newSocket.getOutputStream(), this::disconnect);

            readerThread.start();
            writerThread.start();
            registerMessageObserver(ServerStatusMessage.MESSAGE_TYPE, this.serverStatusListener);
        } catch (IOException e) {
            System.out.println("Vyskytl se problém při vytváření komunikace se serverem.");
        }
    }

Metoda se skládá ze dvou částí. V horní části se zpracovává případ, kdy bylo z nějakého důvodu spojení ukončeno a je potřeba odstranit instance starého čtecího/zapiso­vacího vlákna. Ve zbytku metody se předpokládá, že proměnná newSocket obsahuje nový validní socket, který se vytvořil na základě nově vytvořeného spojení. Vytvoří se nové instance čtecího a zapisovacího vlákna a metodou start() se spustí. Metodu (un)registerMessageObserver() si vysvětlíme, až ji budeme implementovat.

Delegace zpracování přijatých zpráv

Do třídy přidáme další instanční konstantu, která bude obsahovat anonymní funkci starající se o rozeslání přijatých zpráv zaregistrovaným pozorovatelům:

private final OnDataReceivedListener listener = message -> {
        if (message.isResponce()) {
            final Request poll = requests.poll();
            if (poll != null) {
                poll.onResponce(message);
            }
            return;
        }

        final List<OnDataReceivedListener> listenerList = listeners.get(message.getType());
        if (listenerList == null) {
            return;
        }

        for (OnDataReceivedListener listener : listenerList) {
            listener.onDataReceived(message);
        }
    };

Na začátku metody se podíváme, zda-li je přijatá zpráva odpověď na nějaký požadavek. Pokud ano, vyzvedne se a zavolá se obsluha zprávy z fronty requests. Pokud se jedná o běžnou zprávu, získáme z mapy listeners všechny listenery a postupně jim oznámíme, že mají zpracovat přijatou zprávu.

Přihlášení/od­hlášení odběru zpráv

Další metody, které musíme podle rozhraní implementovat, jsou metody pro přihlášení a odhlášení odběru zpráv. Tyto metody budou modifikovat mapu listeners:

@Override
public synchronized void registerMessageObserver(String messageType, OnDataReceivedListener listener) {
    List<OnDataReceivedListener> listenerList = listeners.computeIfAbsent(messageType, k -> new ArrayList<>());

    listenerList.add(listener);
}

@Override
public synchronized void unregisterMessageObserver(String messageType, OnDataReceivedListener listener) {
    List<OnDataReceivedListener> listenerList = listeners.get(messageType);
    if (listenerList == null) {
        return;
    }

    listenerList.remove(listener);
}

Při registraci posluchače využijeme metodu computeIfAbsent(), která se podívá do mapy a pokud na zadaném klíči neexistuje hodnota, tak ji vytvoří.

Navázání spojení

Konečně se dostáváme k nejdůležitějším metodám celého komunikátoru. Začneme implementaci metody connect(), kde poprvé použijeme třídu CompletableFuture:

@Override
public CompletableFuture <Void> connect(String host, int port) {
 if (isConnected()) {
  throw new RuntimeException("Spojení je již vytvořeno.");
 }

 changeState(ConnectionState.CONNECTING);

 return CompletableFuture.supplyAsync(() -> {
   final Socket socket = new Socket();
   try {
    socket.connect(new InetSocketAddress(host, port), 3000);
    return socket;
   } catch (IOException e) {
    return null;
   }
  }, ThreadPool.COMMON_EXECUTOR)
  .thenApplyAsync(socket -> {
   this.socket.set(socket);
   if (socket != null) {
    this.host.set(host);
    this.port.set(port);
   } else {
    changeState(ConnectionState.DISCONNECTED);
    this.host.set(null);
    this.port.set(-1);
   }
   if (socket == null) {
    throw new RuntimeException("Spojení se nepodařilo vytvořit.");
   }

   return null;
  }, ThreadPool.JAVAFX_EXECUTOR);
}

Metoda je opět rozdělena do logických celků. V první části se podíváme, zda-li jsme již připojení k serveru. Pokud jsme připojeni, vyhodíme výjimku. RuntimeException nebudeme muset ani ošetřovat, pouze se nám zapíše do konzole. Důležité je, že aplikace se neukončí. Metodou changeState() informujeme okolí, že se pokoušíme připojit k serveru.

V druhé části metody vytvoříme budoucnost, ve které se pokusíme vytvořit samotné spojení se serverem voláním metody socket.connect(). Konstantou ThreadPool.COMMON_EXECUTOR nastavíme, že připojení se má provést v samostatném vlákně. Pokud se úspěšně spojíme se serverem, vrátíme socket. Metodou thenApplyAsync() "transformujeme" socket na výsledek.

Ve třetí části uložíme socket voláním příkazu this.socket.set(socket). Tím se mimo jiné zavolá changeListener a vytvoří/odstraní se čtecí a zapisovací vlákno. Celá třetí část se musí odehrát v JavaFX vlákně, protože na některé pozorovatelné konstanty později navěsíme grafické komponenty, které jak jsem již říkal minule, se mohou aktualizovat pouze v JavaFX vlákně, jinak se vyhodí výjimka.

Ukončení spojení

Spojení budeme ukončovat metodou disconnect(). Metoda bude mít za úkol řádně ukončit čtecí a zapisovací vlákno:

public CompletableFuture<Boolean> disconnect() {
    if (!isConnected()) {
        return CompletableFuture.completedFuture(false);
    }

    return CompletableFuture.supplyAsync(() -> {
        try {
            socket.get().close();
            readerThread.shutdown();
            writerThread.shutdown();
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }

        return true;
    }, ThreadPool.COMMON_EXECUTOR)
        .thenApplyAsync(success -> {
            if (success) {
                this.socket.set(null);
                changeState(ConnectionState.DISCONNECTED);
            }

            return success;
        }, ThreadPool.JAVAFX_EXECUTOR);
}

Pokud se úspěšně ukončí spojení, tak se pomocí příkazu this.socket.set(null) vymaže čtecí a zapisovací vlákno a komunikátor se dostane do stavu DISCONNECTED.

Odesílání zpráv

Zprávy budeme odesílat dvojího typu:

  • bez čekání na výsledek
  • s čekáním na výsledek

Metoda bez čekání na výsledek bude velmi jednoduchá. Vezme zprávu, předá ji zapisovacímu vláknu a o nic víc se nestará:

public synchronized void sendMessage(IMessage message) {
    if (writerThread != null) {
        writerThread.addMessageToQueue(message);
    }
}

Odeslání zprávy s čekáním na výsledek má jeden problém, který musíme vyřešit. Tím problémem je ono čekání na odpověď ze serveru:

public synchronized CompletableFuture<IMessage> sendMessageFuture(IMessage message) {
    return CompletableFuture.supplyAsync(() -> {
        sendMessage(message);
        return null;
    })
    .thenCompose(ignored -> {
        Request request = new Request();
        requests.add(request);
        return request.getFuture();
    });
}

Metoda, která bude odesílat zprávu s odpovědí, bude vracet budoucnost, ve které odpověď přijde. Nejdříve se standartně odešle zpráva a pak se zavolá nová metoda thenCompose(). Tato metoda vlastně říká, že výsledek budoucnosti získáme z jiné instance třídy CompletableFuture. Tuto jinou instanci získáme voláním metody getFuture() nad instancí třídy Request, kterou si za okamžik nadefinujeme.

Request-response zprávy

Vytvoříme podpůrnou třídu, která nám zajistí čekání na odpověď ze serveru. Vytvoříme tedy novou třídu Request a vše by najednou mělo začít dávat smysl:

class Request {

    private final Object lock = new Object();

    private boolean waiting = true;
    private IMessage responce;

    CompletableFuture<IMessage> getFuture() {
        return CompletableFuture.supplyAsync(() -> {
            while (waiting) {
                synchronized (lock) {
                    try {
                        lock.wait();
                    } catch (InterruptedException ignored) {}
                }
            }
            return responce;
        });
    }

    void onResponce(IMessage message) {
        this.responce = message;
        waiting = false;
        synchronized (lock) {
            lock.notify();
        }
    }
}

Třída má pouze dvě metody: getFuture() a onResponce(). První metoda vytvoří budoucnost, ve které se vlákno uspí voláním metody wait(). Jediný, kdo může tuto budoucnost probudit, je metoda onResponce(), která je zavolána, když je přijata odpoveď ze serveru. Tímto jednoduchým trikem vytvoříme dojem komunikace request-response.

Na konec jen doplním implementaci zbylých metod, které vyžaduje rozhraní:

@Override
public ConnectionState getConnectionState() {
    return connectionState.get();
}

@Override
public ReadOnlyObjectProperty<ConnectionState> connectionStateProperty() {
    return connectionState.getReadOnlyProperty();
}

@Override
public String getConnectedServerName() {
    return connectedServerName.get();
}

To by pro tuto delší lekci vše.

Příště, v lekci Java chat - Klient - Spojení se serverem 3. část, již navážeme spojení se serverem.


 

Měl jsi s čímkoli problém? Stáhni si vzorovou aplikaci níže a porovnej ji se svým projektem, chybu tak snadno najdeš.

Stáhnout

Stažením následujícího souboru souhlasíš s licenčními podmínkami

Staženo 13x (120.98 kB)
Aplikace je včetně zdrojových kódů v jazyce Java

 

Předchozí článek
Java chat - Klient - Spojení se serverem 1. část
Všechny články v sekci
Server pro klientské aplikace v Javě
Přeskočit článek
(nedoporučujeme)
Java chat - Klient - Spojení se serverem 3. část
Článek pro vás napsal Petr Štechmüller
Avatar
Uživatelské hodnocení:
Ještě nikdo nehodnotil, buď první!
Autor se věnuje primárně programování v Javě, ale nebojí se ani webových technologií.
Aktivity