master
1import asyncio
2import socket
3import etcd3
4from aiohttp import web
5
6def find_free_port():
7 s = socket.socket()
8 s.bind(("", 0))
9 port = s.getsockname()[1]
10 s.close()
11 return port
12
13class ClosingHandler:
14 def __init__(self, exit_event):
15 self.exit_event = exit_event
16
17 def next_port():
18 return self.next_port
19
20 async def default(self, request):
21 self.exit_event.set()
22 self.next_port = find_free_port()
23 return web.HTTPOk(
24 body=f"HELLO, GOODBYE {self.next_port}\n"
25 )
26
27#TODO traefik class
28def setup_traefik_srs(etcd):
29 etcd.delete_prefix("traefik/http/routers/srs/")
30 etcd.delete_prefix("traefik/http/services/srs/")
31 etcd.put("traefik/http/middlewares/retry502/buffering/retryExpression", "ResponseCode() == 502")
32 etcd.put("traefik/http/routers/srs/rule","Host(`srs.trustme.click`)")
33 etcd.put("traefik/http/routers/srs/service","srs")
34 etcd.put("traefik/http/routers/srs/entrypoints/0","websecure")
35 etcd.put("traefik/http/routers/srs/tls/certResolver","tmc-acme-http")
36 etcd.put("traefik/http/routers/srs/middlewares/0","retry502")
37 print("etcd srs service setup complete")
38
39def add_traefik_srs_url(etcd, address, port):
40 url_key = f"traefik/http/services/srs/loadBalancer/servers/{port}/url"
41 url_value = f"http://{address}:{port}"
42 etcd.put(url_key, url_value)
43
44def del_traefik_srs_url(etcd, port):
45 url_key = f"traefik/http/services/srs/loadBalancer/servers/{port}/url"
46 etcd.transaction(
47 compare=[
48 etcd.transactions.value("__invalid__") == "__invalid__"
49 ],
50 success=[],
51 failure=[
52 etcd.transactions.delete(url_key)
53 ], )
54 print(etcd.get(url_key))
55
56
57closers = []
58
59# single request server
60async def srs(etcd, port=None):
61
62 # create app handler and closing event
63 closing_time = asyncio.Event()
64 closing_task = asyncio.create_task(closing_time.wait())
65 closers.append(closing_time)
66 handler = ClosingHandler(closing_time)
67
68 # add handler to new application runner
69 app = web.Application()
70 app.add_routes([web.get("/", handler.default)])
71 runner = web.AppRunner(app)
72 await runner.setup()
73
74 # start app on a specific tcp port
75 if port == None:
76 port = find_free_port()
77 site = web.TCPSite(runner, "localhost", port)
78 print(f"{site.name} starting")
79 await site.start()
80
81 # setup traefik access
82 add_traefik_srs_url(etcd, "localhost", port)
83
84 # wait for closing event
85 await closing_task
86 print(f"{site.name} closing")
87 del_traefik_srs_url(etcd, port)
88 await runner.cleanup()
89
90 await srs(etcd, handler.next_port)
91
92
93if __name__ == "__main__":
94 loop = asyncio.get_event_loop()
95 etcd = etcd3.client()
96 setup_traefik_srs(etcd)
97
98 for i in range(100):
99 loop.create_task(srs(etcd))
100
101 try:
102 loop.run_forever()
103 except:
104 print("\nexiting...")
105 finally:
106 for c in closers:
107 c.set() # useful event to also call runner.cleanup
108 etcd.close()