root / src / R / bot.R @ b412345d
Historique | Voir | Annoter | Télécharger (11,63 ko)
1 |
###################################### |
---|---|
2 |
# DISTRIBUTED COMPUTING TASKS ENGINE # |
3 |
###################################### |
4 |
|
5 |
get_nb_proc = function ## Empirical function to find the number of core of the current computing node. |
6 |
### This function uses \emph{/proc/cpuinfo} file for Debian and \emph{sysctl} for macosx. This fucntion could be extended or overrided by the user to adapt it to his own system. |
7 |
() { |
8 |
if (Sys.info()[["nodename"]] == "cremone") { |
9 |
return(12) |
10 |
} |
11 |
dyn_nb_proc = as.integer(system("cat /proc/cpuinfo | grep 'core id' | wc -l",intern=TRUE)) |
12 |
if (dyn_nb_proc == 0) { |
13 |
dyn_nb_proc = as.integer(system("cat /proc/cpuinfo | grep 'cpuid level' | wc -l",intern=TRUE)) |
14 |
if (dyn_nb_proc == 0) { |
15 |
dyn_nb_proc = as.integer(system("sysctl -a | grep machdep.cpu.core_count | cut -d ' ' -f 2",intern=TRUE)) |
16 |
if (dyn_nb_proc == 0) { |
17 |
return(1) |
18 |
} |
19 |
} |
20 |
} |
21 |
return(dyn_nb_proc) |
22 |
### The number of core that the current computing node owns. |
23 |
} |
24 |
|
25 |
run_engine = structure(function (## Execute bag of tasks parallely, on as many cores as the current computing node owns. |
26 |
### This bag of tasks engine forks processes on as many cores as the current computing node owns. Each sub-process takes a task randomly in the list of tasks. For each task, it starts by taking a lock on this task (creating a file named out_filename.lock). Next, it executes the task_processor (a function) using the corresponding set of parameters (task). When this execution is completed, it dumps task_processor results into a results file (named out_filename.RData). |
27 |
tasks, ##<< A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter. |
28 |
task_processor, ##<< A function that will be called for each task in the task list \emph{tasks}. |
29 |
debug=FALSE, ##<< If \emph{TRUE} no process will be forked, the list of tasks will be executed in the current process. |
30 |
starter_name="~/.start_best_effort_jobs", ##<< Path to file that will be deleted after the execution of all tasks if \emph{rm_starter} is set to \emph{TRUE}. |
31 |
rm_starter=TRUE, ##<< If \emph{TRUE} the file \emph{starter_name} will be deleted after the execution of all tasks. |
32 |
log_dir="log", ##<< Path to the \emph{log} directory. |
33 |
bot_cache_dir = "cache", ##<< the directory where task results are cached |
34 |
nb_proc=NULL, ##<< If not NULL fix the number of core on which tasks must be computed. |
35 |
... ##<< Other arguments that will be passed to \emph{task_processor}. |
36 |
){ |
37 |
if (!file.exists(log_dir)) { |
38 |
dir.create(log_dir, recursive = TRUE) |
39 |
} |
40 |
if (!file.exists(bot_cache_dir)) { |
41 |
dir.create(bot_cache_dir, recursive = TRUE) |
42 |
} |
43 |
print(paste("#tasks: ", length(tasks))) |
44 |
forked_part = function(){ |
45 |
stats = list() |
46 |
stats$proc_id = proc_id |
47 |
stats$UID = UID |
48 |
set.seed(proc_id + UID) |
49 |
if (!debug) { |
50 |
Sys.sleep(floor(runif(1,1,30))) |
51 |
} |
52 |
hostname = system("hostname", intern=TRUE) |
53 |
stats$hostname = hostname |
54 |
sink(paste(log_dir, "/proc_id_hostname_", proc_id, "_", hostname, ".log", sep=""), type =c("output", "message"), split = TRUE) |
55 |
need_rerun = TRUE |
56 |
nb_loop = 0 |
57 |
while (need_rerun){ |
58 |
need_rerun = FALSE |
59 |
for (task in sample(tasks)) { |
60 |
# Check mandatory task attribute |
61 |
if (is.null(task$out_filename)) { |
62 |
print("ERROR! Attribute task$out_filename is mandatory.") |
63 |
exit(1) |
64 |
} |
65 |
# Check if task is already done or currently processed |
66 |
lock_filename = paste(bot_cache_dir, "/", task$out_filename, ".lock", sep="") |
67 |
save_filename = paste(bot_cache_dir, "/", task$out_filename, ".RData", sep="") |
68 |
lock_filename_bis = paste(bot_cache_dir, "/", task$out_filename, "_", (proc_id + UID) ,".lock4no", sep="") |
69 |
for_nothing_filename = paste(bot_cache_dir, "/", task$out_filename, "_", (proc_id + UID) , ".RData4no", sep="") |
70 |
stats$start_date = as.integer(format(Sys.time(), "%s")) |
71 |
if (nb_loop==0 & file.exists(lock_filename)) { |
72 |
print(paste("[proc_", proc_id , "] ", date(), " ", save_filename, " is locked... skipping.", sep="")) |
73 |
need_rerun = TRUE |
74 |
} else if (file.exists(save_filename)) { |
75 |
print(paste("[proc_", proc_id , "] ", date(), " ", save_filename, " exists... skipping.", sep="")) |
76 |
} else { |
77 |
need_rerun = TRUE |
78 |
print(paste("[proc_", proc_id , "] ", date(), " taking lock on ", lock_filename, " and computing...", sep="")) |
79 |
save(stats, file=lock_filename) |
80 |
save(stats, file=lock_filename_bis) |
81 |
task_result = task_processor(task, ...) |
82 |
stats$stop_date = as.integer(format(Sys.time(), "%s")) |
83 |
if (!file.exists(save_filename)) { |
84 |
save(task_result, stats, file=save_filename) |
85 |
file.remove(lock_filename) |
86 |
file.remove(lock_filename_bis) |
87 |
} else { |
88 |
print(paste("[proc_", proc_id , "] ", stats$stop_date, " ", save_filename, " already exists... So it have been computed for nothing.", sep="")) |
89 |
save(task_result, stats, file=for_nothing_filename) |
90 |
file.remove(lock_filename) |
91 |
file.remove(lock_filename_bis) |
92 |
} |
93 |
} |
94 |
} |
95 |
nb_loop = nb_loop + 1 |
96 |
} |
97 |
if (rm_starter) { |
98 |
print("all task have been processed. Removing ~/.start_best_effort_jobs..." ) |
99 |
file.remove(starter_name) |
100 |
} else { |
101 |
print("all task have been processed." ) |
102 |
} |
103 |
sink() |
104 |
} |
105 |
UID = round(runif(1,1,1000000)) |
106 |
if (debug) { |
107 |
nb_proc = 1 |
108 |
proc_id = 0 |
109 |
print("running engine witout forking (DEBUG MODE)...") |
110 |
forked_part() |
111 |
} else { |
112 |
if (is.null(nb_proc)) { |
113 |
nb_proc = get_nb_proc() |
114 |
} |
115 |
print(paste("running engine over ", nb_proc, " proc(s)...", sep="")) |
116 |
pids = c() |
117 |
for (proc_id in 1:nb_proc) { |
118 |
# Here we fork! |
119 |
pids = c(pids,fork(forked_part)) |
120 |
} |
121 |
# wait until each childs finishe, then display their exit status |
122 |
for (pid in pids) { |
123 |
wait(pid) |
124 |
} |
125 |
} |
126 |
# Nothing |
127 |
}, ex=function(){ |
128 |
|
129 |
# We define a basic task_processor |
130 |
sum_a_b = function(task) { |
131 |
return(task$a + task$b) |
132 |
} |
133 |
|
134 |
# We define 9 tasks |
135 |
tasks = list() |
136 |
for (a in 1:3) { |
137 |
for (b in 4:6) { |
138 |
tasks[[length(tasks) + 1]] = list(a=a, b=b, out_filename=paste("sum_a_b", a, b, sep="_")) |
139 |
} |
140 |
} |
141 |
|
142 |
# We execute the 3 tasks |
143 |
run_engine(tasks, sum_a_b) |
144 |
|
145 |
# We collect 9 task results |
146 |
for (a in 1:3) { |
147 |
for (b in 4:6) { |
148 |
out_filename = paste("sum_a_b", a, b, sep="_") |
149 |
out_filename = paste("cache/", out_filename, ".RData", sep="") |
150 |
load(out_filename) |
151 |
print(task_result) |
152 |
} |
153 |
} |
154 |
|
155 |
# Better way to do that |
156 |
apply(t(tasks), 2, function(task) { |
157 |
out_filename = task[[1]]$out_filename |
158 |
out_filename = paste("cache/", out_filename, ".RData", sep="") |
159 |
load(out_filename) |
160 |
print(task_result) |
161 |
}) |
162 |
|
163 |
# Viewing statistics about the campain. |
164 |
bot_stats() |
165 |
|
166 |
}) |
167 |
|
168 |
|
169 |
botapply = structure(function(## A function to use bot features in an apply fashion. |
170 |
### With bot apply you could write your independant loop in an apply fashion, results will be collected ans returned when all tasks will be done. |
171 |
tasks, ##<< A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter. |
172 |
task_processor, ##<< A function that will be called for each task in the task list \emph{tasks}. |
173 |
bot_cache_dir = "cache", ##<< the directory where task results are cached |
174 |
... ##<< Other arguments that will be passed to \emph{run_engine}. |
175 |
) { |
176 |
print(bot_cache_dir) |
177 |
run_engine(tasks, task_processor, bot_cache_dir=bot_cache_dir, ...) |
178 |
ret = apply(t(tasks), 2, function(task) { |
179 |
out_filename = task[[1]]$out_filename |
180 |
out_filename = paste(bot_cache_dir, "/", out_filename, ".RData", sep="") |
181 |
task_result = NULL |
182 |
load(out_filename) |
183 |
return(task_result) |
184 |
}) |
185 |
return(ret) |
186 |
# It returns the list of compurted tasks |
187 |
}, ex=function(){ |
188 |
botapply( |
189 |
list( |
190 |
list(a=1, b=10, out_filename="task1"), |
191 |
list(a=2, b=20, out_filename="task2"), |
192 |
list(a=3, b=30, out_filename="task3"), |
193 |
list(a=4, b=40, out_filename="task4")), |
194 |
function(task) { |
195 |
return(task$a + task$b)}) |
196 |
|
197 |
# botapply(list(list(a=1, b=10, out_filename="task1")),function(task) {return(task$a + task$b)}) |
198 |
|
199 |
}) |
200 |
|
201 |
bot_stats = function(## It compute and display statistique about a campain. |
202 |
### This function browses bot_cache_dir directory and collects information about tasks. Next, it display gantt chart, tasks chart and how computing element are used. Finally it prints on outpout some stats about the campain. |
203 |
bot_cache_dir = "cache", ##<< the directory where task results are cached |
204 |
WHITH4NO = FALSE ##<< TRUE if you want to include redundante submission in the stats. |
205 |
) { |
206 |
foo = apply(t(list.files(bot_cache_dir, "*RData")), 2, function(file) { |
207 |
load(paste(bot_cache_dir, "/", file, sep="")) |
208 |
stats |
209 |
}) |
210 |
stats = data.frame(t(matrix(unlist(foo), length(foo[[1]]))), stringsAsFactors=FALSE) |
211 |
names(stats) = names(foo[[1]]) |
212 |
stats$efficient = 1 |
213 |
|
214 |
if (WHITH4NO) { |
215 |
foo2 = apply(t(list.files(bot_cache_dir, "*4no")), 2, function(file) { |
216 |
load(paste(bot_cache_dir, "/", file, sep="")) |
217 |
stats |
218 |
}) |
219 |
stats2 = data.frame(t(matrix(unlist(foo2), length(foo2[[1]]))), stringsAsFactors=FALSE) |
220 |
names(stats2) = names(foo2[[1]]) |
221 |
stats2$efficient = 2 |
222 |
stats = rbind(stats, stats2) |
223 |
} |
224 |
|
225 |
stats$start_date = as.integer(stats$start_date)/60 |
226 |
stats$stop_date = as.integer(stats$stop_date)/60 |
227 |
zero = min(stats$start_date) |
228 |
stats$start_date = stats$start_date - zero |
229 |
stats$stop_date = stats$stop_date - zero |
230 |
stats$duration = stats$stop_date - stats$start_date |
231 |
|
232 |
stats$core = paste(stats$hostname, stats$proc_id, sep="_") |
233 |
cores = sort(unique(stats$core)) |
234 |
|
235 |
x11(width=16, height=9) |
236 |
layout(matrix(1:3, nrow=1), respect=TRUE) |
237 |
|
238 |
stats = stats[ order(stats$start_date), ] |
239 |
plot(0,0,col=0, xlim=c(0, max(stats$stop_date)), ylim=c(0, length(stats[,1])), main=paste("Gantt Chart for", bot_cache_dir), xlab="Time (min)", ylab= "Task") |
240 |
arrows( stats$start_date, 1:length(stats[,1]), stats$stop_date, 1:length(stats[,1]), 0, 0, col=stats$efficient) |
241 |
|
242 |
stats = stats[ order(stats$duration, decreasing=TRUE),] |
243 |
plot(0,0,col=0, xlim=c(0, max(stats$duration)), ylim=c(0, length(stats[,1])), main=paste("Task duration for", bot_cache_dir), xlab="Time (min)", ylab= "Task") |
244 |
arrows( rep(0, length(stats[,1])), 1:length(stats[,1]), stats$duration, 1:length(stats[,1]), 0, 0, , col=stats$efficient) |
245 |
|
246 |
plot(0,0,col=0, xlim=c(0, max(stats$stop_date)), ylim=c(0, length(cores)), main=paste("Task repartition over computing elements for", bot_cache_dir), xlab="Time (min)", ylab= "Computing Element") |
247 |
ys = apply(t(stats$core), 2, function(core){which(cores == core)}) |
248 |
arrows( stats$start_date, ys, stats$stop_date, ys, 0, 0, col=stats$efficient) |
249 |
|
250 |
format.timediff <- function(diff) { |
251 |
hr <- diff%/%60 |
252 |
min <- floor(diff - hr * 60) |
253 |
sec <- round(diff%%1 * 60,digits=2) |
254 |
return(paste(hr,min,sec,sep=':')) |
255 |
} |
256 |
|
257 |
cat("#cores....................", length(cores), "\n", sep="") |
258 |
cat("#tasks....................", length(stats[stats$efficient==1, 1]), "\n", sep="") |
259 |
cat("cpu.time..................", format.timediff(sum(stats[stats$efficient==1, ]$duration)), "\n", sep="") |
260 |
cat("time......................", format.timediff(max(stats[stats$efficient==1, ]$stop_date)), "\n", sep="") |
261 |
cat("speedup...................", sum(stats[stats$efficient==1, ]$duration)/max(stats[stats$efficient==1, ]$stop_date), "\n", sep="") |
262 |
cat("efficiency................", sum(stats[stats$efficient==1, ]$duration)/max(stats[stats$efficient==1, ]$stop_date)/length(cores), "\n", sep="") |
263 |
return(stats) |
264 |
# It returns the data.frame of collected informations. |
265 |
} |