package com.soulplatform.sdk.communication.messages.data.ws;

import com.google.gson.Gson;
import com.soulplatform.sdk.auth.data.AuthDataStorage;
import com.soulplatform.sdk.common.data.ws.ConnectionState;
import com.soulplatform.sdk.common.data.ws.WebSocket;
import com.soulplatform.sdk.common.data.ws.WebSocketListener;
import com.soulplatform.sdk.common.data.ws.impl.raw.ConnectionRestorer;
import com.soulplatform.sdk.common.di.WebSocketModuleKt;
import com.soulplatform.sdk.common.domain.Logger;
import com.soulplatform.sdk.common.domain.SoulLogger;
import com.soulplatform.sdk.common.error.ConnectionException;
import com.soulplatform.sdk.communication.chats.domain.model.Chat;
import com.soulplatform.sdk.communication.messages.data.MessageWrapper;
import com.soulplatform.sdk.communication.messages.data.MessagesSource;
import com.soulplatform.sdk.communication.messages.data.model.AcknowledgmentMessageRaw;
import com.soulplatform.sdk.communication.messages.data.model.EventPayload;
import com.soulplatform.sdk.communication.messages.data.model.FailedMessageRaw;
import com.soulplatform.sdk.communication.messages.data.model.MessagePayload;
import com.soulplatform.sdk.communication.messages.data.model.MessageRaw;
import com.soulplatform.sdk.communication.messages.data.model.MessagesMapperExtKt;
import com.soulplatform.sdk.communication.messages.data.model.Payload;
import com.soulplatform.sdk.communication.messages.data.model.SystemMessageRaw;
import com.soulplatform.sdk.communication.messages.data.model.UserMessageRaw;
import com.soulplatform.sdk.communication.messages.domain.model.GetMessagesParams;
import com.soulplatform.sdk.communication.messages.domain.model.messages.Message;
import com.soulplatform.sdk.communication.messages.domain.model.messages.UserMessage;
import io.reactivex.Completable;
import io.reactivex.CompletableSource;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.PublishSubject;
import ir.p;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/* compiled from: WSMessagesSource.kt */
/* loaded from: classes3.dex */
public final class WSMessagesSource implements MessagesSource {
    private final AuthDataStorage authStorage;
    private final ConnectionRestorer connectionRestorer;
    private final BehaviorSubject<ConnectionState> connectionStateSubject;
    private Disposable disconnectWsDisposable;
    private final Gson gson;
    private final HistoryRetriever historyRetriever;
    private final PublishSubject<MessageWrapper> messagesSubject;
    private final PublishSubject<SocketEvent> responseSubject;
    private final WebSocket webSocket;

    /* compiled from: WSMessagesSource.kt */
    /* loaded from: classes3.dex */
    private final class WSListener implements WebSocketListener {
        public WSListener() {
        }

        private final void handleIncomingEvent(EventPayload eventPayload) {
            MessageRaw event = eventPayload.getEvent();
            if (!(event instanceof AcknowledgmentMessageRaw ? true : event instanceof FailedMessageRaw)) {
                WSMessagesSource.this.messagesSubject.onNext(new MessageWrapper(eventPayload.getChannel(), MessagesMapperExtKt.mapToMessage(WSMessagesSource.this.authStorage.getUserId(), event)));
                return;
            }
            PublishSubject publishSubject = WSMessagesSource.this.responseSubject;
            kotlin.jvm.internal.l.e(event, "null cannot be cast to non-null type com.soulplatform.sdk.communication.messages.data.model.SystemMessageRaw");
            publishSubject.onNext(new SocketEvent((SystemMessageRaw) event));
        }

        private final void handleIncomingMessage(MessagePayload messagePayload) {
            WSMessagesSource.this.messagesSubject.onNext(new MessageWrapper(messagePayload.getChannel(), MessagesMapperExtKt.mapToMessage(WSMessagesSource.this.authStorage.getUserId(), messagePayload.getMessage())));
        }

        @Override // com.soulplatform.sdk.common.data.ws.WebSocketListener
        public void onConnectionStateChanged(ConnectionState state) {
            kotlin.jvm.internal.l.g(state, "state");
            if (state instanceof ConnectionState.DISCONNECTED) {
                PublishSubject publishSubject = WSMessagesSource.this.responseSubject;
                Throwable error = ((ConnectionState.DISCONNECTED) state).getError();
                if (error == null) {
                    error = new IllegalStateException("WS disconnected");
                }
                publishSubject.onNext(new SocketEvent(error));
            }
            WSMessagesSource.this.connectionStateSubject.onNext(state);
        }

        @Override // com.soulplatform.sdk.common.data.ws.WebSocketListener
        public void onMessageReceived(String message) {
            kotlin.jvm.internal.l.g(message, "message");
            Payload payload = (Payload) WSMessagesSource.this.gson.fromJson(message, Payload.class);
            if (payload instanceof MessagePayload) {
                handleIncomingMessage((MessagePayload) payload);
            } else if (payload instanceof EventPayload) {
                handleIncomingEvent((EventPayload) payload);
            }
        }
    }

    public WSMessagesSource(Gson gson, AuthDataStorage authStorage, WebSocket webSocket, ConnectionRestorer connectionRestorer, HistoryRetriever historyRetriever) {
        kotlin.jvm.internal.l.g(gson, "gson");
        kotlin.jvm.internal.l.g(authStorage, "authStorage");
        kotlin.jvm.internal.l.g(webSocket, "webSocket");
        kotlin.jvm.internal.l.g(connectionRestorer, "connectionRestorer");
        kotlin.jvm.internal.l.g(historyRetriever, "historyRetriever");
        this.gson = gson;
        this.authStorage = authStorage;
        this.webSocket = webSocket;
        this.connectionRestorer = connectionRestorer;
        this.historyRetriever = historyRetriever;
        PublishSubject<MessageWrapper> create = PublishSubject.create();
        kotlin.jvm.internal.l.f(create, "create<MessageWrapper>()");
        this.messagesSubject = create;
        PublishSubject<SocketEvent> create2 = PublishSubject.create();
        kotlin.jvm.internal.l.f(create2, "create<SocketEvent>()");
        this.responseSubject = create2;
        BehaviorSubject<ConnectionState> create3 = BehaviorSubject.create();
        kotlin.jvm.internal.l.f(create3, "create<ConnectionState>()");
        this.connectionStateSubject = create3;
        webSocket.addListener(new WSListener());
    }

    private final ObservableTransformer<SocketEvent, SystemMessageRaw> getResponseOrThrowError() {
        return new ObservableTransformer() { // from class: com.soulplatform.sdk.communication.messages.data.ws.d
            @Override // io.reactivex.ObservableTransformer
            public final ObservableSource apply(Observable observable) {
                ObservableSource m112getResponseOrThrowError$lambda7;
                m112getResponseOrThrowError$lambda7 = WSMessagesSource.m112getResponseOrThrowError$lambda7(observable);
                return m112getResponseOrThrowError$lambda7;
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* renamed from: getResponseOrThrowError$lambda-7, reason: not valid java name */
    public static final ObservableSource m112getResponseOrThrowError$lambda7(Observable it) {
        kotlin.jvm.internal.l.g(it, "it");
        return it.flatMap(new Function() { // from class: com.soulplatform.sdk.communication.messages.data.ws.k
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                ObservableSource m113getResponseOrThrowError$lambda7$lambda6;
                m113getResponseOrThrowError$lambda7$lambda6 = WSMessagesSource.m113getResponseOrThrowError$lambda7$lambda6((SocketEvent) obj);
                return m113getResponseOrThrowError$lambda7$lambda6;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* renamed from: getResponseOrThrowError$lambda-7$lambda-6, reason: not valid java name */
    public static final ObservableSource m113getResponseOrThrowError$lambda7$lambda6(SocketEvent socketEvent) {
        kotlin.jvm.internal.l.g(socketEvent, "socketEvent");
        if (socketEvent.isError()) {
            return Observable.error(socketEvent.getError());
        }
        SystemMessageRaw event = socketEvent.getEvent();
        kotlin.jvm.internal.l.d(event);
        return Observable.just(event);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* renamed from: observeMessages$lambda-0, reason: not valid java name */
    public static final void m114observeMessages$lambda0(WSMessagesSource this$0, Disposable disposable) {
        kotlin.jvm.internal.l.g(this$0, "this$0");
        Disposable disposable2 = this$0.disconnectWsDisposable;
        if (disposable2 != null) {
            disposable2.dispose();
        }
        this$0.connectionRestorer.start();
        this$0.webSocket.connect();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* renamed from: observeMessages$lambda-1, reason: not valid java name */
    public static final void m115observeMessages$lambda1(WSMessagesSource this$0) {
        kotlin.jvm.internal.l.g(this$0, "this$0");
        if (this$0.messagesSubject.hasObservers()) {
            return;
        }
        this$0.scheduleWsDisconnect();
    }

    private final void performMessageSending(String str, Message message) {
        Object eventPayload;
        MessageRaw messageRaw = MessagesMapperExtKt.toMessageRaw(message);
        if (messageRaw instanceof UserMessageRaw) {
            eventPayload = new MessagePayload(str, messageRaw);
        } else {
            if (!(messageRaw instanceof SystemMessageRaw)) {
                throw new IllegalArgumentException("Can't send message: " + message);
            }
            eventPayload = new EventPayload(str, messageRaw);
        }
        String payloadJson = this.gson.toJson(eventPayload);
        WebSocket webSocket = this.webSocket;
        kotlin.jvm.internal.l.f(payloadJson, "payloadJson");
        webSocket.send(payloadJson);
    }

    private final void scheduleWsDisconnect() {
        SoulLogger.INSTANCE.d(WebSocketModuleKt.TAG_WS, "Disconnection scheduled");
        this.disconnectWsDisposable = Observable.timer(5L, TimeUnit.SECONDS).subscribe(new Consumer() { // from class: com.soulplatform.sdk.communication.messages.data.ws.g
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                WSMessagesSource.m116scheduleWsDisconnect$lambda8(WSMessagesSource.this, (Long) obj);
            }
        }, new Consumer() { // from class: com.soulplatform.sdk.communication.messages.data.ws.i
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                WSMessagesSource.m117scheduleWsDisconnect$lambda9((Throwable) obj);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* renamed from: scheduleWsDisconnect$lambda-8, reason: not valid java name */
    public static final void m116scheduleWsDisconnect$lambda8(WSMessagesSource this$0, Long l10) {
        kotlin.jvm.internal.l.g(this$0, "this$0");
        this$0.disconnect();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* renamed from: scheduleWsDisconnect$lambda-9, reason: not valid java name */
    public static final void m117scheduleWsDisconnect$lambda9(Throwable th2) {
        Logger.DefaultImpls.e$default(SoulLogger.INSTANCE, WebSocketModuleKt.TAG_WS, null, "Disconnecting by timer failed", th2, 2, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* renamed from: sendMessage$lambda-3, reason: not valid java name */
    public static final CompletableSource m118sendMessage$lambda3(final Message message, final WSMessagesSource this$0, final String channelName) {
        kotlin.jvm.internal.l.g(message, "$message");
        kotlin.jvm.internal.l.g(this$0, "this$0");
        kotlin.jvm.internal.l.g(channelName, "$channelName");
        return (message instanceof UserMessage ? this$0.waitForServerResponse((UserMessage) message) : Completable.complete()).doOnSubscribe(new Consumer() { // from class: com.soulplatform.sdk.communication.messages.data.ws.h
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                WSMessagesSource.m119sendMessage$lambda3$lambda2(WSMessagesSource.this, channelName, message, (Disposable) obj);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* renamed from: sendMessage$lambda-3$lambda-2, reason: not valid java name */
    public static final void m119sendMessage$lambda3$lambda2(WSMessagesSource this$0, String channelName, Message message, Disposable disposable) {
        kotlin.jvm.internal.l.g(this$0, "this$0");
        kotlin.jvm.internal.l.g(channelName, "$channelName");
        kotlin.jvm.internal.l.g(message, "$message");
        this$0.performMessageSending(channelName, message);
    }

    private final Completable waitForServerResponse(UserMessage userMessage) {
        final String id2 = userMessage.getId();
        Completable ignoreElement = this.responseSubject.compose(getResponseOrThrowError()).filter(new Predicate() { // from class: com.soulplatform.sdk.communication.messages.data.ws.l
            @Override // io.reactivex.functions.Predicate
            public final boolean test(Object obj) {
                boolean m120waitForServerResponse$lambda4;
                m120waitForServerResponse$lambda4 = WSMessagesSource.m120waitForServerResponse$lambda4(id2, (SystemMessageRaw) obj);
                return m120waitForServerResponse$lambda4;
            }
        }).map(new Function() { // from class: com.soulplatform.sdk.communication.messages.data.ws.j
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                p m121waitForServerResponse$lambda5;
                m121waitForServerResponse$lambda5 = WSMessagesSource.m121waitForServerResponse$lambda5((SystemMessageRaw) obj);
                return m121waitForServerResponse$lambda5;
            }
        }).timeout(10L, TimeUnit.SECONDS, Observable.error(new ConnectionException.ServerNotRespondingException(null, 1, null))).firstOrError().ignoreElement();
        kotlin.jvm.internal.l.f(ignoreElement, "responseSubject\n        …         .ignoreElement()");
        return ignoreElement;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* renamed from: waitForServerResponse$lambda-4, reason: not valid java name */
    public static final boolean m120waitForServerResponse$lambda4(String messageId, SystemMessageRaw it) {
        kotlin.jvm.internal.l.g(messageId, "$messageId");
        kotlin.jvm.internal.l.g(it, "it");
        return ((it instanceof AcknowledgmentMessageRaw) && kotlin.jvm.internal.l.b(((AcknowledgmentMessageRaw) it).getAcknowledgmentData().getId(), messageId)) || ((it instanceof FailedMessageRaw) && kotlin.jvm.internal.l.b(((FailedMessageRaw) it).getFailedData().getId(), messageId));
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* renamed from: waitForServerResponse$lambda-5, reason: not valid java name */
    public static final p m121waitForServerResponse$lambda5(SystemMessageRaw it) {
        kotlin.jvm.internal.l.g(it, "it");
        if (it instanceof FailedMessageRaw) {
            throw new IllegalArgumentException(((FailedMessageRaw) it).getFailedData().getError());
        }
        return p.f39788a;
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public void disconnect() {
        Disposable disposable = this.disconnectWsDisposable;
        if (disposable != null) {
            disposable.dispose();
        }
        this.connectionRestorer.stop();
        this.webSocket.disconnect();
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public Single<List<Message>> getHistory(String channelName, GetMessagesParams params) {
        kotlin.jvm.internal.l.g(channelName, "channelName");
        kotlin.jvm.internal.l.g(params, "params");
        return this.historyRetriever.getHistory(channelName, params);
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public Single<Message> getMessage(String channelName, String messageId) {
        kotlin.jvm.internal.l.g(channelName, "channelName");
        kotlin.jvm.internal.l.g(messageId, "messageId");
        return this.historyRetriever.getMessage(channelName, messageId);
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public Single<Map<String, Message>> getMessages(String channelName, List<String> messageIds) {
        kotlin.jvm.internal.l.g(channelName, "channelName");
        kotlin.jvm.internal.l.g(messageIds, "messageIds");
        return this.historyRetriever.getMessages(channelName, messageIds);
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public Observable<ConnectionState> observeConnectionState() {
        return this.connectionStateSubject;
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public Observable<MessageWrapper> observeMessages() {
        Observable<MessageWrapper> doFinally = this.messagesSubject.doOnSubscribe(new Consumer() { // from class: com.soulplatform.sdk.communication.messages.data.ws.f
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                WSMessagesSource.m114observeMessages$lambda0(WSMessagesSource.this, (Disposable) obj);
            }
        }).doFinally(new Action() { // from class: com.soulplatform.sdk.communication.messages.data.ws.e
            @Override // io.reactivex.functions.Action
            public final void run() {
                WSMessagesSource.m115observeMessages$lambda1(WSMessagesSource.this);
            }
        });
        kotlin.jvm.internal.l.f(doFinally, "messagesSubject\n        …          }\n            }");
        return doFinally;
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public Completable sendMessage(Chat chat, Message message) {
        kotlin.jvm.internal.l.g(chat, "chat");
        kotlin.jvm.internal.l.g(message, "message");
        return sendMessage(chat.getChannelName(), message);
    }

    @Override // com.soulplatform.sdk.communication.messages.data.MessagesSource
    public Completable sendMessage(final String channelName, final Message message) {
        kotlin.jvm.internal.l.g(channelName, "channelName");
        kotlin.jvm.internal.l.g(message, "message");
        Completable defer = Completable.defer(new Callable() { // from class: com.soulplatform.sdk.communication.messages.data.ws.m
            @Override // java.util.concurrent.Callable
            public final Object call() {
                CompletableSource m118sendMessage$lambda3;
                m118sendMessage$lambda3 = WSMessagesSource.m118sendMessage$lambda3(Message.this, this, channelName);
                return m118sendMessage$lambda3;
            }
        });
        kotlin.jvm.internal.l.f(defer, "defer {\n            if (…ame, message) }\n        }");
        return defer;
    }
}
