root / src / R / bot.R @ master
Historique | Voir | Annoter | Télécharger (11,63 ko)
1 | b412345d | Florent Chuffart | ###################################### |
---|---|---|---|
2 | b412345d | Florent Chuffart | # DISTRIBUTED COMPUTING TASKS ENGINE # |
3 | b412345d | Florent Chuffart | ###################################### |
4 | b412345d | Florent Chuffart | |
5 | b412345d | Florent Chuffart | get_nb_proc = function ## Empirical function to find the number of core of the current computing node. |
6 | b412345d | Florent Chuffart | ### This function uses \emph{/proc/cpuinfo} file for Debian and \emph{sysctl} for macosx. This fucntion could be extended or overrided by the user to adapt it to his own system. |
7 | b412345d | Florent Chuffart | () { |
8 | b412345d | Florent Chuffart | if (Sys.info()[["nodename"]] == "cremone") { |
9 | b412345d | Florent Chuffart | return(12) |
10 | b412345d | Florent Chuffart | } |
11 | b412345d | Florent Chuffart | dyn_nb_proc = as.integer(system("cat /proc/cpuinfo | grep 'core id' | wc -l",intern=TRUE)) |
12 | b412345d | Florent Chuffart | if (dyn_nb_proc == 0) { |
13 | b412345d | Florent Chuffart | dyn_nb_proc = as.integer(system("cat /proc/cpuinfo | grep 'cpuid level' | wc -l",intern=TRUE)) |
14 | b412345d | Florent Chuffart | if (dyn_nb_proc == 0) { |
15 | b412345d | Florent Chuffart | dyn_nb_proc = as.integer(system("sysctl -a | grep machdep.cpu.core_count | cut -d ' ' -f 2",intern=TRUE)) |
16 | b412345d | Florent Chuffart | if (dyn_nb_proc == 0) { |
17 | b412345d | Florent Chuffart | return(1) |
18 | b412345d | Florent Chuffart | } |
19 | b412345d | Florent Chuffart | } |
20 | b412345d | Florent Chuffart | } |
21 | b412345d | Florent Chuffart | return(dyn_nb_proc) |
22 | b412345d | Florent Chuffart | ### The number of core that the current computing node owns. |
23 | b412345d | Florent Chuffart | } |
24 | b412345d | Florent Chuffart | |
25 | b412345d | Florent Chuffart | run_engine = structure(function (## Execute bag of tasks parallely, on as many cores as the current computing node owns. |
26 | b412345d | Florent Chuffart | ### This bag of tasks engine forks processes on as many cores as the current computing node owns. Each sub-process takes a task randomly in the list of tasks. For each task, it starts by taking a lock on this task (creating a file named out_filename.lock). Next, it executes the task_processor (a function) using the corresponding set of parameters (task). When this execution is completed, it dumps task_processor results into a results file (named out_filename.RData). |
27 | b412345d | Florent Chuffart | tasks, ##<< A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter. |
28 | b412345d | Florent Chuffart | task_processor, ##<< A function that will be called for each task in the task list \emph{tasks}. |
29 | b412345d | Florent Chuffart | debug=FALSE, ##<< If \emph{TRUE} no process will be forked, the list of tasks will be executed in the current process. |
30 | b412345d | Florent Chuffart | starter_name="~/.start_best_effort_jobs", ##<< Path to file that will be deleted after the execution of all tasks if \emph{rm_starter} is set to \emph{TRUE}. |
31 | b412345d | Florent Chuffart | rm_starter=TRUE, ##<< If \emph{TRUE} the file \emph{starter_name} will be deleted after the execution of all tasks. |
32 | b412345d | Florent Chuffart | log_dir="log", ##<< Path to the \emph{log} directory. |
33 | b412345d | Florent Chuffart | bot_cache_dir = "cache", ##<< the directory where task results are cached |
34 | b412345d | Florent Chuffart | nb_proc=NULL, ##<< If not NULL fix the number of core on which tasks must be computed. |
35 | b412345d | Florent Chuffart | ... ##<< Other arguments that will be passed to \emph{task_processor}. |
36 | b412345d | Florent Chuffart | ){ |
37 | b412345d | Florent Chuffart | if (!file.exists(log_dir)) { |
38 | b412345d | Florent Chuffart | dir.create(log_dir, recursive = TRUE) |
39 | b412345d | Florent Chuffart | } |
40 | b412345d | Florent Chuffart | if (!file.exists(bot_cache_dir)) { |
41 | b412345d | Florent Chuffart | dir.create(bot_cache_dir, recursive = TRUE) |
42 | b412345d | Florent Chuffart | } |
43 | b412345d | Florent Chuffart | print(paste("#tasks: ", length(tasks))) |
44 | b412345d | Florent Chuffart | forked_part = function(){ |
45 | b412345d | Florent Chuffart | stats = list() |
46 | b412345d | Florent Chuffart | stats$proc_id = proc_id |
47 | b412345d | Florent Chuffart | stats$UID = UID |
48 | b412345d | Florent Chuffart | set.seed(proc_id + UID) |
49 | b412345d | Florent Chuffart | if (!debug) { |
50 | b412345d | Florent Chuffart | Sys.sleep(floor(runif(1,1,30))) |
51 | b412345d | Florent Chuffart | } |
52 | b412345d | Florent Chuffart | hostname = system("hostname", intern=TRUE) |
53 | b412345d | Florent Chuffart | stats$hostname = hostname |
54 | b412345d | Florent Chuffart | sink(paste(log_dir, "/proc_id_hostname_", proc_id, "_", hostname, ".log", sep=""), type =c("output", "message"), split = TRUE) |
55 | b412345d | Florent Chuffart | need_rerun = TRUE |
56 | b412345d | Florent Chuffart | nb_loop = 0 |
57 | b412345d | Florent Chuffart | while (need_rerun){ |
58 | b412345d | Florent Chuffart | need_rerun = FALSE |
59 | b412345d | Florent Chuffart | for (task in sample(tasks)) { |
60 | b412345d | Florent Chuffart | # Check mandatory task attribute |
61 | b412345d | Florent Chuffart | if (is.null(task$out_filename)) { |
62 | b412345d | Florent Chuffart | print("ERROR! Attribute task$out_filename is mandatory.") |
63 | b412345d | Florent Chuffart | exit(1) |
64 | b412345d | Florent Chuffart | } |
65 | b412345d | Florent Chuffart | # Check if task is already done or currently processed |
66 | b412345d | Florent Chuffart | lock_filename = paste(bot_cache_dir, "/", task$out_filename, ".lock", sep="") |
67 | b412345d | Florent Chuffart | save_filename = paste(bot_cache_dir, "/", task$out_filename, ".RData", sep="") |
68 | b412345d | Florent Chuffart | lock_filename_bis = paste(bot_cache_dir, "/", task$out_filename, "_", (proc_id + UID) ,".lock4no", sep="") |
69 | b412345d | Florent Chuffart | for_nothing_filename = paste(bot_cache_dir, "/", task$out_filename, "_", (proc_id + UID) , ".RData4no", sep="") |
70 | b412345d | Florent Chuffart | stats$start_date = as.integer(format(Sys.time(), "%s")) |
71 | b412345d | Florent Chuffart | if (nb_loop==0 & file.exists(lock_filename)) { |
72 | b412345d | Florent Chuffart | print(paste("[proc_", proc_id , "] ", date(), " ", save_filename, " is locked... skipping.", sep="")) |
73 | b412345d | Florent Chuffart | need_rerun = TRUE |
74 | b412345d | Florent Chuffart | } else if (file.exists(save_filename)) { |
75 | b412345d | Florent Chuffart | print(paste("[proc_", proc_id , "] ", date(), " ", save_filename, " exists... skipping.", sep="")) |
76 | b412345d | Florent Chuffart | } else { |
77 | b412345d | Florent Chuffart | need_rerun = TRUE |
78 | b412345d | Florent Chuffart | print(paste("[proc_", proc_id , "] ", date(), " taking lock on ", lock_filename, " and computing...", sep="")) |
79 | b412345d | Florent Chuffart | save(stats, file=lock_filename) |
80 | b412345d | Florent Chuffart | save(stats, file=lock_filename_bis) |
81 | b412345d | Florent Chuffart | task_result = task_processor(task, ...) |
82 | b412345d | Florent Chuffart | stats$stop_date = as.integer(format(Sys.time(), "%s")) |
83 | b412345d | Florent Chuffart | if (!file.exists(save_filename)) { |
84 | b412345d | Florent Chuffart | save(task_result, stats, file=save_filename) |
85 | b412345d | Florent Chuffart | file.remove(lock_filename) |
86 | b412345d | Florent Chuffart | file.remove(lock_filename_bis) |
87 | b412345d | Florent Chuffart | } else { |
88 | b412345d | Florent Chuffart | print(paste("[proc_", proc_id , "] ", stats$stop_date, " ", save_filename, " already exists... So it have been computed for nothing.", sep="")) |
89 | b412345d | Florent Chuffart | save(task_result, stats, file=for_nothing_filename) |
90 | b412345d | Florent Chuffart | file.remove(lock_filename) |
91 | b412345d | Florent Chuffart | file.remove(lock_filename_bis) |
92 | b412345d | Florent Chuffart | } |
93 | b412345d | Florent Chuffart | } |
94 | b412345d | Florent Chuffart | } |
95 | b412345d | Florent Chuffart | nb_loop = nb_loop + 1 |
96 | b412345d | Florent Chuffart | } |
97 | b412345d | Florent Chuffart | if (rm_starter) { |
98 | b412345d | Florent Chuffart | print("all task have been processed. Removing ~/.start_best_effort_jobs..." ) |
99 | b412345d | Florent Chuffart | file.remove(starter_name) |
100 | b412345d | Florent Chuffart | } else { |
101 | b412345d | Florent Chuffart | print("all task have been processed." ) |
102 | b412345d | Florent Chuffart | } |
103 | b412345d | Florent Chuffart | sink() |
104 | b412345d | Florent Chuffart | } |
105 | b412345d | Florent Chuffart | UID = round(runif(1,1,1000000)) |
106 | b412345d | Florent Chuffart | if (debug) { |
107 | b412345d | Florent Chuffart | nb_proc = 1 |
108 | b412345d | Florent Chuffart | proc_id = 0 |
109 | b412345d | Florent Chuffart | print("running engine witout forking (DEBUG MODE)...") |
110 | b412345d | Florent Chuffart | forked_part() |
111 | b412345d | Florent Chuffart | } else { |
112 | b412345d | Florent Chuffart | if (is.null(nb_proc)) { |
113 | b412345d | Florent Chuffart | nb_proc = get_nb_proc() |
114 | b412345d | Florent Chuffart | } |
115 | b412345d | Florent Chuffart | print(paste("running engine over ", nb_proc, " proc(s)...", sep="")) |
116 | b412345d | Florent Chuffart | pids = c() |
117 | b412345d | Florent Chuffart | for (proc_id in 1:nb_proc) { |
118 | b412345d | Florent Chuffart | # Here we fork! |
119 | b412345d | Florent Chuffart | pids = c(pids,fork(forked_part)) |
120 | b412345d | Florent Chuffart | } |
121 | b412345d | Florent Chuffart | # wait until each childs finishe, then display their exit status |
122 | b412345d | Florent Chuffart | for (pid in pids) { |
123 | b412345d | Florent Chuffart | wait(pid) |
124 | b412345d | Florent Chuffart | } |
125 | b412345d | Florent Chuffart | } |
126 | b412345d | Florent Chuffart | # Nothing |
127 | b412345d | Florent Chuffart | }, ex=function(){ |
128 | b412345d | Florent Chuffart | |
129 | b412345d | Florent Chuffart | # We define a basic task_processor |
130 | b412345d | Florent Chuffart | sum_a_b = function(task) { |
131 | b412345d | Florent Chuffart | return(task$a + task$b) |
132 | b412345d | Florent Chuffart | } |
133 | b412345d | Florent Chuffart | |
134 | b412345d | Florent Chuffart | # We define 9 tasks |
135 | b412345d | Florent Chuffart | tasks = list() |
136 | b412345d | Florent Chuffart | for (a in 1:3) { |
137 | b412345d | Florent Chuffart | for (b in 4:6) { |
138 | b412345d | Florent Chuffart | tasks[[length(tasks) + 1]] = list(a=a, b=b, out_filename=paste("sum_a_b", a, b, sep="_")) |
139 | b412345d | Florent Chuffart | } |
140 | b412345d | Florent Chuffart | } |
141 | b412345d | Florent Chuffart | |
142 | b412345d | Florent Chuffart | # We execute the 3 tasks |
143 | b412345d | Florent Chuffart | run_engine(tasks, sum_a_b) |
144 | b412345d | Florent Chuffart | |
145 | b412345d | Florent Chuffart | # We collect 9 task results |
146 | b412345d | Florent Chuffart | for (a in 1:3) { |
147 | b412345d | Florent Chuffart | for (b in 4:6) { |
148 | b412345d | Florent Chuffart | out_filename = paste("sum_a_b", a, b, sep="_") |
149 | b412345d | Florent Chuffart | out_filename = paste("cache/", out_filename, ".RData", sep="") |
150 | b412345d | Florent Chuffart | load(out_filename) |
151 | b412345d | Florent Chuffart | print(task_result) |
152 | b412345d | Florent Chuffart | } |
153 | b412345d | Florent Chuffart | } |
154 | b412345d | Florent Chuffart | |
155 | b412345d | Florent Chuffart | # Better way to do that |
156 | b412345d | Florent Chuffart | apply(t(tasks), 2, function(task) { |
157 | b412345d | Florent Chuffart | out_filename = task[[1]]$out_filename |
158 | b412345d | Florent Chuffart | out_filename = paste("cache/", out_filename, ".RData", sep="") |
159 | b412345d | Florent Chuffart | load(out_filename) |
160 | b412345d | Florent Chuffart | print(task_result) |
161 | b412345d | Florent Chuffart | }) |
162 | b412345d | Florent Chuffart | |
163 | b412345d | Florent Chuffart | # Viewing statistics about the campain. |
164 | b412345d | Florent Chuffart | bot_stats() |
165 | b412345d | Florent Chuffart | |
166 | b412345d | Florent Chuffart | }) |
167 | b412345d | Florent Chuffart | |
168 | b412345d | Florent Chuffart | |
169 | b412345d | Florent Chuffart | botapply = structure(function(## A function to use bot features in an apply fashion. |
170 | b412345d | Florent Chuffart | ### With bot apply you could write your independant loop in an apply fashion, results will be collected ans returned when all tasks will be done. |
171 | b412345d | Florent Chuffart | tasks, ##<< A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter. |
172 | b412345d | Florent Chuffart | task_processor, ##<< A function that will be called for each task in the task list \emph{tasks}. |
173 | b412345d | Florent Chuffart | bot_cache_dir = "cache", ##<< the directory where task results are cached |
174 | b412345d | Florent Chuffart | ... ##<< Other arguments that will be passed to \emph{run_engine}. |
175 | b412345d | Florent Chuffart | ) { |
176 | b412345d | Florent Chuffart | print(bot_cache_dir) |
177 | b412345d | Florent Chuffart | run_engine(tasks, task_processor, bot_cache_dir=bot_cache_dir, ...) |
178 | b412345d | Florent Chuffart | ret = apply(t(tasks), 2, function(task) { |
179 | b412345d | Florent Chuffart | out_filename = task[[1]]$out_filename |
180 | b412345d | Florent Chuffart | out_filename = paste(bot_cache_dir, "/", out_filename, ".RData", sep="") |
181 | b412345d | Florent Chuffart | task_result = NULL |
182 | b412345d | Florent Chuffart | load(out_filename) |
183 | b412345d | Florent Chuffart | return(task_result) |
184 | b412345d | Florent Chuffart | }) |
185 | b412345d | Florent Chuffart | return(ret) |
186 | b412345d | Florent Chuffart | # It returns the list of compurted tasks |
187 | b412345d | Florent Chuffart | }, ex=function(){ |
188 | b412345d | Florent Chuffart | botapply( |
189 | b412345d | Florent Chuffart | list( |
190 | b412345d | Florent Chuffart | list(a=1, b=10, out_filename="task1"), |
191 | b412345d | Florent Chuffart | list(a=2, b=20, out_filename="task2"), |
192 | b412345d | Florent Chuffart | list(a=3, b=30, out_filename="task3"), |
193 | b412345d | Florent Chuffart | list(a=4, b=40, out_filename="task4")), |
194 | b412345d | Florent Chuffart | function(task) { |
195 | b412345d | Florent Chuffart | return(task$a + task$b)}) |
196 | b412345d | Florent Chuffart | |
197 | b412345d | Florent Chuffart | # botapply(list(list(a=1, b=10, out_filename="task1")),function(task) {return(task$a + task$b)}) |
198 | b412345d | Florent Chuffart | |
199 | b412345d | Florent Chuffart | }) |
200 | b412345d | Florent Chuffart | |
201 | b412345d | Florent Chuffart | bot_stats = function(## It compute and display statistique about a campain. |
202 | b412345d | Florent Chuffart | ### This function browses bot_cache_dir directory and collects information about tasks. Next, it display gantt chart, tasks chart and how computing element are used. Finally it prints on outpout some stats about the campain. |
203 | b412345d | Florent Chuffart | bot_cache_dir = "cache", ##<< the directory where task results are cached |
204 | b412345d | Florent Chuffart | WHITH4NO = FALSE ##<< TRUE if you want to include redundante submission in the stats. |
205 | b412345d | Florent Chuffart | ) { |
206 | b412345d | Florent Chuffart | foo = apply(t(list.files(bot_cache_dir, "*RData")), 2, function(file) { |
207 | b412345d | Florent Chuffart | load(paste(bot_cache_dir, "/", file, sep="")) |
208 | b412345d | Florent Chuffart | stats |
209 | b412345d | Florent Chuffart | }) |
210 | b412345d | Florent Chuffart | stats = data.frame(t(matrix(unlist(foo), length(foo[[1]]))), stringsAsFactors=FALSE) |
211 | b412345d | Florent Chuffart | names(stats) = names(foo[[1]]) |
212 | b412345d | Florent Chuffart | stats$efficient = 1 |
213 | b412345d | Florent Chuffart | |
214 | b412345d | Florent Chuffart | if (WHITH4NO) { |
215 | b412345d | Florent Chuffart | foo2 = apply(t(list.files(bot_cache_dir, "*4no")), 2, function(file) { |
216 | b412345d | Florent Chuffart | load(paste(bot_cache_dir, "/", file, sep="")) |
217 | b412345d | Florent Chuffart | stats |
218 | b412345d | Florent Chuffart | }) |
219 | b412345d | Florent Chuffart | stats2 = data.frame(t(matrix(unlist(foo2), length(foo2[[1]]))), stringsAsFactors=FALSE) |
220 | b412345d | Florent Chuffart | names(stats2) = names(foo2[[1]]) |
221 | b412345d | Florent Chuffart | stats2$efficient = 2 |
222 | b412345d | Florent Chuffart | stats = rbind(stats, stats2) |
223 | b412345d | Florent Chuffart | } |
224 | b412345d | Florent Chuffart | |
225 | b412345d | Florent Chuffart | stats$start_date = as.integer(stats$start_date)/60 |
226 | b412345d | Florent Chuffart | stats$stop_date = as.integer(stats$stop_date)/60 |
227 | b412345d | Florent Chuffart | zero = min(stats$start_date) |
228 | b412345d | Florent Chuffart | stats$start_date = stats$start_date - zero |
229 | b412345d | Florent Chuffart | stats$stop_date = stats$stop_date - zero |
230 | b412345d | Florent Chuffart | stats$duration = stats$stop_date - stats$start_date |
231 | b412345d | Florent Chuffart | |
232 | b412345d | Florent Chuffart | stats$core = paste(stats$hostname, stats$proc_id, sep="_") |
233 | b412345d | Florent Chuffart | cores = sort(unique(stats$core)) |
234 | b412345d | Florent Chuffart | |
235 | b412345d | Florent Chuffart | x11(width=16, height=9) |
236 | b412345d | Florent Chuffart | layout(matrix(1:3, nrow=1), respect=TRUE) |
237 | b412345d | Florent Chuffart | |
238 | b412345d | Florent Chuffart | stats = stats[ order(stats$start_date), ] |
239 | b412345d | Florent Chuffart | plot(0,0,col=0, xlim=c(0, max(stats$stop_date)), ylim=c(0, length(stats[,1])), main=paste("Gantt Chart for", bot_cache_dir), xlab="Time (min)", ylab= "Task") |
240 | b412345d | Florent Chuffart | arrows( stats$start_date, 1:length(stats[,1]), stats$stop_date, 1:length(stats[,1]), 0, 0, col=stats$efficient) |
241 | b412345d | Florent Chuffart | |
242 | b412345d | Florent Chuffart | stats = stats[ order(stats$duration, decreasing=TRUE),] |
243 | b412345d | Florent Chuffart | plot(0,0,col=0, xlim=c(0, max(stats$duration)), ylim=c(0, length(stats[,1])), main=paste("Task duration for", bot_cache_dir), xlab="Time (min)", ylab= "Task") |
244 | b412345d | Florent Chuffart | arrows( rep(0, length(stats[,1])), 1:length(stats[,1]), stats$duration, 1:length(stats[,1]), 0, 0, , col=stats$efficient) |
245 | b412345d | Florent Chuffart | |
246 | b412345d | Florent Chuffart | plot(0,0,col=0, xlim=c(0, max(stats$stop_date)), ylim=c(0, length(cores)), main=paste("Task repartition over computing elements for", bot_cache_dir), xlab="Time (min)", ylab= "Computing Element") |
247 | b412345d | Florent Chuffart | ys = apply(t(stats$core), 2, function(core){which(cores == core)}) |
248 | b412345d | Florent Chuffart | arrows( stats$start_date, ys, stats$stop_date, ys, 0, 0, col=stats$efficient) |
249 | b412345d | Florent Chuffart | |
250 | b412345d | Florent Chuffart | format.timediff <- function(diff) { |
251 | b412345d | Florent Chuffart | hr <- diff%/%60 |
252 | b412345d | Florent Chuffart | min <- floor(diff - hr * 60) |
253 | b412345d | Florent Chuffart | sec <- round(diff%%1 * 60,digits=2) |
254 | b412345d | Florent Chuffart | return(paste(hr,min,sec,sep=':')) |
255 | b412345d | Florent Chuffart | } |
256 | b412345d | Florent Chuffart | |
257 | b412345d | Florent Chuffart | cat("#cores....................", length(cores), "\n", sep="") |
258 | b412345d | Florent Chuffart | cat("#tasks....................", length(stats[stats$efficient==1, 1]), "\n", sep="") |
259 | b412345d | Florent Chuffart | cat("cpu.time..................", format.timediff(sum(stats[stats$efficient==1, ]$duration)), "\n", sep="") |
260 | b412345d | Florent Chuffart | cat("time......................", format.timediff(max(stats[stats$efficient==1, ]$stop_date)), "\n", sep="") |
261 | b412345d | Florent Chuffart | cat("speedup...................", sum(stats[stats$efficient==1, ]$duration)/max(stats[stats$efficient==1, ]$stop_date), "\n", sep="") |
262 | b412345d | Florent Chuffart | cat("efficiency................", sum(stats[stats$efficient==1, ]$duration)/max(stats[stats$efficient==1, ]$stop_date)/length(cores), "\n", sep="") |
263 | b412345d | Florent Chuffart | return(stats) |
264 | b412345d | Florent Chuffart | # It returns the data.frame of collected informations. |
265 | b412345d | Florent Chuffart | } |