Statistiques
| Branche: | Révision :

root / src / R / bot.R @ master

Historique | Voir | Annoter | Télécharger (11,63 ko)

1 b412345d Florent Chuffart
######################################
2 b412345d Florent Chuffart
# DISTRIBUTED COMPUTING TASKS ENGINE #
3 b412345d Florent Chuffart
######################################
4 b412345d Florent Chuffart
5 b412345d Florent Chuffart
get_nb_proc = function ## Empirical function to find the number of core of the current computing node.
6 b412345d Florent Chuffart
### This function uses \emph{/proc/cpuinfo} file for Debian and \emph{sysctl} for macosx. This fucntion could be extended or overrided by the user to adapt it to his own system.
7 b412345d Florent Chuffart
() {
8 b412345d Florent Chuffart
  if (Sys.info()[["nodename"]] == "cremone") {
9 b412345d Florent Chuffart
    return(12)
10 b412345d Florent Chuffart
  }
11 b412345d Florent Chuffart
  dyn_nb_proc = as.integer(system("cat /proc/cpuinfo | grep 'core id' | wc -l",intern=TRUE))
12 b412345d Florent Chuffart
  if (dyn_nb_proc == 0) {
13 b412345d Florent Chuffart
    dyn_nb_proc = as.integer(system("cat /proc/cpuinfo | grep 'cpuid level' | wc -l",intern=TRUE))
14 b412345d Florent Chuffart
    if (dyn_nb_proc == 0) {
15 b412345d Florent Chuffart
      dyn_nb_proc = as.integer(system("sysctl -a | grep machdep.cpu.core_count | cut -d ' ' -f 2",intern=TRUE))
16 b412345d Florent Chuffart
      if (dyn_nb_proc == 0) {
17 b412345d Florent Chuffart
        return(1)
18 b412345d Florent Chuffart
      }
19 b412345d Florent Chuffart
    }
20 b412345d Florent Chuffart
  }
21 b412345d Florent Chuffart
  return(dyn_nb_proc)
22 b412345d Florent Chuffart
### The number of core that the current computing node owns.
23 b412345d Florent Chuffart
}
24 b412345d Florent Chuffart
25 b412345d Florent Chuffart
run_engine = structure(function (## Execute bag of tasks parallely, on as many cores as the current computing node owns.
26 b412345d Florent Chuffart
### This bag of tasks engine forks processes on as many cores as the current computing node owns. Each sub-process takes a task randomly in the list of tasks. For each task, it starts by taking a lock on this task (creating a file named out_filename.lock). Next, it executes the task_processor (a function) using the corresponding set of parameters (task). When this execution is completed, it dumps task_processor results into a results file (named out_filename.RData).
27 b412345d Florent Chuffart
tasks,  ##<< A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter.
28 b412345d Florent Chuffart
task_processor, ##<< A function that will be called for each task in the task list \emph{tasks}.
29 b412345d Florent Chuffart
debug=FALSE, ##<< If \emph{TRUE} no process will be forked, the list of tasks will be executed in the current process.
30 b412345d Florent Chuffart
starter_name="~/.start_best_effort_jobs", ##<< Path to file that will be deleted after the execution of all tasks if \emph{rm_starter} is set to \emph{TRUE}.
31 b412345d Florent Chuffart
rm_starter=TRUE, ##<< If \emph{TRUE} the file \emph{starter_name} will be deleted after the execution of all tasks.
32 b412345d Florent Chuffart
log_dir="log", ##<< Path to the \emph{log} directory.
33 b412345d Florent Chuffart
bot_cache_dir = "cache",   ##<< the directory where task results are cached
34 b412345d Florent Chuffart
nb_proc=NULL, ##<< If not NULL fix the number of core on which tasks must be computed.
35 b412345d Florent Chuffart
... ##<< Other arguments that will be passed to \emph{task_processor}.
36 b412345d Florent Chuffart
){  
37 b412345d Florent Chuffart
  if (!file.exists(log_dir)) {
38 b412345d Florent Chuffart
    dir.create(log_dir, recursive = TRUE)
39 b412345d Florent Chuffart
  }
40 b412345d Florent Chuffart
  if (!file.exists(bot_cache_dir)) {
41 b412345d Florent Chuffart
    dir.create(bot_cache_dir, recursive = TRUE)    
42 b412345d Florent Chuffart
  }
43 b412345d Florent Chuffart
  print(paste("#tasks: ", length(tasks)))
44 b412345d Florent Chuffart
  forked_part = function(){
45 b412345d Florent Chuffart
    stats = list()
46 b412345d Florent Chuffart
    stats$proc_id = proc_id
47 b412345d Florent Chuffart
    stats$UID = UID
48 b412345d Florent Chuffart
    set.seed(proc_id + UID)
49 b412345d Florent Chuffart
    if (!debug) {
50 b412345d Florent Chuffart
      Sys.sleep(floor(runif(1,1,30)))
51 b412345d Florent Chuffart
    }
52 b412345d Florent Chuffart
    hostname = system("hostname", intern=TRUE)
53 b412345d Florent Chuffart
    stats$hostname = hostname
54 b412345d Florent Chuffart
    sink(paste(log_dir, "/proc_id_hostname_", proc_id, "_", hostname, ".log", sep=""), type =c("output", "message"), split = TRUE)    
55 b412345d Florent Chuffart
    need_rerun = TRUE
56 b412345d Florent Chuffart
    nb_loop = 0
57 b412345d Florent Chuffart
    while (need_rerun){
58 b412345d Florent Chuffart
        need_rerun = FALSE
59 b412345d Florent Chuffart
        for (task in sample(tasks)) {
60 b412345d Florent Chuffart
        # Check mandatory task attribute
61 b412345d Florent Chuffart
        if (is.null(task$out_filename)) {
62 b412345d Florent Chuffart
          print("ERROR! Attribute task$out_filename is mandatory.")
63 b412345d Florent Chuffart
          exit(1)
64 b412345d Florent Chuffart
        }
65 b412345d Florent Chuffart
        # Check if task is already done or currently processed        
66 b412345d Florent Chuffart
        lock_filename = paste(bot_cache_dir, "/", task$out_filename, ".lock", sep="")
67 b412345d Florent Chuffart
        save_filename = paste(bot_cache_dir, "/", task$out_filename, ".RData", sep="")
68 b412345d Florent Chuffart
        lock_filename_bis = paste(bot_cache_dir, "/", task$out_filename, "_", (proc_id + UID) ,".lock4no", sep="")
69 b412345d Florent Chuffart
        for_nothing_filename = paste(bot_cache_dir, "/", task$out_filename, "_", (proc_id + UID) , ".RData4no", sep="")
70 b412345d Florent Chuffart
        stats$start_date = as.integer(format(Sys.time(), "%s"))
71 b412345d Florent Chuffart
        if (nb_loop==0 & file.exists(lock_filename)) {
72 b412345d Florent Chuffart
          print(paste("[proc_", proc_id , "] ", date(), " ", save_filename, " is locked... skipping.", sep=""))
73 b412345d Florent Chuffart
          need_rerun = TRUE
74 b412345d Florent Chuffart
        } else if (file.exists(save_filename)) {
75 b412345d Florent Chuffart
          print(paste("[proc_", proc_id , "] ", date(), " ", save_filename, " exists... skipping.", sep=""))
76 b412345d Florent Chuffart
        } else {
77 b412345d Florent Chuffart
          need_rerun = TRUE
78 b412345d Florent Chuffart
          print(paste("[proc_", proc_id , "] ", date(), " taking lock on ", lock_filename, " and computing...", sep=""))
79 b412345d Florent Chuffart
          save(stats, file=lock_filename)
80 b412345d Florent Chuffart
          save(stats, file=lock_filename_bis)
81 b412345d Florent Chuffart
          task_result = task_processor(task, ...)          
82 b412345d Florent Chuffart
          stats$stop_date = as.integer(format(Sys.time(), "%s"))
83 b412345d Florent Chuffart
          if (!file.exists(save_filename)) {
84 b412345d Florent Chuffart
            save(task_result, stats, file=save_filename)
85 b412345d Florent Chuffart
            file.remove(lock_filename) 
86 b412345d Florent Chuffart
            file.remove(lock_filename_bis) 
87 b412345d Florent Chuffart
          } else {
88 b412345d Florent Chuffart
            print(paste("[proc_", proc_id , "] ", stats$stop_date, " ", save_filename, " already exists... So it have been computed for nothing.", sep=""))              
89 b412345d Florent Chuffart
            save(task_result, stats, file=for_nothing_filename)
90 b412345d Florent Chuffart
            file.remove(lock_filename) 
91 b412345d Florent Chuffart
            file.remove(lock_filename_bis) 
92 b412345d Florent Chuffart
          }
93 b412345d Florent Chuffart
        }
94 b412345d Florent Chuffart
      }
95 b412345d Florent Chuffart
      nb_loop = nb_loop + 1
96 b412345d Florent Chuffart
    }
97 b412345d Florent Chuffart
    if (rm_starter) {
98 b412345d Florent Chuffart
      print("all task have been processed. Removing ~/.start_best_effort_jobs..." )
99 b412345d Florent Chuffart
      file.remove(starter_name)
100 b412345d Florent Chuffart
    } else {
101 b412345d Florent Chuffart
      print("all task have been processed." )
102 b412345d Florent Chuffart
    }
103 b412345d Florent Chuffart
    sink()
104 b412345d Florent Chuffart
  }
105 b412345d Florent Chuffart
  UID = round(runif(1,1,1000000))
106 b412345d Florent Chuffart
  if (debug) {
107 b412345d Florent Chuffart
    nb_proc = 1
108 b412345d Florent Chuffart
    proc_id = 0
109 b412345d Florent Chuffart
    print("running engine witout forking (DEBUG MODE)...")
110 b412345d Florent Chuffart
    forked_part()
111 b412345d Florent Chuffart
  } else {
112 b412345d Florent Chuffart
    if (is.null(nb_proc)) {
113 b412345d Florent Chuffart
      nb_proc = get_nb_proc()      
114 b412345d Florent Chuffart
    }
115 b412345d Florent Chuffart
    print(paste("running engine over ", nb_proc, " proc(s)...", sep=""))
116 b412345d Florent Chuffart
    pids = c()
117 b412345d Florent Chuffart
    for (proc_id in 1:nb_proc) {
118 b412345d Florent Chuffart
      # Here we fork!
119 b412345d Florent Chuffart
      pids = c(pids,fork(forked_part))
120 b412345d Florent Chuffart
    }
121 b412345d Florent Chuffart
    # wait until each childs finishe, then display their exit status
122 b412345d Florent Chuffart
    for (pid in pids) {
123 b412345d Florent Chuffart
      wait(pid) 
124 b412345d Florent Chuffart
    }
125 b412345d Florent Chuffart
  } 
126 b412345d Florent Chuffart
  # Nothing
127 b412345d Florent Chuffart
}, ex=function(){
128 b412345d Florent Chuffart
  
129 b412345d Florent Chuffart
  # We define a basic task_processor
130 b412345d Florent Chuffart
  sum_a_b = function(task) {
131 b412345d Florent Chuffart
    return(task$a + task$b)
132 b412345d Florent Chuffart
  }
133 b412345d Florent Chuffart
  
134 b412345d Florent Chuffart
  # We define 9 tasks
135 b412345d Florent Chuffart
  tasks = list()
136 b412345d Florent Chuffart
  for (a in 1:3) {
137 b412345d Florent Chuffart
    for (b in 4:6) {
138 b412345d Florent Chuffart
      tasks[[length(tasks) + 1]] = list(a=a, b=b, out_filename=paste("sum_a_b", a, b, sep="_")) 
139 b412345d Florent Chuffart
    }
140 b412345d Florent Chuffart
  }
141 b412345d Florent Chuffart
142 b412345d Florent Chuffart
  # We execute the 3 tasks
143 b412345d Florent Chuffart
  run_engine(tasks, sum_a_b)    
144 b412345d Florent Chuffart
145 b412345d Florent Chuffart
  # We collect 9 task results
146 b412345d Florent Chuffart
  for (a in 1:3) {
147 b412345d Florent Chuffart
    for (b in 4:6) {
148 b412345d Florent Chuffart
      out_filename = paste("sum_a_b", a, b, sep="_")
149 b412345d Florent Chuffart
      out_filename = paste("cache/", out_filename, ".RData", sep="")
150 b412345d Florent Chuffart
      load(out_filename)
151 b412345d Florent Chuffart
      print(task_result) 
152 b412345d Florent Chuffart
    }
153 b412345d Florent Chuffart
  }
154 b412345d Florent Chuffart
  
155 b412345d Florent Chuffart
  # Better way to do that
156 b412345d Florent Chuffart
  apply(t(tasks), 2, function(task) {
157 b412345d Florent Chuffart
    out_filename = task[[1]]$out_filename
158 b412345d Florent Chuffart
    out_filename = paste("cache/", out_filename, ".RData", sep="")
159 b412345d Florent Chuffart
    load(out_filename)
160 b412345d Florent Chuffart
    print(task_result) 
161 b412345d Florent Chuffart
  })
162 b412345d Florent Chuffart
  
163 b412345d Florent Chuffart
  # Viewing statistics about the campain.
164 b412345d Florent Chuffart
  bot_stats()
165 b412345d Florent Chuffart
166 b412345d Florent Chuffart
})  
167 b412345d Florent Chuffart
168 b412345d Florent Chuffart
169 b412345d Florent Chuffart
botapply = structure(function(## A function to use bot features in an apply fashion.
170 b412345d Florent Chuffart
### With bot apply you could write your independant loop in an apply fashion, results will be collected ans returned when all tasks will be done.
171 b412345d Florent Chuffart
tasks,  ##<< A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter.
172 b412345d Florent Chuffart
task_processor, ##<< A function that will be called for each task in the task list \emph{tasks}.
173 b412345d Florent Chuffart
bot_cache_dir = "cache",   ##<< the directory where task results are cached
174 b412345d Florent Chuffart
... ##<< Other arguments that will be passed to \emph{run_engine}.
175 b412345d Florent Chuffart
) {
176 b412345d Florent Chuffart
  print(bot_cache_dir)
177 b412345d Florent Chuffart
  run_engine(tasks, task_processor, bot_cache_dir=bot_cache_dir, ...)
178 b412345d Florent Chuffart
  ret = apply(t(tasks), 2, function(task) {
179 b412345d Florent Chuffart
    out_filename = task[[1]]$out_filename
180 b412345d Florent Chuffart
    out_filename = paste(bot_cache_dir, "/", out_filename, ".RData", sep="")
181 b412345d Florent Chuffart
    task_result = NULL
182 b412345d Florent Chuffart
    load(out_filename)
183 b412345d Florent Chuffart
    return(task_result) 
184 b412345d Florent Chuffart
  })
185 b412345d Florent Chuffart
  return(ret)
186 b412345d Florent Chuffart
# It returns the list of compurted tasks
187 b412345d Florent Chuffart
}, ex=function(){
188 b412345d Florent Chuffart
  botapply(
189 b412345d Florent Chuffart
    list(
190 b412345d Florent Chuffart
      list(a=1, b=10, out_filename="task1"), 
191 b412345d Florent Chuffart
      list(a=2, b=20, out_filename="task2"), 
192 b412345d Florent Chuffart
      list(a=3, b=30, out_filename="task3"), 
193 b412345d Florent Chuffart
      list(a=4, b=40, out_filename="task4")),  
194 b412345d Florent Chuffart
    function(task) {
195 b412345d Florent Chuffart
      return(task$a + task$b)})
196 b412345d Florent Chuffart
197 b412345d Florent Chuffart
  # botapply(list(list(a=1, b=10, out_filename="task1")),function(task) {return(task$a + task$b)})
198 b412345d Florent Chuffart
      
199 b412345d Florent Chuffart
})
200 b412345d Florent Chuffart
201 b412345d Florent Chuffart
bot_stats = function(## It compute and display statistique about a campain.
202 b412345d Florent Chuffart
### This function browses bot_cache_dir directory and collects information about tasks. Next, it display gantt chart, tasks chart and how computing element are used. Finally it prints on outpout some stats about the campain.
203 b412345d Florent Chuffart
bot_cache_dir = "cache",   ##<< the directory where task results are cached
204 b412345d Florent Chuffart
WHITH4NO = FALSE  ##<< TRUE if you want to include redundante submission in the stats.
205 b412345d Florent Chuffart
) {
206 b412345d Florent Chuffart
  foo = apply(t(list.files(bot_cache_dir, "*RData")), 2, function(file) {
207 b412345d Florent Chuffart
    load(paste(bot_cache_dir, "/", file, sep=""))  
208 b412345d Florent Chuffart
    stats
209 b412345d Florent Chuffart
    })
210 b412345d Florent Chuffart
  stats = data.frame(t(matrix(unlist(foo), length(foo[[1]]))), stringsAsFactors=FALSE)
211 b412345d Florent Chuffart
  names(stats) = names(foo[[1]])
212 b412345d Florent Chuffart
  stats$efficient = 1
213 b412345d Florent Chuffart
214 b412345d Florent Chuffart
  if (WHITH4NO) {
215 b412345d Florent Chuffart
    foo2 = apply(t(list.files(bot_cache_dir, "*4no")), 2, function(file) {
216 b412345d Florent Chuffart
    load(paste(bot_cache_dir, "/", file, sep=""))  
217 b412345d Florent Chuffart
    stats
218 b412345d Florent Chuffart
    })
219 b412345d Florent Chuffart
    stats2 = data.frame(t(matrix(unlist(foo2), length(foo2[[1]]))), stringsAsFactors=FALSE)
220 b412345d Florent Chuffart
    names(stats2) = names(foo2[[1]])
221 b412345d Florent Chuffart
    stats2$efficient = 2
222 b412345d Florent Chuffart
    stats = rbind(stats, stats2)
223 b412345d Florent Chuffart
  }
224 b412345d Florent Chuffart
225 b412345d Florent Chuffart
  stats$start_date = as.integer(stats$start_date)/60
226 b412345d Florent Chuffart
  stats$stop_date = as.integer(stats$stop_date)/60
227 b412345d Florent Chuffart
  zero = min(stats$start_date)
228 b412345d Florent Chuffart
  stats$start_date = stats$start_date - zero
229 b412345d Florent Chuffart
  stats$stop_date = stats$stop_date - zero
230 b412345d Florent Chuffart
  stats$duration = stats$stop_date - stats$start_date
231 b412345d Florent Chuffart
232 b412345d Florent Chuffart
  stats$core = paste(stats$hostname, stats$proc_id, sep="_")
233 b412345d Florent Chuffart
  cores = sort(unique(stats$core))
234 b412345d Florent Chuffart
235 b412345d Florent Chuffart
  x11(width=16, height=9)
236 b412345d Florent Chuffart
  layout(matrix(1:3, nrow=1), respect=TRUE)
237 b412345d Florent Chuffart
238 b412345d Florent Chuffart
  stats = stats[ order(stats$start_date), ]
239 b412345d Florent Chuffart
  plot(0,0,col=0, xlim=c(0, max(stats$stop_date)), ylim=c(0, length(stats[,1])), main=paste("Gantt Chart for", bot_cache_dir), xlab="Time (min)", ylab= "Task")
240 b412345d Florent Chuffart
  arrows( stats$start_date, 1:length(stats[,1]), stats$stop_date, 1:length(stats[,1]), 0, 0, col=stats$efficient)
241 b412345d Florent Chuffart
242 b412345d Florent Chuffart
  stats = stats[ order(stats$duration, decreasing=TRUE),]
243 b412345d Florent Chuffart
  plot(0,0,col=0, xlim=c(0, max(stats$duration)), ylim=c(0, length(stats[,1])), main=paste("Task duration for", bot_cache_dir), xlab="Time (min)", ylab= "Task")
244 b412345d Florent Chuffart
  arrows( rep(0, length(stats[,1])), 1:length(stats[,1]), stats$duration, 1:length(stats[,1]), 0, 0, , col=stats$efficient)
245 b412345d Florent Chuffart
246 b412345d Florent Chuffart
  plot(0,0,col=0, xlim=c(0, max(stats$stop_date)), ylim=c(0, length(cores)), main=paste("Task repartition over computing elements for", bot_cache_dir), xlab="Time (min)", ylab= "Computing Element")
247 b412345d Florent Chuffart
  ys = apply(t(stats$core), 2, function(core){which(cores == core)})
248 b412345d Florent Chuffart
  arrows( stats$start_date, ys, stats$stop_date, ys, 0, 0, col=stats$efficient)
249 b412345d Florent Chuffart
250 b412345d Florent Chuffart
  format.timediff <- function(diff) {
251 b412345d Florent Chuffart
      hr <- diff%/%60
252 b412345d Florent Chuffart
      min <- floor(diff - hr * 60)
253 b412345d Florent Chuffart
      sec <- round(diff%%1 * 60,digits=2)
254 b412345d Florent Chuffart
      return(paste(hr,min,sec,sep=':'))
255 b412345d Florent Chuffart
  }
256 b412345d Florent Chuffart
257 b412345d Florent Chuffart
  cat("#cores....................", length(cores), "\n", sep="")
258 b412345d Florent Chuffart
  cat("#tasks....................", length(stats[stats$efficient==1, 1]), "\n", sep="")
259 b412345d Florent Chuffart
  cat("cpu.time..................", format.timediff(sum(stats[stats$efficient==1, ]$duration)), "\n", sep="")
260 b412345d Florent Chuffart
  cat("time......................", format.timediff(max(stats[stats$efficient==1, ]$stop_date)), "\n", sep="")
261 b412345d Florent Chuffart
  cat("speedup...................", sum(stats[stats$efficient==1, ]$duration)/max(stats[stats$efficient==1, ]$stop_date), "\n", sep="")
262 b412345d Florent Chuffart
  cat("efficiency................", sum(stats[stats$efficient==1, ]$duration)/max(stats[stats$efficient==1, ]$stop_date)/length(cores), "\n", sep="")
263 b412345d Florent Chuffart
  return(stats)
264 b412345d Florent Chuffart
  # It returns the data.frame of collected informations.
265 b412345d Florent Chuffart
}