Révision 2be92b2c modules/calculation.py
b/modules/calculation.py | ||
---|---|---|
39 | 39 |
ase.io.write(f'{run_type}/conf_{i}/{coord_file}', conf) |
40 | 40 |
|
41 | 41 |
|
42 |
def get_jobs_status_sge(job_ids): # TODO more elegant
|
|
42 |
def get_jobs_status(job_ids, stat_cmd, stat_dict):
|
|
43 | 43 |
"""Returns a list of job status for a list of job ids. |
44 | 44 |
|
45 | 45 |
@param job_ids: list of all jobs to be checked their status. |
46 |
@param stat_cmd: Command to check job status. |
|
47 |
@param stat_dict: Dictionary with pairs of job status (r, p, f) and the |
|
48 |
pattern it matches in the output of the stat_cmd. |
|
46 | 49 |
@return: list of status for every job. |
47 | 50 |
""" |
48 |
from gridtk.tools import qstat |
|
49 |
run_chk = 'usage 1' |
|
51 |
from subprocess import PIPE, Popen |
|
50 | 52 |
status_list = [] |
51 | 53 |
for job in job_ids: |
52 |
if run_chk in qstat(job): |
|
54 |
stat_order = stat_cmd % job |
|
55 |
stat_msg = Popen(stat_order, shell=True, |
|
56 |
stdout=PIPE).communicate()[0].decode('utf-8').strip() |
|
57 |
if stat_dict['r'] == stat_msg: |
|
53 | 58 |
status_list.append('r') |
54 |
elif len(qstat(job)) > 0: |
|
55 |
status_list.append('q') |
|
56 |
else: |
|
57 |
status_list.append('f') |
|
58 |
return status_list |
|
59 |
|
|
60 |
|
|
61 |
def sub_sge(run_type, sub_script, max_qw, name): |
|
62 |
"""Submits jobs to the sge queuing system with the provided script |
|
63 |
|
|
64 |
@param run_type: Type of calculation. 'isolated', 'screening', 'refinement' |
|
65 |
@param sub_script: script for the job submission. |
|
66 |
@param max_qw: Maximum number of simultaneous jobs waiting to be executed. |
|
67 |
@param name: name of the project |
|
68 |
""" |
|
69 |
from shutil import copy |
|
70 |
from time import sleep |
|
71 |
from gridtk.tools import qsub # TODO CHANGE TO DRMAA |
|
72 |
subm_jobs = [] |
|
73 |
init_dir = os.getcwd() |
|
74 |
for conf in os.listdir(run_type): |
|
75 |
i = conf.split('_')[1] |
|
76 |
while get_jobs_status_sge(subm_jobs).count('q') >= max_qw: |
|
77 |
sleep(30) |
|
78 |
copy(sub_script, f"{run_type}/{conf}") |
|
79 |
os.chdir(f"{run_type}/{conf}") |
|
80 |
job_name = f'{name[:5].capitalize()}{run_type[:3].capitalize()}{i}' |
|
81 |
subm_jobs.append(qsub(sub_script, name=job_name)) |
|
82 |
os.chdir(init_dir) |
|
83 |
|
|
84 |
logger.info('All jobs have been submitted, waiting for them to finish.') |
|
85 |
while not all([stat == 'f' for stat in get_jobs_status_sge(subm_jobs)]): |
|
86 |
sleep(30) |
|
87 |
logger.info('All jobs have finished.') |
|
88 |
|
|
89 |
|
|
90 |
def get_jobs_status_lsf(job_ids): |
|
91 |
"""Returns a list of job status for a list of job ids. |
|
92 |
|
|
93 |
@param job_ids: list of all jobs to be checked their status. |
|
94 |
@return: list of status for every job. |
|
95 |
""" |
|
96 |
run_chk = 'usage 1' # TODO Implement with proper command |
|
97 |
status_list = [] |
|
98 |
running = False # TODO Implement with proper command |
|
99 |
queued = False # TODO Implement with proper command |
|
100 |
finished = False # TODO Implement with proper command |
|
101 |
for job in job_ids: |
|
102 |
if running: # TODO Implement with proper command |
|
103 |
status_list.append('r') |
|
104 |
elif queued: # TODO Implement with proper command |
|
105 |
status_list.append('q') |
|
106 |
elif finished: |
|
59 |
elif stat_dict['p'] == stat_msg: |
|
60 |
status_list.append('p') |
|
61 |
elif stat_dict['f'] == stat_msg: |
|
107 | 62 |
status_list.append('f') |
108 | 63 |
else: |
109 |
raise ValueError
|
|
64 |
logger.warning(f'Unrecognized job status: {job}')
|
|
110 | 65 |
return status_list |
111 | 66 |
|
112 | 67 |
|
113 |
def sub_lsf(run_type, sub_script, max_qw, name): |
|
114 |
"""Submits jobs to the lsf queuing system with the provided script |
|
68 |
def submit_jobs(run_type, sub_cmd, sub_script, stat_cmd, stat_dict, max_pend, |
|
69 |
name): |
|
70 |
"""Submits jobs to a custom queuing system with the provided script |
|
115 | 71 |
|
116 | 72 |
@param run_type: Type of calculation. 'isolated', 'screening', 'refinement' |
73 |
@param sub_cmd: The command used to submit jobs. |
|
117 | 74 |
@param sub_script: script for the job submission. |
118 |
@param max_qw: Maximum number of simultaneous jobs waiting to be executed. |
|
119 |
@param name: name of the project |
|
75 |
@param stat_cmd: Command to check job status. |
|
76 |
@param stat_dict: Dictionary with pairs of job status (r, p, f) and the |
|
77 |
pattern it matches in the output of the stat_cmd. |
|
78 |
@param max_pend: Maximum number of simultaneous jobs waiting to be executed. |
|
79 |
@param name: name of the project. |
|
120 | 80 |
""" |
121 | 81 |
from shutil import copy |
122 | 82 |
from time import sleep |
83 |
from subprocess import PIPE, Popen |
|
123 | 84 |
subm_jobs = [] |
124 | 85 |
init_dir = os.getcwd() |
125 | 86 |
for conf in os.listdir(run_type): |
126 | 87 |
i = conf.split('_')[1] |
127 |
while get_jobs_status_lsf(subm_jobs).count('q') >= max_qw: |
|
88 |
while get_jobs_status(subm_jobs, stat_cmd, stat_dict).count('p') \ |
|
89 |
>= max_pend: |
|
128 | 90 |
sleep(30) |
129 | 91 |
copy(sub_script, f"{run_type}/{conf}") |
130 | 92 |
os.chdir(f"{run_type}/{conf}") |
131 | 93 |
job_name = f'{name[:5].capitalize()}{run_type[:3].capitalize()}{i}' |
132 |
# subm_jobs.append(qsub(sub_script, name=job_name)) # TODO LSF Command |
|
94 |
sub_order = sub_cmd % (job_name, sub_script) |
|
95 |
subm_msg = Popen(sub_order, shell=True, stdout=PIPE).communicate()[0] |
|
96 |
job_id = None |
|
97 |
for word in subm_msg.decode("utf-8").split(): |
|
98 |
try: |
|
99 |
job_id = int(word) |
|
100 |
break |
|
101 |
except ValueError: |
|
102 |
continue |
|
103 |
subm_jobs.append(job_id) |
|
133 | 104 |
os.chdir(init_dir) |
134 | 105 |
|
135 | 106 |
logger.info('All jobs have been submitted, waiting for them to finish.') |
136 |
while not all([stat == 'f' for stat in get_jobs_status_sge(subm_jobs)]): |
|
107 |
while not all([stat == 'f' for stat in |
|
108 |
get_jobs_status(subm_jobs, stat_cmd, stat_dict)]): |
|
137 | 109 |
sleep(30) |
138 | 110 |
logger.info('All jobs have finished.') |
139 | 111 |
|
... | ... | |
171 | 143 |
# elif: inp_vars['code'] == 'Other codes here' |
172 | 144 |
|
173 | 145 |
if inp_vars['batch_q_sys'] == 'sge': |
174 |
sub_sge(run_type, inp_vars['subm_script'], inp_vars['max_qw'], |
|
175 |
inp_vars['project_name']) |
|
176 |
elif inp_vars['batch_q_sys'] == 'lsf': # TODO implement lsf |
|
177 |
sub_lsf(run_type, inp_vars['subm_script'], inp_vars['max_qw'], |
|
178 |
inp_vars['project_name']) |
|
146 |
stat_cmd = "qstat | grep %s | awk '{print $5}'" |
|
147 |
stat_dict = {'r': 'r', 'p': 'qw', 'f': ''} |
|
148 |
submit_jobs(run_type, 'qsub -N %s %s', inp_vars['subm_script'], |
|
149 |
stat_cmd, stat_dict, inp_vars['max_qw'], |
|
150 |
inp_vars['project_name']) |
|
151 |
elif inp_vars['batch_q_sys'] == 'lsf': |
|
152 |
stat_cmd = "bjobs -w | grep %s | awk ''" |
|
153 |
submit_jobs(run_type, 'bsub', inp_vars['subm_script'], |
|
154 |
inp_vars['max_qw'], inp_vars['project_name']) |
|
179 | 155 |
elif inp_vars['batch_q_sys'] == 'local': # TODO implement local |
180 | 156 |
pass # run_local |
181 | 157 |
elif inp_vars['batch_q_sys'] == 'none': |
Formats disponibles : Unified diff