Source code for colosseumrl.matchmaking.MatchmakingServer

import time
import secrets
import grpc
import argparse
import zmq

from queue import Queue, Empty
from concurrent import futures
from threading import Thread, Semaphore
from multiprocessing import Event
from typing import Type, Dict, List
from collections import OrderedDict
from spacetime import Node

from ..match_server import server_app
from ..data_model import ServerState, Player, Observation
from ..config import get_environment, available_environments
from ..BaseEnvironment import BaseEnvironment
from ..util import is_port_in_use
from ..rl_logging import init_logging, get_logger

from .grpc_gen.server_pb2 import QuickMatchReply, QuickMatchRequest
from .grpc_gen.server_pb2_grpc import MatchmakerServicer, add_MatchmakerServicer_to_server
from .RankingDatabase import RankingDatabase


logger = get_logger()

# Global ZMQ context that will be used for all communication between threads.
zmq_context = zmq.Context()


[docs]def match_server_args_factory(tick_rate: int, realtime: bool, observations_only: bool, env_config_string: str): """ Helper factory to make a argument dictionary for servers with varying ports """ def match_server_args(port): arg_dict = { "tick_rate": tick_rate, "port": port, "realtime": realtime, "observations_only": observations_only, "config": env_config_string } return arg_dict return match_server_args
[docs]class MatchMakingHandler(MatchmakerServicer): """ GRPC connection handler. Clients will connect to the server and call this function to request a match. """
[docs] def GetMatch(self, request, context): # Unique identity for this connection identity = request.username.encode() + secrets.token_bytes(8) # Prepare ZeroMQ connection to get added to the queue with zmq_context.socket(zmq.REQ) as socket: socket.connect("inproc://matchmaker_requests") # Wait until we are added to the queue socket.send_multipart((identity, request.SerializeToString())) status, response = socket.recv_multipart() if status == b"FAIL": return QuickMatchReply.FromString(response) # Setup new socket to communicate with matchmaking master with zmq_context.socket(zmq.DEALER) as socket: socket.setsockopt(zmq.IDENTITY, identity) socket.connect("inproc://matchmaker_responses") # Wait until a game has been assigned # Check every once in a while to see if the client is still alive while True: if not socket.poll(timeout=500): if not context.is_active(): socket.send(request.username.encode()) return None else: break response = QuickMatchReply.FromString(socket.recv(flags=zmq.NOBLOCK)) return response
[docs]class MatchProcessJanitor(Thread): """ Simple thread to manage the lifetime of a game server. Will start the game server and close it when the game is finished and release any resources it was holding. """ def __init__(self, match_limit: Semaphore, ports_to_use_queue: Queue, database: RankingDatabase, env_class: Type[BaseEnvironment], match_server_args: Dict, player_list: List, whitelist: List = None): """ Create a janitor thread that will start a game and close it once it is finished. Parameters ---------- match_limit : Semaphore Synchronization semaphore to make sure that we limit the number of simulations matches. ports_to_use_queue : Queue Queue holding available ports for this server. database : RankingDatabase Global database for player ranking. env_class : Type[BaseEnvironment] The game class to launch the server around. match_server_args : Dict Any arguments to pass to the match server as keyword star arguments. player_list : List[str] A list of usernames participating in this game. This will be the whitelist for the server. whitelist : List A list of usernames participating in this game. This will be the whitelist for the server. Again... """ super().__init__() self.match_limit = match_limit self.match_server_args = match_server_args self.env_class = env_class self.ports_to_use_queue = ports_to_use_queue self.database = database self.player_list = player_list self.whitelist = whitelist self.ready = Event()
[docs] def run(self) -> None: port = self.match_server_args['port'] observation_type = Observation(self.env_class.observation_names()) # App blocks until the server has ended app = Node(server_app, server_port=port, Types=[Player, ServerState]) rankings = app.start(self.env_class, observation_type, self.match_server_args, self.whitelist, self.ready) del app # Update player information if isinstance(rankings, dict): self.database.update_ranking(rankings) for user in self.player_list: self.database.logoff(user) # Cleanup self.ports_to_use_queue.put(port) self.match_limit.release()
[docs]class MatchmakingLoginThread(Thread): def __init__(self, connection_queue: Queue, database: RankingDatabase): """ Thread for managing the login system on the matchmaking server. This thread will accept login requests from the GRPC function and add them to the queue of players for a new game. Parameters ---------- connection_queue: Queue The queue to add new players to after they have been successfully logged in. database : RankingDatabase Global database for player ranking and password store. """ super().__init__() self.queue: Queue = connection_queue self.database: RankingDatabase = database self.daemon = True self.socket = zmq_context.socket(zmq.REP) self.socket.bind("inproc://matchmaker_requests") print("Matchmaker Connector thread listening...") def __del__(self): self.socket.close()
[docs] def run(self): while True: identity, serialized_request = self.socket.recv_multipart() request = QuickMatchRequest.FromString(serialized_request) # Login user and handle any errors username, password = request.username, request.password login_result = self.database.login(username, password) if login_result == RankingDatabase.LoginResult.NoUser: self.database.set(username, password) self.database.login(username, password) elif login_result == RankingDatabase.LoginResult.LoginDuplicate: response = QuickMatchReply(username=username, server="FAIL", auth_key="FAIL", ranking=0.0, response="Failed to login: Cannot login twice at the same time.") self.socket.send_multipart((b"FAIL", response.SerializeToString())) continue elif login_result == RankingDatabase.LoginResult.LoginFail: response = QuickMatchReply(username=username, server="FAIL", auth_key="FAIL", ranking=0.0, response="Failed to login: Wrong password.") self.socket.send_multipart((b"FAIL", response.SerializeToString())) continue # Add request to the queue and generate a token for them self.queue.put((identity, request, secrets.token_hex(32))) self.socket.send_multipart((b"SUCCESS", b""))
[docs]class MatchmakingThread(Thread): def __init__(self, starting_port, hostname, max_simultaneous_games, env_class, tick_rate, realtime, observations_only, env_config_string): """ Main matchmaking thread that is responsible for choosing players for each match and assigning a game server to them. Parameters ---------- starting_port : int Port the begin making match server on hostname : str What hostname to start the game servers on. max_simultaneous_games : int Maximum number of game servers that will be running at any given time. env_class : Type[BaseEnvironment] What environment will the server be running. tick_rate : float What frame rate will the servers operate on. realtime : bool Whether or not the games will be realtime or will wait for player actions. observations_only : bool Whether or not we will send out the true server state if supported. env_config_string : str Configuration string to be passed to the server environments. """ super().__init__() self.players_per_game = env_class(env_config_string).min_players self.env_class = env_class self.hostname = hostname self.daemon = True # Prepare our context and sockets self.socket = zmq_context.socket(zmq.ROUTER) self.socket.bind("inproc://matchmaker_responses") logger.info("Matchmaker thread running") # Semaphore for tracking the total number of games running self.match_limit = Semaphore(max_simultaneous_games) # Helper function to make arguments for match threads self.create_match_server_args = match_server_args_factory(tick_rate=tick_rate, realtime=realtime, observations_only=observations_only, env_config_string=env_config_string) # Keep track of the ports we can use and iterate through them as we start new servers self.ports_to_use = Queue() max_port = starting_port + 2 * max_simultaneous_games for port in range(starting_port, max_port): if not is_port_in_use(port): self.ports_to_use.put(port) else: logger.warn("Skipping port {}, already in use.".format(port)) if self.ports_to_use.qsize() < max_simultaneous_games: raise OSError("Port range {} through {} does not have enough unallocated ports " "to hold {} simultaneous games".format(starting_port, max_port, max_simultaneous_games)) self.database = RankingDatabase("test.sqlite") self.connection_queue = Queue() self.connection_thread = MatchmakingLoginThread(self.connection_queue, self.database)
[docs] def start(self) -> None: """ Start this thread along with the related login thread. """ super().start() self.connection_thread.start()
[docs] def select_players(self, requests): """ Select which players will be chosen for an upcoming game. At some point, this should use trueskill to create matches with roughly fairly matched opponents. Parameters ---------- requests : OrderedDict All of the players requested with their username as their key and their request and token as the values. Returns ------- List A list of players that have been chosen with the appropriate data. """ players = [] for _ in range(self.players_per_game): identity, (request, auth) = requests.popitem(last=False) players.append((identity, request, auth)) return players
[docs] def run(self) -> None: requests = OrderedDict() while True: # Wait for any new requests, and always recheck request queue after 5 seconds # This will be useful if we have a robust matchmaking system with rankings try: identity, request, authorization = self.connection_queue.get(timeout=5.0) requests[identity] = (request, authorization) except Empty: pass else: while not self.connection_queue.empty(): identity, request, authorization = self.connection_queue.get() requests[identity] = (request, authorization) # Check if any clients have disconnected while True: try: quitting_identity, quitting_username = self.socket.recv_multipart(flags=zmq.NOBLOCK) except zmq.ZMQError: break else: logger.debug("{} has quit the matchmaking queue unexpectedly.".format(quitting_identity)) self.database.logoff(quitting_username.decode()) requests.pop(quitting_identity, 0) # Once we have enough players for a game, start a game server and send the coordinates if len(requests) >= self.players_per_game: # Limit the number of games so we dont overload server self.match_limit.acquire() # Select Players using arbitrary method new_players = self.select_players(requests) whitelist = [player[2] for player in new_players] usernames = [player[1].username for player in new_players] # Create the game server match_port = self.ports_to_use.get() match_server_args = self.create_match_server_args(port=match_port) match_janitor = MatchProcessJanitor(match_limit=self.match_limit, ports_to_use_queue=self.ports_to_use, database=self.database, env_class=self.env_class, match_server_args=match_server_args, player_list=usernames, whitelist=whitelist) match_janitor.start() database_entries = self.database.get_multi(*usernames) database_entries = {name: ranking for name, _, ranking, _ in database_entries} match_janitor.ready.wait() # Send each player their assigned server. for identity, request, auth_key in new_players: response = QuickMatchReply(username=request.username, server='{}:{}'.format(self.hostname, match_port), auth_key=auth_key, ranking=database_entries[request.username], response="") self.socket.send_multipart((identity, response.SerializeToString()))
[docs]def serve(args): """ Main function for Matchmaking server Parameters ---------- args : Dict Command line arguments """ # Start the separate matchmaking thread matchmaker_thread = MatchmakingThread( hostname=args['hostname'], starting_port=args['game_port'], max_simultaneous_games=args['max_games'], env_class=get_environment(args['environment']), tick_rate=args['tick_rate'], realtime=args['realtime'], observations_only=args['observations_only'], env_config_string=args['config'] ) matchmaker_thread.start() # Start the GRPC callback server server = grpc.server(futures.ThreadPoolExecutor()) add_MatchmakerServicer_to_server(MatchMakingHandler(), server) server.add_insecure_port('[::]:{}'.format(args['matchmaking_port'])) server.start() logger.info("Matchmaking server listening on grpc://{}:{}...".format(args['hostname'], args['matchmaking_port'])) try: one_day = 3600 * 24 while True: time.sleep(one_day) except KeyboardInterrupt: server.stop(0)
[docs]def start_matchmaking_server(environment: str = 'test', hostname: str = 'localhost', matchmaking_port: int = 50051, game_port: int = 21450, max_games: int = 1, tick_rate: int = 60, realtime: bool = False, observations_only: bool = False, config: str = ''): serve(locals())
[docs]def main(): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, description=""" This script launches a matchmaking server for the colosseum framework. The purpose of this system is to allow for an easy way for players to join a match together without needing to coordinate join time, server information, or limiting players. """) parser.add_argument("--environment", "-e", type=str, default="test", help="The name of the environment. Choices are: {}".format(available_environments())) parser.add_argument("--hostname", type=str, default='localhost', help="Hostname to start the matchmaking and game servers on. Defaults to 'localhost'") parser.add_argument("--matchmaking-port", type=int, default=50051, help="Port to start matchmaking server on.") parser.add_argument("--game-port", type=int, default=21450, help="Port to start game servers on. Will use a range starting at this port to this port" "plus the number of games.") parser.add_argument("--max-games", "-m", type=int, default=1, help="Number of games to run in parallel on this server.") parser.add_argument("--tick-rate", "-t", type=int, default=60, help="The max tick rate that the server will run on.") parser.add_argument("--realtime", "-r", action="store_true", help="With this flag on, the server will not wait for all of the clients to respond.") parser.add_argument("--observations-only", '-f', action='store_true', help="With this flag on, the server will not push the true state of the game to the clients " "along with observations") parser.add_argument("--config", '-c', type=str, default="", help="Config string that will be passed into the environment constructor.") command_line_args = parser.parse_args() serve(vars(command_line_args))
if __name__ == '__main__': logger = init_logging() main()