From a6c558e9a1db5ae38c5011d42586188552efc4ab Mon Sep 17 00:00:00 2001 From: Tom Wang Date: Sun, 22 Mar 2020 22:24:55 -0700 Subject: [PATCH] Initial commit --- app_client.py | 72 ++++++++++++++++ app_server.py | 59 +++++++++++++ libclient.py | 215 ++++++++++++++++++++++++++++++++++++++++++++++++ libserver.py | 223 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 569 insertions(+) create mode 100644 app_client.py create mode 100644 app_server.py create mode 100644 libclient.py create mode 100644 libserver.py diff --git a/app_client.py b/app_client.py new file mode 100644 index 0000000..8217df3 --- /dev/null +++ b/app_client.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +""" +Created on Sun Mar 22 10:46:20 2020 + +@author: cpan +""" + +import sys +import socket +import selectors +import traceback + +import libclient + +sel = selectors.DefaultSelector() + + +def create_request(action, value): + if action == "search": + return dict( + type="text/json", + encoding="utf-8", + content=dict(action=action, value=value), + ) + else: + return dict( + type="binary/custom-client-binary-type", + encoding="binary", + content=bytes(action + value, encoding="utf-8"), + ) + + +def start_connection(host, port, request): + addr = (host, port) + print("starting connection to", addr) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setblocking(False) + sock.connect_ex(addr) + events = selectors.EVENT_READ | selectors.EVENT_WRITE + message = libclient.Message(sel, sock, addr, request) + sel.register(sock, events, data=message) + + +if len(sys.argv) != 5: + print("usage:", sys.argv[0], " ") + sys.exit(1) + +host, port = sys.argv[1], int(sys.argv[2]) +action, value = sys.argv[3], sys.argv[4] +request = create_request(action, value) +start_connection(host, port, request) + +try: + while True: + events = sel.select(timeout=1) + for key, mask in events: + message = key.data + try: + message.process_events(mask) + except Exception: + print( + "main: error: exception for", + f"{message.addr}:\n{traceback.format_exc()}", + ) + message.close() + # Check for a socket being monitored to continue. + if not sel.get_map(): + break +except KeyboardInterrupt: + print("caught keyboard interrupt, exiting") +finally: + sel.close() \ No newline at end of file diff --git a/app_server.py b/app_server.py new file mode 100644 index 0000000..721d21a --- /dev/null +++ b/app_server.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +""" +Created on Sun Mar 22 10:43:35 2020 + +@author: cpan +""" + +import sys +import socket +import selectors +import traceback + +import libserver + +sel = selectors.DefaultSelector() + + +def accept_wrapper(sock): + conn, addr = sock.accept() # Should be ready to read + print("accepted connection from", addr) + conn.setblocking(False) + message = libserver.Message(sel, conn, addr) + sel.register(conn, selectors.EVENT_READ, data=message) + + +if len(sys.argv) != 3: + print("usage:", sys.argv[0], " ") + sys.exit(1) + +host, port = sys.argv[1], int(sys.argv[2]) +lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +# Avoid bind() exception: OSError: [Errno 48] Address already in use +lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +lsock.bind((host, port)) +lsock.listen() +print("listening on", (host, port)) +lsock.setblocking(False) +sel.register(lsock, selectors.EVENT_READ, data=None) + +try: + while True: + events = sel.select(timeout=None) + for key, mask in events: + if key.data is None: + accept_wrapper(key.fileobj) + else: + message = key.data + try: + message.process_events(mask) + except Exception: + print( + "main: error: exception for", + f"{message.addr}:\n{traceback.format_exc()}", + ) + message.close() +except KeyboardInterrupt: + print("caught keyboard interrupt, exiting") +finally: + sel.close() \ No newline at end of file diff --git a/libclient.py b/libclient.py new file mode 100644 index 0000000..222f746 --- /dev/null +++ b/libclient.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- +""" +Created on Sun Mar 22 10:45:45 2020 + +@author: cpan +""" + +import sys +import selectors +import json +import io +import struct + + +class Message: + def __init__(self, selector, sock, addr, request): + self.selector = selector + self.sock = sock + self.addr = addr + self.request = request + self._recv_buffer = b"" + self._send_buffer = b"" + self._request_queued = False + self._jsonheader_len = None + self.jsonheader = None + self.response = None + + def _set_selector_events_mask(self, mode): + """Set selector to listen for events: mode is 'r', 'w', or 'rw'.""" + if mode == "r": + events = selectors.EVENT_READ + elif mode == "w": + events = selectors.EVENT_WRITE + elif mode == "rw": + events = selectors.EVENT_READ | selectors.EVENT_WRITE + else: + raise ValueError(f"Invalid events mask mode {repr(mode)}.") + self.selector.modify(self.sock, events, data=self) + + def _read(self): + try: + # Should be ready to read + data = self.sock.recv(4096) + except BlockingIOError: + # Resource temporarily unavailable (errno EWOULDBLOCK) + pass + else: + if data: + self._recv_buffer += data + else: + raise RuntimeError("Peer closed.") + + def _write(self): + if self._send_buffer: + print("sending", repr(self._send_buffer), "to", self.addr) + try: + # Should be ready to write + sent = self.sock.send(self._send_buffer) + except BlockingIOError: + # Resource temporarily unavailable (errno EWOULDBLOCK) + pass + else: + self._send_buffer = self._send_buffer[sent:] + + def _json_encode(self, obj, encoding): + return json.dumps(obj, ensure_ascii=False).encode(encoding) + + def _json_decode(self, json_bytes, encoding): + tiow = io.TextIOWrapper( + io.BytesIO(json_bytes), encoding=encoding, newline="" + ) + obj = json.load(tiow) + tiow.close() + return obj + + def _create_message( + self, *, content_bytes, content_type, content_encoding + ): + jsonheader = { + "byteorder": sys.byteorder, + "content-type": content_type, + "content-encoding": content_encoding, + "content-length": len(content_bytes), + } + jsonheader_bytes = self._json_encode(jsonheader, "utf-8") + message_hdr = struct.pack(">H", len(jsonheader_bytes)) + message = message_hdr + jsonheader_bytes + content_bytes + return message + + def _process_response_json_content(self): + content = self.response + result = content.get("result") + print(f"got result: {result}") + + def _process_response_binary_content(self): + content = self.response + print(f"got response: {repr(content)}") + + def process_events(self, mask): + if mask & selectors.EVENT_READ: + self.read() + if mask & selectors.EVENT_WRITE: + self.write() + + def read(self): + self._read() + + if self._jsonheader_len is None: + self.process_protoheader() + + if self._jsonheader_len is not None: + if self.jsonheader is None: + self.process_jsonheader() + + if self.jsonheader: + if self.response is None: + self.process_response() + + def write(self): + if not self._request_queued: + self.queue_request() + + self._write() + + if self._request_queued: + if not self._send_buffer: + # Set selector to listen for read events, we're done writing. + self._set_selector_events_mask("r") + + def close(self): + print("closing connection to", self.addr) + try: + self.selector.unregister(self.sock) + except Exception as e: + print( + f"error: selector.unregister() exception for", + f"{self.addr}: {repr(e)}", + ) + + try: + self.sock.close() + except OSError as e: + print( + f"error: socket.close() exception for", + f"{self.addr}: {repr(e)}", + ) + finally: + # Delete reference to socket object for garbage collection + self.sock = None + + def queue_request(self): + content = self.request["content"] + content_type = self.request["type"] + content_encoding = self.request["encoding"] + if content_type == "text/json": + req = { + "content_bytes": self._json_encode(content, content_encoding), + "content_type": content_type, + "content_encoding": content_encoding, + } + else: + req = { + "content_bytes": content, + "content_type": content_type, + "content_encoding": content_encoding, + } + message = self._create_message(**req) + self._send_buffer += message + self._request_queued = True + + def process_protoheader(self): + hdrlen = 2 + if len(self._recv_buffer) >= hdrlen: + self._jsonheader_len = struct.unpack( + ">H", self._recv_buffer[:hdrlen] + )[0] + self._recv_buffer = self._recv_buffer[hdrlen:] + + def process_jsonheader(self): + hdrlen = self._jsonheader_len + if len(self._recv_buffer) >= hdrlen: + self.jsonheader = self._json_decode( + self._recv_buffer[:hdrlen], "utf-8" + ) + self._recv_buffer = self._recv_buffer[hdrlen:] + for reqhdr in ( + "byteorder", + "content-length", + "content-type", + "content-encoding", + ): + if reqhdr not in self.jsonheader: + raise ValueError(f'Missing required header "{reqhdr}".') + + def process_response(self): + content_len = self.jsonheader["content-length"] + if not len(self._recv_buffer) >= content_len: + return + data = self._recv_buffer[:content_len] + self._recv_buffer = self._recv_buffer[content_len:] + if self.jsonheader["content-type"] == "text/json": + encoding = self.jsonheader["content-encoding"] + self.response = self._json_decode(data, encoding) + print("received response", repr(self.response), "from", self.addr) + self._process_response_json_content() + else: + # Binary or unknown content-type + self.response = data + print( + f'received {self.jsonheader["content-type"]} response from', + self.addr, + ) + self._process_response_binary_content() + # Close when response has been processed + self.close() \ No newline at end of file diff --git a/libserver.py b/libserver.py new file mode 100644 index 0000000..b85391a --- /dev/null +++ b/libserver.py @@ -0,0 +1,223 @@ +# -*- coding: utf-8 -*- +""" +Created on Sun Mar 22 10:44:50 2020 + +@author: cpan +""" + +import sys +import selectors +import json +import io +import struct + +request_search = { + "morpheus": "Follow the white rabbit. \U0001f430", + "ring": "In the caves beneath the Misty Mountains. \U0001f48d", + "\U0001f436": "\U0001f43e Playing ball! \U0001f3d0", +} + + +class Message: + def __init__(self, selector, sock, addr): + self.selector = selector + self.sock = sock + self.addr = addr + self._recv_buffer = b"" + self._send_buffer = b"" + self._jsonheader_len = None + self.jsonheader = None + self.request = None + self.response_created = False + + def _set_selector_events_mask(self, mode): + """Set selector to listen for events: mode is 'r', 'w', or 'rw'.""" + if mode == "r": + events = selectors.EVENT_READ + elif mode == "w": + events = selectors.EVENT_WRITE + elif mode == "rw": + events = selectors.EVENT_READ | selectors.EVENT_WRITE + else: + raise ValueError(f"Invalid events mask mode {repr(mode)}.") + self.selector.modify(self.sock, events, data=self) + + def _read(self): + try: + # Should be ready to read + data = self.sock.recv(4096) + except BlockingIOError: + # Resource temporarily unavailable (errno EWOULDBLOCK) + pass + else: + if data: + self._recv_buffer += data + else: + raise RuntimeError("Peer closed.") + + def _write(self): + if self._send_buffer: + print("sending", repr(self._send_buffer), "to", self.addr) + try: + # Should be ready to write + sent = self.sock.send(self._send_buffer) + except BlockingIOError: + # Resource temporarily unavailable (errno EWOULDBLOCK) + pass + else: + self._send_buffer = self._send_buffer[sent:] + # Close when the buffer is drained. The response has been sent. + if sent and not self._send_buffer: + self.close() + + def _json_encode(self, obj, encoding): + return json.dumps(obj, ensure_ascii=False).encode(encoding) + + def _json_decode(self, json_bytes, encoding): + tiow = io.TextIOWrapper( + io.BytesIO(json_bytes), encoding=encoding, newline="" + ) + obj = json.load(tiow) + tiow.close() + return obj + + def _create_message( + self, *, content_bytes, content_type, content_encoding + ): + jsonheader = { + "byteorder": sys.byteorder, + "content-type": content_type, + "content-encoding": content_encoding, + "content-length": len(content_bytes), + } + jsonheader_bytes = self._json_encode(jsonheader, "utf-8") + message_hdr = struct.pack(">H", len(jsonheader_bytes)) + message = message_hdr + jsonheader_bytes + content_bytes + return message + + def _create_response_json_content(self): + action = self.request.get("action") + if action == "search": + query = self.request.get("value") + answer = request_search.get(query) or f'No match for "{query}".' + content = {"result": answer} + else: + content = {"result": f'Error: invalid action "{action}".'} + content_encoding = "utf-8" + response = { + "content_bytes": self._json_encode(content, content_encoding), + "content_type": "text/json", + "content_encoding": content_encoding, + } + return response + + def _create_response_binary_content(self): + response = { + "content_bytes": b"First 10 bytes of request: " + + self.request[:10], + "content_type": "binary/custom-server-binary-type", + "content_encoding": "binary", + } + return response + + def process_events(self, mask): + if mask & selectors.EVENT_READ: + self.read() + if mask & selectors.EVENT_WRITE: + self.write() + + def read(self): + self._read() + + if self._jsonheader_len is None: + self.process_protoheader() + + if self._jsonheader_len is not None: + if self.jsonheader is None: + self.process_jsonheader() + + if self.jsonheader: + if self.request is None: + self.process_request() + + def write(self): + if self.request: + if not self.response_created: + self.create_response() + + self._write() + + def close(self): + print("closing connection to", self.addr) + try: + self.selector.unregister(self.sock) + except Exception as e: + print( + f"error: selector.unregister() exception for", + f"{self.addr}: {repr(e)}", + ) + + try: + self.sock.close() + except OSError as e: + print( + f"error: socket.close() exception for", + f"{self.addr}: {repr(e)}", + ) + finally: + # Delete reference to socket object for garbage collection + self.sock = None + + def process_protoheader(self): + hdrlen = 2 + if len(self._recv_buffer) >= hdrlen: + self._jsonheader_len = struct.unpack( + ">H", self._recv_buffer[:hdrlen] + )[0] + self._recv_buffer = self._recv_buffer[hdrlen:] + + def process_jsonheader(self): + hdrlen = self._jsonheader_len + if len(self._recv_buffer) >= hdrlen: + self.jsonheader = self._json_decode( + self._recv_buffer[:hdrlen], "utf-8" + ) + self._recv_buffer = self._recv_buffer[hdrlen:] + for reqhdr in ( + "byteorder", + "content-length", + "content-type", + "content-encoding", + ): + if reqhdr not in self.jsonheader: + raise ValueError(f'Missing required header "{reqhdr}".') + + def process_request(self): + content_len = self.jsonheader["content-length"] + if not len(self._recv_buffer) >= content_len: + return + data = self._recv_buffer[:content_len] + self._recv_buffer = self._recv_buffer[content_len:] + if self.jsonheader["content-type"] == "text/json": + encoding = self.jsonheader["content-encoding"] + self.request = self._json_decode(data, encoding) + print("received request", repr(self.request), "from", self.addr) + else: + # Binary or unknown content-type + self.request = data + print( + f'received {self.jsonheader["content-type"]} request from', + self.addr, + ) + # Set selector to listen for write events, we're done reading. + self._set_selector_events_mask("w") + + def create_response(self): + if self.jsonheader["content-type"] == "text/json": + response = self._create_response_json_content() + else: + # Binary or unknown content-type + response = self._create_response_binary_content() + message = self._create_message(**response) + self.response_created = True + self._send_buffer += message \ No newline at end of file