Source code for toxiccore.shell
# -*- coding: utf-8 -*-
# Copyright 2023 Juca Crispim <juca@poraodojuca.net>
# This file is part of toxiccore.
# toxiccore is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# toxiccore is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with toxiccore. If not, see <http://www.gnu.org/licenses/>.
import asyncio
from asyncio import ensure_future
from asyncio.exceptions import LimitOverrunError, IncompleteReadError
import os
import subprocess
from .exceptions import ExecCmdError
from .utils import get_envvars, run_in_thread
[docs]async def exec_cmd(cmd, cwd, timeout=3600, out_fn=None, **envvars):
""" Executes a shell command. Raises with the command output
if return code > 0.
:param cmd: command to run.
:param cwd: Directory to execute the command.
:param timeout: How long we should wait some output. Default
is 3600.
:param out_fn: A coroutine that receives each line of the step
output. The coroutine signature must be in the form:
mycoro(line_index, line).
:param envvars: Environment variables to be used in the command.
"""
proc = await _create_cmd_proc(cmd, cwd, **envvars)
out = []
try:
line_index = 0
done = False
while not done:
outline, done = await asyncio.wait_for(_readline(proc.stdout),
timeout)
outline = outline.decode()
if out_fn:
ensure_future(out_fn(line_index, outline))
line_index += 1
out.append(outline)
output = ''.join(out).strip('\n')
finally:
# we must ensure that all process started by our command are
# dead.
await _kill_group(proc)
await proc.wait()
if int(proc.returncode) > 0:
raise ExecCmdError(output)
return output
async def _create_cmd_proc(cmd, cwd, **envvars):
"""Creates a process that will execute a command in a shell.
:param cmd: command to run.
:param cwd: Directory to execute the command.
:param envvars: Environment variables to be used in the command.
"""
envvars = get_envvars(envvars)
proc = await asyncio.create_subprocess_shell(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd,
env=envvars, preexec_fn=os.setsid)
return proc
async def _kill_group(process):
"""Kills all processes of the group which a process belong.
:param process: A process that belongs to the group you want to kill.
"""
def fn():
try:
pgid = os.getpgid(process.pid)
os.killpg(pgid, 9)
except ProcessLookupError: # pragma no cover
pass
await run_in_thread(fn)
async def _try_readline(stream):
sep = b'\n'
try:
r = await stream.readuntil(sep)
except LimitOverrunError:
sep = b'\r'
r = await stream.readuntil(sep)
return r
async def _readline(stream):
"""Reads a line from the stream buffer. Tries to read a line
ending in '\n'. If no new line found, try to find '\r'.
:param stream: The StreamReader to read from.
"""
# basically taken from asyncio.streams.StreamReader.readline
lf, cr = b'\n', b'\r'
seplen = 1
try:
line = await _try_readline(stream)
except IncompleteReadError as e:
return e.partial, True
except LimitOverrunError as e:
if stream._buffer.startswith(lf, e.consumed) or \
stream._buffer.startswith(cr, e.consumed):
del stream._buffer[:e.consumed + seplen]
else:
stream._buffer.clear()
stream._maybe_resume_transport()
raise ValueError(e.args[0])
return line, False