package com.heimuheimu.naivecache.memcached.binary.channel;

import com.heimuheimu.naivecache.constant.BeanStatusEnum;
import com.heimuheimu.naivecache.memcached.binary.command.Command;
import com.heimuheimu.naivecache.memcached.binary.command.OptimizedCommand;
import com.heimuheimu.naivecache.memcached.binary.response.ResponsePacket;
import com.heimuheimu.naivecache.memcached.binary.response.ResponsePacketReader;
import com.heimuheimu.naivecache.memcached.exception.TimeoutException;
import com.heimuheimu.naivecache.memcached.monitor.SocketMonitorFactory;
import com.heimuheimu.naivecache.net.BuildSocketException;
import com.heimuheimu.naivecache.net.SocketBuilder;
import com.heimuheimu.naivecache.net.SocketConfiguration;
import com.heimuheimu.naivemonitor.monitor.SocketMonitor;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/heimuheimu/naivecache/memcached/binary/channel/MemcachedChannel.class */
public class MemcachedChannel implements Closeable {
    private static final Logger MEMCACHED_CONNECTION_LOG = LoggerFactory.getLogger("NAIVECACHE_MEMCACHED_CONNECTION_LOG");
    private static final Logger LOG = LoggerFactory.getLogger(MemcachedChannel.class);
    private final String host;
    private final Socket socket;
    private final SocketMonitor socketMonitor;
    private volatile BeanStatusEnum state = BeanStatusEnum.UNINITIALIZED;
    private final LinkedBlockingQueue<Command> commandQueue = new LinkedBlockingQueue<>();
    private IoTask ioTask = null;
    private volatile long continuousTimeoutExceptionTimes = 0;
    private volatile long lastTimeoutExceptionTime = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/heimuheimu/naivecache/memcached/binary/channel/MemcachedChannel$IoTask.class */
    public class IoTask extends Thread {
        private final ResponsePacketReader reader;
        private final int sendBufferSize;
        private volatile boolean stopSignal = false;
        private int mergedPacketSize = 0;
        private final ArrayList<Command> mergedCommandList = new ArrayList<>();
        private final LinkedList<Command> waitingQueue = new LinkedList<>();

        public IoTask(Integer num) throws IOException {
            this.sendBufferSize = num != null ? num.intValue() : 65536;
            this.reader = new ResponsePacketReader(MemcachedChannel.this.socketMonitor, MemcachedChannel.this.socket.getInputStream());
        }

        private void sendMergedPacket(OutputStream outputStream) throws IOException {
            if (this.mergedCommandList.size() <= 1) {
                if (this.mergedCommandList.size() == 1) {
                    Command command = this.mergedCommandList.get(0);
                    outputStream.write(command.getRequestByteArray());
                    MemcachedChannel.this.socketMonitor.onWritten(r0.length);
                    if (command.hasResponsePacket()) {
                        this.waitingQueue.add(command);
                    }
                    resetMergedPacket();
                    return;
                }
                return;
            }
            byte[] bArr = new byte[this.mergedPacketSize];
            int i = 0;
            ArrayList arrayList = new ArrayList();
            Iterator<Command> it = this.mergedCommandList.iterator();
            while (it.hasNext()) {
                Command next = it.next();
                boolean z = false;
                if (next instanceof OptimizedCommand) {
                    Iterator it2 = arrayList.iterator();
                    while (true) {
                        if (!it2.hasNext()) {
                            break;
                        } else if (((OptimizedCommand) it2.next()).optimize((OptimizedCommand) next)) {
                            z = true;
                            break;
                        }
                    }
                }
                if (!z) {
                    byte[] requestByteArray = next.getRequestByteArray();
                    System.arraycopy(requestByteArray, 0, bArr, i, requestByteArray.length);
                    i += requestByteArray.length;
                    if (next.hasResponsePacket()) {
                        this.waitingQueue.add(next);
                    }
                    if (next instanceof OptimizedCommand) {
                        arrayList.add((OptimizedCommand) next);
                    }
                }
            }
            outputStream.write(bArr, 0, i);
            MemcachedChannel.this.socketMonitor.onWritten(i);
            resetMergedPacket();
        }

        private void addToMergedPacket(Command command) {
            this.mergedCommandList.add(command);
            this.mergedPacketSize += command.getRequestByteArray().length;
        }

        private void resetMergedPacket() {
            this.mergedCommandList.clear();
            this.mergedPacketSize = 0;
        }

        private void releaseWaitingCommand() {
            while (true) {
                Command poll = this.waitingQueue.poll();
                if (poll == null) {
                    return;
                } else {
                    poll.close();
                }
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                OutputStream outputStream = MemcachedChannel.this.socket.getOutputStream();
                while (!this.stopSignal) {
                    Command command = (Command) MemcachedChannel.this.commandQueue.take();
                    if (command != null) {
                        byte[] requestByteArray = command.getRequestByteArray();
                        if (this.mergedPacketSize + requestByteArray.length < this.sendBufferSize) {
                            addToMergedPacket(command);
                            if (MemcachedChannel.this.commandQueue.size() == 0) {
                                sendMergedPacket(outputStream);
                                outputStream.flush();
                            }
                        } else {
                            sendMergedPacket(outputStream);
                            if (MemcachedChannel.this.commandQueue.size() == 0) {
                                outputStream.write(requestByteArray);
                                MemcachedChannel.this.socketMonitor.onWritten(requestByteArray.length);
                                if (command.hasResponsePacket()) {
                                    this.waitingQueue.add(command);
                                }
                            } else {
                                addToMergedPacket(command);
                            }
                            outputStream.flush();
                        }
                    }
                    while (true) {
                        if (this.waitingQueue.size() > 0) {
                            Command peek = this.waitingQueue.peek();
                            ResponsePacket read = this.reader.read();
                            if (read == null) {
                                MemcachedChannel.MEMCACHED_CONNECTION_LOG.info("End of the input stream has been reached. Host: `{}`", MemcachedChannel.this.host);
                                MemcachedChannel.this.close();
                                releaseWaitingCommand();
                                break;
                            } else {
                                peek.receiveResponsePacket(read);
                                if (!peek.hasResponsePacket()) {
                                    this.waitingQueue.poll();
                                }
                            }
                        }
                    }
                }
            } catch (IOException e) {
                MemcachedChannel.MEMCACHED_CONNECTION_LOG.error("[IoTask] MemcachedChannel need to be closed due to: `IOException: {}`. Host: `{}`.", e.getMessage(), MemcachedChannel.this.host);
                MemcachedChannel.LOG.error("[IoTask] MemcachedChannel need to be closed: `IoException`. Host: `" + MemcachedChannel.this.host + "`. " + MemcachedChannel.this.socket, e);
                MemcachedChannel.this.close();
                releaseWaitingCommand();
            } catch (InterruptedException e2) {
                releaseWaitingCommand();
            } catch (Exception e3) {
                MemcachedChannel.MEMCACHED_CONNECTION_LOG.error("[IoTask] MemcachedChannel need to be closed due to: `{}`. Host: `{}`.", e3.getMessage(), MemcachedChannel.this.host);
                MemcachedChannel.LOG.error("[IoTask] MemcachedChannel need to be closed: `Unexpected error`. Host: `" + MemcachedChannel.this.host + "`. " + MemcachedChannel.this.socket, e3);
                MemcachedChannel.this.close();
                releaseWaitingCommand();
            }
        }
    }

    public MemcachedChannel(String str, SocketConfiguration socketConfiguration) throws IllegalArgumentException, BuildSocketException {
        this.host = str;
        this.socket = SocketBuilder.create(str, socketConfiguration);
        this.socketMonitor = SocketMonitorFactory.get(str);
    }

    public List<ResponsePacket> send(Command command, long j) throws NullPointerException, IllegalStateException, TimeoutException {
        if (command == null) {
            throw new NullPointerException("Memcached command could not be null. Host: `" + this.host + "`. " + this.socket);
        }
        if (this.state != BeanStatusEnum.NORMAL) {
            throw new IllegalStateException("MemcachedChannel is not initialized or has been closed. State: `" + this.state + "`. Host: `" + this.host + "`. " + this.socket);
        }
        this.commandQueue.add(command);
        try {
            return command.getResponsePacketList(j);
        } catch (TimeoutException e) {
            if (System.currentTimeMillis() - this.lastTimeoutExceptionTime < 1000) {
                this.continuousTimeoutExceptionTimes++;
            } else {
                this.continuousTimeoutExceptionTimes = 1L;
            }
            this.lastTimeoutExceptionTime = System.currentTimeMillis();
            if (this.continuousTimeoutExceptionTimes > 50) {
                MEMCACHED_CONNECTION_LOG.error("MemcachedChannel need to be closed due to: `Too many timeout exceptions[{}]`. Host: `{}`.", Long.valueOf(this.continuousTimeoutExceptionTimes), this.host);
                close();
            }
            throw e;
        }
    }

    public boolean isActive() {
        return this.state == BeanStatusEnum.NORMAL;
    }

    public synchronized void init() {
        if (this.state == BeanStatusEnum.UNINITIALIZED) {
            try {
                if (!this.socket.isConnected() || this.socket.isClosed()) {
                    MEMCACHED_CONNECTION_LOG.error("Initialize MemcachedChannel failed. Socket is not connected or has been closed. Host: `{}`.", this.host);
                    close();
                } else {
                    this.state = BeanStatusEnum.NORMAL;
                    long currentTimeMillis = System.currentTimeMillis();
                    SocketConfiguration config = SocketBuilder.getConfig(this.socket);
                    String str = this.host + "/" + this.socket.getLocalPort();
                    this.ioTask = new IoTask(config.getSendBufferSize());
                    this.ioTask.setName("naivecache-memcached-io-" + str);
                    this.ioTask.setDaemon(true);
                    this.ioTask.start();
                    MEMCACHED_CONNECTION_LOG.info("MemcachedChannel has been initialized. Cost: {}ms. Host: `{}`. Local port: `{}`. Config: `{}`.", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), this.host, Integer.valueOf(this.socket.getLocalPort()), config});
                }
            } catch (Exception e) {
                MEMCACHED_CONNECTION_LOG.error("Initialize MemcachedChannel failed. Unexpected error: `{}`. Host: `{}`.", e.getMessage(), this.host);
                LOG.error("Initialize MemcachedChannel failed. Unexpected error. Host: `" + this.host + "`. " + this.socket, e);
                close();
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.state != BeanStatusEnum.CLOSED) {
            long currentTimeMillis = System.currentTimeMillis();
            this.state = BeanStatusEnum.CLOSED;
            try {
                this.socket.close();
                this.ioTask.stopSignal = true;
                this.ioTask.interrupt();
                MEMCACHED_CONNECTION_LOG.info("MemcachedChannel has been closed. Cost: {}ms. Host: `{}`.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), this.host);
            } catch (Exception e) {
                MEMCACHED_CONNECTION_LOG.error("Close MemcachedChannel failed. Unexpected error: `{}`. Host: `{}`.", e.getMessage(), this.host);
                LOG.error("Close MemcachedChannel failed. Unexpected error. Host: `" + this.host + "`. " + this.socket, e);
            }
        }
    }

    public String toString() {
        return "MemcachedChannel{host='" + this.host + "', socket=" + this.socket + ", state=" + this.state + ", continuousTimeoutExceptionTimes=" + this.continuousTimeoutExceptionTimes + ", lastTimeoutExceptionTime=" + this.lastTimeoutExceptionTime + '}';
    }
}
