task.task-picker-class

Background

The sets the fully qualified name of the class the at will be used to pick tasks off the task queue to be run.

A custom task picker class could, for example:

  • Limit the number of tasks being run concurrently.

  • Run tasks of some users before other users.

The class must implement:

package com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.shared;

public interface TaskPicker {

    /**
     * Allows for selecting the next task(s) to run.
     *
     * <p>This method will be called periodically and will allow for the
     * selection of the next task to run. The context provides the list of
     * tasks which can be run along with the currently running set of tasks,
     * implementations are to pick tasks from the running list and submit
     * them to {@link PickTaskContext#submitTaskToRun} which may run the task
     * if it is possible.</p>
     *
     * <p>On each invocation of this method, the implementations will likely
     * submit as many tasks as possible to be run (perhaps limited by some
     * constraint) rather than submit one task and wait for the next
     * invocation.</p>
     *
     * @param context
     */
    void pickNextTasksToRun(PickTaskContext context);

}

The class will want to interact with the PickTaskContext to get the tasks from the queue and decide if they should be run. The context will implement:

package com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.shared;

import java.util.List;

import java.io.File;

public interface PickTaskContext {

    /**
     * Tasks that are currently in the queue which could be selected to run.
     *
     * <p>If a task that is in this queue is passed to submitTaskToRun then
     * it will no longer returned in the list returned by this method.</p>
     *
     * <p>This is not the complete list of tasks that are in the queue, instead
     * this is the list of tasks that can be run at the same time as the tasks
     * that are currently running. Tasks that can not run at the same time for
     * example are a Instant update and re-index at the same time on the same
     * collection.<p>
     *
     * <p>Multiple calls to this may NOT result in the same list being
     * returned.</p>
     *
     * <p>The order is by priority then by order in the queue</p>
     *
     * @return the list of currently queued tasks.
     */
    public List<Task> getQueuedTasksWhichCanRun();

    /**
     * Currently running tasks.
     *
     * <p>If a task completes while the {@link TaskPicker} is running
     * the returned list may become outdated.</p>
     *
     * <p>Multiple calls to this may NOT result in the same list being
     * returned.</p>
     *
     * @return
     */
    public List<Task> getRunningTasks();

    /**
     * Attempts to submit a task to be run.
     *
     * <p>It is possible that a task can not submitted because something else
     * is currently running the task outside of the queue system for example
     * from the command line. This will attempt to catch those cases and return
     * false, rather than submit the task to then have the task later find out
     * it could not be run.</p>
     *
     * @param task to run.
     * @return true if the task can be submitted to run false if it
     * could not (typically when another tasks that uses the same resources as
     * the given task is already running e.g. submitting a RE_INDEX and a
     * INSTANT_UPDATE is already running for the same collection).
     */
    public boolean submitTaskToRun(Task task);

    /**
     * Returns the search home for the installation this is running under.
     *
     * @return search home.
     */
    public File getSearchHome();

}

The tasks will implement:

package com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.shared;

import java.time.ZonedDateTime;

import com.funnelback.admin.api.service.queue.task.param.TaskParams;
import com.funnelback.admin.api.service.queue.task.tasktypes.TaskTypes;
import com.funnelback.common.task.queue.Priority;

public interface Task {

    /**
     * Gets the ID of the task.
     *
     * return the ID of the task.
     */
    public String getId();

    /**
     * Gets the date and time the task was created.
     *
     * @return the data and time the task was created.
     */
    public ZonedDateTime getCreated();

    /**
     * Gets the last modified data and time of the task.
     *
     * @return the last modified date and time of the task.
     */
    public ZonedDateTime getLastModified();

    /**
     * Gets the type of the task.
     *
     * @return the task type.
     */
    public TaskTypes getType();

    /**
     * The task parameters, this will match what was given to the API.
     *
     * @return the task parameters.
     */
    public TaskParams getParams();

    /**
     * Gets the position of the task within the list of all tasks in the queue.
     *
     * @return
     */
    public int getPosition();

    /**
     * Gets the username of the user that created the task.
     *
     * @return the username of the user that created the task.
     */
    public String getAddedBy();

    /**
     * Gets the priority of the task.
     *
     * @return the priority of the task
     */
    public Priority getPriority();

    /**
     * Gets the collection this task runs on.
     *
     * @return the collection this task runs on or null if it doesn't run on a
     * collection.
     */
    public String getCollectionTheTaskRunsOn();


}

Setting the key

Set this configuration key in the server configuration.

Use the configuration key editor to add or edit the task.task-picker-class key, and set the value. This can be set to any valid String value.

Default value

By default the task picker is set to a class which will run all tasks immediately.

task.task-picker-class=com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.RunImmediatelyTaskPicker

Examples

This will show an example of implementing a custom groovy class which limits the number tasks running based on the available CPUs.

The groovy script will be placed into:

$SEARCH_HOME/lib/java/groovy/com/foo/OneTaskPerCPU.groovy

The groovy code is:

package com.foo;

import com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.shared.*;
import com.funnelback.common.task.queue.Priority;

public class OneTaskPerCPU implements TaskPicker {

    private static final org.apache.logging.log4j.Logger log =
        org.apache.logging.log4j.LogManager.getLogger(OneTaskPerCPU.class);

    public void pickNextTasksToRun(PickTaskContext context) {
        int availableProcessors = Runtime.getRuntime().availableProcessors();

        // Tasks come in order of priority then in order they are in the queue.
        // ALL RUN_ASAP tasks will come first.
        for(Task task : context.getQueuedTasksWhichCanRun()) {
            // wrap everything in an try catch, this way we can get to the next
            // task if something goes wrong.
            try {
                if(task.getPriority() != Priority.RUN_ASAP
                    && context.getRunningTasks().size() >= availableProcessors) {
                    // There are no more RUN_ASAP tasks and we are already
                    // running as many tasks as CPUs.
                    return;
                }

                context.submitTaskToRun(task);
            } catch (Exception e) {
                try {
                    log.trace("An error occure when working on "
                            + "task '{}' of type '{}' with params '{}'.",
                        task.getId(),
                        task.getType(),
                        task.getParams(), // the parameters of every task has a
                                          // human readable toString method so
                                          // this will tell us everything we
                                          // need to know.
                        e);
                } catch (Exception e2) {
                    e2.addSuppressed(e);
                    log.info("Error logging error about a task", e2);
                }
            }
        }

    }
}

The global.cfg file will then need to contain:

task.task-picker-class=com.foo.OneTaskPerCPU

The change will come into affect immediately, no restart is required.