dockonsurf / modules / utilities.py @ 4e82c425
Historique | Voir | Annoter | Télécharger (3,59 ko)
1 |
import logging |
---|---|
2 |
|
3 |
logger = logging.getLogger('DockOnSurf')
|
4 |
|
5 |
|
6 |
def tail(f, lines=20): |
7 |
"""Returns the specified last number of lines of a file.
|
8 |
|
9 |
@param f: The file to retrieve the last lines from.
|
10 |
@param lines: The number of lines to be retrieved.
|
11 |
@return str: The last number of lines
|
12 |
"""
|
13 |
total_lines_wanted = lines |
14 |
|
15 |
block_size = 1024
|
16 |
f.seek(0, 2) |
17 |
block_end_byte = f.tell() |
18 |
lines_to_go = total_lines_wanted |
19 |
block_number = -1
|
20 |
blocks = [] |
21 |
while lines_to_go > 0 and block_end_byte > 0: |
22 |
if block_end_byte - block_size > 0: |
23 |
f.seek(block_number * block_size, 2)
|
24 |
blocks.append(f.read(block_size)) |
25 |
else:
|
26 |
f.seek(0, 0) |
27 |
blocks.append(f.read(block_end_byte)) |
28 |
lines_found = blocks[-1].count(b'\n') |
29 |
lines_to_go -= lines_found |
30 |
block_end_byte -= block_size |
31 |
block_number -= 1
|
32 |
all_read_text = b''.join(reversed(blocks)) |
33 |
return b'\n'.join(all_read_text.splitlines()[-total_lines_wanted:]).decode() |
34 |
|
35 |
|
36 |
def check_bak(file_name): |
37 |
"""Checks if a file already exists and backs it up if so.
|
38 |
|
39 |
@param file_name: file to be checked if exists
|
40 |
"""
|
41 |
import os |
42 |
new_name = file_name |
43 |
bak_num = 0
|
44 |
while os.path.isdir(new_name) or os.path.isfile(new_name): |
45 |
bak_num += 1
|
46 |
new_name = new_name.split(".bak")[0] + f".bak{bak_num}" |
47 |
if bak_num > 0: |
48 |
os.rename(file_name, new_name) |
49 |
logger.warning(f"'{file_name}' already present. Backed it up to "
|
50 |
f"{new_name}.")
|
51 |
|
52 |
|
53 |
def try_command(command, expct_error_types: list, *args, **kwargs): |
54 |
"""Try to run a command and record exceptions (expected and not) on a log.
|
55 |
|
56 |
@param command: method or function, the command to be executed.
|
57 |
@param expct_error_types: list of tuples, every inner tuple is supposed to
|
58 |
contain an exception type (eg. ValueError, TypeError, etc.) to be caught and
|
59 |
a message to print in the log and on the screen explaining the exception.
|
60 |
Error types that are not allow to be called with a custom message as only
|
61 |
error argument are not supported.
|
62 |
The outer tuple encloses all couples of error types and their relative
|
63 |
messages.
|
64 |
*args and **kwargs: arguments and keyword-arguments of the command to be
|
65 |
executed.
|
66 |
When trying to run 'command' with its args and kwargs, if an exception
|
67 |
present on the 'error_types' occurs, its relative error message is recorded
|
68 |
on the log and a same type exception is raised with the custom message.
|
69 |
"""
|
70 |
unexp_error = "An unexpected error occurred"
|
71 |
|
72 |
err = False
|
73 |
try:
|
74 |
return_val = command(*args, **kwargs) |
75 |
except Exception as e: |
76 |
for expct_err in expct_error_types: |
77 |
if isinstance(e, expct_err[0]): |
78 |
logger.error(expct_err[1])
|
79 |
err = expct_err[0](expct_err[1]) |
80 |
break
|
81 |
else:
|
82 |
logger.exception(unexp_error) |
83 |
err = e |
84 |
else:
|
85 |
err = False
|
86 |
return return_val
|
87 |
finally:
|
88 |
if isinstance(err, BaseException): |
89 |
raise err
|
90 |
|
91 |
|
92 |
def _human_key(key): |
93 |
"""Function used as sorting strategy where numbers are sorted human-wise.
|
94 |
|
95 |
@param key:
|
96 |
@return:
|
97 |
"""
|
98 |
import re |
99 |
parts = re.split('(\d*\.\d+|\d+)', key)
|
100 |
return tuple((e.swapcase() if i % 2 == 0 else float(e)) |
101 |
for i, e in enumerate(parts)) |
102 |
|
103 |
|
104 |
def is_binary(file): |
105 |
"""Checks if a file is a text file or a binary one.
|
106 |
|
107 |
@param file:
|
108 |
@return:
|
109 |
"""
|
110 |
try:
|
111 |
with open(file, "r") as fh: |
112 |
fh.read(50)
|
113 |
except UnicodeDecodeError: |
114 |
return True |
115 |
else:
|
116 |
return False |