Statistiques
| Branche: | Révision :

root / src / man / run_engine.Rd @ b412345d

Historique | Voir | Annoter | Télécharger (2,62 ko)

1
\name{run_engine}
2
\alias{run_engine}
3
\title{# Execute bag of tasks parallely, on as many cores as the current computing node owns.}
4
\description{This bag of tasks engine forks processes on as many cores as the current computing node owns. Each sub-process takes a task randomly in the list of tasks. For each task, it starts by taking a lock on this task (creating a file named out_filename.lock). Next, it executes the task_processor (a function) using the corresponding set of parameters (task). When this execution is completed, it dumps task_processor results into a results file (named out_filename.RData).}
5
\usage{run_engine(tasks, task_processor, debug = FALSE, starter_name = "~/.start_best_effort_jobs", 
6
    rm_starter = TRUE, log_dir = "log", bot_cache_dir = "cache", 
7
    nb_proc = NULL, ...)}
8
\arguments{
9
  \item{tasks}{A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter.}
10
  \item{task_processor}{A function that will be called for each task in the task list \emph{tasks}.}
11
  \item{debug}{If \emph{TRUE} no process will be forked, the list of tasks will be executed in the current process.}
12
  \item{starter_name}{Path to file that will be deleted after the execution of all tasks if \emph{rm_starter} is set to \emph{TRUE}.}
13
  \item{rm_starter}{If \emph{TRUE} the file \emph{starter_name} will be deleted after the execution of all tasks.}
14
  \item{log_dir}{Path to the \emph{log} directory.}
15
  \item{bot_cache_dir}{the directory where task results are cached}
16
  \item{nb_proc}{If not NULL fix the number of core on which tasks must be computed.}
17
  \item{\dots}{Other arguments that will be passed to \emph{task_processor}.}
18
}
19

    
20

    
21

    
22
\author{Florent Chuffart}
23

    
24

    
25

    
26

    
27
\examples{
28

    
29
# We define a basic task_processor
30
sum_a_b = function(task) {
31
  return(task$a + task$b)
32
}
33

    
34
# We define 9 tasks
35
tasks = list()
36
for (a in 1:3) {
37
  for (b in 4:6) {
38
    tasks[[length(tasks) + 1]] = list(a=a, b=b, out_filename=paste("sum_a_b", a, b, sep="_")) 
39
  }
40
}
41

    
42
# We execute the 3 tasks
43
run_engine(tasks, sum_a_b)    
44

    
45
# We collect 9 task results
46
for (a in 1:3) {
47
  for (b in 4:6) {
48
    out_filename = paste("sum_a_b", a, b, sep="_")
49
    out_filename = paste("cache/", out_filename, ".RData", sep="")
50
    load(out_filename)
51
    print(task_result) 
52
  }
53
}
54

    
55
# Better way to do that
56
apply(t(tasks), 2, function(task) {
57
  out_filename = task[[1]]$out_filename
58
  out_filename = paste("cache/", out_filename, ".RData", sep="")
59
  load(out_filename)
60
  print(task_result) 
61
})
62

    
63
# Viewing statistics about the campain.
64
bot_stats()
65

    
66
}