Statistiques
| Branche: | Tag: | Révision :

dockonsurf / modules / dos_input.py @ d8a6314e

Historique | Voir | Annoter | Télécharger (22,04 ko)

1
"""Functions to deal with DockOnSurf input files.
2

3
Functions
4
try_command:Try to run a command and log exceptions (expected and not).
5
str2lst: Converts a string of integers, and groups of them, to a list.
6
check_expect_val: Checks whether an option lies within its expected values.
7
read_input: Sets up the variables of DockOnSurf by reading from an input file.
8
get_run_type: Gets 'run_type' value and check that its value is acceptable.
9
get_code: Gets 'code' value and check that its value is acceptable.
10
get_batch_q_sys: Gets 'batch_q_sys' value and check that its value is
11
acceptable.
12
get_relaunch_err: Gets 'relaunch_err' value and check that its value is
13
acceptable.
14
get_max_qw: Gets 'max_qw' value and check that its value is acceptable.
15
get_special_atoms: Gets 'special_atoms' value and check that its value is
16
acceptable.
17
get_isol_inp_file: Gets 'isol_inp_file' value and check that its value is
18
acceptable.
19
get_cluster_magns: Gets 'cluster_magns' value and check that its value is
20
acceptable.
21
get_num_conformers: Gets 'num_conformers' value and check that its value is
22
acceptable.
23
get_num_prom_cand: Gets 'num_prom_cand' value and check that its value is
24
acceptable.
25
get_iso_rmsd: Gets 'iso_rmsd' value and check that its value is acceptable.
26
get_screen_inp_file: Gets 'screen_inp_file' value and check that its value is
27
acceptable.
28
get_sites: Gets 'sites' value and check that its value is acceptable.
29
get_molec_ads_ctrs: Gets 'molec_ads_ctrs' value and check that its value is
30
acceptable.
31
get_try_disso: Gets 'try_disso' value and check that its value is acceptable.
32
get_pts_per_angle: Gets 'pts_per_angle' value and check that its value is
33
acceptable.
34
get_coll_thrsld: Gets 'coll_thrsld' value and check that its value is
35
acceptable.
36
get_screen_rmsd: Gets 'screen_rmsd' value and check that its value is
37
acceptable.
38
get_coll_bottom_z: Gets 'coll_bottom_z' value and check that its value is
39
acceptable.
40
get_refine_inp_file: Gets 'refine_inp_file' value and check that its value is
41
acceptable.
42
get_energy_cutoff: Gets 'energy_cutoff' value and check that its value is
43
acceptable.
44
"""
45
import os.path
46
import logging
47
from configparser import ConfigParser, NoSectionError, NoOptionError, \
48
    MissingSectionHeaderError, DuplicateOptionError
49

    
50
logger = logging.getLogger('DockOnSurf')
51

    
52
dos_inp = ConfigParser(inline_comment_prefixes='#',
53
                       empty_lines_in_values=False)
54

    
55
new_answers = {'n': False, 'none': False, 'nay': False,
56
               'y': True, '': True, 'aye': True, 'sure': True}
57
for answer, val in new_answers.items():
58
    dos_inp.BOOLEAN_STATES[answer] = val
59
turn_false_answers = [answer for answer in dos_inp.BOOLEAN_STATES
60
                      if dos_inp.BOOLEAN_STATES[answer] is False]
61

    
62
no_sect_err = "Section '%s' not found on input file"
63
no_opt_err = "Option '%s' not found on section '%s'"
64
num_error = "'%s' value must be a %s"
65
unexp_error = "An unexpected error occurred"
66

    
67

    
68
def try_command(command, expct_error_types: list, *args, **kwargs):
69
    """Try to run a command and record exceptions (expected and not) on a log.
70
    
71
    @param command: method or function, the command to be executed.
72
    @param expct_error_types: tuple of tuples, every inner tuple is supposed to
73
    contain an exception type (eg. ValueError, TypeError, etc.) to be caught and
74
    a message to print in the log and on the screen explaining the exception.
75
    Error types that are not allow to be called with a custom message as only
76
    error argument are not supported.
77
    The outer tuple encloses all couples of error types and their relative
78
    messages.
79
    *args and **kwargs: arguments and keyword-arguments of the command to be
80
    executed.
81
    When trying to run 'command' with its args and kwargs, if an exception
82
    present on the 'error_types' occurs, its relative error message is recorded
83
    on the log and a same type exception is raised with the custom message.
84
    """
85

    
86
    err = False
87
    try:
88
        return_val = command(*args, **kwargs)
89
    except Exception as e:
90
        for expct_err in expct_error_types:
91
            if isinstance(e, expct_err[0]):
92
                logger.error(expct_err[1])
93
                err = expct_err[0](expct_err[1])
94
                break
95
        else:
96
            logger.exception(unexp_error)
97
            err = e
98
    else:
99
        err = False
100
        return return_val
101
    finally:
102
        if isinstance(err, BaseException):
103
            raise err
104

    
105

    
106
def str2lst(cmplx_str):  # TODO: enable deeper level of nested lists
107
    """Converts a string of integers, and groups of them, to a list.
108

109
    Keyword arguments:
110
    @param cmplx_str: str, string of integers and groups of them enclosed by
111
    parentheses-like characters.
112
    - Group enclosers: '()' '[]' and '{}'.
113
    - Integer separators: ',' ';' and ' '.
114
    - Nested groups are not allowed: '3 ((6 7) 8) 4'.
115

116
    @return list, list of integers, or list of integers in the case they were
117
    grouped. First, the singlets are placed, and then the groups in input order.
118

119
    eg. '128,(135 138;141] 87 {45, 68}' -> [128, 87, [135, 138, 141], [45, 68]]
120
    """
121

    
122
    # Checks
123
    error_msg = "Function argument should be a str,sequence of integer " \
124
                "numbers separated by ',' ';' or ' '." \
125
                "\nThey can be grouped in parentheses-like enclosers: '()', " \
126
                "'[]' or {}. Nested groups are not allowed. \n" \
127
                "eg. 128,(135 138;141) 87 {45, 68}"
128
    cmplx_str = try_command(cmplx_str.replace, [(AttributeError, error_msg)],
129
                            ',', ' ')
130

    
131
    cmplx_str = cmplx_str.replace(';', ' ').replace('[', '(').replace(
132
        ']', ')').replace('{', '(').replace('}', ')')
133

    
134
    try_command(list, [(ValueError, error_msg)], map(int, cmplx_str.replace(
135
        ')', '').replace('(', '').split()))
136

    
137
    deepness = 0
138
    for el in cmplx_str.split():
139
        if '(' in el:
140
            deepness += 1
141
        if ')' in el:
142
            deepness += -1
143
        if deepness > 1 or deepness < 0:
144
            logger.error(error_msg)
145
            raise ValueError(error_msg)
146

    
147
    init_list = cmplx_str.split()
148
    start_group = []
149
    end_group = []
150
    for i, element in enumerate(init_list):
151
        if '(' in element:
152
            start_group.append(i)
153
            init_list[i] = element.replace('(', '')
154
        if ')' in element:
155
            end_group.append(i)
156
            init_list[i] = element.replace(')', '')
157

    
158
    init_list = list(map(int, init_list))
159

    
160
    new_list = []
161
    for start_el, end_el in zip(start_group, end_group):
162
        new_list.append(init_list[start_el:end_el + 1])
163

    
164
    for v in new_list:
165
        for el in v:
166
            init_list.remove(el)
167
    return init_list + new_list
168

    
169

    
170
def check_expect_val(value, expect_vals):
171
    """Checks whether an option lies within its expected values.
172

173
    Keyword arguments:
174
    @param value: The variable to check if its value lies within the expected
175
    ones
176
    @param expect_vals: list, list of values allowed for the present option.
177
    @raise ValueError: if the value is not among the expected ones.
178
    @return True if the value is among the expected ones.
179
    """
180
    adeq_val_err = "'%s' is not an adequate value.\n" \
181
                   "Adequate values: %s"
182
    if not any([exp_val in value for exp_val in expect_vals]):
183
        logger.error(adeq_val_err % (value, expect_vals))
184
        raise ValueError(adeq_val_err % (value, expect_vals))
185

    
186
    return True
187

    
188

    
189
def get_run_type():
190
    isolated, screening, refinement = (False, False, False)
191
    run_type_vals = ['isolated', 'screening', 'refinement', 'adsorption',
192
                     'full']
193
    check_expect_val(dos_inp.get('Global', 'run_type').lower(), run_type_vals)
194

    
195
    run_type = dos_inp.get('Global', 'run_type').lower()
196
    if 'isolated' in run_type:
197
        isolated = True
198
    if 'screening' in run_type:
199
        screening = True
200
    if 'refinement' in run_type:
201
        refinement = True
202
    if 'adsorption' in run_type:
203
        screening, refinement = (True, True)
204
    if 'full' in run_type:
205
        isolated, screening, refinement = (True, True, True)
206

    
207
    return isolated, screening, refinement
208

    
209

    
210
def get_code():
211
    code_vals = ['cp2k']
212
    check_expect_val(dos_inp.get('Global', 'code').lower(), code_vals)
213
    code = dos_inp.get('Global', 'code').lower()
214
    return code
215

    
216

    
217
def get_batch_q_sys():
218
    batch_q_sys_vals = ['sge']
219
    check_expect_val(dos_inp.get('Global', 'batch_q_sys').lower(),
220
                     batch_q_sys_vals)
221
    batch_q_sys = dos_inp.get('Global', 'batch_q_sys').lower()
222
    return batch_q_sys
223

    
224

    
225
def get_relaunch_err():
226
    relaunch_err_vals = ['geo_not_conv', 'false']
227
    relaunch_err = dos_inp.get('Global', 'relaunch_err',
228
                               fallback="False")
229
    if relaunch_err.lower() in turn_false_answers:
230
        return False
231
    else:
232
        check_expect_val(relaunch_err.lower(), relaunch_err_vals)
233
    return relaunch_err
234

    
235

    
236
def get_max_qw():
237
    err_msg = num_error % ('max_qw', 'positive integer')
238
    max_qw = try_command(dos_inp.getint, [(ValueError, err_msg)],
239
                         'Global', 'max_qw', fallback=3)
240

    
241
    if max_qw < 1:
242
        logger.error(num_error % ('max_qw', 'positive integer'))
243
        raise ValueError(num_error % ('max_qw', 'positive integer'))
244
    return max_qw
245

    
246

    
247
def get_special_atoms():
248
    from ase.data import chemical_symbols
249

    
250
    spec_at_err = '\'special_atoms\' does not have an adequate format.\n' \
251
                  'Adequate format: (Fe1 Fe) (O1 O)'
252
    special_atoms = dos_inp.get('Global', 'special_atoms', fallback="False")
253
    if special_atoms.lower() in turn_false_answers:
254
        special_atoms = False
255
    else:
256
        # Converts the string into a list of tuples
257
        lst_tple = [tuple(pair.replace("(", "").split()) for pair in
258
                    special_atoms.split(")")[:-1]]
259
        if len(lst_tple) == 0:
260
            logger.error(spec_at_err)
261
            raise ValueError(spec_at_err)
262
        for i, tup in enumerate(lst_tple):
263
            if type(tup) is not tuple or len(tup) != 2:
264
                logger.error(spec_at_err)
265
                raise ValueError(spec_at_err)
266
            if tup[1].capitalize() not in chemical_symbols:
267
                elem_err = "The second element of the couple should be an " \
268
                           "actual element of the periodic table"
269
                logger.error(elem_err)
270
                raise ValueError(elem_err)
271
            if tup[0].capitalize() in chemical_symbols:
272
                elem_err = "The first element of the couple is already an " \
273
                           "actual element of the periodic table, "
274
                logger.error(elem_err)
275
                raise ValueError(elem_err)
276
            for j, tup2 in enumerate(lst_tple):
277
                if j <= i:
278
                    continue
279
                if tup2[0] == tup[0]:
280
                    label_err = f'You have specified the label {tup[0]} to ' \
281
                                f'more than one special atom'
282
                    logger.error(label_err)
283
                    raise ValueError(label_err)
284
        special_atoms = lst_tple
285
    return special_atoms
286

    
287

    
288
def get_isol_inp_file():
289
    isol_inp_file = dos_inp.get('Isolated', 'isol_inp_file')
290
    if not os.path.isfile(isol_inp_file):
291
        logger.error(f'File {isol_inp_file} not found')
292
        raise FileNotFoundError(f'File {isol_inp_file} not found')
293
    return isol_inp_file
294

    
295

    
296
def get_cluster_magns():
297
    clust_magns_vals = ['energy', 'moi']
298
    cluster_magns_str = dos_inp.get('Isolated', 'cluster_magns',
299
                                    fallback='energy')
300
    cluster_magns_str.replace(',', ' ').replace(';', ' ')
301
    cluster_magns = cluster_magns_str.split(' ')
302
    cluster_magns = [m.lower() for m in cluster_magns]
303
    for m in cluster_magns:
304
        check_expect_val(m, clust_magns_vals)
305
    return cluster_magns
306

    
307

    
308
def get_num_conformers():
309
    err_msg = num_error % ('num_conformers', 'positive integer')
310
    num_conformers = try_command(dos_inp.getint, [(ValueError, err_msg)],
311
                                 'Isolated', 'num_conformers', fallback=100)
312
    if num_conformers < 1:
313
        logger.error(err_msg)
314
        raise ValueError(err_msg)
315
    return num_conformers
316

    
317

    
318
def get_num_prom_cand():
319
    err_msg = num_error % ('num_prom_cand', 'positive integer')
320
    num_prom_cand = try_command(dos_inp.getint, [(ValueError, err_msg)],
321
                                'Isolated', 'num_prom_cand', fallback=3)
322
    if num_prom_cand < 1:
323
        logger.error(err_msg)
324
        raise ValueError(err_msg)
325
    return num_prom_cand
326

    
327

    
328
def get_iso_rmsd():
329
    err_msg = num_error % ('iso_rmsd', 'positive decimal number')
330
    iso_rmsd = try_command(dos_inp.getfloat, [(ValueError, err_msg)],
331
                           'Isolated', 'iso_rmsd', fallback=0.05)
332
    if iso_rmsd <= 0.0:
333
        logger.error(err_msg)
334
        raise ValueError(err_msg)
335
    return iso_rmsd
336

    
337

    
338
def get_screen_inp_file():
339
    screen_inp_file = dos_inp.get('Screening', 'screen_inp_file')
340
    if not os.path.isfile(screen_inp_file):
341
        logger.error(f'File {screen_inp_file} not found')
342
        raise FileNotFoundError(f'File {screen_inp_file} not found')
343
    return screen_inp_file
344

    
345

    
346
def get_sites():
347
    err_msg = 'The value of sites should be a list of atom numbers ' \
348
              '(ie. positive integers) or groups of atom numbers ' \
349
              'grouped by parentheses-like enclosers. \n' \
350
              'eg. 128,(135 138;141) 87 {45, 68}'
351
    # Convert the string into a list of lists
352
    sites = try_command(str2lst,
353
                        [(ValueError, err_msg), (AttributeError, err_msg)],
354
                        dos_inp.get('Screening', 'sites'))
355
    # Check all elements of the list (of lists) are positive integers
356
    for site in sites:
357
        if type(site) is list:
358
            for atom in site:
359
                if atom < 0:
360
                    logger.error(err_msg)
361
                    raise ValueError(err_msg)
362
        elif type(site) is int:
363
            if site < 0:
364
                logger.error(err_msg)
365
                raise ValueError(err_msg)
366
        else:
367
            logger.error(err_msg)
368
            raise ValueError(err_msg)
369

    
370
    return sites
371

    
372

    
373
def get_molec_ads_ctrs():
374
    err_msg = 'The value of molec_ads_ctrs should be a list of atom' \
375
              ' numbers (ie. positive integers) or groups of atom ' \
376
              'numbers enclosed by parentheses-like characters. \n' \
377
              'eg. 128,(135 138;141) 87 {45, 68}'
378
    # Convert the string into a list of lists
379
    molec_ads_ctrs = try_command(str2lst,
380
                                 [(ValueError, err_msg),
381
                                  (AttributeError, err_msg)],
382
                                 dos_inp.get('Screening', 'molec_ads_ctrs'))
383
    # Check all elements of the list (of lists) are positive integers
384
    for ctr in molec_ads_ctrs:
385
        if type(ctr) is list:
386
            for atom in ctr:
387
                if atom < 0:
388
                    logger.error(err_msg)
389
                    raise ValueError(err_msg)
390
        elif type(ctr) is int:
391
            if ctr < 0:
392
                logger.error(err_msg)
393
                raise ValueError(err_msg)
394
        else:
395
            logger.error(err_msg)
396
            raise ValueError(err_msg)
397

    
398
    return molec_ads_ctrs
399

    
400

    
401
def get_try_disso():
402
    err_msg = "try_disso should be have a boolean value (True or False)"
403
    try_disso = try_command(dos_inp.getboolean,
404
                            [(ValueError, err_msg)],
405
                            'Screening', 'try_disso', fallback=False)
406
    return try_disso
407

    
408

    
409
def get_pts_per_angle():
410
    err_msg = num_error % ('sample_points_per_angle',
411
                           'positive integer')
412
    pts_per_angle = try_command(dos_inp.getint,
413
                                [(ValueError, err_msg)],
414
                                'Screening', 'sample_points_per_angle',
415
                                fallback=3)
416

    
417
    return pts_per_angle
418

    
419

    
420
def get_coll_thrsld():
421
    err_msg = num_error % ('collision_threshold',
422
                           'positive decimal number')
423

    
424
    coll_thrsld = try_command(dos_inp.getfloat,
425
                              [(ValueError, err_msg)],
426
                              'Screening', 'collision_threshold', fallback=1.2)
427
    if coll_thrsld <= 0:
428
        logger.error(err_msg)
429
        raise ValueError(err_msg)
430

    
431
    return coll_thrsld
432

    
433

    
434
def get_screen_rmsd():
435
    err_msg = num_error % ('screen_rmsd', 'positive decimal number')
436
    screen_rmsd = try_command(dos_inp.getfloat,
437
                              [(ValueError, err_msg)],
438
                              'Screening', 'screen_rmsd', fallback=0.05)
439
    if screen_rmsd <= 0:
440
        logger.error(err_msg)
441
        raise ValueError(err_msg)
442

    
443
    return screen_rmsd
444

    
445

    
446
def get_coll_bottom_z():
447
    err_msg = num_error % ('collision_bottom_z', 'decimal number')
448
    coll_bottom_z = dos_inp.get('Screening', 'collision_bottom_z',
449
                                fallback="False")
450
    if coll_bottom_z.lower() in turn_false_answers:
451
        coll_bottom_z = False
452
    else:
453
        coll_bottom_z = try_command(float, [(ValueError, err_msg)],
454
                                    coll_bottom_z)
455

    
456
    return coll_bottom_z
457

    
458

    
459
def get_refine_inp_file():
460
    refine_inp_file = dos_inp.get('Refinement', 'refine_inp_file')
461
    if not os.path.isfile(refine_inp_file):
462
        logger.error(f'File {refine_inp_file} not found')
463
        raise FileNotFoundError(f'File {refine_inp_file} not found')
464

    
465
    return refine_inp_file
466

    
467

    
468
def get_energy_cutoff():
469
    err_msg = num_error % ('energy_cutoff', 'positive decimal number')
470
    energy_cutoff = try_command(dos_inp.getfloat,
471
                                [(ValueError, err_msg)],
472
                                'Refinement', 'energy_cutoff', fallback=0.5)
473
    if energy_cutoff < 0:
474
        logger.error(err_msg)
475
        raise ValueError(err_msg)
476
    return energy_cutoff
477

    
478

    
479
def read_input(in_file):
480
    err = False
481
    try:
482
        dos_inp.read(in_file)
483
    except MissingSectionHeaderError as e:
484
        logger.error('There are options in the input file without a Section '
485
                     'header')
486
        err = e
487
    except DuplicateOptionError as e:
488
        logger.error('There is an option in the input file that has been '
489
                     'specified more than once, possibly due to the lack of a '
490
                     'Section header')
491
        err = e
492
    except Exception as e:
493
        err = e
494
    else:
495
        err = False
496
    finally:
497
        if isinstance(err, BaseException):
498
            raise err
499

    
500
    return_vars = {}
501

    
502
    # Global
503
    if not dos_inp.has_section('Global'):
504
        logger.error(no_sect_err % 'Global')
505
        raise NoSectionError('Global')
506

    
507
    # Mandatory options
508
    # Checks whether the mandatory options 'run_type', 'code', etc. are present.
509
    screen_mand_opts = ['run_type', 'code', 'batch_q_sys']
510
    for opt in screen_mand_opts:
511
        if not dos_inp.has_option('Global', opt):
512
            logger.error(no_opt_err % (opt, 'Global'))
513
            raise NoOptionError(opt, 'Global')
514

    
515
    # Gets which sections are to be carried out
516
    isolated, screening, refinement = get_run_type()
517
    return_vars['isolated'] = isolated
518
    return_vars['screening'] = screening
519
    return_vars['refinement'] = refinement
520
    return_vars['code'] = get_code()
521
    return_vars['batch_q_sys'] = get_batch_q_sys()
522

    
523
    # Facultative options (Default/Fallback value present)
524
    return_vars['relaunch_err'] = get_relaunch_err()
525
    return_vars['max_qw'] = get_max_qw()
526
    return_vars['special_atoms'] = get_special_atoms()
527

    
528
    # Isolated
529
    if isolated:
530
        if not dos_inp.has_section('Isolated'):
531
            logger.error(no_sect_err % 'Isolated')
532
            raise NoSectionError('Isolated')
533
        # Mandatory options
534
        # Checks whether the mandatory options are present.
535
        iso_mand_opts = ['isol_inp_file']
536
        for opt in iso_mand_opts:
537
            if not dos_inp.has_option('Isolated', opt):
538
                logger.error(no_opt_err % (opt, 'Isolated'))
539
                raise NoOptionError(opt, 'Isolated')
540
        return_vars['isol_inp_file'] = get_isol_inp_file()
541

    
542
        # Facultative options (Default/Fallback value present)
543
        return_vars['cluster_magns'] = get_cluster_magns()
544
        return_vars['num_conformers'] = get_num_conformers()
545
        return_vars['num_prom_cand'] = get_num_prom_cand()
546
        return_vars['iso_rmsd'] = get_iso_rmsd()
547

    
548
    # Screening
549
    if screening:
550
        if not dos_inp.has_section('Screening'):
551
            logger.error(no_sect_err % 'Screening')
552
            raise NoSectionError('Screening')
553
        # Mandatory options:
554
        # Checks whether the mandatory options are present.
555
        screen_mand_opts = ['sites', 'molec_ads_ctrs', 'screen_inp_file']
556
        for opt in screen_mand_opts:
557
            if not dos_inp.has_option('Screening', opt):
558
                logger.error(no_opt_err % (opt, 'Screening'))
559
                raise NoOptionError(opt, 'Screening')
560
        return_vars['screen_inp_file'] = get_screen_inp_file()
561
        return_vars['sites'] = get_sites()
562
        return_vars['molec_ads_ctrs'] = get_molec_ads_ctrs()
563

    
564
        # Facultative options (Default value present)
565
        return_vars['try_disso'] = get_try_disso()
566
        return_vars['sample_points_per_angle'] = get_pts_per_angle()
567
        return_vars['collision_threshold'] = get_coll_thrsld()
568
        return_vars['screen_rmsd'] = get_screen_rmsd()
569
        return_vars['collision_bottom_z'] = get_coll_bottom_z()
570

    
571
    # Refinement
572
    if refinement:
573
        if not dos_inp.has_section('Refinement'):
574
            logger.error(no_sect_err % 'Refinement')
575
            raise NoSectionError('Refinement')
576
        # Mandatory options
577
        # Checks whether the mandatory options are present.
578
        ref_mand_opts = ['refine_inp_file']
579
        for opt in ref_mand_opts:
580
            if not dos_inp.has_option('Refinement', opt):
581
                logger.error(no_opt_err % (opt, 'Refinement'))
582
                raise NoOptionError(opt, 'Refinement')
583
        return_vars['refine_inp_file'] = get_refine_inp_file()
584

    
585
        # Facultative options (Default value present)
586
        return_vars['energy_cutoff'] = get_energy_cutoff()
587
        # end energy_cutoff
588

    
589
    return_vars_str = "\n\t".join([str(key) + ": " + str(val)
590
                                   for key, val in return_vars.items()])
591
    logger.info(
592
        f'Correctly read {in_file} parameters: \n\n\t{return_vars_str}\n')
593

    
594
    return return_vars
595

    
596

    
597
if __name__ == "__main__":
598
    import sys
599

    
600
    print(read_input(sys.argv[1]))