Table of Contents

Codeschnipsel für Python

Eine kleine Sammlung an Codefragmenten und Routinen für Python.

try main Block mit Exception Handling

import sys
 
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('\nCtrl+C detected, will abort process')
        sys.exit(0)
    except Exception as e:
        print('\nAn error occured during execution:')
        print(e)
        if '-v' or '--verbose' in sys.argv:
            raise e
        sys.exit(1)

Liste zeilenweise ausgeben

print(*myList, sep='\n')

Erstes Listenelement oder None erhalten

next(iter(your_list or []), None)

Ternärer Operator

print('The key is set to %s' % ('true' if key else 'false'))

Zeitstempel

import datetime
 
ts = str(datetime.datetime.now()).split('.')[0]
print(ts)

for else Beispiel

for i in range(1, 5):
    print(i)
else:
    print('The for loop is over.')

Temporäres Verzeichnis als Context Manager verwenden

import tempfile
 
with tempfile.TemporaryDirectory() as tmp_dir:
    pass  # do stuff

Mittels Pipe übergebene Daten lesen

import sys
 
# ensure the script only reads piped data and doesn't wait for user input
if not sys.stdin.isatty():
    lst = [l.strip() for l in sys.stdin.readlines() if not l.startswith('#')]
    print(lst)

Dateien aus einem Verzeichnis erhalten oder aus Datei lesen

import os
from os.path import expanduser, isdir, isfile, abspath, join as pjoin
 
def get_files(path=/some/path):
    files = []
    path = expanduser(path)
 
    if isdir(path):
        files = [pjoin(abspath(path), f) for f in os.listdir(path) if
                 (isfile(pjoin(path, f)) and f.lower().endswith('.ext'))]
    elif isfile(path):
        with open(path, 'r') as lst:
            files = [l.strip() for l in lst.readlines() if not l.startswith('#') and l.split()]
 
    return files

Anführungszeichen um einen String entfernen

def unquote(string):
    """Remove matching pair of single or double quotes around string"""
    if (string[0] == string[-1]) and string.startswith(("'", '"')):
        return string[1:-1]
    return string

Ohne Überprüfen von passenden Angührungszeichen:

return re.sub(r'^["\']|["\']$', '', string)

Formatierte Ausgabe mit Einheitenpräfix

import re
 
def unit_prefix(number):
    """Format big number by using unit prefixes"""
    if number >= 1000000000:
        if str(number).count('0') >= 9:
            return re.sub(r"000000000$", "G", str(number))
        else:
            return str(number/1E9) + 'G'
    elif number >= 1000000:
        if str(number).count('0') >= 6:
            return re.sub(r"000000$", "M", str(number))
        else:
            return str(number/1E6) + 'M'
    elif number >= 1000:
        if str(number).count('0') >= 3:
            return re.sub(r"000$", "k", str(number))
        else:
            return str(number/1E3) + 'k'
    else:
        return str(number)
 
print("{0:<20s} {1:>4d} files with {2:>4s} entries"
      .format(some_string, integer, unit_prefix(integer)))

String splitten anhang von Trennzeichen außerhalb von Anführungszeichen

import re
 
line = "'hello world' 'foo' 'bar' '\"baz zab\"'"
 
delimiter = ' '
pattern = re.compile(r'''((?:[^%s"']|"[^"]*"|'[^']*')+)''' % delimiter)
print(pattern.split(line)[1::2])

Schleife für Eingabe von Integern

def input_digit(input_msg, max_retries=4, fail_msg='Invalid input, this channel will be skipped'):
    """Show the user an input dialogue to enter a digit, return 0 if it failed after max_retries"""
    num, count = 0, 0
    while True:
        count += 1
        try:
            num = int(input(input_msg + ' '))
            break
        except ValueError:
            if count < max_retries:
                print("Your input wasn't a number, please try again:")
            else:
                print(fail_msg)
                break
    return num

Terminalgröße ermitteln

from subprocess import check_output
 
def term_size():
    return list(map(int, check_output(['stty', 'size']).split()))
 
def term_height():
    return term_size()[0]
 
def term_width():
    return term_size()[1]
 
 
# shorter possibility:
import os
 
os.get_terminal_size().columns
w, h = os.get_terminal_size()

Binary in $PATH finden

import sys
from os.path import abspath
from distutils.spawn import find_executable
 
exe = find_executable('binary_name')
if not exe:
    print('[ERROR] Binary not found in your $PATH variable!')
    sys.exit(1)
else:
    exe = abspath(pluto)

Fortschrittsanzeige

progress.py
import sys
from math import ceil
from time import sleep
 
def progress(total, length=20, bar='=', empty=' '):
    point = total/100
    increment = total/length
 
    for i in range(total+1):  # add one as counting starts at 0
        sys.stdout.write('\r')
        fill = int(i/increment)
        # use ceil to ensure that 100% is reached, just in case of low precision
        sys.stdout.write("[%s%s] %3d%%" % (bar*fill, empty*(length-fill), ceil(i/point)))
        sys.stdout.flush()
        sleep(0.1)
 
    print()
 
 
def progress_simple(total, bar='='):
    point = total/100
    increment = total/20
 
    for i in range(total+1):  # add one as counting starts at 0
        sys.stdout.write('\r')
        sys.stdout.write("[%-20s] %3d%%" % (bar*int(i/increment), ceil(i/point)))
        sys.stdout.flush()
        sleep(0.1)
 
    print()
 
 
try:
    total = int(sys.argv[1])
except IndexError:
    total = 25
except ValueError:
    exit("The given argument is not a valid number!")
except:
    exit("Unknown error")
 
progress_simple(total)
progress(total, 40, '#', '-')

Verzeichnis temporär wechseln (Exception-safe)

cd_exception_safe.py
from contextlib import contextmanager
import os
 
@contextmanager
def cd(newdir):
    prevdir = os.getcwd()
    os.chdir(os.path.expanduser(newdir))
    try:
        yield
    finally:
        os.chdir(prevdir)
 
# cd using context manager and decorator
# directory is reverted even after an exception is thrown
 
os.chdir('/home')
 
with cd('/tmp'):
    # ...
    raise Exception("There's no place like home.")
# Directory is now back to '/home'.

Liste mit Inhalt einer anderen Liste filtern

# filter a list by checking if substring of another list is contained in entries of the list to be filtered
# here flags is some list which contain entries from the list regex --> filter matching regex from list flags
flags = ['"s/regex/looking/g"', 'options', '"s/another/regex/"', '--more flags']
regex = [unquote(flag) for flag in flags if unquote(flag).startswith('s/')]
flags = [flag for flag in flags if not any(flag for reg in regex if reg in flag)]

Ausgiebige Tests, ob Ordner oder Dateien existieren, inklusive Rechte

path_checks.py
import os
import errno
from os.path import abspath, dirname, join as pjoin
 
def check_path(path, create=False, write=True):
    """Check if given path exists and is readable as well as writable if specified;
    if create is true and the path doesn't exist, the directories will be created if possible"""
    path = os.path.expanduser(path)
    exist = os.path.isdir(path)
    if not exist and create:
        print("Directory '%s' does not exist, it will be created now" % path)
        # try to create the directory; if it should exist for whatever reason,
        # ignore it, otherwise report the error
        try:
            os.makedirs(path)
        except OSError as exception:
            if exception.errno == errno.EACCES:
                print_error("[ERROR] You don't have the permission to create directories in '%s'" % dirname(path))
                return False
            elif exception.errno != errno.EEXIST:
                raise
        return True
    elif not exist:
        print_error("[ERROR] Directory '%s' does not exist" % path)
        return False
    else:
        if not is_readable(path):
            print_error("[ERROR] Directory '%s' is not readable" % path)
            return False
        if write and not is_writable(path):
            print_error("[ERROR] Directory '%s' is not writable" % path)
            return False
    return True
 
def check_file(path, file):
    """Check if a file in a certain path exists and is readable;
    if the file argument is omitted only path is checked, it's recommended to use
    check_path instead for this case"""
    path = os.path.expanduser(path)
    if file is None:
        if not os.path.isfile(path):
            print_error("[ERROR] The file '%s' does not exist!" % (path))
            return False
        else:
            if not is_readable(path):
                print_error("[ERROR] The file '%s' is not readable!" % (path))
                return False
            return True
    path = get_path(path, file)
    if not os.path.isfile(path):
        print_error("[ERROR] The file '%s' does not exist!" % path)
        return False
    else:
        if not is_readable(path):
            print_error("[ERROR] The file '%s' is not readable!" % (path))
            return False
        return True
 
def check_permission(path, permission):
    """Check if the given permission is allowed for path"""
    if os.path.exists(path):
        return os.access(path, permission)
    else:
        return False
 
def is_readable(path):
    """Check if path is readable"""
    return check_permission(path, os.R_OK)
 
def is_writable(path):
    """Check if path is writable"""
    return check_permission(path, os.W_OK)
 
def is_executable(path):
    """Check if path is executable"""
    return check_permission(path, os.X_OK)
 
def get_path(path, file=None):
    """Get the absolute path of a given path and an optional file"""
    if not file:
        return abspath(os.path.expanduser(path))
    return abspath(os.path.expanduser(pjoin(path, file)))

Ausführliches Beispiel für Verwendung von Argumenten

argparse_example.py
import sys
import os.path
import argparse
 
class Settings():
    pass
 
def get_path(path):
    return
 
def check_path(path, force=False):
    return
 
def is_valid_file(parser, arg):
    """Helper function for argparse to check if a file exists"""
    if not os.path.isfile(arg):
        parser.error('The file %s does not exist!' % arg)
    else:
        #return open(arg, 'r')
        return arg
 
def is_valid_dir(parser, arg):
    """Helper function for argparse to check if a directory exists"""
    if not os.path.isdir(os.path.expanduser(arg)):
        parser.error('The directory %s does not exist!' % arg)
    else:
        return os.path.expanduser(arg)
 
def main():
    """Main function, argparse demonstration"""
    parser = argparse.ArgumentParser(description='Example method to demonstrate some '
                                     'some of the capabilities of argsparse')
    parser.add_argument('-c', '--config', nargs=1, metavar='config_file',
                        dest='config', required=True,
                        type=lambda x: is_valid_file(parser, x),
                        help='Required: Specify a config file')
    parser.add_argument('-o', '--output', nargs=1, metavar='output_directory',
                        type=lambda x: is_valid_dir(parser, x),
                        help='Optional: Custom output directory')
    parser.add_argument('-l', '--list', nargs='?', const=True,
                        help='List the amount of existing entries in files'
                        'in the output directory and exit; if "all" is specified'
                        'as an optional argument, the amount of entries will be listed')
    parser.add_argument('-e', '--example-config', nargs='?', const='example.config',
                        help='Export an example config file and exit. '
                        'If no file is specified, the output will be written to "example.config"')
    parser.add_argument('-n', '--number', type=int, nargs=1, metavar='integer',
                        help='Number of entries')
    parser.add_argument('-q', '--queue', type=str, nargs=1, metavar='queue',
                        help='The used queue system')
    parser.add_argument('-f', '--force', action='store_true',
                        help='Force creation of directories if they do not exist'
                        'or overwrite existing files')
    parser.add_argument('-v', '--verbose', action='store_true',
                        help='Print additional output')
 
    args = parser.parse_args()
    verbose = args.verbose
    force = args.force
 
    if args.example_config:
        print('[INFO] Write example config to file "%s"' % args.example_config)
        sys.exit(0)
 
    config = args.config
    settings = None
    if verbose:
        print('Use config file %s' % config)
    with open(config, 'r') as conf:
        settings = settings.read_config(conf)
 
    if args.list:
        if verbose and args.list == 'all':
            print('Trying to determine the total amount file entries')
        if args.output:
            settings.set('OUTPUT_PATH', get_path(args.output[0]))
        list_file_amount(settings, args.list == 'all')
        sys.exit(0)
 
    if args.queue:
        if verbose:
            print('Use the queue', args.queue[0])
        settings.set('QUEUE', args.queue[0])
 
    if args.number:
        if verbose:
            print('%d entries specified', args.number[0])
        settings.set('NUMBER', args.number[0])
 
    if verbose:
        print('The following settings will be used:')
        settings.print()
 
    if args.output:
        if not check_path(args.output[0], force):
            sys.exit('The output directory %s cannot be used' % args.output[0])
        output = get_path(args.output[0])
        print('Setting custom output directory: %s' % output)

Farbige Ausgaben und Logging

colored_output.py
#!/usr/bin/env python
# vim: set ai ts=4 sw=4 sts=4 noet fileencoding=utf-8 ft=python
 
'''
This module provides functions to color string in the terminal output
and provides a logging class with colored strings
'''
 
__version__ = '1.0'
 
# Colored output
#The background is set with 40 plus the number of the color, and the foreground with 30
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
 
#These are the sequences need to get colored ouput
RESET_SEQ = "\033[0m"
COLOR_SEQ = "\033[1;%dm"
BOLD_SEQ = "\033[1m"
 
COLORS = {
    'WARNING': YELLOW,
    'INFO': WHITE,
    'DEBUG': BLUE,
    'CRITICAL': YELLOW,
    'ERROR': RED,
    'RED': RED,
    'GREEN': GREEN,
    'YELLOW': YELLOW,
    'BLUE': BLUE,
    'MAGENTA': MAGENTA,
    'CYAN': CYAN,
    'WHITE': WHITE
}
 
def color_string(string, color):
    fmt = ''
    if color in COLORS:
        fmt = COLOR_SEQ % (30 + COLORS[color]) + string + RESET_SEQ
    elif color in range(8):
        fmt = COLOR_SEQ % (30 + color) + string + RESET_SEQ
    else:
        fmt = string
    return fmt
 
def bold_string(string):
    return BOLD_SEQ + string + RESET_SEQ
 
def print_color(string, color):
    print(color_string(string, color))
 
import sys
 
def print_error(string):
    print(color_string(string, RED), file=sys.stderr)
 
import logging
 
class ColoredFormatter(logging.Formatter):
    def __init__(self, *args, **kwargs):
        # can't do super(...) here because Formatter is an old school class
        logging.Formatter.__init__(self, *args, **kwargs)
 
    def format(self, record):
        levelname = record.levelname
        color     = COLOR_SEQ % (30 + COLORS[levelname])
        record.levelname = levelname.center(8)
        message   = logging.Formatter.format(self, record)
        message   = message.replace("$RESET", RESET_SEQ)\
                           .replace("$BOLD",  BOLD_SEQ)\
                           .replace("$COLOR", color)
        for k,v in COLORS.items():
            message = message.replace("$" + k,    COLOR_SEQ % (v+30))\
                             .replace("$BG" + k,  COLOR_SEQ % (v+40))\
                             .replace("$BG-" + k, COLOR_SEQ % (v+40))
        return message + RESET_SEQ
 
# Custom logger class with multiple destinations
class ColoredLogger(logging.Logger):
    FORMAT = "[%(asctime)s][$COLOR%(levelname)s$RESET]  $BOLD%(message)s$RESET (%(filename)s:%(lineno)d)"
    # format with milliseconds (only 3 digits) using dot instead of comma, DATEFORMAT below needed to remove milliseconds from asctime
    #FORMAT = "[%(asctime)s.%(msecs).03d][$COLOR%(levelname)s$RESET]  $BOLD%(message)s$RESET (%(filename)s:%(lineno)d)"
    DATEFORMAT = '%Y-%m-%d %H:%M:%S'
    def __init__(self, name):
        logging.Logger.__init__(self, name, logging.DEBUG)
 
        color_formatter = ColoredFormatter(self.FORMAT)
        # milliseconds won't be shown with the below command
        #color_formatter = ColoredFormatter(fmt=self.FORMAT, datefmt=self.DATEFORMAT)
 
        console = logging.StreamHandler()
        console.setFormatter(color_formatter)
 
        self.addHandler(console)
        return
 
 
logging.setLoggerClass(ColoredLogger)
 
logger = logging.getLogger('Simulation')
logger.setLevel(logging.DEBUG)
logger.warning("This is a warning")
logger.error("ERROR!")
logger.info("Just an info")
logger.critical("Oh oh")
logger.debug("Debug message")
 
print_color("just a cyan colored info", 'CYAN')
print_color("and a magenta/purple one, too", MAGENTA)
 
print_error('[ERROR] This will be written to stderr')
 
#str = "this is a test string"
#for i in range(8):
#        print(COLOR_SEQ % (30 + i) + str + RESET_SEQ)

subprocess Beispiele

subprocess_example.py
#!/usr/bin/env python
 
import subprocess
 
# use sequence of arguments
ret = subprocess.call(['echo', 'Hello world!'])
# shell=True allows to use simple string for the command
ret = subprocess.call('echo Hello world!', shell=True)
print()
 
command = 'date'
print(command)
# checks output, raises error if nonzero return code
output = subprocess.check_output(command, shell=True) 
 
p = subprocess.Popen('date', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) 
out, err = p.communicate() 
returncode = p.returncode
 
#return_code = p.wait() 
 
print(output.decode('utf-8')) 
print('out:', output.decode('utf-8')) 
print('err:', err.decode('utf-8')) 
print('status:', returncode) 
 
 
# write to stdout and logfile
 
logfile = open('logfile', 'w')
proc=subprocess.Popen(['cat', 'file'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in proc.stdout:
    sys.stdout.write(line)
    logfile.write(line)
proc.wait()