Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.axoniq.platform.framework.application.MeasuringExecutorServiceDecorator;
import io.axoniq.platform.framework.application.RSocketThreadDumpResponder;
import io.axoniq.platform.framework.client.AxoniqConsoleRSocketClient;
import io.axoniq.platform.framework.client.ClientSettingsService;
import io.axoniq.platform.framework.client.PlatformClientConnectionService;
import io.axoniq.platform.framework.client.RSocketHandlerRegistrar;
import io.axoniq.platform.framework.client.ServerProcessorReporter;
import io.axoniq.platform.framework.client.SetupPayloadCreator;
Expand Down Expand Up @@ -74,8 +74,8 @@ public void enhance(ComponentRegistry registry) {
}
registry
.registerComponent(ComponentDefinition
.ofType(ClientSettingsService.class)
.withBuilder(c -> new ClientSettingsService()))
.ofType(PlatformClientConnectionService.class)
.withBuilder(c -> new PlatformClientConnectionService()))
.registerComponent(ComponentDefinition
.ofType(ProcessorMetricsRegistry.class)
.withBuilder(c -> new ProcessorMetricsRegistry()))
Expand Down Expand Up @@ -117,7 +117,7 @@ public void enhance(ComponentRegistry registry) {
c.getComponent(SetupPayloadCreator.class),
c.getComponent(RSocketHandlerRegistrar.class),
c.getComponent(RSocketPayloadEncodingStrategy.class),
c.getComponent(ClientSettingsService.class),
c.getComponent(PlatformClientConnectionService.class),
determineInstanceName(c)))

.onStart(Phase.EXTERNAL_CONNECTIONS, AxoniqConsoleRSocketClient::start))
Expand All @@ -126,7 +126,7 @@ public void enhance(ComponentRegistry registry) {
.withBuilder(c -> new ServerProcessorReporter(
c.getComponent(AxoniqConsoleRSocketClient.class),
c.getComponent(ProcessorReportCreator.class),
c.getComponent(ClientSettingsService.class),
c.getComponent(PlatformClientConnectionService.class),
c.getComponent(AxoniqPlatformConfiguration.class)))
// The start handler will allow for eager creation
.onStart(Phase.EXTERNAL_CONNECTIONS, c -> {
Expand All @@ -136,7 +136,7 @@ public void enhance(ComponentRegistry registry) {
.withBuilder(c -> new ApplicationMetricReporter(
c.getComponent(AxoniqConsoleRSocketClient.class),
c.getComponent(ApplicationReportCreator.class),
c.getComponent(ClientSettingsService.class),
c.getComponent(PlatformClientConnectionService.class),
c.getComponent(AxoniqPlatformConfiguration.class)))
// The start handler will allow for eager creation
.onStart(Phase.EXTERNAL_CONNECTIONS, c -> {
Expand All @@ -145,7 +145,7 @@ public void enhance(ComponentRegistry registry) {
.ofType(HandlerMetricsRegistry.class)
.withBuilder(c -> new HandlerMetricsRegistry(
c.getComponent(AxoniqConsoleRSocketClient.class),
c.getComponent(ClientSettingsService.class),
c.getComponent(PlatformClientConnectionService.class),
c.getComponent(AxoniqPlatformConfiguration.class))))
.registerComponent(ComponentDefinition
.ofType(ApplicationThreadDumpProvider.class)
Expand Down Expand Up @@ -195,10 +195,10 @@ public void enhance(ComponentRegistry registry) {
Objects.requireNonNull(original);
ReflectionKt.setPropertyValue(delegate, "queryExecutorServiceFactory", (ExecutorServiceFactory<DistributedQueryBusConfiguration>) (configuration, queue) -> {
var built = original.createExecutorService(configuration, queue);
return new MeasuringExecutorServiceDecorator(
BusType.QUERY,
built,
cc.getComponent(ApplicationMetricRegistry.class));
return new MeasuringExecutorServiceDecorator(
BusType.QUERY,
built,
cc.getComponent(ApplicationMetricRegistry.class));
});
} catch (Exception e) {
logger.error("Failed to instruct DistributedQueryBusConfiguration, e)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,27 @@ import io.axoniq.platform.framework.api.Routes
import io.axoniq.platform.framework.AxoniqPlatformConfiguration
import io.axoniq.platform.framework.api.ClientStatus
import io.axoniq.platform.framework.client.AxoniqConsoleRSocketClient
import io.axoniq.platform.framework.client.ClientSettingsObserver
import io.axoniq.platform.framework.client.ClientSettingsService
import io.axoniq.platform.framework.client.PlatformClientConnectionObserver
import io.axoniq.platform.framework.client.PlatformClientConnectionService
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit

class ApplicationMetricReporter(
private val client: AxoniqConsoleRSocketClient,
private val reportCreator: ApplicationReportCreator,
private val clientSettingsService: ClientSettingsService,
private val platformClientConnectionService: PlatformClientConnectionService,
private val properties: AxoniqPlatformConfiguration,
) : ClientSettingsObserver {
) : PlatformClientConnectionObserver {
private var reportTask: ScheduledFuture<*>? = null
private val logger = KotlinLogging.logger { }
private val executor = properties.reportingTaskExecutor

init {
clientSettingsService.subscribeToSettings(this)
platformClientConnectionService.subscribeToSettings(this)
}

override fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) {
override fun onConnected(clientStatus: ClientStatus, settings: ClientSettingsV2) {
if (!clientStatus.enabled || reportTask != null) {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import kotlin.math.pow
*
* Establishing a connection works as follows:
* - The client will send a setup payload to the server, containing the authentication information
* - The client will retrieve the settings from the server, and update the [ClientSettingsService] with the new settings
* - The client will retrieve the settings from the server, and update the [PlatformClientConnectionService] with the new settings
* - The client will start sending heartbeats to the server, and will check if it receives heartbeats from the server
*
* The server is in control of these settings. Of course, the user can manipulate these as well themselves.
Expand All @@ -63,7 +63,7 @@ class AxoniqConsoleRSocketClient(
private val setupPayloadCreator: SetupPayloadCreator,
private val registrar: RSocketHandlerRegistrar,
private val encodingStrategy: RSocketPayloadEncodingStrategy,
private val clientSettingsService: ClientSettingsService,
private val platformClientConnectionService: PlatformClientConnectionService,
private val instanceName: String,
) {
private val environmentId: String = properties.environmentId
Expand Down Expand Up @@ -93,18 +93,18 @@ class AxoniqConsoleRSocketClient(
private var suppressConnectMessage = false

init {
clientSettingsService.subscribeToSettings(heartbeatOrchestrator)
platformClientConnectionService.subscribeToSettings(heartbeatOrchestrator)

// Server can send updated settings if necessary
registrar.registerHandlerWithPayload(Routes.Management.SETTINGS, ClientSettingsV2::class.java) {
clientSettingsService.updateSettings(it)
platformClientConnectionService.onConnected(it)
}

// Server sends client status updates
registrar.registerHandlerWithPayload(Routes.Management.STATUS, ClientStatusUpdate::class.java) {
logger.debug("Received status update from Axoniq Platform. New status: {}", it.newStatus)
status = it.newStatus
clientSettingsService.updateClientStatus(status)
platformClientConnectionService.updateClientStatus(status)
}

// Server can send log requests
Expand Down Expand Up @@ -172,7 +172,7 @@ class AxoniqConsoleRSocketClient(
.flatMap { socket ->
rsocket = socket
retrieveSettings().map { settings ->
clientSettingsService.updateSettings(settings)
platformClientConnectionService.onConnected(settings)
if (!suppressConnectMessage) {
logger.info("Connection to Axoniq Platform set up successfully! This instance's name: $instanceName, settings: $settings")
suppressConnectMessage = true
Expand All @@ -181,7 +181,10 @@ class AxoniqConsoleRSocketClient(
socket
}
}
.doOnError { disposeCurrentConnection() }
.doOnError { e ->
disposeCurrentConnection()
platformClientConnectionService.notifyUnreachable(classifyConnectionError(e))
}
.doFinally { synchronized(connectionLock) { pendingConnection = null } }
.cache()
}
Expand Down Expand Up @@ -296,7 +299,7 @@ class AxoniqConsoleRSocketClient(
if (currentRSocket != null) {
rsocket = null
currentRSocket.dispose()
clientSettingsService.clearSettings()
platformClientConnectionService.clearSettings()
}
}

Expand All @@ -310,7 +313,7 @@ class AxoniqConsoleRSocketClient(
private const val BACKOFF_FACTOR = 2.0
}

private inner class HeartbeatOrchestrator : ClientSettingsObserver {
private inner class HeartbeatOrchestrator : PlatformClientConnectionObserver {
private var heartbeatSendTask: ScheduledFuture<*>? = null
private var heartbeatCheckTask: ScheduledFuture<*>? = null
private var lastReceivedHeartbeat = Instant.now()
Expand All @@ -323,7 +326,7 @@ class AxoniqConsoleRSocketClient(
}
}

override fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) {
override fun onConnected(clientStatus: ClientStatus, settings: ClientSettingsV2) {
this.heartbeatSendTask?.cancel(true)
this.heartbeatCheckTask?.cancel(true)
lastReceivedHeartbeat = Instant.now()
Expand Down Expand Up @@ -366,6 +369,19 @@ class AxoniqConsoleRSocketClient(
}
}

private fun classifyConnectionError(e: Throwable): PlatformClientConnectionObserver.UnreachableReason {
return when {
e.message?.contains("invalid authentication", ignoreCase = true) == true ->
PlatformClientConnectionObserver.UnreachableReason.INVALID_AUTHENTICATION
e.message?.contains("Access Denied", ignoreCase = true) == true ->
PlatformClientConnectionObserver.UnreachableReason.INVALID_AUTHENTICATION
e is java.net.ConnectException || e.cause is java.net.ConnectException ->
PlatformClientConnectionObserver.UnreachableReason.NO_CONNECTION
else ->
PlatformClientConnectionObserver.UnreachableReason.OTHER
}
}

private fun retrieveSettings(): Mono<ClientSettingsV2> {
return rsocket!!
.requestResponse(encodingStrategy.encode("", createRoutingMetadata(Routes.Management.SETTINGS_V2)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,33 @@ import io.axoniq.platform.framework.api.ClientStatus
/**
* Observes the established connection and the settings provided by the server.
* The [onDisconnected] method is called when the connection is lost, or just before new settings
* are being updated to provide cleanup. The [onConnectionUpdate] method is called when the connection is
* are being updated to provide cleanup. The [onConnected] method is called when the connection is
* established or the settings are updated
*/
interface ClientSettingsObserver {
interface PlatformClientConnectionObserver {
/**
* Called when the connection is established, the settings are updated, or the client's status changes.
* @param settings the settings provided by the server
*/
fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2)
fun onConnected(clientStatus: ClientStatus, settings: ClientSettingsV2)

/**
* Called when the connection is lost, or just before new settings are being updated to provide cleanup.
*/
fun onDisconnected()

/**
* Called when the connection can not be established. Will be called every time the client tries to connect, but fails. The reason for the failure is provided as a parameter.
*
* @param reason The reason for being unreachable.
*/
fun onUnreachable(reason: UnreachableReason) {
// Default implementation does nothing, as not all observers need to react to this event
}

enum class UnreachableReason {
INVALID_AUTHENTICATION,
NO_CONNECTION,
OTHER,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.concurrent.CopyOnWriteArrayList

/**
* Service that holds the client settings. See [ClientSettingsObserver] for more information.
* Service that holds the client settings. See [PlatformClientConnectionObserver] for more information.
*/
class ClientSettingsService {
private val observers = CopyOnWriteArrayList<ClientSettingsObserver>()
class PlatformClientConnectionService {
private val observers = CopyOnWriteArrayList<PlatformClientConnectionObserver>()
private var clientStatus: ClientStatus = ClientStatus.PENDING
private var settings: ClientSettingsV2? = null
private val logger = KotlinLogging.logger { }
Expand All @@ -34,30 +34,35 @@ class ClientSettingsService {
logger.debug { "Clearing client settings" }
if (settings != null) {
settings = null
observers.forEach { it.onDisconnected() }
}
observers.forEach { it.onDisconnected() }
}

fun subscribeToSettings(observer: ClientSettingsObserver) {
fun subscribeToSettings(observer: PlatformClientConnectionObserver) {
logger.debug { "Subscribing to client settings $observer" }
this.observers.add(observer)
if (settings != null) {
observer.onConnectionUpdate(clientStatus, settings!!)
observer.onConnected(clientStatus, settings!!)
}
}

fun updateClientStatus(clientStatus: ClientStatus) {
logger.debug { "Client status changed to $clientStatus" }
this.clientStatus = clientStatus
if (settings != null) {
observers.forEach { it.onConnectionUpdate(clientStatus, settings!!) }
observers.forEach { it.onConnected(clientStatus, settings!!) }
}
}

fun updateSettings(settings: ClientSettingsV2) {
fun notifyUnreachable(reason: PlatformClientConnectionObserver.UnreachableReason) {
logger.debug { "Notifying observers of unreachable: $reason" }
observers.forEach { it.onUnreachable(reason) }
}

fun onConnected(settings: ClientSettingsV2) {
clearSettings()
logger.debug { "Client settings changed to $settings" }
this.settings = settings
observers.forEach { it.onConnectionUpdate(clientStatus, settings) }
observers.forEach { it.onConnected(clientStatus, settings) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,24 @@ import io.axoniq.platform.framework.AxoniqPlatformConfiguration
import io.axoniq.platform.framework.api.ClientStatus
import io.axoniq.platform.framework.eventprocessor.ProcessorReportCreator
import io.github.oshai.kotlinlogging.KotlinLogging
import reactor.core.Disposable
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit

class ServerProcessorReporter(
private val client: AxoniqConsoleRSocketClient,
private val processorReportCreator: ProcessorReportCreator,
private val clientSettingsService: ClientSettingsService,
private val platformClientConnectionService: PlatformClientConnectionService,
private val properties: AxoniqPlatformConfiguration
) : ClientSettingsObserver {
) : PlatformClientConnectionObserver {
private var reportTask: ScheduledFuture<*>? = null
private val logger = KotlinLogging.logger { }
private val executor = properties.reportingTaskExecutor

init {
clientSettingsService.subscribeToSettings(this)
platformClientConnectionService.subscribeToSettings(this)
}

override fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) {
override fun onConnected(clientStatus: ClientStatus, settings: ClientSettingsV2) {
if (!clientStatus.enabled || reportTask != null) {
return
}
Expand Down
Loading