test_dropbear.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import subprocess
  2. import os
  3. import pty
  4. import tempfile
  5. import logging
  6. import time
  7. import socketserver
  8. import threading
  9. import queue
  10. import pytest
  11. LOCALADDR="127.0.5.5"
  12. @pytest.fixture(scope="module")
  13. def dropbear(request):
  14. opt = request.config.option
  15. if opt.remote:
  16. yield None
  17. return
  18. # split so that "dropbearmulti dropbear" works
  19. args = opt.dropbear.split() + [
  20. "-p", LOCALADDR, # bind locally only
  21. "-r", opt.hostkey,
  22. "-p", opt.port,
  23. "-F", "-E",
  24. ]
  25. p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True)
  26. # Wait until it has started listening
  27. for l in p.stderr:
  28. if "Not backgrounding" in l:
  29. break
  30. # Check it's still running
  31. assert p.poll() is None
  32. # Ready
  33. yield p
  34. p.terminate()
  35. print("Terminated dropbear. Flushing output:")
  36. for l in p.stderr:
  37. print(l.rstrip())
  38. print("Done")
  39. def dbclient(request, *args, **kwargs):
  40. opt = request.config.option
  41. host = opt.remote or LOCALADDR
  42. # split so that "dropbearmulti dbclient" works
  43. base_args = opt.dbclient.split() + ["-y", host, "-p", opt.port]
  44. if opt.user:
  45. base_args.extend(['-l', opt.user])
  46. full_args = base_args + list(args)
  47. bg = kwargs.get("background")
  48. if "background" in kwargs:
  49. del kwargs["background"]
  50. if bg:
  51. return subprocess.Popen(full_args, **kwargs)
  52. else:
  53. kwargs.setdefault("timeout", 10)
  54. # wait for response
  55. return subprocess.run(full_args, **kwargs)
  56. def own_venv_command():
  57. """ Returns a command to run as a prefix to get the same venv
  58. as the current running Python. Returns '' on not a virtualenv
  59. """
  60. try:
  61. venv = os.environ['VIRTUAL_ENV']
  62. except KeyError:
  63. return ""
  64. # note: bash/zsh unix specific
  65. return f"source {venv}/bin/activate"
  66. class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer):
  67. # override TCPServer's default, avoids TIME_WAIT
  68. allow_reuse_addr = True
  69. """ Listens for a single incoming request, sends a response if given,
  70. and returns the inbound data.
  71. Reponse can be a queue object, in which case each item in the queue will
  72. be sent as a response, until it receives a None item.
  73. """
  74. def __init__(self, port, timeout, response=None):
  75. super().__init__(('localhost', port), self.Handler)
  76. self.port = port
  77. self.timeout = timeout
  78. self.response = response
  79. self.sink = None
  80. class Handler(socketserver.StreamRequestHandler):
  81. def handle(self):
  82. if isinstance(self.server.response, queue.Queue):
  83. while True:
  84. i = self.server.response.get()
  85. if i is None:
  86. break
  87. self.wfile.write(i)
  88. elif self.server.response:
  89. self.wfile.write(self.server.response)
  90. assert self.server.sink is None, ">1 request sent to handler"
  91. self.server.sink = self.rfile.read()
  92. def __enter__(self):
  93. self.server_thread = threading.Thread(target=self.serve_forever)
  94. self.server_thread.daemon = True
  95. self.server_thread.start()
  96. return self
  97. def __exit__(self, *exc_stuff):
  98. self.shutdown()
  99. self.server_thread.join()
  100. def inbound(self):
  101. """ Returns the data sent to the socket """
  102. return self.sink
  103. def readall_socket(sock):
  104. b = []
  105. while True:
  106. i = sock.recv(4096)
  107. if not i:
  108. break
  109. b.append(i)
  110. return b''.join(b)
  111. # returns a str
  112. def random_alnum(size):
  113. r = os.urandom(500 + size*5)
  114. return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()