nus/cs2109s/labs/ps3/utils.py
2024-02-20 11:30:55 +08:00

356 lines
11 KiB
Python

import os
import sys
import copy
import time
import traceback
import multiprocessing
import functools
from threading import Thread
from typing import Callable, Any
# board row and column -> these are constant
ROW, COL = 6, 6
INF = 90129012
WIN = 21092109
MOVE_NONE = (-1, -1), (-1, -1)
TIME_LIMIT = 3.05
Move = tuple[tuple[int, int], tuple[int, int]]
Board = list[list[str]]
# generates initial state
def generate_init_state() -> Board:
"""
Generates the initial state of the game.
Returns
-------
2D list-of-lists. Contains characters "B", "W", and "_"
representing black pawn, white pawn, and empty cell respectively.
"""
state = [
list("BBBBBB"),
list("BBBBBB"),
list("______"),
list("______"),
list("WWWWWW"),
list("WWWWWW"),
]
return state
# prints board
def print_state(board: Board) -> None:
horizontal_rule = "+" + ("-" * 5 + "+") * COL
for row in board:
print(horizontal_rule)
print(f"| {' | '.join(' ' if tile == '_' else tile for tile in row)} |")
print(horizontal_rule)
# inverts board by modifying board state, or returning a new board with updated board state
def invert_board(board: Board, in_place: bool = True) -> Board:
"""
Inverts the board by modifying existing values if in_place is set to True,
or creating a new board with updated values if in_place is set to False.
"""
if not in_place:
board = copy.deepcopy(board)
board.reverse()
for r, row in enumerate(board):
for c, tile in enumerate(row):
if tile == "W":
board[r][c] = "B"
elif tile == "B":
board[r][c] = "W"
return board
# checks if a move made for black is valid or not.
# Move source: src (row, col), move destination: dst (row, col)
def is_valid_move(
board: Board,
src: tuple[int, int],
dst: tuple[int, int]
) -> bool:
"""
Checks whether the given move is a valid move.
Parameters
----------
board: 2D list-of-lists. Contains characters "B", "W", and "_"
representing black pawn, white pawn, and empty cell respectively.
src: tuple[int, int]. Source position of the pawn.
dst: tuple[int, int]. Destination position of the pawn.
Returns
-------
A boolean indicating whether the given move from `src` to `dst` is valid.
"""
sr, sc = src
dr, dc = dst
if board[sr][sc] != "B":
# if move not made for black
return False
if dr < 0 or dr >= ROW or dc < 0 or dc >= COL:
# if move takes pawn outside the board
return False
if dr != sr + 1:
# if move takes more than one step forward
return False
if dc > sc + 1 or dc < sc - 1:
# if move takes beyond left/right diagonal
return False
if dc == sc and board[dr][dc] != "_":
# if pawn to the front, but still move forward
return False
if (dc == sc + 1 or dc == sc - 1) and board[dr][dc] == "B":
# if black pawn to the diagonal or front, but still move forward
return False
return True
# generates the first available valid move for black
def generate_rand_move(board: Board) -> Move:
"""
Generates a random move.
Parameters
----------
board: 2D list-of-lists. Contains characters "B", "W", and "_"
representing black pawn, white pawn, and empty cell respectively.
Returns
-------
A tuple ((src_row, src_col), (dst_row, dst_col)):
src_row, src_col: position of the pawn to move.
dst_row, dst_col: position to move the pawn to.
"""
for r, row in enumerate(board):
for c, tile in enumerate(row):
if tile != "B":
continue
src = r, c
for d in (-1, 0, 1):
dst = r + 1, c + d
if is_valid_move(board, src, dst):
return src, dst
raise ValueError("no valid move")
# makes a move effective on the board by modifying board state,
# or returning a new board with updated board state
def state_change(
board: Board,
src: tuple[int, int],
dst: tuple[int, int],
in_place: bool = True
) -> Board:
"""
Updates the board configuration by modifying existing values if in_place is set to True,
or creating a new board with updated values if in_place is set to False.
Parameters
----------
board: 2D list-of-lists. Contains characters "B", "W", and "_"
representing black pawn, white pawn, and empty cell respectively.
src: tuple[int, int]. Source position of the pawn.
dst: tuple[int, int]. Destination position of the pawn.
in_place: bool. Whether the modification is to be made in-place or to a deep copy of the given `board`.
Returns
-------
The modified board.
"""
if not in_place:
board = copy.deepcopy(board)
if is_valid_move(board, src, dst):
sr, sc = src
dr, dc = dst
board[sr][sc] = "_"
board[dr][dc] = "B"
return board
# checks if game is over
def is_game_over(board: Board) -> bool:
"""
Returns True if game is over.
Parameters
----------
board: 2D list-of-lists. Contains characters "B", "W", and "_"
representing black pawn, white pawn, and empty cell respectively.
Returns
-------
A bool representing whether the game is over.
"""
if any(tile == "B" for tile in board[5]) or any(tile == "W" for tile in board[0]):
return True
wcount, bcount = 0, 0
for row in board:
for tile in row:
if tile == "B":
bcount += 1
elif tile == "W":
wcount += 1
return bcount == 0 or wcount == 0
#############################################
# Utils function for game playing framework #
#############################################
# move making function for game playing
def make_move_job_func(player, board: Board, queue) -> None:
# disable stdout and stderr to prevent prints
sys.stdout = open(os.devnull, "w")
sys.stderr = open(os.devnull, "w")
try:
src, dst = player.make_move(board)
queue.put((src, dst))
except KeyboardInterrupt:
exit()
except Exception as e:
queue.put(e)
exit(1)
finally:
# reenable stdout and stderr
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
return
# game playing function. Takes in the initial board
def play(playerAI_A, playerAI_B, board: Board) -> bool:
colors = (black, white) = "Black(Student agent)", "White(Test agent)"
players = []
random_moves = 0
move = 0
# disable stdout for people who leave print statements in their code, disable stderr
sys.stdout = open(os.devnull, "w")
sys.stderr = open(os.devnull, "w")
try:
players.append(playerAI_A)
except KeyboardInterrupt:
exit()
except:
return f"{black} failed to initialise"
finally:
# reenable stdout and stderr
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
# disable stdout for people who leave print statements in their code, disable stderr
sys.stdout = open(os.devnull, 'w')
sys.stderr = open(os.devnull, 'w')
try:
players.append(playerAI_B)
except KeyboardInterrupt:
exit()
except:
return f"{white} failed to initialise"
finally:
# reenable stdout and stderr
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
# game starts
color = None
while not is_game_over(board):
player = players[move % 2]
color = colors[move % 2]
src, dst = MOVE_NONE
if color == white:
invert_board(board)
src, dst = player.make_move(board)
else: # black
result_queue = multiprocessing.Queue()
board_copy = copy.deepcopy(board)
mp = multiprocessing.Process(target=make_move_job_func, args=(player, board_copy, result_queue))
mp.start()
mp.join(timeout=3)
exit_code = mp.exitcode
if mp.is_alive():
mp.terminate()
if exit_code == None:
del result_queue
elif exit_code == 1:
e = result_queue.get()
del result_queue
return f"{black} returned err={e} during move"
elif exit_code == 0:
src, dst = result_queue.get()
del result_queue
else:
del result_queue
is_valid = False
try:
is_valid = is_valid_move(board, src, dst)
except KeyboardInterrupt:
exit()
except Exception:
is_valid = False
if not is_valid:
# if move is invalid or time is exceeded, then we give a random move
random_moves += 1
src, dst = generate_rand_move(board)
state_change(board, src, dst) # makes the move effective on the board
if color == white:
invert_board(board)
move += 1
# return f"{color} win; Random move made by {BLACK}: {random_moves};"
return color == black
def wrap_test(func: Callable) -> Any:
def inner(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
traceback.print_exc()
return f"FAILED, reason: {e}"
return inner
if os.name == "nt":
def timeout(timeout):
def decorate(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
ret = TimeoutError(f'Function [{func.__name__}] exceeded timeout of [{timeout} seconds]')
def run_func():
nonlocal ret
try:
ret = func(*args, **kwargs)
except Exception as e:
ret = e
t = Thread(target=run_func, daemon=True)
try:
t.start()
t.join(timeout)
except Exception as e:
traceback.print_exc()
raise e
if isinstance(ret, BaseException):
raise ret
return ret
return wrapper
return decorate
else:
from timeout_decorator import timeout
@wrap_test
@timeout(TIME_LIMIT)
def test_move(board: Board, playerAI) -> bool:
board_copy = copy.deepcopy(board)
start = time.time()
src, dst = playerAI.make_move(board_copy)
end = time.time()
move_time = end - start
valid = is_valid_move(board, src, dst)
return valid and move_time <= 3