/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.proton.streams.impl;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonLinkOptions;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.impl.ProtonConnectionImpl;
import io.vertx.proton.impl.ProtonDeliveryImpl;
import io.vertx.proton.streams.ProtonSubscriber;
import io.vertx.proton.streams.ProtonSubscriberOptions;
import io.vertx.proton.streams.Tracker;
import io.vertx.proton.streams.impl.TrackerImpl;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.transport.Source;
import org.apache.qpid.proton.amqp.transport.Target;
import org.reactivestreams.Subscription;

public class ProtonSubscriberImpl
implements ProtonSubscriber<Tracker> {
    private static final Logger LOG = LoggerFactory.getLogger(ProtonSubscriberImpl.class);
    private Subscription sub;
    private Context connCtx;
    private ProtonConnectionImpl conn;
    private ProtonSender sender;
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private final AtomicBoolean completed = new AtomicBoolean();
    private final AtomicBoolean cancelledSub = new AtomicBoolean();
    private boolean emitOnConnectionEnd = true;
    private long outstandingRequests = 0L;

    public ProtonSubscriberImpl(String address, ProtonConnectionImpl conn) {
        this(address, conn, new ProtonSubscriberOptions());
    }

    public ProtonSubscriberImpl(String address, ProtonConnectionImpl conn, ProtonSubscriberOptions options) {
        this.connCtx = conn.getContext();
        this.conn = conn;
        ProtonLinkOptions linkOptions = new ProtonLinkOptions();
        if (options.getLinkName() != null) {
            linkOptions.setLinkName(options.getLinkName());
        }
        this.sender = conn.createSender(address, linkOptions);
        this.sender.setAutoDrained(false);
    }

    public void onSubscribe(Subscription subscription) {
        Objects.requireNonNull(subscription, "A subscription must be supplied");
        if (this.subscribed.getAndSet(true)) {
            LOG.trace("Only a single Subscription is supported and already subscribed, cancelling new subscriber.");
            subscription.cancel();
            return;
        }
        this.sub = subscription;
        this.connCtx.runOnContext(x -> {
            this.conn.addEndHandler(v -> {
                if (this.emitOnConnectionEnd) {
                    this.cancelSub();
                }
            });
            this.sender.sendQueueDrainHandler(sender -> {
                long credit;
                long newRequests;
                if (!this.completed.get() && !this.cancelledSub.get() && (newRequests = (credit = (long)sender.getCredit()) - this.outstandingRequests) > 0L) {
                    this.outstandingRequests += newRequests;
                    this.sub.request(newRequests);
                }
            });
            this.sender.detachHandler(res -> {
                this.cancelSub();
                this.sender.detach();
            });
            this.sender.closeHandler(res -> {
                this.cancelSub();
                this.sender.close();
            });
            this.sender.openHandler(res -> LOG.trace("Attach received"));
            this.sender.open();
        });
    }

    private void cancelSub() {
        if (!this.cancelledSub.getAndSet(true)) {
            this.sub.cancel();
        }
    }

    public void onNext(Tracker tracker) {
        Objects.requireNonNull(tracker, "An element must be supplied when calling onNext");
        if (!this.completed.get()) {
            this.connCtx.runOnContext(x -> {
                --this.outstandingRequests;
                TrackerImpl env = (TrackerImpl)tracker;
                ProtonDelivery delivery = this.sender.send(tracker.message(), d -> {
                    Handler<Tracker> h = env.handler();
                    if (h != null) {
                        h.handle(env);
                    }
                });
                env.setDelivery((ProtonDeliveryImpl)delivery);
            });
        }
    }

    public void onError(Throwable t) {
        Objects.requireNonNull(t, "An error must be supplied when calling onError");
        if (!this.completed.getAndSet(true)) {
            this.connCtx.runOnContext(x -> {
                this.sender.sendQueueDrainHandler(null);
                this.sender.detachHandler(null);
                this.sender.closeHandler(null);
                this.sender.close();
            });
        }
    }

    public void onComplete() {
        if (!this.completed.getAndSet(true)) {
            this.connCtx.runOnContext(x -> {
                this.sender.sendQueueDrainHandler(null);
                this.sender.detachHandler(null);
                this.sender.closeHandler(null);
                this.sender.close();
            });
        }
    }

    @Override
    public ProtonSubscriber<Tracker> setSource(Source source) {
        this.sender.setSource(source);
        return this;
    }

    @Override
    public Source getSource() {
        return this.sender.getSource();
    }

    @Override
    public ProtonSubscriber<Tracker> setTarget(Target target) {
        this.sender.setTarget(target);
        return this;
    }

    @Override
    public Target getTarget() {
        return this.sender.getTarget();
    }

    public Source getRemoteSource() {
        return this.sender.getRemoteSource();
    }

    public Target getRemoteTarget() {
        return this.sender.getRemoteTarget();
    }

    public boolean isEmitOnConnectionEnd() {
        return this.emitOnConnectionEnd;
    }

    public void setEmitOnConnectionEnd(boolean emitOnConnectionEnd) {
        this.emitOnConnectionEnd = emitOnConnectionEnd;
    }

    public ProtonSender getLink() {
        return this.sender;
    }
}

