task.task-picker-class
Background
The sets the fully qualified name of the class the at will be used to pick tasks off the task queue to be run.
A custom task picker class could, for example:
-
Limit the number of tasks being run concurrently.
-
Run tasks of some users before other users.
The class must implement:
package com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.shared; public interface TaskPicker { /** * Allows for selecting the next task(s) to run. * * <p>This method will be called periodically and will allow for the * selection of the next task to run. The context provides the list of * tasks which can be run along with the currently running set of tasks, * implementations are to pick tasks from the running list and submit * them to {@link PickTaskContext#submitTaskToRun} which may run the task * if it is possible.</p> * * <p>On each invocation of this method, the implementations will likely * submit as many tasks as possible to be run (perhaps limited by some * constraint) rather than submit one task and wait for the next * invocation.</p> * * @param context */ void pickNextTasksToRun(PickTaskContext context); }
The class will want to interact with the PickTaskContext
to get the tasks from the queue and decide if they should be run. The context will implement:
package com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.shared; import java.util.List; import java.io.File; public interface PickTaskContext { /** * Tasks that are currently in the queue which could be selected to run. * * <p>If a task that is in this queue is passed to submitTaskToRun then * it will no longer returned in the list returned by this method.</p> * * <p>This is not the complete list of tasks that are in the queue, instead * this is the list of tasks that can be run at the same time as the tasks * that are currently running. Tasks that can not run at the same time for * example are a Instant update and re-index at the same time on the same * collection.<p> * * <p>Multiple calls to this may NOT result in the same list being * returned.</p> * * <p>The order is by priority then by order in the queue</p> * * @return the list of currently queued tasks. */ public List<Task> getQueuedTasksWhichCanRun(); /** * Currently running tasks. * * <p>If a task completes while the {@link TaskPicker} is running * the returned list may become outdated.</p> * * <p>Multiple calls to this may NOT result in the same list being * returned.</p> * * @return */ public List<Task> getRunningTasks(); /** * Attempts to submit a task to be run. * * <p>It is possible that a task can not submitted because something else * is currently running the task outside of the queue system for example * from the command line. This will attempt to catch those cases and return * false, rather than submit the task to then have the task later find out * it could not be run.</p> * * @param task to run. * @return true if the task can be submitted to run false if it * could not (typically when another tasks that uses the same resources as * the given task is already running e.g. submitting a RE_INDEX and a * INSTANT_UPDATE is already running for the same collection). */ public boolean submitTaskToRun(Task task); /** * Returns the search home for the installation this is running under. * * @return search home. */ public File getSearchHome(); }
The tasks will implement:
package com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.shared; import java.time.ZonedDateTime; import com.funnelback.admin.api.service.queue.task.param.TaskParams; import com.funnelback.admin.api.service.queue.task.tasktypes.TaskTypes; import com.funnelback.common.task.queue.Priority; public interface Task { /** * Gets the ID of the task. * * return the ID of the task. */ public String getId(); /** * Gets the date and time the task was created. * * @return the data and time the task was created. */ public ZonedDateTime getCreated(); /** * Gets the last modified data and time of the task. * * @return the last modified date and time of the task. */ public ZonedDateTime getLastModified(); /** * Gets the type of the task. * * @return the task type. */ public TaskTypes getType(); /** * The task parameters, this will match what was given to the API. * * @return the task parameters. */ public TaskParams getParams(); /** * Gets the position of the task within the list of all tasks in the queue. * * @return */ public int getPosition(); /** * Gets the username of the user that created the task. * * @return the username of the user that created the task. */ public String getAddedBy(); /** * Gets the priority of the task. * * @return the priority of the task */ public Priority getPriority(); /** * Gets the collection this task runs on. * * @return the collection this task runs on or null if it doesn't run on a * collection. */ public String getCollectionTheTaskRunsOn(); }
Setting the key
Set this configuration key in the server configuration.
Use the configuration key editor to add or edit the task.task-picker-class
key, and set the value. This can be set to any valid String
value.
Examples
This will show an example of implementing a custom groovy class which limits the number tasks running based on the available CPUs.
The groovy script will be placed into:
$SEARCH_HOME/lib/java/groovy/com/foo/OneTaskPerCPU.groovy
The groovy code is:
package com.foo; import com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.shared.*; import com.funnelback.common.task.queue.Priority; public class OneTaskPerCPU implements TaskPicker { private static final org.apache.logging.log4j.Logger log = org.apache.logging.log4j.LogManager.getLogger(OneTaskPerCPU.class); public void pickNextTasksToRun(PickTaskContext context) { int availableProcessors = Runtime.getRuntime().availableProcessors(); // Tasks come in order of priority then in order they are in the queue. // ALL RUN_ASAP tasks will come first. for(Task task : context.getQueuedTasksWhichCanRun()) { // wrap everything in an try catch, this way we can get to the next // task if something goes wrong. try { if(task.getPriority() != Priority.RUN_ASAP && context.getRunningTasks().size() >= availableProcessors) { // There are no more RUN_ASAP tasks and we are already // running as many tasks as CPUs. return; } context.submitTaskToRun(task); } catch (Exception e) { try { log.trace("An error occure when working on " + "task '{}' of type '{}' with params '{}'.", task.getId(), task.getType(), task.getParams(), // the parameters of every task has a // human readable toString method so // this will tell us everything we // need to know. e); } catch (Exception e2) { e2.addSuppressed(e); log.info("Error logging error about a task", e2); } } } } }
The global.cfg
file will then need to contain:
task.task-picker-class=com.foo.OneTaskPerCPU
The change will come into affect immediately, no restart is required.