Révision d0677cdf

b/dfs_bench_seq_write_4GB.py
1
#!/usr/bin/python
2

  
3
import sys
4
import tempfile
5
import time
6
from execo_g5k import *
7
from execo import *
8

  
9
OUTDIR="/home/dponsard/DFS_EXECO/RESULT"
10

  
11

  
12
def usage():
13
  print "Usage %s job site dfs_type" % sys.argv[0]
14
  sys.exit(1)
15

  
16

  
17
def start_benchmark(client_nodes,bench_size=4096):
18
  prepare=Remote("sync && echo 3 > /proc/sys/vm/drop_caches && echo 3 > /proc/sys/vm/drop_caches", client_nodes, connexion_params={'user': 'root'})
19
  bench=Remote("TMP_DIR=`mktemp -d /data/BENCH.XXXX` ;dd if=/dev/zero of=${TMP_DIR}/write.dd bs=1M count=%d; sync" % bench_size, client_nodes, connexion_params={'user': 'root'})
20

  
21
  prepare.run()
22
  bench.run()
23

  
24
  cumul_speed = 0
25
  for p in bench.processes():
26
    cumul_speed += bench_size/float(p.end_date()-p.start_date())
27

  
28
  print "Benchmark speed is "+str(cumul_speed)
29

  
30
  print "Benchmark output"
31
  for p in bench.processes():
32
    print p.stdout() 
33

  
34
  return cumul_speed
35

  
36

  
37
def create_dfs5k_conf(dfs_type,site,job,master_node,client_nodes,data_nodes):
38
  try:
39
    f=tempfile.NamedTemporaryFile(prefix="dfs5k_%s_%s_%s-" % (dfs_type,site,job), delete=False)
40
  except Exception as e: 
41
    print "Failed to open /tmp/dfs5k.conf"
42
    print e
43
    sys.exit(1)   
44
   
45
  if dfs_type == "ceph" or dfs_type=="lustre":
46
    f.write("""name: DFS5k
47
options:
48
dataDir: /tmp
49
master: root@%s
50
clients:
51
""" % master_node.address)
52

  
53
    for client_node in client_nodes:
54
      f.write("  root@%s\n" % client_node.address)
55

  
56
    f.write("dataNodes:\n")
57
    for data_node in data_nodes:
58
      f.write("  root@%s\n" % data_node.address)
59
  
60
  elif dfs_type == "glusterfs":
61
    f.write("""name: DFS5k
62
options: stripe %d
63
master: root@%s
64
clients:
65
""" % (len(data_nodes),master_node.address))
66

  
67
    for client_node in client_nodes:
68
      f.write("  root@%s\n" % client_node.address)
69

  
70
    f.write("dataNodes:\n")
71
    for data_node in data_nodes:
72
      f.write("  root@%s:/tmp\n" % data_node.address)
73
  
74
  else:
75
    print "Error: Unsupported dfs type %s" % dfs_type
76
    sys.exit(1)
77

  
78
  f.close() 
79
  return f
80

  
81

  
82
def start_localfs_bench(site,job,node,bench_size=4096):
83
  prepare=SshProcess("sync && echo 3 > /proc/sys/vm/drop_caches && echo 3 > /proc/sys/vm/drop_caches", node, connexion_params={'user': 'root'})
84
  bench=SshProcess("dd if=/dev/zero of=/tmp/write.dd bs=1M count=%d && sync" % bench_size, node, connexion_params={'user': 'root'})
85
  prepare.run()
86
  start=time.time()
87
  bench.run()
88
  end=time.time()
89
  print "Benchmark output"
90
  print bench.stdout()
91
  print "Cleanning /tmp"
92
  clean=SshProcess("rm -rf /tmp/write.dd",node, connexion_params={'user': 'root'})
93
  clean.run()
94
  speed = bench_size/float(end-start)
95
  print "Benchmark speed is "+str(speed)
96
  return speed
97

  
98
def clean_nodes_before(dfs_type,site,dfs_conf,nodes):
99
  print "Cleaning /data"
100
  Remote("rm -rf /data/* 2&>/dev/null", nodes, connexion_params={'user': 'root'}).run()
101
  
102
  print "Umounting DFS5k"
103
  print "Umounting DFS5k"
104
  SshProcess("dfs5k -d -a umount -s %s -f %s" % (dfs_type,dfs_conf.name) , Host(site)).run()
105
  
106
  if dfs_type != "lustre":
107
	print "UnDeploying DFS5k" 
108
	SshProcess("dfs5k -d -a undeploy -s %s -f %s" % (dfs_type,dfs_conf.name) , Host(site)).run()
109
  else:
110
	print "UnDeploying DFS5k" 
111
	p=SshProcess("dfs5k -d -a undeploy -s %s -f %s" % (dfs_type,dfs_conf.name) , Host(site), close_stdin=False).start()
112
        p._process.stdin.write("y\n")
113
        p._process.stdin.flush()
114
        p.wait()
115

  
116
  print "Cleaning /tmp and /data"
117
  Remote("rm -rf /tmp/* /data/* 2&>/dev/null", nodes, connexion_params={'user': 'root'}).run()
118
  return
119

  
120
def clean_nodes_after(dfs_type,site,dfs_conf,nodes):
121
  print "Cleaning /data"
122
  Remote("rm -rf /data/* 2&>/dev/null", nodes, connexion_params={'user': 'root'}).run()
123

  
124
  print "Umounting DFS5k"
125
  SshProcess("dfs5k -d -a umount -s %s -f %s" % (dfs_type,dfs_conf.name) , Host(site)).run()
126

  
127
  print "UnDeploying DFS5k"
128
  SshProcess("dfs5k -d -a undeploy -s %s -f %s" % (dfs_type,dfs_conf.name) , Host(site)).run()
129

  
130
  print "Cleaning /tmp and /data"
131
  Remote("rm -rf /tmp/* /data/* 2&>/dev/null", nodes, connexion_params={'user': 'root'}).run()
132
  return
133

  
134

  
135

  
136
def start_dfs_bench(dfs_type,site,job,master_node,client_nodes,data_nodes):
137

  
138
  f = create_dfs5k_conf(dfs_type,site,job,master_node,client_nodes,data_nodes)
139
  print "Copying dfs5k configuration file" 
140
  Process("scp %s %s:/tmp/" % (f.name,site)).run()
141
  
142
  print "Copying SSH key to master node" 
143
  SshProcess("scp ~/.ssh/id_rsa root@%s:.ssh/" % master_node.address, Host(site)).run()
144
 
145
  clean_nodes_before(dfs_type, site, f, data_nodes+client_nodes+[master_node])
146

  
147
  print "Deploying DFS5k" 
148
  SshProcess("dfs5k -d -a deploy -s %s -f %s" % (dfs_type,f.name) , Host(site)).run()
149

  
150
  print "Mounting DFS5k" 
151
  print "Deploying DFS5k"
152
  SshProcess("dfs5k -d -a mount -s %s -f %s" % (dfs_type,f.name) , Host(site)).run()
153
  
154
  print "Performing benchmark %s" % dfs_type
155
  print "Master node is %s" % master_node
156
  print "## %d Data nodes are %s" % (len(data_nodes), data_nodes)
157
  print "## %d Client nodes are %s" % (len(client_nodes), client_nodes)
158
  
159
  speed = start_benchmark(client_nodes)
160
  
161
  clean_nodes_after(dfs_type, site, f, data_nodes+client_nodes+[master_node])
162
 
163
  return speed
164

  
165

  
166
def prepare_nodes(job,site,dfs_type):
167
  nodes = []
168
  try:
169
    if get_oar_job_info(job,site)["state"]!="Running":
170
      print "Cannot deploy on job %s @ %s, job not started" % (job,site)
171
    else: 
172
      nodes += get_oar_job_nodes(job,site)
173
  except Exception as e:
174
    print "Cannot get info on job %s @ %s" % (job,site)
175
    print e
176
  try:
177
    print "Deploying on %s" % nodes
178
    if dfs_type=="lustre":
179
      print "Deploying on %s  environment centos-x64-lustre for %s" % (nodes, dfs_type)
180
      (ok_nodes, failed_nodes) = deploy(Deployment(nodes,env_name="centos-x64-lustre"), node_connexion_params={'user': 'root'})
181
    else:
182
      print "Deploying on %s  environment squeeze-x64-nfs for %s" % (nodes, dfs_type)
183
      (ok_nodes, failed_nodes) = deploy(Deployment(nodes,env_name="squeeze-x64-nfs"))
184
    print "Deployed on %s " % ok_nodes
185

  
186
  except Exception as e:
187
    print "Failed to deploy"
188
    print e
189
    raise e
190
    sys.exit(1)
191
  else: 
192
    #print "Installing iozone"
193
    #Remote("apt-get update && apt-get install -y iozone3",ok_nodes, connexion_params={'user': 'root'}).run()
194
    return list(ok_nodes)
195

  
196
if len(sys.argv) < 4:
197
  usage()
198

  
199
job = int(sys.argv[1])
200
site = sys.argv[2]
201
dfs_type= sys.argv[3]
202

  
203
f_result = open(OUTDIR+"/results_%s_%s_%s.txt" % (job,site,dfs_type),"w")
204

  
205
nodes=prepare_nodes(job,site,dfs_type)
206
#nodes=["A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P"]
207
print nodes
208

  
209
master_node=nodes[0]
210

  
211
#Write first line of the table
212
line="X"
213
for i in range(2,len(nodes)-1):
214
  line+=" %d" % i
215
line+="\n"
216
f_result.write(line)
217

  
218
print "#### START LOCALFS BENCH #####"
219
speed = start_localfs_bench(site,job,master_node)
220
line="0"
221
for i in range(2,len(nodes)-1):
222
  line+=" %d" % int(speed)
223
line+="\n"
224
f_result.write(line)
225

  
226
for n_clients in [1,2,4,6,8,10,12,14,16,18,20]:
227

  
228
  f_result.write("%d" % n_clients)
229

  
230
  available_nodes = nodes[1:]
231

  
232
  client_nodes=available_nodes[:n_clients]
233
  available_nodes=available_nodes[n_clients:]
234

  
235
  for n_datanodes in range(2,len(available_nodes)+1):
236
    data_nodes=available_nodes[:n_datanodes]
237

  
238
    print "\n\n"
239
    print "#### START DFS BENCH #####"
240
    print master_node,client_nodes,data_nodes
241
    print "#### ############### #####"
242

  
243
    speed = start_dfs_bench(dfs_type,site,job,master_node,client_nodes,data_nodes)
244
    f_result.write(" %d" % int(speed))
245
    f_result.flush()
246

  
247

  
248
  f_result.write("\n")
249
  f_result.flush()
250

  
251
f_result.write("\n\n\n")
252
f_result.write(str(nodes))
253
f_result.close()
254

  

Formats disponibles : Unified diff