Source code for toxiccore.protocol
# -*- coding: utf-8 -*-
# Copyright 2015, 2017, 2023 Juca Crispim <juca@poraodojuca.net>
# This file is part of toxicbuild.
# toxicbuild is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# toxicbuild is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with toxicbuild. If not, see <http://www.gnu.org/licenses/>.
import asyncio
from asyncio import ensure_future
from collections import OrderedDict
import json
import time
import traceback
from .socks import read_stream, write_stream
from .utils import LoggerMixin, compare_bcrypt_string
[docs]class BaseToxicProtocol(asyncio.StreamReaderProtocol, LoggerMixin):
""" Base protocol for toxicbulid servers. To create your own protocol
extend this class and implement the client_connected method.
Example:
--------
.. code-block:: python
class MyServer(BaseToxicProtocol):
async def client_connected(self):
assert self.action
r = await fn(**self.data)
# code == 0 means everything ok.
# code > 0 means something went wrong
self.send_response(code=0, body={'some': 'thing'})
"""
# This is the token used to authenticate incomming requests.
encrypted_token = None
def __init__(self, loop, connection_lost_cb=None):
""":param loop: An asyncio loop.
:param connection_lost_cb: Callable to be executed when connection
is closed. Gets an exception or None as parameter."""
self.raw_data = None
self.json_data = None
self.action = None
self._connected = False
# make tests easier
self._check_data_future = None
self._client_connected_future = None
self.connection_lost_cb = connection_lost_cb
self.data = None
self.peername = None
self._transport = None
self._writer_lock = asyncio.Lock()
self._reader = asyncio.StreamReader(loop=loop)
super().__init__(self._reader, loop=loop)
def __call__(self):
return self
[docs] def connection_made(self, transport):
""" Called once, when the client connects.
:param transport: transport for asyncio.StreamReader and
asyncio.StreamWriter.
"""
self._transport = transport
self._over_ssl = transport.get_extra_info('sslcontext') is not None
self._stream_reader.set_transport(transport)
self._stream_writer = asyncio.StreamWriter(transport, self,
self._stream_reader,
self._loop)
self._connected = True
self.peername = self._transport.get_extra_info('peername')
self._check_data_future = ensure_future(self.check_data())
self._check_data_future.add_done_callback(self._check_data_cb)
[docs] def connection_lost(self, exc):
"""Called once, when the connection is lost.
:param exc: The exception, if some."""
self._reader = None
self.close_connection()
super().connection_lost(exc)
self.log('Connection lost', level='debug')
if self.connection_lost_cb:
self.log('Executing connection_lost_cb', level='debug')
self.connection_lost_cb(exc)
[docs] async def check_data(self):
""" Checks if the data is valid, it means, checks if has some data,
checks if it is a valid json and checks if it has a ``action`` key
"""
self.data = await self.get_json_data()
if not self.data:
self.log('Bad data', level='warning')
self.log(self.raw_data, level='debug')
msg = 'Something wrong with your data {!r}'.format(self.raw_data)
await self.send_response(code=1, body={'error': msg})
return self.close_connection()
token = self.data.get('token')
if not token:
msg = 'No auth token'
self.log(msg, level='warning')
await self.send_response(code=2, body={'error': msg})
return self.close_connection()
if not compare_bcrypt_string(token, self.encrypted_token):
msg = 'Bad auth token'
self.log(msg, level='warning')
await self.send_response(code=3, body={'error': msg})
return self.close_connection()
self.action = self.data.get('action')
if not self.action:
msg = 'No action found!'
self.log(msg, level='warning')
await self.send_response(code=1, body=msg)
return self.close_connection()
[docs] async def client_connected(self): # pragma no cover
""" Coroutine that handles connections. You must implement this
in your sub-classes. When this method is called, ``self.data``,
containing a dictionary with the data passed in the request and
``self.action``, a string indicating which action to take are already
available.
"""
raise NotImplementedError
[docs] def close_connection(self):
""" Closes the connection with the client
"""
if self._stream_writer:
self._stream_writer.close()
self._connected = False
[docs] async def send_response(self, code, body, timeout=None):
""" Send a response to client formated by the (unknown) toxicbuild
remote build specs.
:param code: code for this message. code == 0 is success and
code > 0 is error.
:param body: response body. It has to be a serializable object.
:param timeout: Timeout for the operation. If None there is no timeout
"""
response = OrderedDict()
response['code'] = code
response['body'] = body
data = json.dumps(response)
# drain() cannot be called concurrently by multiple coroutines:
# http://bugs.python.org/issue29930. Remove this lock when no
# version of Python where this bugs exists is supported anymore.
# patch by @RemiCardona for websockets on github.
async with self._writer_lock:
await write_stream(self._stream_writer, data,
timeout=timeout)
[docs] async def get_raw_data(self, timeout=None):
""" Returns the raw data sent by the client
:param timeout: Timeout for the operation. If None there is no timeout
"""
r = await read_stream(self._stream_reader, timeout=timeout)
return r
[docs] async def get_json_data(self, timeout=None):
"""Returns the json sent by the client.
:param timeout: Timeout for the operation. If None there is no timeout
"""
data = await self.get_raw_data(timeout=timeout)
data = data.decode()
try:
data = json.loads(data)
except Exception:
msg = '{}\n{}'.format(traceback.format_exc(), data)
self.log(msg, level='error')
data = None
return data
def _check_data_cb(self, future):
# The thing here is: run client_connected only if everything ok
# on check_data
if not self._connected: # pragma no cover
self.log('Not connected', level='warning')
return
# wrapping it to log it.
async def logged_cb():
init = (time.time() * 1e3)
try:
status = await self.client_connected()
except ConnectionResetError:
status = 1
msg = 'Connection reset'
self.log(msg, level='debug')
end = (time.time() * 1e3)
self.log('{}: {} {}'.format(self.action, status, (end - init)))
self._client_connected_future = ensure_future(logged_cb())