123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import subprocess
- import os
- import pty
- import tempfile
- import logging
- import time
- import socketserver
- import threading
- import queue
- import pytest
- LOCALADDR="127.0.5.5"
- @pytest.fixture(scope="module")
- def dropbear(request):
- opt = request.config.option
- if opt.remote:
- yield None
- return
- # split so that "dropbearmulti dropbear" works
- args = opt.dropbear.split() + [
- "-p", LOCALADDR, # bind locally only
- "-r", opt.hostkey,
- "-p", opt.port,
- "-F", "-E",
- ]
- p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True)
- # Wait until it has started listening
- for l in p.stderr:
- if "Not backgrounding" in l:
- break
- # Check it's still running
- assert p.poll() is None
- # Ready
- yield p
- p.terminate()
- print("Terminated dropbear. Flushing output:")
- for l in p.stderr:
- print(l.rstrip())
- print("Done")
- def dbclient(request, *args, **kwargs):
- opt = request.config.option
- host = opt.remote or LOCALADDR
- # split so that "dropbearmulti dbclient" works
- base_args = opt.dbclient.split() + ["-y", host, "-p", opt.port]
- if opt.user:
- base_args.extend(['-l', opt.user])
- full_args = base_args + list(args)
- bg = kwargs.get("background")
- if "background" in kwargs:
- del kwargs["background"]
- if bg:
- return subprocess.Popen(full_args, **kwargs)
- else:
- kwargs.setdefault("timeout", 10)
- # wait for response
- return subprocess.run(full_args, **kwargs)
- def own_venv_command():
- """ Returns a command to run as a prefix to get the same venv
- as the current running Python. Returns '' on not a virtualenv
- """
- try:
- venv = os.environ['VIRTUAL_ENV']
- except KeyError:
- return ""
- # note: bash/zsh unix specific
- return f"source {venv}/bin/activate"
- class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer):
- # override TCPServer's default, avoids TIME_WAIT
- allow_reuse_addr = True
- """ Listens for a single incoming request, sends a response if given,
- and returns the inbound data.
- Reponse can be a queue object, in which case each item in the queue will
- be sent as a response, until it receives a None item.
- """
- def __init__(self, port, timeout, response=None):
- super().__init__(('localhost', port), self.Handler)
- self.port = port
- self.timeout = timeout
- self.response = response
- self.sink = None
- class Handler(socketserver.StreamRequestHandler):
- def handle(self):
- if isinstance(self.server.response, queue.Queue):
- while True:
- i = self.server.response.get()
- if i is None:
- break
- self.wfile.write(i)
- elif self.server.response:
- self.wfile.write(self.server.response)
- assert self.server.sink is None, ">1 request sent to handler"
- self.server.sink = self.rfile.read()
- def __enter__(self):
- self.server_thread = threading.Thread(target=self.serve_forever)
- self.server_thread.daemon = True
- self.server_thread.start()
- return self
- def __exit__(self, *exc_stuff):
- self.shutdown()
- self.server_thread.join()
- def inbound(self):
- """ Returns the data sent to the socket """
- return self.sink
- def readall_socket(sock):
- b = []
- while True:
- i = sock.recv(4096)
- if not i:
- break
- b.append(i)
- return b''.join(b)
- # returns a str
- def random_alnum(size):
- r = os.urandom(500 + size*5)
- return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()
|