Statistiques
| Branche: | Révision :

root / src / R / bot.R @ master

Historique | Voir | Annoter | Télécharger (11,63 ko)

1
######################################
2
# DISTRIBUTED COMPUTING TASKS ENGINE #
3
######################################
4

    
5
get_nb_proc = function ## Empirical function to find the number of core of the current computing node.
6
### This function uses \emph{/proc/cpuinfo} file for Debian and \emph{sysctl} for macosx. This fucntion could be extended or overrided by the user to adapt it to his own system.
7
() {
8
  if (Sys.info()[["nodename"]] == "cremone") {
9
    return(12)
10
  }
11
  dyn_nb_proc = as.integer(system("cat /proc/cpuinfo | grep 'core id' | wc -l",intern=TRUE))
12
  if (dyn_nb_proc == 0) {
13
    dyn_nb_proc = as.integer(system("cat /proc/cpuinfo | grep 'cpuid level' | wc -l",intern=TRUE))
14
    if (dyn_nb_proc == 0) {
15
      dyn_nb_proc = as.integer(system("sysctl -a | grep machdep.cpu.core_count | cut -d ' ' -f 2",intern=TRUE))
16
      if (dyn_nb_proc == 0) {
17
        return(1)
18
      }
19
    }
20
  }
21
  return(dyn_nb_proc)
22
### The number of core that the current computing node owns.
23
}
24

    
25
run_engine = structure(function (## Execute bag of tasks parallely, on as many cores as the current computing node owns.
26
### This bag of tasks engine forks processes on as many cores as the current computing node owns. Each sub-process takes a task randomly in the list of tasks. For each task, it starts by taking a lock on this task (creating a file named out_filename.lock). Next, it executes the task_processor (a function) using the corresponding set of parameters (task). When this execution is completed, it dumps task_processor results into a results file (named out_filename.RData).
27
tasks,  ##<< A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter.
28
task_processor, ##<< A function that will be called for each task in the task list \emph{tasks}.
29
debug=FALSE, ##<< If \emph{TRUE} no process will be forked, the list of tasks will be executed in the current process.
30
starter_name="~/.start_best_effort_jobs", ##<< Path to file that will be deleted after the execution of all tasks if \emph{rm_starter} is set to \emph{TRUE}.
31
rm_starter=TRUE, ##<< If \emph{TRUE} the file \emph{starter_name} will be deleted after the execution of all tasks.
32
log_dir="log", ##<< Path to the \emph{log} directory.
33
bot_cache_dir = "cache",   ##<< the directory where task results are cached
34
nb_proc=NULL, ##<< If not NULL fix the number of core on which tasks must be computed.
35
... ##<< Other arguments that will be passed to \emph{task_processor}.
36
){  
37
  if (!file.exists(log_dir)) {
38
    dir.create(log_dir, recursive = TRUE)
39
  }
40
  if (!file.exists(bot_cache_dir)) {
41
    dir.create(bot_cache_dir, recursive = TRUE)    
42
  }
43
  print(paste("#tasks: ", length(tasks)))
44
  forked_part = function(){
45
    stats = list()
46
    stats$proc_id = proc_id
47
    stats$UID = UID
48
    set.seed(proc_id + UID)
49
    if (!debug) {
50
      Sys.sleep(floor(runif(1,1,30)))
51
    }
52
    hostname = system("hostname", intern=TRUE)
53
    stats$hostname = hostname
54
    sink(paste(log_dir, "/proc_id_hostname_", proc_id, "_", hostname, ".log", sep=""), type =c("output", "message"), split = TRUE)    
55
    need_rerun = TRUE
56
    nb_loop = 0
57
    while (need_rerun){
58
        need_rerun = FALSE
59
        for (task in sample(tasks)) {
60
        # Check mandatory task attribute
61
        if (is.null(task$out_filename)) {
62
          print("ERROR! Attribute task$out_filename is mandatory.")
63
          exit(1)
64
        }
65
        # Check if task is already done or currently processed        
66
        lock_filename = paste(bot_cache_dir, "/", task$out_filename, ".lock", sep="")
67
        save_filename = paste(bot_cache_dir, "/", task$out_filename, ".RData", sep="")
68
        lock_filename_bis = paste(bot_cache_dir, "/", task$out_filename, "_", (proc_id + UID) ,".lock4no", sep="")
69
        for_nothing_filename = paste(bot_cache_dir, "/", task$out_filename, "_", (proc_id + UID) , ".RData4no", sep="")
70
        stats$start_date = as.integer(format(Sys.time(), "%s"))
71
        if (nb_loop==0 & file.exists(lock_filename)) {
72
          print(paste("[proc_", proc_id , "] ", date(), " ", save_filename, " is locked... skipping.", sep=""))
73
          need_rerun = TRUE
74
        } else if (file.exists(save_filename)) {
75
          print(paste("[proc_", proc_id , "] ", date(), " ", save_filename, " exists... skipping.", sep=""))
76
        } else {
77
          need_rerun = TRUE
78
          print(paste("[proc_", proc_id , "] ", date(), " taking lock on ", lock_filename, " and computing...", sep=""))
79
          save(stats, file=lock_filename)
80
          save(stats, file=lock_filename_bis)
81
          task_result = task_processor(task, ...)          
82
          stats$stop_date = as.integer(format(Sys.time(), "%s"))
83
          if (!file.exists(save_filename)) {
84
            save(task_result, stats, file=save_filename)
85
            file.remove(lock_filename) 
86
            file.remove(lock_filename_bis) 
87
          } else {
88
            print(paste("[proc_", proc_id , "] ", stats$stop_date, " ", save_filename, " already exists... So it have been computed for nothing.", sep=""))              
89
            save(task_result, stats, file=for_nothing_filename)
90
            file.remove(lock_filename) 
91
            file.remove(lock_filename_bis) 
92
          }
93
        }
94
      }
95
      nb_loop = nb_loop + 1
96
    }
97
    if (rm_starter) {
98
      print("all task have been processed. Removing ~/.start_best_effort_jobs..." )
99
      file.remove(starter_name)
100
    } else {
101
      print("all task have been processed." )
102
    }
103
    sink()
104
  }
105
  UID = round(runif(1,1,1000000))
106
  if (debug) {
107
    nb_proc = 1
108
    proc_id = 0
109
    print("running engine witout forking (DEBUG MODE)...")
110
    forked_part()
111
  } else {
112
    if (is.null(nb_proc)) {
113
      nb_proc = get_nb_proc()      
114
    }
115
    print(paste("running engine over ", nb_proc, " proc(s)...", sep=""))
116
    pids = c()
117
    for (proc_id in 1:nb_proc) {
118
      # Here we fork!
119
      pids = c(pids,fork(forked_part))
120
    }
121
    # wait until each childs finishe, then display their exit status
122
    for (pid in pids) {
123
      wait(pid) 
124
    }
125
  } 
126
  # Nothing
127
}, ex=function(){
128
  
129
  # We define a basic task_processor
130
  sum_a_b = function(task) {
131
    return(task$a + task$b)
132
  }
133
  
134
  # We define 9 tasks
135
  tasks = list()
136
  for (a in 1:3) {
137
    for (b in 4:6) {
138
      tasks[[length(tasks) + 1]] = list(a=a, b=b, out_filename=paste("sum_a_b", a, b, sep="_")) 
139
    }
140
  }
141

    
142
  # We execute the 3 tasks
143
  run_engine(tasks, sum_a_b)    
144

    
145
  # We collect 9 task results
146
  for (a in 1:3) {
147
    for (b in 4:6) {
148
      out_filename = paste("sum_a_b", a, b, sep="_")
149
      out_filename = paste("cache/", out_filename, ".RData", sep="")
150
      load(out_filename)
151
      print(task_result) 
152
    }
153
  }
154
  
155
  # Better way to do that
156
  apply(t(tasks), 2, function(task) {
157
    out_filename = task[[1]]$out_filename
158
    out_filename = paste("cache/", out_filename, ".RData", sep="")
159
    load(out_filename)
160
    print(task_result) 
161
  })
162
  
163
  # Viewing statistics about the campain.
164
  bot_stats()
165

    
166
})  
167

    
168

    
169
botapply = structure(function(## A function to use bot features in an apply fashion.
170
### With bot apply you could write your independant loop in an apply fashion, results will be collected ans returned when all tasks will be done.
171
tasks,  ##<< A list of tasks, each task is a list of key values that will be passed as arguments to the task_processor. Note that task$out_filename is a mandatory parameter.
172
task_processor, ##<< A function that will be called for each task in the task list \emph{tasks}.
173
bot_cache_dir = "cache",   ##<< the directory where task results are cached
174
... ##<< Other arguments that will be passed to \emph{run_engine}.
175
) {
176
  print(bot_cache_dir)
177
  run_engine(tasks, task_processor, bot_cache_dir=bot_cache_dir, ...)
178
  ret = apply(t(tasks), 2, function(task) {
179
    out_filename = task[[1]]$out_filename
180
    out_filename = paste(bot_cache_dir, "/", out_filename, ".RData", sep="")
181
    task_result = NULL
182
    load(out_filename)
183
    return(task_result) 
184
  })
185
  return(ret)
186
# It returns the list of compurted tasks
187
}, ex=function(){
188
  botapply(
189
    list(
190
      list(a=1, b=10, out_filename="task1"), 
191
      list(a=2, b=20, out_filename="task2"), 
192
      list(a=3, b=30, out_filename="task3"), 
193
      list(a=4, b=40, out_filename="task4")),  
194
    function(task) {
195
      return(task$a + task$b)})
196

    
197
  # botapply(list(list(a=1, b=10, out_filename="task1")),function(task) {return(task$a + task$b)})
198
      
199
})
200

    
201
bot_stats = function(## It compute and display statistique about a campain.
202
### This function browses bot_cache_dir directory and collects information about tasks. Next, it display gantt chart, tasks chart and how computing element are used. Finally it prints on outpout some stats about the campain.
203
bot_cache_dir = "cache",   ##<< the directory where task results are cached
204
WHITH4NO = FALSE  ##<< TRUE if you want to include redundante submission in the stats.
205
) {
206
  foo = apply(t(list.files(bot_cache_dir, "*RData")), 2, function(file) {
207
    load(paste(bot_cache_dir, "/", file, sep=""))  
208
    stats
209
    })
210
  stats = data.frame(t(matrix(unlist(foo), length(foo[[1]]))), stringsAsFactors=FALSE)
211
  names(stats) = names(foo[[1]])
212
  stats$efficient = 1
213

    
214
  if (WHITH4NO) {
215
    foo2 = apply(t(list.files(bot_cache_dir, "*4no")), 2, function(file) {
216
    load(paste(bot_cache_dir, "/", file, sep=""))  
217
    stats
218
    })
219
    stats2 = data.frame(t(matrix(unlist(foo2), length(foo2[[1]]))), stringsAsFactors=FALSE)
220
    names(stats2) = names(foo2[[1]])
221
    stats2$efficient = 2
222
    stats = rbind(stats, stats2)
223
  }
224

    
225
  stats$start_date = as.integer(stats$start_date)/60
226
  stats$stop_date = as.integer(stats$stop_date)/60
227
  zero = min(stats$start_date)
228
  stats$start_date = stats$start_date - zero
229
  stats$stop_date = stats$stop_date - zero
230
  stats$duration = stats$stop_date - stats$start_date
231

    
232
  stats$core = paste(stats$hostname, stats$proc_id, sep="_")
233
  cores = sort(unique(stats$core))
234

    
235
  x11(width=16, height=9)
236
  layout(matrix(1:3, nrow=1), respect=TRUE)
237

    
238
  stats = stats[ order(stats$start_date), ]
239
  plot(0,0,col=0, xlim=c(0, max(stats$stop_date)), ylim=c(0, length(stats[,1])), main=paste("Gantt Chart for", bot_cache_dir), xlab="Time (min)", ylab= "Task")
240
  arrows( stats$start_date, 1:length(stats[,1]), stats$stop_date, 1:length(stats[,1]), 0, 0, col=stats$efficient)
241

    
242
  stats = stats[ order(stats$duration, decreasing=TRUE),]
243
  plot(0,0,col=0, xlim=c(0, max(stats$duration)), ylim=c(0, length(stats[,1])), main=paste("Task duration for", bot_cache_dir), xlab="Time (min)", ylab= "Task")
244
  arrows( rep(0, length(stats[,1])), 1:length(stats[,1]), stats$duration, 1:length(stats[,1]), 0, 0, , col=stats$efficient)
245

    
246
  plot(0,0,col=0, xlim=c(0, max(stats$stop_date)), ylim=c(0, length(cores)), main=paste("Task repartition over computing elements for", bot_cache_dir), xlab="Time (min)", ylab= "Computing Element")
247
  ys = apply(t(stats$core), 2, function(core){which(cores == core)})
248
  arrows( stats$start_date, ys, stats$stop_date, ys, 0, 0, col=stats$efficient)
249

    
250
  format.timediff <- function(diff) {
251
      hr <- diff%/%60
252
      min <- floor(diff - hr * 60)
253
      sec <- round(diff%%1 * 60,digits=2)
254
      return(paste(hr,min,sec,sep=':'))
255
  }
256

    
257
  cat("#cores....................", length(cores), "\n", sep="")
258
  cat("#tasks....................", length(stats[stats$efficient==1, 1]), "\n", sep="")
259
  cat("cpu.time..................", format.timediff(sum(stats[stats$efficient==1, ]$duration)), "\n", sep="")
260
  cat("time......................", format.timediff(max(stats[stats$efficient==1, ]$stop_date)), "\n", sep="")
261
  cat("speedup...................", sum(stats[stats$efficient==1, ]$duration)/max(stats[stats$efficient==1, ]$stop_date), "\n", sep="")
262
  cat("efficiency................", sum(stats[stats$efficient==1, ]$duration)/max(stats[stats$efficient==1, ]$stop_date)/length(cores), "\n", sep="")
263
  return(stats)
264
  # It returns the data.frame of collected informations.
265
}