Révision b412345d
b/src/DESCRIPTION | ||
---|---|---|
1 |
Package: bot |
|
2 |
Maintainer: Florent Chuffart <florent.chuffart@ens-lyon.fr> |
|
3 |
Author: Florent Chuffart |
|
4 |
Version: 0.9.1 |
|
5 |
License: CeCILL |
|
6 |
Title: bot |
|
7 |
Depends: fork |
|
8 |
Description: BoT (stands for Bag of Tasks) is an R package allowing to distribute independent tasks over many cores and many computing nodes. The simple fact that BoT is based on the process forking feature and task locking over file system makes BoT compatible with most of computing infrastructures (multicore, clusters, grids, clouds). Using BoT, each task is a set of parameters associated with a user-defined function built on an R process. Next step consists in forking this R process for each core of the computing node. Finally, the forked set of tasks is randomized and executed in a parallel way. When a task starts a distributed lock is taken. This avoids redundant task execution. When a task is ended, result is dumped into a file. BoT is used to compute NGS data in the SiGHT project context (ERC-StG2011-281359). BoT has been tested on two infrastructures: Grid'5000 experimental testbed (https://www.grid5000.fr) and PSMN computing center of ENS de Lyon (http://www.ens-lyon.fr/PSMN). For more information on how to use bot, have a look on the examples of the help ?bot::run_engine and ?bot::botapply. |
|
9 |
URL: http://www.ens-lyon.fr/LBMC/gisv/index.php/en/protocols/bioinformatics http://www.ens-lyon.fr/LBMC/gisv |
|
10 |
|
|
11 |
|
|
12 |
|
b/src/NAMESPACE | ||
---|---|---|
1 |
export(run_engine, bot_stats, botapply) |
b/src/R/bot.R | ||
---|---|---|
1 |
###################################### |
|
2 |
# DISTRIBUTED COMPUTING TASKS ENGINE # |
|
3 |
###################################### |
|
4 |
|
|
5 |
get_nb_proc = function ## Empirical function to find the number of core of the current computing node. |
|
6 |
### This function uses \emph{/proc/cpuinfo} file for Debian and \emph{sysctl} for macosx. This fucntion could be extended or overrided by the user to adapt it to his own system. |
|
7 |
() { |
|
8 |
if (Sys.info()[["nodename"]] == "cremone") { |
|
9 |
return(12) |
|
10 |
} |
|
11 |
dyn_nb_proc = as.integer(system("cat /proc/cpuinfo | grep 'core id' | wc -l",intern=TRUE)) |
|
12 |
if (dyn_nb_proc == 0) { |
|
13 |
dyn_nb_proc = as.integer(system("cat /proc/cpuinfo | grep 'cpuid level' | wc -l",intern=TRUE)) |
|
14 |
if (dyn_nb_proc == 0) { |
|
15 |
dyn_nb_proc = as.integer(system("sysctl -a | grep machdep.cpu.core_count | cut -d ' ' -f 2",intern=TRUE)) |
|
16 |
if (dyn_nb_proc == 0) { |
|
17 |
return(1) |
|
18 |
} |
|
19 |
} |
|
20 |
} |
|
21 |
return(dyn_nb_proc) |
|
22 |
### The number of core that the current computing node owns. |
|
23 |
} |
|
24 |
|
|
25 |
run_engine = structure(function (## Execute bag of tasks parallely, on as many cores as the current computing node owns. |
|
26 |
### This bag of tasks engine forks processes on as many cores as the current computing node owns. Each sub-process takes a task randomly in the list of tasks. For each task, it starts by taking a lock on this task (creating a file named out_filename.lock). Next, it executes the task_processor (a function) using the corresponding set of parameters (task). When this execution is completed, it dumps task_processor results into a results file (named out_filename.RData). |
|
27 |
tasks, ##<< A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter. |
|
28 |
task_processor, ##<< A function that will be called for each task in the task list \emph{tasks}. |
|
29 |
debug=FALSE, ##<< If \emph{TRUE} no process will be forked, the list of tasks will be executed in the current process. |
|
30 |
starter_name="~/.start_best_effort_jobs", ##<< Path to file that will be deleted after the execution of all tasks if \emph{rm_starter} is set to \emph{TRUE}. |
|
31 |
rm_starter=TRUE, ##<< If \emph{TRUE} the file \emph{starter_name} will be deleted after the execution of all tasks. |
|
32 |
log_dir="log", ##<< Path to the \emph{log} directory. |
|
33 |
bot_cache_dir = "cache", ##<< the directory where task results are cached |
|
34 |
nb_proc=NULL, ##<< If not NULL fix the number of core on which tasks must be computed. |
|
35 |
... ##<< Other arguments that will be passed to \emph{task_processor}. |
|
36 |
){ |
|
37 |
if (!file.exists(log_dir)) { |
|
38 |
dir.create(log_dir, recursive = TRUE) |
|
39 |
} |
|
40 |
if (!file.exists(bot_cache_dir)) { |
|
41 |
dir.create(bot_cache_dir, recursive = TRUE) |
|
42 |
} |
|
43 |
print(paste("#tasks: ", length(tasks))) |
|
44 |
forked_part = function(){ |
|
45 |
stats = list() |
|
46 |
stats$proc_id = proc_id |
|
47 |
stats$UID = UID |
|
48 |
set.seed(proc_id + UID) |
|
49 |
if (!debug) { |
|
50 |
Sys.sleep(floor(runif(1,1,30))) |
|
51 |
} |
|
52 |
hostname = system("hostname", intern=TRUE) |
|
53 |
stats$hostname = hostname |
|
54 |
sink(paste(log_dir, "/proc_id_hostname_", proc_id, "_", hostname, ".log", sep=""), type =c("output", "message"), split = TRUE) |
|
55 |
need_rerun = TRUE |
|
56 |
nb_loop = 0 |
|
57 |
while (need_rerun){ |
|
58 |
need_rerun = FALSE |
|
59 |
for (task in sample(tasks)) { |
|
60 |
# Check mandatory task attribute |
|
61 |
if (is.null(task$out_filename)) { |
|
62 |
print("ERROR! Attribute task$out_filename is mandatory.") |
|
63 |
exit(1) |
|
64 |
} |
|
65 |
# Check if task is already done or currently processed |
|
66 |
lock_filename = paste(bot_cache_dir, "/", task$out_filename, ".lock", sep="") |
|
67 |
save_filename = paste(bot_cache_dir, "/", task$out_filename, ".RData", sep="") |
|
68 |
lock_filename_bis = paste(bot_cache_dir, "/", task$out_filename, "_", (proc_id + UID) ,".lock4no", sep="") |
|
69 |
for_nothing_filename = paste(bot_cache_dir, "/", task$out_filename, "_", (proc_id + UID) , ".RData4no", sep="") |
|
70 |
stats$start_date = as.integer(format(Sys.time(), "%s")) |
|
71 |
if (nb_loop==0 & file.exists(lock_filename)) { |
|
72 |
print(paste("[proc_", proc_id , "] ", date(), " ", save_filename, " is locked... skipping.", sep="")) |
|
73 |
need_rerun = TRUE |
|
74 |
} else if (file.exists(save_filename)) { |
|
75 |
print(paste("[proc_", proc_id , "] ", date(), " ", save_filename, " exists... skipping.", sep="")) |
|
76 |
} else { |
|
77 |
need_rerun = TRUE |
|
78 |
print(paste("[proc_", proc_id , "] ", date(), " taking lock on ", lock_filename, " and computing...", sep="")) |
|
79 |
save(stats, file=lock_filename) |
|
80 |
save(stats, file=lock_filename_bis) |
|
81 |
task_result = task_processor(task, ...) |
|
82 |
stats$stop_date = as.integer(format(Sys.time(), "%s")) |
|
83 |
if (!file.exists(save_filename)) { |
|
84 |
save(task_result, stats, file=save_filename) |
|
85 |
file.remove(lock_filename) |
|
86 |
file.remove(lock_filename_bis) |
|
87 |
} else { |
|
88 |
print(paste("[proc_", proc_id , "] ", stats$stop_date, " ", save_filename, " already exists... So it have been computed for nothing.", sep="")) |
|
89 |
save(task_result, stats, file=for_nothing_filename) |
|
90 |
file.remove(lock_filename) |
|
91 |
file.remove(lock_filename_bis) |
|
92 |
} |
|
93 |
} |
|
94 |
} |
|
95 |
nb_loop = nb_loop + 1 |
|
96 |
} |
|
97 |
if (rm_starter) { |
|
98 |
print("all task have been processed. Removing ~/.start_best_effort_jobs..." ) |
|
99 |
file.remove(starter_name) |
|
100 |
} else { |
|
101 |
print("all task have been processed." ) |
|
102 |
} |
|
103 |
sink() |
|
104 |
} |
|
105 |
UID = round(runif(1,1,1000000)) |
|
106 |
if (debug) { |
|
107 |
nb_proc = 1 |
|
108 |
proc_id = 0 |
|
109 |
print("running engine witout forking (DEBUG MODE)...") |
|
110 |
forked_part() |
|
111 |
} else { |
|
112 |
if (is.null(nb_proc)) { |
|
113 |
nb_proc = get_nb_proc() |
|
114 |
} |
|
115 |
print(paste("running engine over ", nb_proc, " proc(s)...", sep="")) |
|
116 |
pids = c() |
|
117 |
for (proc_id in 1:nb_proc) { |
|
118 |
# Here we fork! |
|
119 |
pids = c(pids,fork(forked_part)) |
|
120 |
} |
|
121 |
# wait until each childs finishe, then display their exit status |
|
122 |
for (pid in pids) { |
|
123 |
wait(pid) |
|
124 |
} |
|
125 |
} |
|
126 |
# Nothing |
|
127 |
}, ex=function(){ |
|
128 |
|
|
129 |
# We define a basic task_processor |
|
130 |
sum_a_b = function(task) { |
|
131 |
return(task$a + task$b) |
|
132 |
} |
|
133 |
|
|
134 |
# We define 9 tasks |
|
135 |
tasks = list() |
|
136 |
for (a in 1:3) { |
|
137 |
for (b in 4:6) { |
|
138 |
tasks[[length(tasks) + 1]] = list(a=a, b=b, out_filename=paste("sum_a_b", a, b, sep="_")) |
|
139 |
} |
|
140 |
} |
|
141 |
|
|
142 |
# We execute the 3 tasks |
|
143 |
run_engine(tasks, sum_a_b) |
|
144 |
|
|
145 |
# We collect 9 task results |
|
146 |
for (a in 1:3) { |
|
147 |
for (b in 4:6) { |
|
148 |
out_filename = paste("sum_a_b", a, b, sep="_") |
|
149 |
out_filename = paste("cache/", out_filename, ".RData", sep="") |
|
150 |
load(out_filename) |
|
151 |
print(task_result) |
|
152 |
} |
|
153 |
} |
|
154 |
|
|
155 |
# Better way to do that |
|
156 |
apply(t(tasks), 2, function(task) { |
|
157 |
out_filename = task[[1]]$out_filename |
|
158 |
out_filename = paste("cache/", out_filename, ".RData", sep="") |
|
159 |
load(out_filename) |
|
160 |
print(task_result) |
|
161 |
}) |
|
162 |
|
|
163 |
# Viewing statistics about the campain. |
|
164 |
bot_stats() |
|
165 |
|
|
166 |
}) |
|
167 |
|
|
168 |
|
|
169 |
botapply = structure(function(## A function to use bot features in an apply fashion. |
|
170 |
### With bot apply you could write your independant loop in an apply fashion, results will be collected ans returned when all tasks will be done. |
|
171 |
tasks, ##<< A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter. |
|
172 |
task_processor, ##<< A function that will be called for each task in the task list \emph{tasks}. |
|
173 |
bot_cache_dir = "cache", ##<< the directory where task results are cached |
|
174 |
... ##<< Other arguments that will be passed to \emph{run_engine}. |
|
175 |
) { |
|
176 |
print(bot_cache_dir) |
|
177 |
run_engine(tasks, task_processor, bot_cache_dir=bot_cache_dir, ...) |
|
178 |
ret = apply(t(tasks), 2, function(task) { |
|
179 |
out_filename = task[[1]]$out_filename |
|
180 |
out_filename = paste(bot_cache_dir, "/", out_filename, ".RData", sep="") |
|
181 |
task_result = NULL |
|
182 |
load(out_filename) |
|
183 |
return(task_result) |
|
184 |
}) |
|
185 |
return(ret) |
|
186 |
# It returns the list of compurted tasks |
|
187 |
}, ex=function(){ |
|
188 |
botapply( |
|
189 |
list( |
|
190 |
list(a=1, b=10, out_filename="task1"), |
|
191 |
list(a=2, b=20, out_filename="task2"), |
|
192 |
list(a=3, b=30, out_filename="task3"), |
|
193 |
list(a=4, b=40, out_filename="task4")), |
|
194 |
function(task) { |
|
195 |
return(task$a + task$b)}) |
|
196 |
|
|
197 |
# botapply(list(list(a=1, b=10, out_filename="task1")),function(task) {return(task$a + task$b)}) |
|
198 |
|
|
199 |
}) |
|
200 |
|
|
201 |
bot_stats = function(## It compute and display statistique about a campain. |
|
202 |
### This function browses bot_cache_dir directory and collects information about tasks. Next, it display gantt chart, tasks chart and how computing element are used. Finally it prints on outpout some stats about the campain. |
|
203 |
bot_cache_dir = "cache", ##<< the directory where task results are cached |
|
204 |
WHITH4NO = FALSE ##<< TRUE if you want to include redundante submission in the stats. |
|
205 |
) { |
|
206 |
foo = apply(t(list.files(bot_cache_dir, "*RData")), 2, function(file) { |
|
207 |
load(paste(bot_cache_dir, "/", file, sep="")) |
|
208 |
stats |
|
209 |
}) |
|
210 |
stats = data.frame(t(matrix(unlist(foo), length(foo[[1]]))), stringsAsFactors=FALSE) |
|
211 |
names(stats) = names(foo[[1]]) |
|
212 |
stats$efficient = 1 |
|
213 |
|
|
214 |
if (WHITH4NO) { |
|
215 |
foo2 = apply(t(list.files(bot_cache_dir, "*4no")), 2, function(file) { |
|
216 |
load(paste(bot_cache_dir, "/", file, sep="")) |
|
217 |
stats |
|
218 |
}) |
|
219 |
stats2 = data.frame(t(matrix(unlist(foo2), length(foo2[[1]]))), stringsAsFactors=FALSE) |
|
220 |
names(stats2) = names(foo2[[1]]) |
|
221 |
stats2$efficient = 2 |
|
222 |
stats = rbind(stats, stats2) |
|
223 |
} |
|
224 |
|
|
225 |
stats$start_date = as.integer(stats$start_date)/60 |
|
226 |
stats$stop_date = as.integer(stats$stop_date)/60 |
|
227 |
zero = min(stats$start_date) |
|
228 |
stats$start_date = stats$start_date - zero |
|
229 |
stats$stop_date = stats$stop_date - zero |
|
230 |
stats$duration = stats$stop_date - stats$start_date |
|
231 |
|
|
232 |
stats$core = paste(stats$hostname, stats$proc_id, sep="_") |
|
233 |
cores = sort(unique(stats$core)) |
|
234 |
|
|
235 |
x11(width=16, height=9) |
|
236 |
layout(matrix(1:3, nrow=1), respect=TRUE) |
|
237 |
|
|
238 |
stats = stats[ order(stats$start_date), ] |
|
239 |
plot(0,0,col=0, xlim=c(0, max(stats$stop_date)), ylim=c(0, length(stats[,1])), main=paste("Gantt Chart for", bot_cache_dir), xlab="Time (min)", ylab= "Task") |
|
240 |
arrows( stats$start_date, 1:length(stats[,1]), stats$stop_date, 1:length(stats[,1]), 0, 0, col=stats$efficient) |
|
241 |
|
|
242 |
stats = stats[ order(stats$duration, decreasing=TRUE),] |
|
243 |
plot(0,0,col=0, xlim=c(0, max(stats$duration)), ylim=c(0, length(stats[,1])), main=paste("Task duration for", bot_cache_dir), xlab="Time (min)", ylab= "Task") |
|
244 |
arrows( rep(0, length(stats[,1])), 1:length(stats[,1]), stats$duration, 1:length(stats[,1]), 0, 0, , col=stats$efficient) |
|
245 |
|
|
246 |
plot(0,0,col=0, xlim=c(0, max(stats$stop_date)), ylim=c(0, length(cores)), main=paste("Task repartition over computing elements for", bot_cache_dir), xlab="Time (min)", ylab= "Computing Element") |
|
247 |
ys = apply(t(stats$core), 2, function(core){which(cores == core)}) |
|
248 |
arrows( stats$start_date, ys, stats$stop_date, ys, 0, 0, col=stats$efficient) |
|
249 |
|
|
250 |
format.timediff <- function(diff) { |
|
251 |
hr <- diff%/%60 |
|
252 |
min <- floor(diff - hr * 60) |
|
253 |
sec <- round(diff%%1 * 60,digits=2) |
|
254 |
return(paste(hr,min,sec,sep=':')) |
|
255 |
} |
|
256 |
|
|
257 |
cat("#cores....................", length(cores), "\n", sep="") |
|
258 |
cat("#tasks....................", length(stats[stats$efficient==1, 1]), "\n", sep="") |
|
259 |
cat("cpu.time..................", format.timediff(sum(stats[stats$efficient==1, ]$duration)), "\n", sep="") |
|
260 |
cat("time......................", format.timediff(max(stats[stats$efficient==1, ]$stop_date)), "\n", sep="") |
|
261 |
cat("speedup...................", sum(stats[stats$efficient==1, ]$duration)/max(stats[stats$efficient==1, ]$stop_date), "\n", sep="") |
|
262 |
cat("efficiency................", sum(stats[stats$efficient==1, ]$duration)/max(stats[stats$efficient==1, ]$stop_date)/length(cores), "\n", sep="") |
|
263 |
return(stats) |
|
264 |
# It returns the data.frame of collected informations. |
|
265 |
} |
b/src/man/bot-package.Rd | ||
---|---|---|
1 |
\name{bot-package} |
|
2 |
\alias{bot-package} |
|
3 |
\alias{bot} |
|
4 |
\docType{package} |
|
5 |
\title{bot} |
|
6 |
\description{BoT (stands for Bag Of Tasks) is an R package allowing to distribute independent tasks over many cores and many computing nodes. The simple fact that BoT is based on the process forking feature and task locking over file system makes BoT compatible with most of computing infrastructures (multicore, clusters, grids, clouds). Using BoT, each task is a set of parameters associated with a user-defined function built on an R process. Next step consists in forking this R process for each core of the computing node. Finally, the forked set of tasks is randomized and executed in a parallel way. When a task starts a distributed lock is taken. This avoids redundant task execution. When a task is ended, result is dumped into a file. BoT is used to compute NGS data in the SiGHT project context (ERC-StG2011-281359). BoT has been tested on two infrastructures: Grid'5000 experimental testbed (https://www.grid5000.fr) and PSMN computing center of ENS de Lyon (http://www.ens-lyon.fr/PSMN). For more information on how to use bot, have a look on the example of the help ?bot::run_engine.} |
|
7 |
\details{ |
|
8 |
\tabular{ll}{Package: \tab bot\cr |
|
9 |
Maintainer: \tab Florent Chuffart <florent.chuffart@ens-lyon.fr>\cr |
|
10 |
Author: \tab Florent Chuffart\cr |
|
11 |
Version: \tab 0.9\cr |
|
12 |
License: \tab CeCILL\cr |
|
13 |
Title: \tab bot\cr |
|
14 |
Depends: \tab fork\cr |
|
15 |
URL: \tab http://www.ens-lyon.fr/LBMC/gisv/index.php/en/protocols/bioinformatics http://www.ens-lyon.fr/LBMC/gisv\cr} |
|
16 |
} |
|
17 |
\author{Florent Chuffart} |
|
18 |
|
|
19 |
\keyword{ package } |
|
20 |
|
|
21 |
|
b/src/man/bot_stats.Rd | ||
---|---|---|
1 |
\name{bot_stats} |
|
2 |
\alias{bot_stats} |
|
3 |
\title{# It compute and display statistique about a campain.} |
|
4 |
\description{This function browses bot_cache_dir directory and collects information about tasks. Next, it display gantt chart, tasks chart and how computing element are used. Finally it prints on outpout some stats about the campain.} |
|
5 |
\usage{bot_stats(bot_cache_dir = "cache", WHITH4NO = FALSE)} |
|
6 |
\arguments{ |
|
7 |
\item{bot_cache_dir}{the directory where task results are cached} |
|
8 |
\item{WHITH4NO}{TRUE if you want to include redundante submission in the stats.} |
|
9 |
} |
|
10 |
|
|
11 |
|
|
12 |
|
|
13 |
\author{Florent Chuffart} |
|
14 |
|
|
15 |
|
|
16 |
|
|
17 |
|
|
18 |
|
b/src/man/botapply.Rd | ||
---|---|---|
1 |
\name{botapply} |
|
2 |
\alias{botapply} |
|
3 |
\title{# A function to use bot features in an apply fashion.} |
|
4 |
\description{With bot apply you could write your independant loop in an apply fashion, results will be collected ans returned when all tasks will be done.} |
|
5 |
\usage{botapply(tasks, task_processor, bot_cache_dir = "cache", ...)} |
|
6 |
\arguments{ |
|
7 |
\item{tasks}{A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter.} |
|
8 |
\item{task_processor}{A function that will be called for each task in the task list \emph{tasks}.} |
|
9 |
\item{bot_cache_dir}{the directory where task results are cached} |
|
10 |
\item{\dots}{Other arguments that will be passed to \emph{run_engine}.} |
|
11 |
} |
|
12 |
|
|
13 |
|
|
14 |
|
|
15 |
\author{Florent Chuffart} |
|
16 |
|
|
17 |
|
|
18 |
|
|
19 |
|
|
20 |
\examples{ |
|
21 |
botapply( |
|
22 |
list( |
|
23 |
list(a=1, b=10, out_filename="task1"), |
|
24 |
list(a=2, b=20, out_filename="task2"), |
|
25 |
list(a=3, b=30, out_filename="task3"), |
|
26 |
list(a=4, b=40, out_filename="task4")), |
|
27 |
function(task) { |
|
28 |
return(task$a + task$b)}) |
|
29 |
|
|
30 |
# botapply(list(list(a=1, b=10, out_filename="task1")),function(task) {return(task$a + task$b)}) |
|
31 |
|
|
32 |
} |
b/src/man/get_nb_proc.Rd | ||
---|---|---|
1 |
\name{get_nb_proc} |
|
2 |
\alias{get_nb_proc} |
|
3 |
\title{# Empirical function to find the number of core of the current computing node.} |
|
4 |
\description{This function uses \emph{/proc/cpuinfo} file for Debian and \emph{sysctl} for macosx. This fucntion could be extended or overrided by the user to adapt it to his own system.} |
|
5 |
\usage{get_nb_proc()} |
|
6 |
|
|
7 |
\value{The number of core that the current computing node owns.} |
|
8 |
|
|
9 |
\author{Florent Chuffart} |
|
10 |
|
|
11 |
|
|
12 |
|
|
13 |
|
|
14 |
|
b/src/man/run_engine.Rd | ||
---|---|---|
1 |
\name{run_engine} |
|
2 |
\alias{run_engine} |
|
3 |
\title{# Execute bag of tasks parallely, on as many cores as the current computing node owns.} |
|
4 |
\description{This bag of tasks engine forks processes on as many cores as the current computing node owns. Each sub-process takes a task randomly in the list of tasks. For each task, it starts by taking a lock on this task (creating a file named out_filename.lock). Next, it executes the task_processor (a function) using the corresponding set of parameters (task). When this execution is completed, it dumps task_processor results into a results file (named out_filename.RData).} |
|
5 |
\usage{run_engine(tasks, task_processor, debug = FALSE, starter_name = "~/.start_best_effort_jobs", |
|
6 |
rm_starter = TRUE, log_dir = "log", bot_cache_dir = "cache", |
|
7 |
nb_proc = NULL, ...)} |
|
8 |
\arguments{ |
|
9 |
\item{tasks}{A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter.} |
|
10 |
\item{task_processor}{A function that will be called for each task in the task list \emph{tasks}.} |
|
11 |
\item{debug}{If \emph{TRUE} no process will be forked, the list of tasks will be executed in the current process.} |
|
12 |
\item{starter_name}{Path to file that will be deleted after the execution of all tasks if \emph{rm_starter} is set to \emph{TRUE}.} |
|
13 |
\item{rm_starter}{If \emph{TRUE} the file \emph{starter_name} will be deleted after the execution of all tasks.} |
|
14 |
\item{log_dir}{Path to the \emph{log} directory.} |
|
15 |
\item{bot_cache_dir}{the directory where task results are cached} |
|
16 |
\item{nb_proc}{If not NULL fix the number of core on which tasks must be computed.} |
|
17 |
\item{\dots}{Other arguments that will be passed to \emph{task_processor}.} |
|
18 |
} |
|
19 |
|
|
20 |
|
|
21 |
|
|
22 |
\author{Florent Chuffart} |
|
23 |
|
|
24 |
|
|
25 |
|
|
26 |
|
|
27 |
\examples{ |
|
28 |
|
|
29 |
# We define a basic task_processor |
|
30 |
sum_a_b = function(task) { |
|
31 |
return(task$a + task$b) |
|
32 |
} |
|
33 |
|
|
34 |
# We define 9 tasks |
|
35 |
tasks = list() |
|
36 |
for (a in 1:3) { |
|
37 |
for (b in 4:6) { |
|
38 |
tasks[[length(tasks) + 1]] = list(a=a, b=b, out_filename=paste("sum_a_b", a, b, sep="_")) |
|
39 |
} |
|
40 |
} |
|
41 |
|
|
42 |
# We execute the 3 tasks |
|
43 |
run_engine(tasks, sum_a_b) |
|
44 |
|
|
45 |
# We collect 9 task results |
|
46 |
for (a in 1:3) { |
|
47 |
for (b in 4:6) { |
|
48 |
out_filename = paste("sum_a_b", a, b, sep="_") |
|
49 |
out_filename = paste("cache/", out_filename, ".RData", sep="") |
|
50 |
load(out_filename) |
|
51 |
print(task_result) |
|
52 |
} |
|
53 |
} |
|
54 |
|
|
55 |
# Better way to do that |
|
56 |
apply(t(tasks), 2, function(task) { |
|
57 |
out_filename = task[[1]]$out_filename |
|
58 |
out_filename = paste("cache/", out_filename, ".RData", sep="") |
|
59 |
load(out_filename) |
|
60 |
print(task_result) |
|
61 |
}) |
|
62 |
|
|
63 |
# Viewing statistics about the campain. |
|
64 |
bot_stats() |
|
65 |
|
|
66 |
} |
Formats disponibles : Unified diff