1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
#!/usr/bin/env python3
# Used by Test_channel_dap_mode in test_channel.mnv to test DAP functionality.
import json
import socket
import threading
import time
try:
import socketserver
except ImportError:
import SocketServer as socketserver
def make_dap_message(obj):
payload = json.dumps(obj).encode("utf-8")
header = f"Content-Length: {len(payload)}\r\n\r\n".encode("ascii")
return header + payload
def parse_messages(buffer):
messages = []
while True:
hdr_end = buffer.find(b"\r\n\r\n")
if hdr_end == -1:
break
header = buffer[:hdr_end].decode("ascii", errors="ignore")
content_length = None
for line in header.split("\r\n"):
if line.lower().startswith("content-length:"):
content_length = int(line.split(":")[1].strip())
if content_length is None:
break
total_len = hdr_end + 4 + content_length
if len(buffer) < total_len:
break # partial
body = buffer[hdr_end + 4:total_len]
messages.append(json.loads(body.decode("utf-8")))
buffer = buffer[total_len:]
return messages, buffer
class DAPHandler(socketserver.BaseRequestHandler):
def setup(self):
self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.seq = 1 # server sequence counter
def send(self, obj):
obj["seq"] = self.seq
self.seq += 1
self.request.sendall(make_dap_message(obj))
def send_response(self, request, body=None, success=True):
self.send({
"type": "response",
"request_seq": request["seq"],
"success": success,
"command": request["command"],
"body": body or {}
})
def send_event(self, event, body=None):
self.send({
"type": "event",
"event": event,
"body": body or {}
})
def handle_request(self, msg):
cmd = msg.get("command")
if cmd == "initialize":
self.send_response(msg, {
"supportsConfigurationDoneRequest": True
})
self.send_event("initialized")
else:
self.send_response(msg)
return True
def handle(self):
buffer = b""
while True:
data = self.request.recv(4096)
if not data:
break
buffer += data
messages, buffer = parse_messages(buffer)
for msg in messages:
if msg.get("type") == "request":
if not self.handle_request(msg):
return
class ThreadedDAPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
def write_port_to_file(port, filename="Xportnr"):
with open(filename, "w") as f:
f.write(str(port))
def main():
server = ThreadedDAPServer(("localhost", 0), DAPHandler)
# Get the actual assigned port
ip, assigned_port = server.server_address
# Write port so client/test can read it
write_port_to_file(assigned_port)
thread = threading.Thread(target=server.serve_forever)
thread.daemon = True
thread.start()
try:
while thread.is_alive():
thread.join(1)
except KeyboardInterrupt:
server.shutdown()
if __name__ == "__main__":
main()
|