package com.soulplatform.sdk.communication.messages.data.ws;

import com.google.gson.Gson;
import com.soulplatform.sdk.auth.data.AuthDataStorage;
import com.soulplatform.sdk.common.data.ws.ConnectionState;
import com.soulplatform.sdk.common.data.ws.WebSocket;
import com.soulplatform.sdk.common.data.ws.WebSocketListener;
import com.soulplatform.sdk.common.data.ws.impl.raw.ConnectionRestorer;
import com.soulplatform.sdk.common.di.WebSocketModuleKt;
import com.soulplatform.sdk.common.domain.Logger;
import com.soulplatform.sdk.common.domain.SoulLogger;
import com.soulplatform.sdk.common.error.ConnectionException;
import com.soulplatform.sdk.communication.messages.data.MessageWrapper;
import com.soulplatform.sdk.communication.messages.data.MessagesSource;
import com.soulplatform.sdk.communication.messages.data.model.AcknowledgmentMessageRaw;
import com.soulplatform.sdk.communication.messages.data.model.EventPayload;
import com.soulplatform.sdk.communication.messages.data.model.FailedMessageRaw;
import com.soulplatform.sdk.communication.messages.data.model.MessagePayload;
import com.soulplatform.sdk.communication.messages.data.model.MessageRaw;
import com.soulplatform.sdk.communication.messages.data.model.MessagesMapperExtKt;
import com.soulplatform.sdk.communication.messages.data.model.Payload;
import com.soulplatform.sdk.communication.messages.data.model.SystemMessageRaw;
import com.soulplatform.sdk.communication.messages.data.model.UserMessageRaw;
import com.soulplatform.sdk.communication.messages.domain.model.GetMessagesParams;
import com.soulplatform.sdk.communication.messages.domain.model.messages.Message;
import com.soulplatform.sdk.communication.messages.domain.model.messages.UserMessage;
import io.reactivex.Completable;
import io.reactivex.CompletableSource;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.PublishSubject;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import kotlin.Unit;
import kotlin.jvm.functions.Function1;

/* compiled from: WSMessagesSource.kt */
/* loaded from: classes3.dex */
public final class WSMessagesSource implements MessagesSource {
    private final AuthDataStorage authStorage;
    private final ConnectionRestorer connectionRestorer;
    private final BehaviorSubject<ConnectionState> connectionStateSubject;
    private Disposable disconnectWsDisposable;
    private final Gson gson;
    private final HistoryRetriever historyRetriever;
    private final PublishSubject<MessageWrapper> messagesSubject;
    private final PublishSubject<SocketEvent> responseSubject;
    private final WebSocket webSocket;

    /* compiled from: WSMessagesSource.kt */
    /* loaded from: classes3.dex */
    private final class WSListener implements WebSocketListener {
        public WSListener() {
        }

        private final void handleIncomingEvent(EventPayload eventPayload) {
            MessageRaw event = eventPayload.getEvent();
            if (!(event instanceof AcknowledgmentMessageRaw ? true : event instanceof FailedMessageRaw)) {
                WSMessagesSource.this.messagesSubject.onNext(new MessageWrapper(eventPayload.getChatId(), MessagesMapperExtKt.mapToMessage(WSMessagesSource.this.authStorage.getUserId(), event)));
                return;
            }
            PublishSubject publishSubject = WSMessagesSource.this.responseSubject;
            kotlin.jvm.internal.j.e(event, "null cannot be cast to non-null type com.soulplatform.sdk.communication.messages.data.model.SystemMessageRaw");
            publishSubject.onNext(new SocketEvent((SystemMessageRaw) event));
        }

        private final void handleIncomingMessage(MessagePayload messagePayload) {
            WSMessagesSource.this.messagesSubject.onNext(new MessageWrapper(messagePayload.getChatId(), MessagesMapperExtKt.mapToMessage(WSMessagesSource.this.authStorage.getUserId(), messagePayload.getMessage())));
        }

        @Override // com.soulplatform.sdk.common.data.ws.WebSocketListener
        public void onConnectionStateChanged(ConnectionState state) {
            kotlin.jvm.internal.j.g(state, "state");
            if (state instanceof ConnectionState.DISCONNECTED) {
                PublishSubject publishSubject = WSMessagesSource.this.responseSubject;
                Throwable error = ((ConnectionState.DISCONNECTED) state).getError();
                if (error == null) {
                    error = new IllegalStateException("WS disconnected");
                }
                publishSubject.onNext(new SocketEvent(error));
            }
            WSMessagesSource.this.connectionStateSubject.onNext(state);
        }

        @Override // com.soulplatform.sdk.common.data.ws.WebSocketListener
        public void onMessageReceived(String message) {
            kotlin.jvm.internal.j.g(message, "message");
            Payload payload = (Payload) WSMessagesSource.this.gson.fromJson(message, Payload.class);
            if (payload instanceof MessagePayload) {
                handleIncomingMessage((MessagePayload) payload);
            } else if (payload instanceof EventPayload) {
                handleIncomingEvent((EventPayload) payload);
            }
        }
    }

    public WSMessagesSource(Gson gson, AuthDataStorage authStorage, WebSocket webSocket, ConnectionRestorer connectionRestorer, HistoryRetriever historyRetriever) {
        kotlin.jvm.internal.j.g(gson, "gson");
        kotlin.jvm.internal.j.g(authStorage, "authStorage");
        kotlin.jvm.internal.j.g(webSocket, "webSocket");
        kotlin.jvm.internal.j.g(connectionRestorer, "connectionRestorer");
        kotlin.jvm.internal.j.g(historyRetriever, "historyRetriever");
        this.gson = gson;
        this.authStorage = authStorage;
        this.webSocket = webSocket;
        this.connectionRestorer = connectionRestorer;
        this.historyRetriever = historyRetriever;
        PublishSubject<MessageWrapper> create = PublishSubject.create();
        kotlin.jvm.internal.j.f(create, "create<MessageWrapper>()");
        this.messagesSubject = create;
        PublishSubject<SocketEvent> create2 = PublishSubject.create();
        kotlin.jvm.internal.j.f(create2, "create<SocketEvent>()");
        this.responseSubject = create2;
        BehaviorSubject<ConnectionState> create3 = BehaviorSubject.create();
        kotlin.jvm.internal.j.f(create3, "create<ConnectionState>()");
        this.connectionStateSubject = create3;
        webSocket.addListener(new WSListener());
    }

    private final ObservableTransformer<SocketEvent, SystemMessageRaw> getResponseOrThrowError() {
        return new ObservableTransformer() { // from class: com.soulplatform.sdk.communication.messages.data.ws.c
            @Override // io.reactivex.ObservableTransformer
            public final ObservableSource apply(Observable observable) {
                ObservableSource responseOrThrowError$lambda$7;
                responseOrThrowError$lambda$7 = WSMessagesSource.getResponseOrThrowError$lambda$7(observable);
                return responseOrThrowError$lambda$7;
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final ObservableSource getResponseOrThrowError$lambda$7(Observable it) {
        kotlin.jvm.internal.j.g(it, "it");
        final WSMessagesSource$getResponseOrThrowError$1$1 wSMessagesSource$getResponseOrThrowError$1$1 = new Function1<SocketEvent, ObservableSource<? extends SystemMessageRaw>>() { // from class: com.soulplatform.sdk.communication.messages.data.ws.WSMessagesSource$getResponseOrThrowError$1$1
            @Override // kotlin.jvm.functions.Function1
            public final ObservableSource<? extends SystemMessageRaw> invoke(SocketEvent socketEvent) {
                kotlin.jvm.internal.j.g(socketEvent, "socketEvent");
                if (socketEvent.isError()) {
                    return Observable.error(socketEvent.getError());
                }
                SystemMessageRaw event = socketEvent.getEvent();
                kotlin.jvm.internal.j.d(event);
                return Observable.just(event);
            }
        };
        return it.flatMap(new Function() { // from class: com.soulplatform.sdk.communication.messages.data.ws.j
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                ObservableSource responseOrThrowError$lambda$7$lambda$6;
                responseOrThrowError$lambda$7$lambda$6 = WSMessagesSource.getResponseOrThrowError$lambda$7$lambda$6(Function1.this, obj);
                return responseOrThrowError$lambda$7$lambda$6;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final ObservableSource getResponseOrThrowError$lambda$7$lambda$6(Function1 tmp0, Object obj) {
        kotlin.jvm.internal.j.g(tmp0, "$tmp0");
        return (ObservableSource) tmp0.invoke(obj);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void observeMessages$lambda$0(Function1 tmp0, Object obj) {
        kotlin.jvm.internal.j.g(tmp0, "$tmp0");
        tmp0.invoke(obj);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void observeMessages$lambda$1(WSMessagesSource this$0) {
        kotlin.jvm.internal.j.g(this$0, "this$0");
        if (this$0.messagesSubject.hasObservers()) {
            return;
        }
        this$0.scheduleWsDisconnect();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void performMessageSending(String str, Message message) {
        Object eventPayload;
        MessageRaw messageRaw = MessagesMapperExtKt.toMessageRaw(message);
        if (messageRaw instanceof UserMessageRaw) {
            eventPayload = new MessagePayload(str, messageRaw);
        } else {
            if (!(messageRaw instanceof SystemMessageRaw)) {
                throw new IllegalArgumentException("Can't send message: " + message);
            }
            eventPayload = new EventPayload(str, messageRaw);
        }
        String payloadJson = this.gson.toJson(eventPayload);
        WebSocket webSocket = this.webSocket;
        kotlin.jvm.internal.j.f(payloadJson, "payloadJson");
        webSocket.send(payloadJson);
    }

    private final void scheduleWsDisconnect() {
        SoulLogger.INSTANCE.d(WebSocketModuleKt.TAG_WS, "Disconnection scheduled");
        Observable<Long> timer = Observable.timer(5L, TimeUnit.SECONDS);
        final Function1<Long, Unit> function1 = new Function1<Long, Unit>() { // from class: com.soulplatform.sdk.communication.messages.data.ws.WSMessagesSource$scheduleWsDisconnect$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(1);
            }

            @Override // kotlin.jvm.functions.Function1
            public /* bridge */ /* synthetic */ Unit invoke(Long l10) {
                invoke2(l10);
                return Unit.f41326a;
            }

            /* renamed from: invoke, reason: avoid collision after fix types in other method */
            public final void invoke2(Long l10) {
                WSMessagesSource.this.disconnect();
            }
        };
        Consumer<? super Long> consumer = new Consumer() { // from class: com.soulplatform.sdk.communication.messages.data.ws.k
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                WSMessagesSource.scheduleWsDisconnect$lambda$8(Function1.this, obj);
            }
        };
        final WSMessagesSource$scheduleWsDisconnect$2 wSMessagesSource$scheduleWsDisconnect$2 = new Function1<Throwable, Unit>() { // from class: com.soulplatform.sdk.communication.messages.data.ws.WSMessagesSource$scheduleWsDisconnect$2
            @Override // kotlin.jvm.functions.Function1
            public /* bridge */ /* synthetic */ Unit invoke(Throwable th2) {
                invoke2(th2);
                return Unit.f41326a;
            }

            /* renamed from: invoke, reason: avoid collision after fix types in other method */
            public final void invoke2(Throwable th2) {
                Logger.DefaultImpls.e$default(SoulLogger.INSTANCE, WebSocketModuleKt.TAG_WS, null, "Disconnecting by timer failed", th2, 2, null);
            }
        };
        this.disconnectWsDisposable = timer.subscribe(consumer, new Consumer() { // from class: com.soulplatform.sdk.communication.messages.data.ws.l
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                WSMessagesSource.scheduleWsDisconnect$lambda$9(Function1.this, obj);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void scheduleWsDisconnect$lambda$8(Function1 tmp0, Object obj) {
        kotlin.jvm.internal.j.g(tmp0, "$tmp0");
        tmp0.invoke(obj);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void scheduleWsDisconnect$lambda$9(Function1 tmp0, Object obj) {
        kotlin.jvm.internal.j.g(tmp0, "$tmp0");
        tmp0.invoke(obj);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final CompletableSource sendMessage$lambda$3(final Message message, final WSMessagesSource this$0, final String chatId) {
        kotlin.jvm.internal.j.g(message, "$message");
        kotlin.jvm.internal.j.g(this$0, "this$0");
        kotlin.jvm.internal.j.g(chatId, "$chatId");
        Completable waitForServerResponse = message instanceof UserMessage ? this$0.waitForServerResponse((UserMessage) message) : Completable.complete();
        final Function1<Disposable, Unit> function1 = new Function1<Disposable, Unit>() { // from class: com.soulplatform.sdk.communication.messages.data.ws.WSMessagesSource$sendMessage$1$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            @Override // kotlin.jvm.functions.Function1
            public /* bridge */ /* synthetic */ Unit invoke(Disposable disposable) {
                invoke2(disposable);
                return Unit.f41326a;
            }

            /* renamed from: invoke, reason: avoid collision after fix types in other method */
            public final void invoke2(Disposable disposable) {
                WSMessagesSource.this.performMessageSending(chatId, message);
            }
        };
        return waitForServerResponse.doOnSubscribe(new Consumer() { // from class: com.soulplatform.sdk.communication.messages.data.ws.h
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                WSMessagesSource.sendMessage$lambda$3$lambda$2(Function1.this, obj);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void sendMessage$lambda$3$lambda$2(Function1 tmp0, Object obj) {
        kotlin.jvm.internal.j.g(tmp0, "$tmp0");
        tmp0.invoke(obj);
    }

    private final Completable waitForServerResponse(UserMessage userMessage) {
        final String id2 = userMessage.getId();
        Observable<R> compose = this.responseSubject.compose(getResponseOrThrowError());
        final Function1<SystemMessageRaw, Boolean> function1 = new Function1<SystemMessageRaw, Boolean>() { // from class: com.soulplatform.sdk.communication.messages.data.ws.WSMessagesSource$waitForServerResponse$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            @Override // kotlin.jvm.functions.Function1
            public final Boolean invoke(SystemMessageRaw it) {
                kotlin.jvm.internal.j.g(it, "it");
                boolean z10 = true;
                boolean z11 = (it instanceof AcknowledgmentMessageRaw) && kotlin.jvm.internal.j.b(((AcknowledgmentMessageRaw) it).getAcknowledgmentData().getId(), id2);
                boolean z12 = (it instanceof FailedMessageRaw) && kotlin.jvm.internal.j.b(((FailedMessageRaw) it).getFailedData().getId(), id2);
                if (!z11 && !z12) {
                    z10 = false;
                }
                return Boolean.valueOf(z10);
            }
        };
        Observable filter = compose.filter(new Predicate() { // from class: com.soulplatform.sdk.communication.messages.data.ws.f
            @Override // io.reactivex.functions.Predicate
            public final boolean test(Object obj) {
                boolean waitForServerResponse$lambda$4;
                waitForServerResponse$lambda$4 = WSMessagesSource.waitForServerResponse$lambda$4(Function1.this, obj);
                return waitForServerResponse$lambda$4;
            }
        });
        final WSMessagesSource$waitForServerResponse$2 wSMessagesSource$waitForServerResponse$2 = new Function1<SystemMessageRaw, Unit>() { // from class: com.soulplatform.sdk.communication.messages.data.ws.WSMessagesSource$waitForServerResponse$2
            @Override // kotlin.jvm.functions.Function1
            public /* bridge */ /* synthetic */ Unit invoke(SystemMessageRaw systemMessageRaw) {
                invoke2(systemMessageRaw);
                return Unit.f41326a;
            }

            /* renamed from: invoke, reason: avoid collision after fix types in other method */
            public final void invoke2(SystemMessageRaw it) {
                kotlin.jvm.internal.j.g(it, "it");
                if (it instanceof FailedMessageRaw) {
                    throw new IllegalArgumentException(((FailedMessageRaw) it).getFailedData().getError());
                }
            }
        };
        Completable ignoreElement = filter.map(new Function() { // from class: com.soulplatform.sdk.communication.messages.data.ws.g
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                Unit waitForServerResponse$lambda$5;
                waitForServerResponse$lambda$5 = WSMessagesSource.waitForServerResponse$lambda$5(Function1.this, obj);
                return waitForServerResponse$lambda$5;
            }
        }).timeout(10L, TimeUnit.SECONDS, Observable.error(new ConnectionException.ServerNotRespondingException(null, 1, null))).firstOrError().ignoreElement();
        kotlin.jvm.internal.j.f(ignoreElement, "messageId = message.id\n …         .ignoreElement()");
        return ignoreElement;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final boolean waitForServerResponse$lambda$4(Function1 tmp0, Object obj) {
        kotlin.jvm.internal.j.g(tmp0, "$tmp0");
        return ((Boolean) tmp0.invoke(obj)).booleanValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final Unit waitForServerResponse$lambda$5(Function1 tmp0, Object obj) {
        kotlin.jvm.internal.j.g(tmp0, "$tmp0");
        return (Unit) tmp0.invoke(obj);
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public void disconnect() {
        Disposable disposable = this.disconnectWsDisposable;
        if (disposable != null) {
            disposable.dispose();
        }
        this.connectionRestorer.stop();
        this.webSocket.disconnect();
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public Single<List<Message>> getHistory(String chatId, GetMessagesParams params) {
        kotlin.jvm.internal.j.g(chatId, "chatId");
        kotlin.jvm.internal.j.g(params, "params");
        return this.historyRetriever.getHistory(chatId, params);
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public Single<Map<String, Message>> getMessages(String chatId, List<String> messageIds) {
        kotlin.jvm.internal.j.g(chatId, "chatId");
        kotlin.jvm.internal.j.g(messageIds, "messageIds");
        return this.historyRetriever.getMessages(chatId, messageIds);
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public Observable<ConnectionState> observeConnectionState() {
        return this.connectionStateSubject;
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public Observable<MessageWrapper> observeMessages() {
        PublishSubject<MessageWrapper> publishSubject = this.messagesSubject;
        final Function1<Disposable, Unit> function1 = new Function1<Disposable, Unit>() { // from class: com.soulplatform.sdk.communication.messages.data.ws.WSMessagesSource$observeMessages$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(1);
            }

            @Override // kotlin.jvm.functions.Function1
            public /* bridge */ /* synthetic */ Unit invoke(Disposable disposable) {
                invoke2(disposable);
                return Unit.f41326a;
            }

            /* renamed from: invoke, reason: avoid collision after fix types in other method */
            public final void invoke2(Disposable disposable) {
                Disposable disposable2;
                ConnectionRestorer connectionRestorer;
                WebSocket webSocket;
                disposable2 = WSMessagesSource.this.disconnectWsDisposable;
                if (disposable2 != null) {
                    disposable2.dispose();
                }
                connectionRestorer = WSMessagesSource.this.connectionRestorer;
                connectionRestorer.start();
                webSocket = WSMessagesSource.this.webSocket;
                webSocket.connect();
            }
        };
        Observable<MessageWrapper> doFinally = publishSubject.doOnSubscribe(new Consumer() { // from class: com.soulplatform.sdk.communication.messages.data.ws.d
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                WSMessagesSource.observeMessages$lambda$0(Function1.this, obj);
            }
        }).doFinally(new Action() { // from class: com.soulplatform.sdk.communication.messages.data.ws.e
            @Override // io.reactivex.functions.Action
            public final void run() {
                WSMessagesSource.observeMessages$lambda$1(WSMessagesSource.this);
            }
        });
        kotlin.jvm.internal.j.f(doFinally, "override fun observeMess…    }\n            }\n    }");
        return doFinally;
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public Completable sendMessage(final String chatId, final Message message) {
        kotlin.jvm.internal.j.g(chatId, "chatId");
        kotlin.jvm.internal.j.g(message, "message");
        Completable defer = Completable.defer(new Callable() { // from class: com.soulplatform.sdk.communication.messages.data.ws.i
            @Override // java.util.concurrent.Callable
            public final Object call() {
                CompletableSource sendMessage$lambda$3;
                sendMessage$lambda$3 = WSMessagesSource.sendMessage$lambda$3(Message.this, this, chatId);
                return sendMessage$lambda$3;
            }
        });
        kotlin.jvm.internal.j.f(defer, "defer {\n            if (…tId, message) }\n        }");
        return defer;
    }
}
