Révision 2be92b2c modules/calculation.py

b/modules/calculation.py
39 39
        ase.io.write(f'{run_type}/conf_{i}/{coord_file}', conf)
40 40

  
41 41

  
42
def get_jobs_status_sge(job_ids):  # TODO more elegant
42
def get_jobs_status(job_ids, stat_cmd, stat_dict):
43 43
    """Returns a list of job status for a list of job ids.
44 44

  
45 45
    @param job_ids: list of all jobs to be checked their status.
46
    @param stat_cmd: Command to check job status.
47
    @param stat_dict: Dictionary with pairs of job status (r, p, f) and the
48
        pattern it matches in the output of the stat_cmd.
46 49
    @return: list of status for every job.
47 50
    """
48
    from gridtk.tools import qstat
49
    run_chk = 'usage         1'
51
    from subprocess import PIPE, Popen
50 52
    status_list = []
51 53
    for job in job_ids:
52
        if run_chk in qstat(job):
54
        stat_order = stat_cmd % job
55
        stat_msg = Popen(stat_order, shell=True,
56
                         stdout=PIPE).communicate()[0].decode('utf-8').strip()
57
        if stat_dict['r'] == stat_msg:
53 58
            status_list.append('r')
54
        elif len(qstat(job)) > 0:
55
            status_list.append('q')
56
        else:
57
            status_list.append('f')
58
    return status_list
59

  
60

  
61
def sub_sge(run_type, sub_script, max_qw, name):
62
    """Submits jobs to the sge queuing system with the provided script
63

  
64
    @param run_type: Type of calculation. 'isolated', 'screening', 'refinement'
65
    @param sub_script: script for the job submission.
66
    @param max_qw: Maximum number of simultaneous jobs waiting to be executed.
67
    @param name: name of the project
68
    """
69
    from shutil import copy
70
    from time import sleep
71
    from gridtk.tools import qsub  # TODO CHANGE TO DRMAA
72
    subm_jobs = []
73
    init_dir = os.getcwd()
74
    for conf in os.listdir(run_type):
75
        i = conf.split('_')[1]
76
        while get_jobs_status_sge(subm_jobs).count('q') >= max_qw:
77
            sleep(30)
78
        copy(sub_script, f"{run_type}/{conf}")
79
        os.chdir(f"{run_type}/{conf}")
80
        job_name = f'{name[:5].capitalize()}{run_type[:3].capitalize()}{i}'
81
        subm_jobs.append(qsub(sub_script, name=job_name))
82
        os.chdir(init_dir)
83

  
84
    logger.info('All jobs have been submitted, waiting for them to finish.')
85
    while not all([stat == 'f' for stat in get_jobs_status_sge(subm_jobs)]):
86
        sleep(30)
87
    logger.info('All jobs have finished.')
88

  
89

  
90
def get_jobs_status_lsf(job_ids):
91
    """Returns a list of job status for a list of job ids.
92

  
93
    @param job_ids: list of all jobs to be checked their status.
94
    @return: list of status for every job.
95
    """
96
    run_chk = 'usage         1'  # TODO Implement with proper command
97
    status_list = []
98
    running = False  # TODO Implement with proper command
99
    queued = False  # TODO Implement with proper command
100
    finished = False  # TODO Implement with proper command
101
    for job in job_ids:
102
        if running:  # TODO Implement with proper command
103
            status_list.append('r')
104
        elif queued:  # TODO Implement with proper command
105
            status_list.append('q')
106
        elif finished:
59
        elif stat_dict['p'] == stat_msg:
60
            status_list.append('p')
61
        elif stat_dict['f'] == stat_msg:
107 62
            status_list.append('f')
108 63
        else:
109
            raise ValueError
64
            logger.warning(f'Unrecognized job status: {job}')
110 65
    return status_list
111 66

  
112 67

  
113
def sub_lsf(run_type, sub_script, max_qw, name):
114
    """Submits jobs to the lsf queuing system with the provided script
68
def submit_jobs(run_type, sub_cmd, sub_script, stat_cmd, stat_dict, max_pend,
69
                name):
70
    """Submits jobs to a custom queuing system with the provided script
115 71

  
116 72
    @param run_type: Type of calculation. 'isolated', 'screening', 'refinement'
73
    @param sub_cmd: The command used to submit jobs.
117 74
    @param sub_script: script for the job submission.
118
    @param max_qw: Maximum number of simultaneous jobs waiting to be executed.
119
    @param name: name of the project
75
    @param stat_cmd: Command to check job status.
76
    @param stat_dict: Dictionary with pairs of job status (r, p, f) and the
77
        pattern it matches in the output of the stat_cmd.
78
    @param max_pend: Maximum number of simultaneous jobs waiting to be executed.
79
    @param name: name of the project.
120 80
    """
121 81
    from shutil import copy
122 82
    from time import sleep
83
    from subprocess import PIPE, Popen
123 84
    subm_jobs = []
124 85
    init_dir = os.getcwd()
125 86
    for conf in os.listdir(run_type):
126 87
        i = conf.split('_')[1]
127
        while get_jobs_status_lsf(subm_jobs).count('q') >= max_qw:
88
        while get_jobs_status(subm_jobs, stat_cmd, stat_dict).count('p') \
89
                >= max_pend:
128 90
            sleep(30)
129 91
        copy(sub_script, f"{run_type}/{conf}")
130 92
        os.chdir(f"{run_type}/{conf}")
131 93
        job_name = f'{name[:5].capitalize()}{run_type[:3].capitalize()}{i}'
132
        # subm_jobs.append(qsub(sub_script, name=job_name))  # TODO LSF Command
94
        sub_order = sub_cmd % (job_name, sub_script)
95
        subm_msg = Popen(sub_order, shell=True, stdout=PIPE).communicate()[0]
96
        job_id = None
97
        for word in subm_msg.decode("utf-8").split():
98
            try:
99
                job_id = int(word)
100
                break
101
            except ValueError:
102
                continue
103
        subm_jobs.append(job_id)
133 104
        os.chdir(init_dir)
134 105

  
135 106
    logger.info('All jobs have been submitted, waiting for them to finish.')
136
    while not all([stat == 'f' for stat in get_jobs_status_sge(subm_jobs)]):
107
    while not all([stat == 'f' for stat in
108
                   get_jobs_status(subm_jobs, stat_cmd, stat_dict)]):
137 109
        sleep(30)
138 110
    logger.info('All jobs have finished.')
139 111

  
......
171 143
    # elif: inp_vars['code'] == 'Other codes here'
172 144

  
173 145
    if inp_vars['batch_q_sys'] == 'sge':
174
        sub_sge(run_type, inp_vars['subm_script'], inp_vars['max_qw'],
175
                inp_vars['project_name'])
176
    elif inp_vars['batch_q_sys'] == 'lsf':  # TODO implement lsf
177
        sub_lsf(run_type, inp_vars['subm_script'], inp_vars['max_qw'],
178
                inp_vars['project_name'])
146
        stat_cmd = "qstat | grep %s | awk '{print $5}'"
147
        stat_dict = {'r': 'r', 'p': 'qw', 'f': ''}
148
        submit_jobs(run_type, 'qsub -N %s %s', inp_vars['subm_script'],
149
                    stat_cmd, stat_dict, inp_vars['max_qw'],
150
                    inp_vars['project_name'])
151
    elif inp_vars['batch_q_sys'] == 'lsf':
152
        stat_cmd = "bjobs -w | grep %s | awk ''"
153
        submit_jobs(run_type, 'bsub', inp_vars['subm_script'],
154
                    inp_vars['max_qw'], inp_vars['project_name'])
179 155
    elif inp_vars['batch_q_sys'] == 'local':  # TODO implement local
180 156
        pass  # run_local
181 157
    elif inp_vars['batch_q_sys'] == 'none':

Formats disponibles : Unified diff