master
Raw Download raw file
  1import asyncio
  2import socket
  3import etcd3
  4from aiohttp import web
  5
  6def find_free_port():
  7    s = socket.socket()
  8    s.bind(("", 0))
  9    port = s.getsockname()[1]
 10    s.close()
 11    return port
 12
 13class ClosingHandler:
 14  def __init__(self, exit_event):
 15    self.exit_event = exit_event
 16
 17  def next_port():
 18    return self.next_port
 19
 20  async def default(self, request):
 21      self.exit_event.set()
 22      self.next_port = find_free_port()
 23      return web.HTTPOk(
 24        body=f"HELLO, GOODBYE {self.next_port}\n"
 25      )
 26
 27#TODO traefik class
 28def setup_traefik_srs(etcd):
 29    etcd.delete_prefix("traefik/http/routers/srs/")
 30    etcd.delete_prefix("traefik/http/services/srs/")
 31    etcd.put("traefik/http/middlewares/retry502/buffering/retryExpression", "ResponseCode() == 502")
 32    etcd.put("traefik/http/routers/srs/rule","Host(`srs.trustme.click`)")
 33    etcd.put("traefik/http/routers/srs/service","srs")
 34    etcd.put("traefik/http/routers/srs/entrypoints/0","websecure")
 35    etcd.put("traefik/http/routers/srs/tls/certResolver","tmc-acme-http")
 36    etcd.put("traefik/http/routers/srs/middlewares/0","retry502")
 37    print("etcd srs service setup complete")
 38
 39def add_traefik_srs_url(etcd, address, port):
 40    url_key = f"traefik/http/services/srs/loadBalancer/servers/{port}/url"
 41    url_value = f"http://{address}:{port}"
 42    etcd.put(url_key, url_value)
 43
 44def del_traefik_srs_url(etcd, port):
 45    url_key = f"traefik/http/services/srs/loadBalancer/servers/{port}/url"
 46    etcd.transaction(
 47      compare=[
 48        etcd.transactions.value("__invalid__") == "__invalid__" 
 49      ], 
 50      success=[],
 51      failure=[
 52        etcd.transactions.delete(url_key)
 53      ], )
 54    print(etcd.get(url_key))
 55
 56
 57closers = [] 
 58
 59# single request server
 60async def srs(etcd, port=None):
 61
 62    # create app handler and closing event
 63    closing_time = asyncio.Event()
 64    closing_task = asyncio.create_task(closing_time.wait())
 65    closers.append(closing_time)
 66    handler = ClosingHandler(closing_time)
 67
 68    # add handler to new application runner
 69    app = web.Application()
 70    app.add_routes([web.get("/", handler.default)])
 71    runner = web.AppRunner(app)
 72    await runner.setup()
 73
 74    # start app on a specific tcp port
 75    if port == None:
 76      port = find_free_port()
 77    site = web.TCPSite(runner, "localhost", port)
 78    print(f"{site.name} starting")
 79    await site.start()
 80
 81    # setup traefik access
 82    add_traefik_srs_url(etcd, "localhost", port)
 83
 84    # wait for closing event
 85    await closing_task
 86    print(f"{site.name} closing")
 87    del_traefik_srs_url(etcd, port)
 88    await runner.cleanup()
 89
 90    await srs(etcd, handler.next_port)
 91
 92
 93if __name__ == "__main__":
 94    loop = asyncio.get_event_loop()
 95    etcd = etcd3.client() 
 96    setup_traefik_srs(etcd)
 97
 98    for i in range(100):
 99      loop.create_task(srs(etcd))
100
101    try:
102      loop.run_forever()
103    except:
104      print("\nexiting...")
105    finally:
106      for c in closers:
107        c.set() # useful event to also call runner.cleanup
108        etcd.close()