diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java index e03d59d..2a8964b 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java @@ -24,7 +24,7 @@ import io.axoniq.platform.framework.application.MeasuringExecutorServiceDecorator; import io.axoniq.platform.framework.application.RSocketThreadDumpResponder; import io.axoniq.platform.framework.client.AxoniqConsoleRSocketClient; -import io.axoniq.platform.framework.client.ClientSettingsService; +import io.axoniq.platform.framework.client.PlatformClientConnectionService; import io.axoniq.platform.framework.client.RSocketHandlerRegistrar; import io.axoniq.platform.framework.client.ServerProcessorReporter; import io.axoniq.platform.framework.client.SetupPayloadCreator; @@ -74,8 +74,8 @@ public void enhance(ComponentRegistry registry) { } registry .registerComponent(ComponentDefinition - .ofType(ClientSettingsService.class) - .withBuilder(c -> new ClientSettingsService())) + .ofType(PlatformClientConnectionService.class) + .withBuilder(c -> new PlatformClientConnectionService())) .registerComponent(ComponentDefinition .ofType(ProcessorMetricsRegistry.class) .withBuilder(c -> new ProcessorMetricsRegistry())) @@ -117,7 +117,7 @@ public void enhance(ComponentRegistry registry) { c.getComponent(SetupPayloadCreator.class), c.getComponent(RSocketHandlerRegistrar.class), c.getComponent(RSocketPayloadEncodingStrategy.class), - c.getComponent(ClientSettingsService.class), + c.getComponent(PlatformClientConnectionService.class), determineInstanceName(c))) .onStart(Phase.EXTERNAL_CONNECTIONS, AxoniqConsoleRSocketClient::start)) @@ -126,7 +126,7 @@ public void enhance(ComponentRegistry registry) { .withBuilder(c -> new ServerProcessorReporter( c.getComponent(AxoniqConsoleRSocketClient.class), c.getComponent(ProcessorReportCreator.class), - c.getComponent(ClientSettingsService.class), + c.getComponent(PlatformClientConnectionService.class), c.getComponent(AxoniqPlatformConfiguration.class))) // The start handler will allow for eager creation .onStart(Phase.EXTERNAL_CONNECTIONS, c -> { @@ -136,7 +136,7 @@ public void enhance(ComponentRegistry registry) { .withBuilder(c -> new ApplicationMetricReporter( c.getComponent(AxoniqConsoleRSocketClient.class), c.getComponent(ApplicationReportCreator.class), - c.getComponent(ClientSettingsService.class), + c.getComponent(PlatformClientConnectionService.class), c.getComponent(AxoniqPlatformConfiguration.class))) // The start handler will allow for eager creation .onStart(Phase.EXTERNAL_CONNECTIONS, c -> { @@ -145,7 +145,7 @@ public void enhance(ComponentRegistry registry) { .ofType(HandlerMetricsRegistry.class) .withBuilder(c -> new HandlerMetricsRegistry( c.getComponent(AxoniqConsoleRSocketClient.class), - c.getComponent(ClientSettingsService.class), + c.getComponent(PlatformClientConnectionService.class), c.getComponent(AxoniqPlatformConfiguration.class)))) .registerComponent(ComponentDefinition .ofType(ApplicationThreadDumpProvider.class) @@ -195,10 +195,10 @@ public void enhance(ComponentRegistry registry) { Objects.requireNonNull(original); ReflectionKt.setPropertyValue(delegate, "queryExecutorServiceFactory", (ExecutorServiceFactory) (configuration, queue) -> { var built = original.createExecutorService(configuration, queue); - return new MeasuringExecutorServiceDecorator( - BusType.QUERY, - built, - cc.getComponent(ApplicationMetricRegistry.class)); + return new MeasuringExecutorServiceDecorator( + BusType.QUERY, + built, + cc.getComponent(ApplicationMetricRegistry.class)); }); } catch (Exception e) { logger.error("Failed to instruct DistributedQueryBusConfiguration, e)"); diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/application/ApplicationMetricReporter.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/application/ApplicationMetricReporter.kt index 2f5fbc6..e53386f 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/application/ApplicationMetricReporter.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/application/ApplicationMetricReporter.kt @@ -21,8 +21,8 @@ import io.axoniq.platform.framework.api.Routes import io.axoniq.platform.framework.AxoniqPlatformConfiguration import io.axoniq.platform.framework.api.ClientStatus import io.axoniq.platform.framework.client.AxoniqConsoleRSocketClient -import io.axoniq.platform.framework.client.ClientSettingsObserver -import io.axoniq.platform.framework.client.ClientSettingsService +import io.axoniq.platform.framework.client.PlatformClientConnectionObserver +import io.axoniq.platform.framework.client.PlatformClientConnectionService import io.github.oshai.kotlinlogging.KotlinLogging import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit @@ -30,18 +30,18 @@ import java.util.concurrent.TimeUnit class ApplicationMetricReporter( private val client: AxoniqConsoleRSocketClient, private val reportCreator: ApplicationReportCreator, - private val clientSettingsService: ClientSettingsService, + private val platformClientConnectionService: PlatformClientConnectionService, private val properties: AxoniqPlatformConfiguration, -) : ClientSettingsObserver { +) : PlatformClientConnectionObserver { private var reportTask: ScheduledFuture<*>? = null private val logger = KotlinLogging.logger { } private val executor = properties.reportingTaskExecutor init { - clientSettingsService.subscribeToSettings(this) + platformClientConnectionService.subscribeToSettings(this) } - override fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) { + override fun onConnected(clientStatus: ClientStatus, settings: ClientSettingsV2) { if (!clientStatus.enabled || reportTask != null) { return } diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClient.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClient.kt index 79eec02..292b961 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClient.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClient.kt @@ -51,7 +51,7 @@ import kotlin.math.pow * * Establishing a connection works as follows: * - The client will send a setup payload to the server, containing the authentication information - * - The client will retrieve the settings from the server, and update the [ClientSettingsService] with the new settings + * - The client will retrieve the settings from the server, and update the [PlatformClientConnectionService] with the new settings * - The client will start sending heartbeats to the server, and will check if it receives heartbeats from the server * * The server is in control of these settings. Of course, the user can manipulate these as well themselves. @@ -63,7 +63,7 @@ class AxoniqConsoleRSocketClient( private val setupPayloadCreator: SetupPayloadCreator, private val registrar: RSocketHandlerRegistrar, private val encodingStrategy: RSocketPayloadEncodingStrategy, - private val clientSettingsService: ClientSettingsService, + private val platformClientConnectionService: PlatformClientConnectionService, private val instanceName: String, ) { private val environmentId: String = properties.environmentId @@ -93,18 +93,18 @@ class AxoniqConsoleRSocketClient( private var suppressConnectMessage = false init { - clientSettingsService.subscribeToSettings(heartbeatOrchestrator) + platformClientConnectionService.subscribeToSettings(heartbeatOrchestrator) // Server can send updated settings if necessary registrar.registerHandlerWithPayload(Routes.Management.SETTINGS, ClientSettingsV2::class.java) { - clientSettingsService.updateSettings(it) + platformClientConnectionService.onConnected(it) } // Server sends client status updates registrar.registerHandlerWithPayload(Routes.Management.STATUS, ClientStatusUpdate::class.java) { logger.debug("Received status update from Axoniq Platform. New status: {}", it.newStatus) status = it.newStatus - clientSettingsService.updateClientStatus(status) + platformClientConnectionService.updateClientStatus(status) } // Server can send log requests @@ -172,7 +172,7 @@ class AxoniqConsoleRSocketClient( .flatMap { socket -> rsocket = socket retrieveSettings().map { settings -> - clientSettingsService.updateSettings(settings) + platformClientConnectionService.onConnected(settings) if (!suppressConnectMessage) { logger.info("Connection to Axoniq Platform set up successfully! This instance's name: $instanceName, settings: $settings") suppressConnectMessage = true @@ -181,7 +181,10 @@ class AxoniqConsoleRSocketClient( socket } } - .doOnError { disposeCurrentConnection() } + .doOnError { e -> + disposeCurrentConnection() + platformClientConnectionService.notifyUnreachable(classifyConnectionError(e)) + } .doFinally { synchronized(connectionLock) { pendingConnection = null } } .cache() } @@ -296,7 +299,7 @@ class AxoniqConsoleRSocketClient( if (currentRSocket != null) { rsocket = null currentRSocket.dispose() - clientSettingsService.clearSettings() + platformClientConnectionService.clearSettings() } } @@ -310,7 +313,7 @@ class AxoniqConsoleRSocketClient( private const val BACKOFF_FACTOR = 2.0 } - private inner class HeartbeatOrchestrator : ClientSettingsObserver { + private inner class HeartbeatOrchestrator : PlatformClientConnectionObserver { private var heartbeatSendTask: ScheduledFuture<*>? = null private var heartbeatCheckTask: ScheduledFuture<*>? = null private var lastReceivedHeartbeat = Instant.now() @@ -323,7 +326,7 @@ class AxoniqConsoleRSocketClient( } } - override fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) { + override fun onConnected(clientStatus: ClientStatus, settings: ClientSettingsV2) { this.heartbeatSendTask?.cancel(true) this.heartbeatCheckTask?.cancel(true) lastReceivedHeartbeat = Instant.now() @@ -366,6 +369,19 @@ class AxoniqConsoleRSocketClient( } } + private fun classifyConnectionError(e: Throwable): PlatformClientConnectionObserver.UnreachableReason { + return when { + e.message?.contains("invalid authentication", ignoreCase = true) == true -> + PlatformClientConnectionObserver.UnreachableReason.INVALID_AUTHENTICATION + e.message?.contains("Access Denied", ignoreCase = true) == true -> + PlatformClientConnectionObserver.UnreachableReason.INVALID_AUTHENTICATION + e is java.net.ConnectException || e.cause is java.net.ConnectException -> + PlatformClientConnectionObserver.UnreachableReason.NO_CONNECTION + else -> + PlatformClientConnectionObserver.UnreachableReason.OTHER + } + } + private fun retrieveSettings(): Mono { return rsocket!! .requestResponse(encodingStrategy.encode("", createRoutingMetadata(Routes.Management.SETTINGS_V2))) diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsObserver.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/PlatformClientConnectionObserver.kt similarity index 64% rename from framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsObserver.kt rename to framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/PlatformClientConnectionObserver.kt index 43850ed..5cfd63d 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsObserver.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/PlatformClientConnectionObserver.kt @@ -22,18 +22,33 @@ import io.axoniq.platform.framework.api.ClientStatus /** * Observes the established connection and the settings provided by the server. * The [onDisconnected] method is called when the connection is lost, or just before new settings - * are being updated to provide cleanup. The [onConnectionUpdate] method is called when the connection is + * are being updated to provide cleanup. The [onConnected] method is called when the connection is * established or the settings are updated */ -interface ClientSettingsObserver { +interface PlatformClientConnectionObserver { /** * Called when the connection is established, the settings are updated, or the client's status changes. * @param settings the settings provided by the server */ - fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) + fun onConnected(clientStatus: ClientStatus, settings: ClientSettingsV2) /** * Called when the connection is lost, or just before new settings are being updated to provide cleanup. */ fun onDisconnected() + + /** + * Called when the connection can not be established. Will be called every time the client tries to connect, but fails. The reason for the failure is provided as a parameter. + * + * @param reason The reason for being unreachable. + */ + fun onUnreachable(reason: UnreachableReason) { + // Default implementation does nothing, as not all observers need to react to this event + } + + enum class UnreachableReason { + INVALID_AUTHENTICATION, + NO_CONNECTION, + OTHER, + } } \ No newline at end of file diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsService.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/PlatformClientConnectionService.kt similarity index 67% rename from framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsService.kt rename to framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/PlatformClientConnectionService.kt index 9171b5e..cb1152a 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsService.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/PlatformClientConnectionService.kt @@ -22,10 +22,10 @@ import io.github.oshai.kotlinlogging.KotlinLogging import java.util.concurrent.CopyOnWriteArrayList /** - * Service that holds the client settings. See [ClientSettingsObserver] for more information. + * Service that holds the client settings. See [PlatformClientConnectionObserver] for more information. */ -class ClientSettingsService { - private val observers = CopyOnWriteArrayList() +class PlatformClientConnectionService { + private val observers = CopyOnWriteArrayList() private var clientStatus: ClientStatus = ClientStatus.PENDING private var settings: ClientSettingsV2? = null private val logger = KotlinLogging.logger { } @@ -34,15 +34,15 @@ class ClientSettingsService { logger.debug { "Clearing client settings" } if (settings != null) { settings = null - observers.forEach { it.onDisconnected() } } + observers.forEach { it.onDisconnected() } } - fun subscribeToSettings(observer: ClientSettingsObserver) { + fun subscribeToSettings(observer: PlatformClientConnectionObserver) { logger.debug { "Subscribing to client settings $observer" } this.observers.add(observer) if (settings != null) { - observer.onConnectionUpdate(clientStatus, settings!!) + observer.onConnected(clientStatus, settings!!) } } @@ -50,14 +50,19 @@ class ClientSettingsService { logger.debug { "Client status changed to $clientStatus" } this.clientStatus = clientStatus if (settings != null) { - observers.forEach { it.onConnectionUpdate(clientStatus, settings!!) } + observers.forEach { it.onConnected(clientStatus, settings!!) } } } - fun updateSettings(settings: ClientSettingsV2) { + fun notifyUnreachable(reason: PlatformClientConnectionObserver.UnreachableReason) { + logger.debug { "Notifying observers of unreachable: $reason" } + observers.forEach { it.onUnreachable(reason) } + } + + fun onConnected(settings: ClientSettingsV2) { clearSettings() logger.debug { "Client settings changed to $settings" } this.settings = settings - observers.forEach { it.onConnectionUpdate(clientStatus, settings) } + observers.forEach { it.onConnected(clientStatus, settings) } } } \ No newline at end of file diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ServerProcessorReporter.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ServerProcessorReporter.kt index 7984373..2c0a7f5 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ServerProcessorReporter.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ServerProcessorReporter.kt @@ -22,25 +22,24 @@ import io.axoniq.platform.framework.AxoniqPlatformConfiguration import io.axoniq.platform.framework.api.ClientStatus import io.axoniq.platform.framework.eventprocessor.ProcessorReportCreator import io.github.oshai.kotlinlogging.KotlinLogging -import reactor.core.Disposable import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit class ServerProcessorReporter( private val client: AxoniqConsoleRSocketClient, private val processorReportCreator: ProcessorReportCreator, - private val clientSettingsService: ClientSettingsService, + private val platformClientConnectionService: PlatformClientConnectionService, private val properties: AxoniqPlatformConfiguration -) : ClientSettingsObserver { +) : PlatformClientConnectionObserver { private var reportTask: ScheduledFuture<*>? = null private val logger = KotlinLogging.logger { } private val executor = properties.reportingTaskExecutor init { - clientSettingsService.subscribeToSettings(this) + platformClientConnectionService.subscribeToSettings(this) } - override fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) { + override fun onConnected(clientStatus: ClientStatus, settings: ClientSettingsV2) { if (!clientStatus.enabled || reportTask != null) { return } diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt index eb5f104..559adde 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt @@ -32,8 +32,8 @@ import io.axoniq.platform.framework.api.metrics.Metric import io.axoniq.platform.framework.api.metrics.MetricTargetType import io.axoniq.platform.framework.api.metrics.StatisticReport import io.axoniq.platform.framework.client.AxoniqConsoleRSocketClient -import io.axoniq.platform.framework.client.ClientSettingsObserver -import io.axoniq.platform.framework.client.ClientSettingsService +import io.axoniq.platform.framework.client.PlatformClientConnectionObserver +import io.axoniq.platform.framework.client.PlatformClientConnectionService import io.axoniq.platform.framework.computeIfAbsentWithRetry import io.github.oshai.kotlinlogging.KotlinLogging import io.micrometer.core.instrument.Timer @@ -45,9 +45,9 @@ import java.util.concurrent.TimeUnit class HandlerMetricsRegistry( private val axoniqConsoleRSocketClient: AxoniqConsoleRSocketClient, - private val clientSettingsService: ClientSettingsService, + private val platformClientConnectionService: PlatformClientConnectionService, private val properties: AxoniqPlatformConfiguration -) : ClientSettingsObserver { +) : PlatformClientConnectionObserver { private val logger = KotlinLogging.logger { } private var reportTask: ScheduledFuture<*>? = null private val meterRegistry = SimpleMeterRegistry() @@ -63,10 +63,10 @@ class HandlerMetricsRegistry( ) init { - clientSettingsService.subscribeToSettings(this) + platformClientConnectionService.subscribeToSettings(this) } - override fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) { + override fun onConnected(clientStatus: ClientStatus, settings: ClientSettingsV2) { if (!clientStatus.enabled || reportTask != null) { return } diff --git a/framework-client-messaging/src/test/kotlin/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClientIntegrationTest.kt b/framework-client-messaging/src/test/kotlin/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClientIntegrationTest.kt index e4d3288..61bfc0b 100644 --- a/framework-client-messaging/src/test/kotlin/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClientIntegrationTest.kt +++ b/framework-client-messaging/src/test/kotlin/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClientIntegrationTest.kt @@ -18,6 +18,7 @@ package io.axoniq.platform.framework.client import io.axoniq.platform.framework.AxoniqPlatformConfiguration import io.axoniq.platform.framework.api.ClientSettingsV2 +import io.axoniq.platform.framework.api.ClientStatus import io.axoniq.platform.framework.api.CommandBusInformation import io.axoniq.platform.framework.api.EventStoreInformation import io.axoniq.platform.framework.api.ModuleVersion @@ -30,8 +31,10 @@ import io.mockk.mockk import org.awaitility.Awaitility.await import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.TimeUnit class AxoniqConsoleRSocketClientIntegrationTest { @@ -72,6 +75,38 @@ class AxoniqConsoleRSocketClientIntegrationTest { assertFalse(client.isConnected()) } + @Test + fun `notifies observers of INVALID_AUTHENTICATION when setup is rejected`() { + mockServer.rejectSetup = true + val settingsService = PlatformClientConnectionService() + val observer = TestObserver() + settingsService.subscribeToSettings(observer) + + client = buildClient(platformClientConnectionService = settingsService) + client.start() + + await().atMost(5, TimeUnit.SECONDS).until { observer.unreachableReasons.isNotEmpty() } + assertTrue(observer.unreachableReasons.all { + it == PlatformClientConnectionObserver.UnreachableReason.INVALID_AUTHENTICATION + }) + } + + @Test + fun `notifies observers of NO_CONNECTION when server is unreachable`() { + val settingsService = PlatformClientConnectionService() + val observer = TestObserver() + settingsService.subscribeToSettings(observer) + + // Point to a port where nothing is listening + client = buildClient(platformClientConnectionService = settingsService, port = 1) + client.start() + + await().atMost(5, TimeUnit.SECONDS).until { observer.unreachableReasons.isNotEmpty() } + assertTrue(observer.unreachableReasons.all { + it == PlatformClientConnectionObserver.UnreachableReason.NO_CONNECTION + }) + } + @Test fun `reconnects after server closes connection`() { client = buildClient() @@ -110,14 +145,17 @@ class AxoniqConsoleRSocketClientIntegrationTest { // ---- helpers ---- - private fun buildClient(): AxoniqConsoleRSocketClient { + private fun buildClient( + platformClientConnectionService: PlatformClientConnectionService = PlatformClientConnectionService(), + port: Int = mockServer.port, + ): AxoniqConsoleRSocketClient { val encodingStrategy = CborJackson2EncodingStrategy() val setupPayloadCreator = mockk() every { setupPayloadCreator.createReport() } returns minimalSetupPayload() val config = AxoniqPlatformConfiguration("test-env", "test-token", "test-app") .host("localhost") - .port(mockServer.port) + .port(port) .secure(false) return AxoniqConsoleRSocketClient( @@ -125,7 +163,7 @@ class AxoniqConsoleRSocketClientIntegrationTest { setupPayloadCreator = setupPayloadCreator, registrar = RSocketHandlerRegistrar(encodingStrategy), encodingStrategy = encodingStrategy, - clientSettingsService = ClientSettingsService(), + platformClientConnectionService = platformClientConnectionService, instanceName = "test-instance" ) } @@ -147,4 +185,14 @@ class AxoniqConsoleRSocketClientIntegrationTest { versions = Versions(frameworkVersion = "test", moduleVersions = emptyList()), upcasters = emptyList(), ) + + private class TestObserver : PlatformClientConnectionObserver { + val unreachableReasons = CopyOnWriteArrayList() + + override fun onConnected(clientStatus: ClientStatus, settings: ClientSettingsV2) {} + override fun onDisconnected() {} + override fun onUnreachable(reason: PlatformClientConnectionObserver.UnreachableReason) { + unreachableReasons.add(reason) + } + } } diff --git a/framework-client-messaging/src/test/kotlin/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClientToxiproxyIntegrationTest.kt b/framework-client-messaging/src/test/kotlin/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClientToxiproxyIntegrationTest.kt index b508658..8627798 100644 --- a/framework-client-messaging/src/test/kotlin/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClientToxiproxyIntegrationTest.kt +++ b/framework-client-messaging/src/test/kotlin/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClientToxiproxyIntegrationTest.kt @@ -223,7 +223,7 @@ class AxoniqConsoleRSocketClientToxiproxyIntegrationTest { setupPayloadCreator = setupPayloadCreator, registrar = RSocketHandlerRegistrar(encodingStrategy), encodingStrategy = encodingStrategy, - clientSettingsService = ClientSettingsService(), + platformClientConnectionService = PlatformClientConnectionService(), instanceName = "test-instance" ) } diff --git a/pom.xml b/pom.xml index 47396ea..a7f9cd8 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ 2.2.20 2.0.0 - 5.1.0-RC2 + 5.1.0-SNAPSHOT 3.5.7 @@ -234,6 +234,19 @@ + + + central-snapshots + https://central.sonatype.com/repository/maven-snapshots/ + + false + + + true + + + + scm:git:git://github.com/AxonIQ/platform-framework-client.git scm:git:git@github.com:AxonIQ/platform-framework-client.git