# -*- coding: utf-8 -*-
# Copyright 2015-2019, 2023 Juca Crispim <juca@poraodojuca.net>
# This file is part of toxicbuild.
# toxicbuild is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# toxicbuild is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with toxicbuild. If not, see <http://www.gnu.org/licenses/>.
import base64
from concurrent.futures import ThreadPoolExecutor
import copy
from datetime import datetime, timezone, timedelta
import fnmatch
import importlib
import logging
import os
import random
import tempfile
import string
import sys
import time
import warnings
import bcrypt
from .exceptions import ConfigError
from .socks import (read_stream as read_sock_stream,
write_stream as write_sock_stream)
DTFORMAT = '%w %m %d %H:%M:%S %Y %z'
_THREAD_EXECUTOR = ThreadPoolExecutor()
WRITE_CHUNK_LEN = 4096
READ_CHUNK_LEN = 1024
logger = logging.getLogger('toxicbuild')
# copy from mongomotor
[docs]class MonkeyPatcher:
def __init__(self):
self.patched = {}
# if the original patched object is a dict, indicates if
# we should merge the original dict with the dict existing
# when leaving the context manager.
self._update_original_dict = False
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
for obj, patches in self.patched.items():
for attr, origobj in patches.items():
if self._update_original_dict: # pragma no cover
current_obj = getattr(obj, attr)
if hasattr(current_obj, 'update'):
origobj.update(current_obj)
setattr(obj, attr, origobj)
[docs] def patch_item(self, obj, attr, newitem, undo=True):
"""Sets ``attr`` in ``obj`` with ``newitem``.
If not ``undo`` the item will continue patched
after leaving the context manager"""
NONE = object()
olditem = getattr(obj, attr, NONE)
if undo and olditem is not NONE: # pragma no branch
self.patched.setdefault(obj, {}).setdefault(attr, olditem)
setattr(obj, attr, newitem)
[docs]class SourceSuffixesPatcher(MonkeyPatcher):
"""We must to path ``SOURCE_SUFFIXES`` in the python ``importlib`` so
we can import source code files with extension other than `.py`, in
this case, namely `.conf`"""
SOURCE_SUFFIXES = ['.py', '.conf']
[docs] def patch_source_suffixes(self):
"""Patches the ``SOURCE_SUFFIXES`` constant in the module
:mod:`importlib._bootstrap_external`."""
self.patch_item(importlib._bootstrap_external, 'SOURCE_SUFFIXES',
self.SOURCE_SUFFIXES)
# patching it now!
patcher = SourceSuffixesPatcher()
patcher.patch_source_suffixes()
# class SettingsPatcher(MonkeyPatcher):
# """Patches the settings from pyrocumulus to use the same settings
# as toxibuild."""
# def patch_pyro_settings(self, settings):
# from pyrocumulus import conf as pyroconf
# self.patch_item(pyroconf, 'settings', settings)
[docs]def interpolate_dict_values(to_return, valued, base):
"""Interpolates the values of ``valued`` with values of ``base``.
:param to_return: A dictionary that will be updated with interpolated
values.
:param valued: A dict with values needing interpolation.
:param base: A dict with the base values to use in interpolation.
"""
for var, value in valued.items():
if var in value:
current = base.get(var, '')
value = value.replace(var, current)
to_return[var] = value
return to_return
[docs]def get_envvars(envvars, use_local_envvars=True):
"""Returns environment variables to be used in shell. Does the
interpolation of values using the current values from the envvar
and the values passed as parameters. """
if use_local_envvars:
newvars = copy.copy(os.environ)
else:
newvars = {}
newvars = interpolate_dict_values(newvars, envvars, os.environ)
return newvars
[docs]def load_module_from_content(module_content):
"""Loads a module from a string that is the contents of the module.
:param module_content: A string with the module content.
"""
# This is a shitty hack. I don't want to remove python build config
# but I don't want spend time on it. :P
with tempfile.NamedTemporaryFile(suffix='.py') as fd:
fd.write(module_content.encode())
fd.flush()
mod = load_module_from_file(fd.name)
return mod
[docs]def load_module_from_file(filename):
""" Load a module from a source file
:param filename: full path for file to be loaded.
"""
fname = filename.rsplit('.', 1)[0]
fname = fname.rsplit(os.sep, 1)[-1]
spec = importlib.util.spec_from_file_location(fname, filename)
module = importlib.util.module_from_spec(spec)
# source_file = importlib.machinery.SourceFileLoader(fname, filename)
try:
# module = source_file.load_module()
spec.loader.exec_module(module)
except FileNotFoundError:
err_msg = 'Config file "%s" does not exist!' % (filename)
raise FileNotFoundError(err_msg)
except Exception as e:
err_msg = 'There is something wrong with your file. '
err_msg += 'The original exception was:\n{}'.format(e.args[0])
raise ConfigError(err_msg)
return module
[docs]def set_loglevel(loglevel):
stdout_handler = logging.StreamHandler(sys.stdout)
# stderr_handler = logging.StreamHandler(sys.stderr)
logger.addHandler(stdout_handler)
# logger.addHandler(stderr_handler)
loglevel = getattr(logging, loglevel.upper())
logger.setLevel(loglevel)
for h in logger.handlers:
h.setLevel(loglevel)
[docs]def log(msg, level='info'):
log = getattr(logger, level)
dt = now().strftime('%Y-%m-%d %H:%M:%S')
lvl = level.upper()
msg = '[{}] {} - {}'.format(lvl, dt, msg)
log(msg)
[docs]class LoggerMixin:
"""A simple mixin to use log on a class."""
[docs] @classmethod
def log_cls(cls, msg, level='info'):
log('[{}] {} '.format(cls.__name__, msg), level)
[docs] def log(self, msg, level='info'):
"""Appends the class name before the log message. """
type(self).log_cls(msg, level)
[docs]def datetime2string(dt, dtformat=DTFORMAT):
"""Transforms a datetime object into a formated string.
:param dt: The datetime object.
:param dtformat: The format to use."""
if dt.utcoffset() is None:
tz = timezone(timedelta(seconds=0))
dt = dt.replace(tzinfo=tz)
return datetime.strftime(dt, dtformat)
[docs]def string2datetime(dtstr, dtformat=DTFORMAT):
"""Transforns a string into a datetime object acording to ``dtformat``.
:param dtstr: The string containing the formated date.
:param dtformat: The format of the formated date.
"""
return datetime.strptime(dtstr, dtformat)
[docs]def utc2localtime(utcdatetime):
"""Transforms a utc datetime object into a datetime object
in local time.
:param utcdatetime: A datetime object"""
off = time.localtime().tm_gmtoff
td = timedelta(seconds=off)
tz = timezone(td)
local = utcdatetime + td
localtime = datetime(local.year, local.month, local.day,
local.hour, local.minute, local.second,
local.microsecond,
tzinfo=tz)
return localtime
[docs]def localtime2utc(localdatetime):
"""Transforms a local datetime object into a datetime object
in utc time.
:param localdatetime: A datetime object."""
off = time.localtime().tm_gmtoff
td = timedelta(seconds=off)
utc = localdatetime - td
utctz = timezone(timedelta(seconds=0))
utctime = datetime(utc.year, utc.month, utc.day,
utc.hour, utc.minute, utc.second,
utc.microsecond,
tzinfo=utctz)
return utctime
[docs]def now():
""" Returns the localtime with timezone info. """
off = time.localtime().tm_gmtoff
tz = timezone(timedelta(seconds=off))
return datetime.now(tz=tz)
[docs]def set_tzinfo(dt, tzoff):
"""Sets a timezone info to a datetime object.
:param dt: A datetime object.
:para tzoff: The timezone offset from utc in seconds"""
tz = timezone(timedelta(seconds=tzoff))
tztime = datetime(dt.year, dt.month, dt.day,
dt.hour, dt.minute, dt.second,
dt.microsecond,
tzinfo=tz)
return tztime
# inherit_docs thanks to Raymond Hettinger on stackoverflow
# stackoverflow.com/questions/8100166/inheriting-methods-docstrings-in-python
[docs]def inherit_docs(cls):
""" Inherit docstrings from base classes' methods.
Can be used as a decorator
:param cls: Class that will inherit docstrings from its parents.
"""
for name, func in vars(cls).items():
if not func.__doc__:
for parent in cls.__bases__:
try:
parfunc = getattr(parent, name)
except Exception:
continue
if parfunc and getattr(parfunc, '__doc__', None):
func.__doc__ = parfunc.__doc__
break
return cls
[docs]def bcrypt_string(src_string, salt=None):
encoding = sys.getdefaultencoding()
if not salt:
salt = bcrypt.gensalt(12)
if isinstance(salt, str):
salt = salt.encode(encoding)
encrypted = bcrypt.hashpw(src_string.encode(encoding), salt)
return encrypted.decode()
[docs]def compare_bcrypt_string(original, encrypted):
"""Compares if a un-encrypted string matches an encrypted one.
:param original: An un-encrypted string.
:param encrypted: An bcrypt encrypted string."""
return bcrypt.checkpw(original.encode(), encrypted.encode())
[docs]def create_random_string(length):
valid_chars = string.ascii_letters + string.digits
random_str = ''.join([line for i in range(length)
for line in random.choice(valid_chars)])
return random_str
[docs]def create_validation_string(secret):
"""Creates a random string that can be used to validate
against it. The algorithm is as follows:
Given a secret, a random string is generated, then <secret>-<random-str>
are encrypted using bcrypt. Finally <encrypted-str>:<random-str>
are base64 encoded.
"""
random_str = create_random_string(12)
enc = bcrypt_string('{}-{}'.format(secret, random_str))
final = base64.encodebytes(
'{}:{}'.format(enc, random_str).encode('utf-8')).decode()
return final
[docs]def validate_string(b64_str, secret):
"""Validates a string created with
:func:`~..utils.create_validation_string`.
Given a base64 string the validation is as follows:
First decodes the base64 string in <encrypted-string>:<random-str> then
bcrypt-compare <secret>-<random-str> with <encrypted-string>
"""
try:
real = base64.decodebytes(b64_str.encode()).decode()
enc, random_sr = real.split(':')
except Exception as e:
log('Error validating string: {}'.format(str(e)), level='error')
return False
else:
return compare_bcrypt_string('{}-{}'.format(secret, random_sr), enc)
[docs]class changedir(object):
def __init__(self, path):
self.old_dir = os.getcwd()
self.current_dir = path
def __enter__(self):
os.chdir(self.current_dir)
def __exit__(self, *a, **kw):
os.chdir(self.old_dir)
[docs]def match_string(smatch, filters):
"""Checks if a string match agains a list
of filters containing wildcards.
:param smatch: String to test against the filters
:param filters: Filter to match a string."""
return any([fnmatch.fnmatch(smatch, f) for f in filters])
[docs]class MatchKeysDict(dict):
"""A dictionary that returns the values matching the keys using
:meth:`..utils.match_string`.
.. code-block:: python
>>> d = MatchKeysDict()
>>> d['k*'] = 1
>>> d['key']
1
>>> k['keyboard']
1
"""
def __getitem__(self, key):
for k in self.keys():
if match_string(key, [k]):
return super().__getitem__(k)
return super().__getitem__(key)
[docs] def get(self, key, default=None):
try:
r = self[key]
except KeyError:
r = default
return r
[docs]async def run_in_thread(fn, *args, **kwargs):
"""Runs a callable in a background thread.
:param fn: A callable to be executed in a thread.
:param args: Positional arguments to ``fn``
:param kwargs: Named arguments to ``fn``
Usage
.. code-block:: python
r = await run_in_thread(call, 1, bla='a')
"""
f = _THREAD_EXECUTOR.submit(fn, *args, **kwargs)
return f
[docs]async def read_file(filename):
"""Reads the contents of a file asynchronously.
:param filename: The path of the file."""
def _read(filename):
with open(filename) as fd:
contents = fd.read()
return contents
contents = (await run_in_thread(_read, filename)).result()
return contents
# deprecated stuff
[docs]async def exec_cmd(cmd, cwd, timeout=3600, out_fn=None, **envvars):
""" Executes a shell command. Raises with the command output
if return code > 0.
:param cmd: command to run.
:param cwd: Directory to execute the command.
:param timeout: How long we should wait some output. Default
is 3600.
:param out_fn: A coroutine that receives each line of the step
output. The coroutine signature must be in the form:
mycoro(line_index, line).
:param envvars: Environment variables to be used in the command.
DEPRECATED: Use :func:`toxiccore.shell.exec_cmd`
"""
m = "toxiccore.utils.exec_cmd is deprecated. Use toxiccore.shell.exec_cmd"
warnings.warn(m, DeprecationWarning)
from .shell import exec_cmd as exec_sh_cmd
r = await exec_sh_cmd(cmd, cwd, timeout=timeout, out_fn=out_fn, **envvars)
return r
[docs]async def read_stream(reader, timeout=None): # pragma no cover
""" Reads the input stream. First reads the bytes until the first "\\n".
These first bytes are the length of the full message.
:param reader: An instance of :class:`asyncio.StreamReader`
:param timeout: Timeout for the operation. If None there is no timeout
DEPRECATED: Use :func:`toxiccore.socks.read_stream`
"""
m = "toxiccore.utils.read_string is deprecated. "
m += "Use toxiccore.socks.read_stream"
warnings.warn(m, DeprecationWarning)
r = await read_sock_stream(reader, timeout=timeout)
return r
[docs]async def write_stream(writer, data, timeout=None): # pragma no cover
""" Writes ``data`` to output. Encodes data to utf-8 and prepend the
lenth of the data before sending it.
:param writer: An instance of asyncio.StreamWriter
:param data: String data to be sent.
:param timeout: Timeout for the write operation. If None there is
no timeout
DEPRECATED: Use :func:`toxiccore.socks.write_stream`
"""
m = "toxiccore.utils.write_string is deprecated. "
m += "Use toxiccore.socks.write_stream"
warnings.warn(m, DeprecationWarning)
r = await write_sock_stream(writer, data, timeout=timeout)
return r
# Sorry, but not willing to test a daemonizer.
[docs]def daemonize(call, cargs, ckwargs, stdout, stderr,
workdir, pidfile): # pragma: no cover
""" Run a callable as a daemon
:param call: a callable.
:param cargs: args to ``call``.
:param ckwargs: kwargs to ``call``.
:param stdout: daemon's stdout.
:param stderr: daemon's stderr.
:param workdir: daemon's workdir
:param pidfile: pidfile's path
"""
_create_daemon(stdout, stderr, workdir)
pid = os.getpid()
with open(pidfile, 'w') as f:
f.write(str(pid))
call(*cargs, **ckwargs)
def _create_daemon(stdout, stderr, workdir): # pragma: no cover
_fork_off_and_die()
os.setsid()
_fork_off_and_die()
os.umask(0)
os.chdir(workdir)
_redirect_file_descriptors(stdout, stderr)
def _fork_off_and_die(): # pragma: no cover
pid = os.fork()
if pid != 0:
sys.exit(0)
def _redirect_file_descriptors(stdout, stderr): # pragma: no cover
for fd in sys.stdout, sys.stderr:
fd.flush()
sys.stdout = open(stdout, 'a', 1)
sys.stderr = open(stderr, 'a', 1)