/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.forecast.task;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.forecast.constant.ForecastCommonMessages;
import org.opensearch.forecast.indices.ForecastIndex;
import org.opensearch.forecast.indices.ForecastIndexManagement;
import org.opensearch.forecast.model.ForecastTask;
import org.opensearch.forecast.model.ForecastTaskType;
import org.opensearch.forecast.model.Forecaster;
import org.opensearch.forecast.settings.ForecastSettings;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.common.exception.DuplicateTaskException;
import org.opensearch.timeseries.function.BiCheckedFunction;
import org.opensearch.timeseries.function.ExecutorFunction;
import org.opensearch.timeseries.function.ResponseTransformer;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.model.DateRange;
import org.opensearch.timeseries.model.TaskState;
import org.opensearch.timeseries.model.TaskType;
import org.opensearch.timeseries.model.TimeSeriesTask;
import org.opensearch.timeseries.task.TaskCacheManager;
import org.opensearch.timeseries.task.TaskManager;
import org.opensearch.timeseries.transport.JobResponse;
import org.opensearch.timeseries.util.ExceptionUtil;
import org.opensearch.timeseries.util.ParseUtils;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.client.Client;

public class ForecastTaskManager
extends TaskManager<TaskCacheManager, ForecastTaskType, ForecastTask, ForecastIndex, ForecastIndexManagement> {
    private final Logger logger = LogManager.getLogger(ForecastTaskManager.class);

    public ForecastTaskManager(TaskCacheManager forecastTaskCacheManager, Client client, NamedXContentRegistry xContentRegistry, ForecastIndexManagement forecastIndices, ClusterService clusterService, Settings settings, ThreadPool threadPool, NodeStateManager nodeStateManager) {
        super(forecastTaskCacheManager, clusterService, client, ForecastIndex.STATE.getIndexName(), ForecastTaskType.REALTIME_TASK_TYPES, Collections.emptyList(), ForecastTaskType.RUN_ONCE_TASK_TYPES, forecastIndices, nodeStateManager, AnalysisType.FORECAST, xContentRegistry, "forecaster_id", ForecastSettings.MAX_OLD_TASK_DOCS_PER_FORECASTER, settings, threadPool, "opensearch-forecast-results*", "forecast-threadpool", ForecastSettings.DELETE_FORECAST_RESULT_WHEN_DELETE_FORECASTER, TaskState.INACTIVE);
    }

    @Override
    public void initRealtimeTaskCacheAndCleanupStaleCache(String forecasterId, Config forecaster, TransportService transportService, ActionListener<Boolean> listener) {
        try {
            if (this.taskCacheManager.getRealtimeTaskCache(forecasterId) != null) {
                listener.onResponse((Object)false);
                return;
            }
            this.getAndExecuteOnLatestConfigLevelTask(forecasterId, ForecastTaskType.REALTIME_TASK_TYPES, forecastTaskOptional -> {
                if (forecastTaskOptional.isEmpty()) {
                    this.logger.debug("Can't find realtime task for forecaster {}, init realtime task cache directly", (Object)forecasterId);
                    ExecutorFunction function = () -> this.createNewTask(forecaster, null, false, forecaster.getUser(), this.clusterService.localNode().getId(), TaskState.CREATED, ActionListener.wrap(r -> {
                        this.logger.info("Recreate realtime task successfully for forecaster {}", (Object)forecasterId);
                        this.taskCacheManager.initRealtimeTaskCache(forecasterId, forecaster.getIntervalInMilliseconds());
                        listener.onResponse((Object)true);
                    }, e -> {
                        this.logger.error("Failed to recreate realtime task for forecaster " + forecasterId, (Throwable)e);
                        listener.onFailure(e);
                    }));
                    this.recreateRealtimeTaskBeforeExecuting(function, listener);
                    return;
                }
                this.logger.info("Init realtime task cache for forecaster {}", (Object)forecasterId);
                this.taskCacheManager.initRealtimeTaskCache(forecasterId, forecaster.getIntervalInMilliseconds());
                listener.onResponse((Object)true);
            }, transportService, false, listener);
        }
        catch (Exception e) {
            this.logger.error("Failed to init realtime task cache for " + forecasterId, (Throwable)e);
            listener.onFailure(e);
        }
    }

    public void updateForecastTask(String taskId, Map<String, Object> updatedFields) {
        this.updateForecastTask(taskId, updatedFields, (ActionListener<UpdateResponse>)ActionListener.wrap(response -> {
            if (response.status() == RestStatus.OK) {
                this.logger.debug("Updated forecast task successfully: {}, task id: {}", (Object)response.status(), (Object)taskId);
            } else {
                this.logger.error("Failed to update forecast task {}, status: {}", (Object)taskId, (Object)response.status());
            }
        }, e -> this.logger.error("Failed to update task: " + taskId, (Throwable)e)));
    }

    public void updateForecastTask(String taskId, Map<String, Object> updatedFields, ActionListener<UpdateResponse> listener) {
        UpdateRequest updateRequest = new UpdateRequest(ForecastIndex.STATE.getIndexName(), taskId);
        HashMap<String, Object> updatedContent = new HashMap<String, Object>();
        updatedContent.putAll(updatedFields);
        updatedContent.put("last_update_time", Instant.now().toEpochMilli());
        updateRequest.doc(updatedContent);
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        this.client.update(updateRequest, listener);
    }

    private void recreateRealtimeTaskBeforeExecuting(ExecutorFunction function, ActionListener<Boolean> listener) {
        if (((ForecastIndexManagement)this.indexManagement).doesStateIndexExist()) {
            function.execute();
        } else {
            ((ForecastIndexManagement)this.indexManagement).initStateIndex((ActionListener<CreateIndexResponse>)ActionListener.wrap(r -> {
                if (r.isAcknowledged()) {
                    this.logger.info("Created {} with mappings.", (Object)ForecastIndex.STATE.getIndexName());
                    function.execute();
                } else {
                    String error = String.format(Locale.ROOT, "Create index %S not acknowledged by OpenSearch core", ForecastIndex.STATE.getIndexName());
                    this.logger.warn(error);
                    listener.onFailure((Exception)new OpenSearchStatusException(error, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
                }
            }, e -> {
                if (ExceptionsHelper.unwrapCause((Throwable)e) instanceof ResourceAlreadyExistsException) {
                    function.execute();
                } else {
                    this.logger.error("Failed to init anomaly detection state index", (Throwable)e);
                    listener.onFailure(e);
                }
            }));
        }
    }

    @Override
    public void cleanChildTasksAndResultsOfDeletedTask() {
        if (!this.taskCacheManager.hasDeletedTask()) {
            return;
        }
        this.threadPool.schedule(() -> {
            String taskId = this.taskCacheManager.pollDeletedTask();
            if (taskId == null) {
                return;
            }
            DeleteByQueryRequest deleteForecastResultsRequest = new DeleteByQueryRequest(new String[]{"opensearch-forecast-results*"});
            deleteForecastResultsRequest.setQuery((QueryBuilder)new TermsQueryBuilder("task_id", new String[]{taskId}));
            this.client.execute((ActionType)DeleteByQueryAction.INSTANCE, (ActionRequest)deleteForecastResultsRequest, ActionListener.wrap(res -> {
                this.logger.debug("Successfully deleted forecast results of task " + taskId);
                DeleteByQueryRequest deleteChildTasksRequest = new DeleteByQueryRequest(new String[]{ForecastIndex.STATE.getIndexName()});
                deleteChildTasksRequest.setQuery((QueryBuilder)new TermsQueryBuilder("parent_task_id", new String[]{taskId}));
                this.client.execute((ActionType)DeleteByQueryAction.INSTANCE, (ActionRequest)deleteChildTasksRequest, ActionListener.wrap(r -> {
                    this.logger.debug("Successfully deleted child tasks of task " + taskId);
                    this.cleanChildTasksAndResultsOfDeletedTask();
                }, e -> this.logger.error("Failed to delete child tasks of task " + taskId, (Throwable)e)));
            }, ex -> this.logger.error("Failed to delete forecast results for task " + taskId, (Throwable)ex)));
        }, TimeValue.timeValueSeconds((long)DEFAULT_MAINTAIN_INTERVAL_IN_SECONDS), "ad-batch-task-threadpool");
    }

    @Override
    public void startHistorical(Config config, DateRange dateRange, User user, TransportService transportService, ActionListener<JobResponse> listener) {
    }

    @Override
    protected TaskType getTaskType(Config config, DateRange dateRange, boolean runOnce) {
        if (runOnce) {
            return config.isHighCardinality() ? ForecastTaskType.RUN_ONCE_FORECAST_HC_FORECASTER : ForecastTaskType.RUN_ONCE_FORECAST_SINGLE_STREAM;
        }
        return config.isHighCardinality() ? ForecastTaskType.REALTIME_FORECAST_HC_FORECASTER : ForecastTaskType.REALTIME_FORECAST_SINGLE_STREAM;
    }

    @Override
    protected <T> void createNewTask(Config config, DateRange dateRange, boolean runOnce, User user, String coordinatingNode, TaskState initialState, ActionListener<T> listener) {
        ResponseTransformer<IndexResponse, Object> responseTransformer;
        ForecastTask forecastTask;
        String userName = user == null ? null : user.getName();
        Instant now = Instant.now();
        String taskType = this.getTaskType(config, dateRange, runOnce).name();
        ForecastTask.Builder forecastTaskBuilder = (ForecastTask.Builder)((ForecastTask.Builder)((ForecastTask.Builder)((ForecastTask.Builder)((ForecastTask.Builder)((ForecastTask.Builder)((ForecastTask.Builder)((ForecastTask.Builder)((ForecastTask.Builder)new ForecastTask.Builder().configId(config.getId())).forecaster((Forecaster)config).isLatest(true)).taskType(taskType)).executionStartTime(now)).state(initialState.name())).lastUpdateTime(now)).startedBy(userName)).coordinatingNode(coordinatingNode)).user(user);
        if (initialState == TaskState.INIT_TEST) {
            forecastTask = forecastTaskBuilder.build();
            responseTransformer = indexResponse -> forecastTask;
        } else {
            forecastTask = ((ForecastTask.Builder)((ForecastTask.Builder)forecastTaskBuilder.taskProgress(Float.valueOf(0.0f))).initProgress(Float.valueOf(0.0f))).dateRange(dateRange).build();
            responseTransformer = indexResponse -> new JobResponse(indexResponse.getId());
        }
        this.createTaskDirectly(forecastTask, r -> this.onIndexConfigTaskResponse((IndexResponse)r, forecastTask, (IndexResponse response, ActionListener<T> delegatedListener) -> this.cleanOldConfigTaskDocs((IndexResponse)response, forecastTask, responseTransformer, delegatedListener), listener), listener);
    }

    @Override
    public <T> void cleanConfigCache(TimeSeriesTask task, TransportService transportService, ExecutorFunction function, ActionListener<T> listener) {
        function.execute();
    }

    @Override
    protected boolean isHistoricalHCTask(TimeSeriesTask task) {
        return false;
    }

    @Override
    protected <T> void onIndexConfigTaskResponse(IndexResponse response, ForecastTask forecastTask, BiConsumer<IndexResponse, ActionListener<T>> function, ActionListener<T> listener) {
        if (response == null || response.getResult() != DocWriteResponse.Result.CREATED) {
            String errorMsg = ExceptionUtil.getShardsFailure(response);
            listener.onFailure((Exception)new OpenSearchStatusException(errorMsg, response.status(), new Object[0]));
            return;
        }
        forecastTask.setTaskId(response.getId());
        ActionListener delegatedListener = ActionListener.wrap(r -> listener.onResponse(r), e -> {
            this.handleTaskException(forecastTask, (Exception)e);
            if (e instanceof DuplicateTaskException) {
                listener.onFailure((Exception)new OpenSearchStatusException(ForecastCommonMessages.FORECASTER_IS_RUNNING, RestStatus.BAD_REQUEST, new Object[0]));
            } else {
                listener.onFailure(e);
            }
        });
        if (function != null) {
            function.accept(response, delegatedListener);
        }
    }

    @Override
    protected <T> void runBatchResultAction(IndexResponse response, ForecastTask tsTask, ResponseTransformer<IndexResponse, T> responseTransformer, ActionListener<T> listener) {
        throw new UnsupportedOperationException("Forecast does not support back testing yet.");
    }

    @Override
    protected BiCheckedFunction<XContentParser, String, ForecastTask, IOException> getTaskParser() {
        return ForecastTask::parse;
    }

    @Override
    public void createRunOnceTaskAndCleanupStaleTasks(String configId, Config config, TransportService transportService, ActionListener<ForecastTask> listener) {
        ForecastTaskType taskType = config.isHighCardinality() ? ForecastTaskType.RUN_ONCE_FORECAST_HC_FORECASTER : ForecastTaskType.RUN_ONCE_FORECAST_SINGLE_STREAM;
        try {
            if (((ForecastIndexManagement)this.indexManagement).doesStateIndexExist()) {
                this.getAndExecuteOnLatestConfigLevelTask(config.getId(), Arrays.asList(taskType), task -> {
                    if (!task.isPresent() || ((ForecastTask)task.get()).isDone()) {
                        this.updateLatestFlagOfOldTasksAndCreateNewTask(config, null, true, config.getUser(), TaskState.INIT_TEST, listener);
                    } else {
                        listener.onFailure((Exception)new OpenSearchStatusException("run once is on-going", RestStatus.BAD_REQUEST, new Object[0]));
                    }
                }, transportService, true, listener);
            } else {
                ((ForecastIndexManagement)this.indexManagement).initStateIndex((ActionListener<CreateIndexResponse>)ActionListener.wrap(r -> {
                    if (r.isAcknowledged()) {
                        this.logger.info("Created {} with mappings.", (Object)this.stateIndex);
                        this.updateLatestFlagOfOldTasksAndCreateNewTask(config, null, true, config.getUser(), TaskState.INIT_TEST, listener);
                    } else {
                        String error = String.format(Locale.ROOT, "Create index %S not acknowledged by OpenSearch core", this.stateIndex);
                        this.logger.warn(error);
                        listener.onFailure((Exception)new OpenSearchStatusException(error, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
                    }
                }, e -> {
                    if (ExceptionsHelper.unwrapCause((Throwable)e) instanceof ResourceAlreadyExistsException) {
                        this.updateLatestFlagOfOldTasksAndCreateNewTask(config, null, true, config.getUser(), TaskState.INIT_TEST, listener);
                    } else {
                        this.logger.error("Failed to init anomaly detection state index", (Throwable)e);
                        listener.onFailure(e);
                    }
                }));
            }
        }
        catch (Exception e2) {
            this.logger.error("Failed to start detector " + config.getId(), (Throwable)e2);
            listener.onFailure(e2);
        }
    }

    @Override
    public List<ForecastTaskType> getTaskTypes(DateRange dateRange, boolean runOnce) {
        if (runOnce) {
            return ForecastTaskType.RUN_ONCE_TASK_TYPES;
        }
        return ForecastTaskType.REALTIME_TASK_TYPES;
    }

    private <T> void resetRunOnceConfigTaskState(List<TimeSeriesTask> runOnceTasks, ExecutorFunction function, TransportService transportService, ActionListener<T> listener) {
        if (ParseUtils.isNullOrEmpty(runOnceTasks)) {
            function.execute();
            return;
        }
        ForecastTask forecastTask = (ForecastTask)runOnceTasks.get(0);
        this.resetTaskStateAsStopped(forecastTask, function, transportService, listener);
    }

    @Override
    protected <T> void resetLatestConfigTaskState(List<ForecastTask> tasks, Consumer<List<ForecastTask>> function, TransportService transportService, ActionListener<T> listener) {
        ArrayList<TimeSeriesTask> runningRealtimeTasks = new ArrayList<TimeSeriesTask>();
        ArrayList<TimeSeriesTask> runningRunOnceTasks = new ArrayList<TimeSeriesTask>();
        for (TimeSeriesTask timeSeriesTask : tasks) {
            if (timeSeriesTask.isHistoricalEntityTask() || timeSeriesTask.isDone()) continue;
            if (timeSeriesTask.isRealTimeTask()) {
                runningRealtimeTasks.add(timeSeriesTask);
                continue;
            }
            if (!timeSeriesTask.isRunOnceTask()) continue;
            runningRunOnceTasks.add(timeSeriesTask);
        }
        this.resetRunOnceConfigTaskState(runningRunOnceTasks, () -> this.resetRealtimeConfigTaskState(runningRealtimeTasks, () -> function.accept(tasks), transportService, listener), transportService, listener);
    }
}

