/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.agents.runtime.feedback;

import java.io.OutputStream;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Supplier;
import org.apache.flink.agents.runtime.logger.FeedbackLogger;
import org.apache.flink.util.IOUtils;

public class Checkpoints<T>
implements AutoCloseable {
    private final Supplier<? extends FeedbackLogger<T>> feedbackLoggerFactory;
    private final TreeMap<Long, FeedbackLogger<T>> uncompletedCheckpoints = new TreeMap();

    public Checkpoints(Supplier<? extends FeedbackLogger<T>> feedbackLoggerFactory) {
        this.feedbackLoggerFactory = Objects.requireNonNull(feedbackLoggerFactory);
    }

    public void startLogging(long checkpointId, OutputStream outputStream) {
        FeedbackLogger<T> logger = this.feedbackLoggerFactory.get();
        logger.startLogging(outputStream);
        this.uncompletedCheckpoints.put(checkpointId, logger);
    }

    public void append(T element) {
        for (FeedbackLogger<T> logger : this.uncompletedCheckpoints.values()) {
            logger.append(element);
        }
    }

    public void commitCheckpointsUntil(long checkpointId) {
        NavigableMap<Long, FeedbackLogger<T>> completedCheckpoints = this.uncompletedCheckpoints.headMap(checkpointId, true);
        completedCheckpoints.values().forEach(FeedbackLogger::commit);
        completedCheckpoints.clear();
    }

    @Override
    public void close() {
        IOUtils.closeAllQuietly(this.uncompletedCheckpoints.values());
        this.uncompletedCheckpoints.clear();
    }
}

