Lekce 9 - Java server - Event bus
V minulé lekci, Java server - Komunikační protokol, jsme navrhli svůj komunikační protokol a konečně komunikaci serveru s klientem úspěšně otestovali.
Dnes vytvoříme jednoduchou event bus, pomocí které budeme propagovat události napříč celým serverem.
Event bus
Jedná se o mechanismus, který dovoluje komunikaci mezi dvěma různým komponentami, aniž by o sobě věděly. Jedna komponenta vyprodukuje událost a je jí jedno, kolik poslouchajících komponent tuto událost zachytí a zareaguje na ní.
Návrh rozhraní
V balíčku core
vytvoříme nový balíček s
názvem event
. Všechny třídy, které nyní
vytvoříme, se budou nacházet v tomto balíčku, pokud neuvedu jinak.
Nejdříve si navrhneme rozhraní a poté je implementujeme. Rozhraní
vytvoříme následující:
IEvent
- rozhraní představující vzniklou událostIEventHandler
- rozhraní reagující na vzniklou událostIEventBus
- rozhraní umožňující správu událostí
Rozhraní IEvent
bude obsahovat jedinou metodu
getEventType()
, která bude vracet jedinečný identifikátor
vzniklé události:
public interface IEvent { String getEventType(); }
Rozhraní IEventHandler
také bude obsahovat
jednu metodu handleEvent()
, kterou budou muset překrýt všichni,
kteří budou chtít poslouchat danou událost:
@FunctionalInterface public interface IEventHandler { void handleEvent(IEvent event); }
Nakonec rozhraní IEventBus
, které obsahuje
metody pro přihlášení/odhlášení odběru událostí a metodu pro
vyvolání události:
public interface IEventBus { void registerEventHandler(String messageType, IEventHandler listener); void unregisterEventHandler(String messageType, IEventHandler listener); void publishEvent(IEvent event); }
Implementace
Jediné rozhraní, které budeme implementovat v balíčku core
,
je rozhraní IEventBus
, pomocí stejnojmenné
třídy EventBus
:
@Singleton public class EventBus implements IEventBus { private final Map<String, List<IEventHandler>> listenerMap = new HashMap<>(); @Override public void registerEventHandler(String messageType, IEventHandler listener) { List<IEventHandler> listeners = listenerMap .computeIfAbsent(messageType, k -> new ArrayList<>()); listeners.add(listener); } @Override public void unregisterEventHandler(String messageType, IEventHandler listener) { final List<IEventHandler> listeners = listenerMap .getOrDefault(messageType, Collections.emptyList()); listeners.remove(listener); } @Override public void publishEvent(IEvent event) { final List<IEventHandler> handlers = listenerMap .getOrDefault(event.getEventType(), Collections.emptyList()); handlers.forEach(handler -> handler.handleEvent(event)); } }
Třída obsahuje jednu třídní konstantu listenerMap
, do
které se budou ukládat posluchače jednotlivých událostí. Mapa jako klíč
používá String
, který budeme získávat
pomocí metody getEventType()
z rozhraní
IEvent
. Hodnota v mapě je kolekce posluchačů,
aby se k jedné události mohlo zaregistrovat více posluchačů.
Nakonec zaregistrujeme event bus do modulu pro guice:
bind(IEventBus.class).to(EventBus.class);
Použití
Pro zatím se omezíme na publikování událostí, konkrétně:
- uživatel se připojil
- uživatel se odpojil
- server přijal zprávu od uživatele
Konzumování těchto událostí necháme na budoucí implementaci.
Informaci, že se uživatel připojil/odpojil, může publikovat třída
ConnectionManager
, protože ta jediná se to
dozví. Přidáme tedy do balíčku connection
dvě třídy představující událost připojení a odpojení:
ClientConnectedEvent
:
public class ClientConnectedEvent implements IEvent { public static final String EVENT_TYPE = "client-connected"; private final Client client; ClientConnectedEvent(Client client) { this.client = client; } public Client getClient() { return client; } @Override public String getEventType() { return EVENT_TYPE; } }
A ClientDisconnectedEvent
:
public class ClientDisconnectedEvent implements IEvent { public static final String EVENT_TYPE = "client-disonnected"; private final Client client; ClientDisconnectedEvent(Client client) { this.client = client; } public Client getClient() { return client; } @Override public String getEventType() { return EVENT_TYPE; } }
Do třídy ConnectionManager
přidáme novou
třídní konstantu typu IEventBus
a necháme ji
inicializovat v konstruktoru z parametru:
private final IEventBus eventBus; @Inject public ConnectionManager(IClientDispatcher clientDispatcher, IWriterThread writerThread, IEventBus eventBus, ExecutorService pool, int maxClients) { this.clientDispatcher = clientDispatcher; this.writerThread = writerThread; this.eventBus = eventBus; this.pool = pool; this.maxClients = maxClients; }
Samotné události budeme vyvolávat v metodě
insertClientToListOrQueue()
. Událost, že klient se odpojil,
publikujeme hned poté, co odstraníme klienta ze seznamu klientů:
client.setConnectionClosedListener(() -> { clients.remove(client); // Vytvoření nové události eventBus.publishEvent(new ClientDisconnectedEvent(client)); if (clientDispatcher.hasClientInQueue()) { this.insertClientToListOrQueue(clientDispatcher.getClientFromQueue()); } });
Událost o připojeném klientovi publikujeme po vložení klienta do threadpoolu:
pool.submit(client);
eventBus.publishEvent(new ClientConnectedEvent(client));
Poslední událost, kterou budeme publikovat, je událost přijaté zprávy od klienta. Nejdříve vytvoříme třídu, kterou budeme reprezentovat danou událost:
public class MessageReceivedEvent implements IEvent { private final IMessage receivedMessage; private final Client client; MessageReceivedEvent(IMessage receivedMessage, Client client) { this.receivedMessage = receivedMessage; this.client = client; } @Override public String getEventType() { return receivedMessage.getType(); } public IMessage getReceivedMessage() { return receivedMessage; } public Client getClient() { return client; } }
Nyní se přesuneme do třídy Client
. Opět
zde vytvoříme třídní konstantu typu IEventBus
a inicializujeme ji v konstruktoru pomocí parametru:
private final IEventBus eventBus; Client(Socket socket, IWriterThread writerThread, IEventBus eventBus) throws IOException { this.socket = socket; writer = new ObjectOutputStream(socket.getOutputStream()); this.writerThread = writerThread; this.eventBus = eventBus; }
Nakonec v metodě run()
když přijmeme zprávu, tak ji
publikujeme:
IMessage received; while ((received = (IMessage) reader.readObject()) != null) { eventBus.publishEvent(new MessageReceivedEvent(received, this)); }
Tím bychom měli hotové publikování událostí napříč celým serverem.
V příští lekci, Java server - Systém pluginů, naimplementujeme systém pluginů pro snadnou rozšiřitelnost serveru o novou funkcionalitu.
Měl jsi s čímkoli problém? Stáhni si vzorovou aplikaci níže a porovnej ji se svým projektem, chybu tak snadno najdeš.
Stáhnout
Stažením následujícího souboru souhlasíš s licenčními podmínkami
Staženo 20x (143.33 kB)
Aplikace je včetně zdrojových kódů v jazyce Java