package ru.auto.data.network.common;

import android.support.v7.ake;
import java.util.concurrent.TimeUnit;
import kotlin.Unit;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Ref;
import kotlin.jvm.internal.l;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import ru.auto.data.network.common.BaseSocketService;
import ru.auto.data.network.exception.SocketUnexpectedCloseException;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;

/* loaded from: classes8.dex */
public abstract class BaseSocketService<Params, Message> {
    private static final int CODE_SOCKET_ERROR_CLOSE = 1011;
    private static final int CODE_SOCKET_NORMAL_CLOSE = 1000;
    private static final String MESSAGE_SOCKET_ERROR_CLOSE = "Didnt recieve ping";
    private static final String MESSAGE_SOCKET_NORMAL_CLOSE = "";
    private final OkHttpClient client;
    public static final Companion Companion = new Companion(null);
    private static final String TAG = BaseSocketService.class.getSimpleName();

    /* loaded from: classes8.dex */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes8.dex */
    public final class RxWebSocketListener extends WebSocketListener {
        private Subscription connectionCheckerSubscription;
        private boolean messageOccured;
        private final SerializedSubject<Message, Message> messagesSubject;
        private final Function0<Unit> socketInactiveListener;
        final /* synthetic */ BaseSocketService this$0;
        private boolean timeoutOccured;

        public RxWebSocketListener(BaseSocketService baseSocketService, Function0<Unit> function0) {
            l.b(function0, "socketInactiveListener");
            this.this$0 = baseSocketService;
            this.socketInactiveListener = function0;
            SerializedSubject<Message, Message> serializedSubject = (SerializedSubject<Message, Message>) PublishSubject.create().toSerialized();
            l.a((Object) serializedSubject, "PublishSubject.create<Message>().toSerialized()");
            this.messagesSubject = serializedSubject;
        }

        private final void handleMessage(String str) {
            try {
                Object parseMessage = this.this$0.parseMessage(str);
                if (parseMessage != null) {
                    this.messagesSubject.onNext(parseMessage);
                }
            } catch (Exception unused) {
            }
        }

        public final Observable<Message> getMessages() {
            return this.messagesSubject;
        }

        public final Function0<Unit> getSocketInactiveListener() {
            return this.socketInactiveListener;
        }

        @Override // okhttp3.WebSocketListener
        public void onClosed(WebSocket webSocket, int i, String str) {
            ake.a(BaseSocketService.TAG, "Socket: closed");
            Subscription subscription = this.connectionCheckerSubscription;
            if (subscription != null) {
                subscription.unsubscribe();
            }
            if (i != 1000 || this.timeoutOccured) {
                this.messagesSubject.onError(new SocketUnexpectedCloseException());
            } else {
                this.messagesSubject.onCompleted();
            }
        }

        @Override // okhttp3.WebSocketListener
        public void onFailure(WebSocket webSocket, Throwable th, Response response) {
            String str = BaseSocketService.TAG;
            StringBuilder sb = new StringBuilder();
            sb.append("Socket: failure = ");
            sb.append(th != null ? th.getMessage() : null);
            ake.a(str, sb.toString());
            Subscription subscription = this.connectionCheckerSubscription;
            if (subscription != null) {
                subscription.unsubscribe();
            }
            this.messagesSubject.onError(th);
        }

        @Override // okhttp3.WebSocketListener
        public void onMessage(WebSocket webSocket, String str) {
            ake.a(BaseSocketService.TAG, "Socket: message = " + str);
            if (str != null) {
                handleMessage(str);
            }
            this.messageOccured = true;
        }

        @Override // okhttp3.WebSocketListener
        public void onOpen(WebSocket webSocket, Response response) {
            ake.a(BaseSocketService.TAG, "Socket: open");
            this.connectionCheckerSubscription = Observable.interval(2L, TimeUnit.MINUTES).subscribeOn(Schedulers.computation()).subscribe(new Action1<Long>() { // from class: ru.auto.data.network.common.BaseSocketService$RxWebSocketListener$onOpen$1
                @Override // rx.functions.Action1
                public final void call(Long l) {
                    boolean z;
                    z = BaseSocketService.RxWebSocketListener.this.messageOccured;
                    if (z) {
                        BaseSocketService.RxWebSocketListener.this.messageOccured = false;
                    } else {
                        BaseSocketService.RxWebSocketListener.this.timeoutOccured = true;
                        BaseSocketService.RxWebSocketListener.this.getSocketInactiveListener().invoke();
                    }
                }
            });
        }
    }

    public BaseSocketService(OkHttpClient okHttpClient) {
        l.b(okHttpClient, "client");
        this.client = okHttpClient;
    }

    private final Request createConnectionRequest(Params params) {
        Request build = new Request.Builder().url(getConnectionUrl(params)).build();
        l.a((Object) build, "Request.Builder()\n      …\n                .build()");
        return build;
    }

    public final OkHttpClient getClient() {
        return this.client;
    }

    public abstract String getConnectionUrl(Params params);

    /* JADX WARN: Type inference failed for: r1v1, types: [T, okhttp3.WebSocket] */
    public final Observable<Message> getMessages(Params params) {
        final Request createConnectionRequest = createConnectionRequest(params);
        final Ref.ObjectRef objectRef = new Ref.ObjectRef();
        objectRef.a = (WebSocket) 0;
        final RxWebSocketListener rxWebSocketListener = new RxWebSocketListener(this, new BaseSocketService$getMessages$socketListener$1(objectRef));
        Observable<Message> doOnUnsubscribe = rxWebSocketListener.getMessages().doOnSubscribe(new Action0() { // from class: ru.auto.data.network.common.BaseSocketService$getMessages$1
            /* JADX WARN: Type inference failed for: r1v2, types: [T, okhttp3.WebSocket] */
            @Override // rx.functions.Action0
            public final void call() {
                objectRef.a = BaseSocketService.this.getClient().newWebSocket(createConnectionRequest, rxWebSocketListener);
            }
        }).doOnUnsubscribe(new Action0() { // from class: ru.auto.data.network.common.BaseSocketService$getMessages$2
            @Override // rx.functions.Action0
            public final void call() {
                WebSocket webSocket = (WebSocket) Ref.ObjectRef.this.a;
                if (webSocket != null) {
                    webSocket.close(1000, "");
                }
            }
        });
        l.a((Object) doOnUnsubscribe, "socketListener.getMessag…_CLOSE)\n                }");
        return doOnUnsubscribe;
    }

    public abstract Message parseMessage(String str);
}
