"""Functions and classes related to the serialization of application data. """ from .data import Items, Species, Spe, Kind from . import Item, Monster, Node, Region, Global, Map, g import types, struct, collections, weakref, random OPCODE_LIST = 'Li' OPCODE_SET = 'Se' OPCODE_TUPLE = 'Tu' OPCODE_DICT = 'Di' OPCODE_TRUE = 'Tr' OPCODE_FALSE = 'Fa' OPCODE_NONE = 'No' OPCODE_INT = 'In' OPCODE_BYTE = 'By' OPCODE_LONG = 'Lo' OPCODE_FLOAT = 'Fl' OPCODE_STRING = 'St' OPCODE_UNICODE = 'Un' OPCODE_MONSTER = 'Mn' OPCODE_ITEM = 'It' OPCODE_SPECIES = 'Sp' OPCODE_KIND = 'Ki' OPCODE_NODE = 'Nd' OPCODE_LEVEL = 'Le' OPCODE_G = 'Go' OPCODE_GRID = 'Gr' OPCODE_WEAKREF = 'Wf' OPCODE_RNG = 'Rn' OPCODE_SETSTATE = 'SS' OPCODE_MKGRID = 'GM' OPCODE_MEMOIZE = 'Mo' OPCODE_MEMOIZED = 'Me' OPCODE_SMEM = 'SM' OPCODE_SMEMMED = 'SE' OPCODE_SETATTR = 'SA' OPCODE_SETITEM = 'SI' OPCODE_APPEND = 'AP' OPCODE_EXTEND = 'EX' OPCODE_UPDATE = 'UP' OPCODE_END = 'En' OPCODE_VERSION = 'Vr' VERSION = 4 COMPAT_VERSION = 3 serializers = {} def packs(type): def deco(func): serializers[type] = func return func return deco class Serializer(object): def __init__(self, file): self.memo = {} self.string_memo = {} self.file = file def memoize(self, obj): i = id(obj) if i in self.memo: raise ValueError('%r already memoized' % obj) index = len(self.memo) self.memo[i] = index self.file.write(OPCODE_MEMOIZE) self.file.write(struct.pack('!H', index)) def memoize_string(self, string): i = self.string_memo.get(string) if i is not None: self.file.write(OPCODE_SMEMMED) else: i = self.string_memo[string] = len(self.string_memo) self.pack_string(string) self.file.write(OPCODE_SMEM) self.file.write(struct.pack('!H', i)) def dump(self, obj): self.file.write(OPCODE_VERSION) self.file.write(chr(VERSION)) callstack = [self.pack(obj)] while callstack: try: result = callstack[-1].next() except StopIteration: callstack.pop() else: if result is not None: callstack.append(result) self.file.write(OPCODE_END) def pack(self, obj): mem = self.memo.get(id(obj)) if mem is not None: self.file.write(OPCODE_MEMOIZED) self.file.write(struct.pack('!H', mem)) return obj_type = type(obj) while obj_type not in serializers: obj_type = obj_type.__bases__[0] yield serializers[obj_type](self, obj) @packs(types.ListType) def pack_list(self, l): self.file.write(OPCODE_LIST) self.memoize(l) yield self.pack_tuple(l) self.file.write(OPCODE_EXTEND) @packs(types.TupleType) def pack_tuple(self, t): e = -1 for e, itm in enumerate(t): yield self.pack(itm) self.pack_int(e + 1) self.file.write(OPCODE_TUPLE) @packs(types.DictType) def pack_dict(self, d): self.file.write(OPCODE_DICT) self.memoize(d) yield self.pack_tuple(d.iteritems()) self.file.write(OPCODE_UPDATE) @packs(set) def pack_set(self, s): self.file.write(OPCODE_SET) self.memoize(s) yield self.pack_tuple(s) self.file.write(OPCODE_UPDATE) @packs(types.BooleanType) def pack_bool(self, b): if b is True: self.file.write(OPCODE_TRUE) elif b is False: self.file.write(OPCODE_FALSE) else: raise ValueError('what is it? should be bool: %r' % b) @packs(types.NoneType) def pack_none(self, n): self.file.write(OPCODE_NONE) @packs(types.IntType) def pack_int(self, i): if 0 <= i <= 255: self.file.write(OPCODE_BYTE) self.file.write(chr(i)) else: self.file.write(OPCODE_INT) self.file.write(struct.pack('!i', i)) @packs(types.LongType) def pack_long(self, l): self.file.write(OPCODE_LONG) self.file.write(struct.pack('!q', l)) @packs(types.FloatType) def pack_float(self, f): self.file.write(OPCODE_FLOAT) self.file.write(struct.pack('!d', f)) @packs(types.StringType) def pack_string(self, s): self.file.write(OPCODE_STRING) self.file.write(struct.pack('!H', len(s))) self.file.write(s) @packs(types.UnicodeType) def pack_unicode(self, s): self.pack_string(s.encode('utf8')) self.file.write(OPCODE_UNICODE) def pack_object(self, obj, **modifiers): for k, v in obj.__dict__.iteritems(): self.memoize_string(k) if k in modifiers: yield self.pack(modifiers.pop(k)) else: yield self.pack(v) self.file.write(OPCODE_SETATTR) for k, v in modifiers.iteritems(): self.memoize_string(k) yield self.pack(v) self.file.write(OPCODE_SETATTR) @packs(Monster.Monster) def pack_monster(self, m): self.file.write(OPCODE_MONSTER) self.memoize(m) yield self.pack_object(m) @packs(Item.Item) def pack_item(self, i): self.file.write(OPCODE_ITEM) self.memoize(i) yield self.pack_object(i) @packs(Spe.Spe) def pack_species(self, s): self.pack_string(type(s).__name__) self.file.write(OPCODE_SPECIES) self.memoize(s) yield self.pack_object(s) @packs(Kind.Kind) def pack_kind(self, k): self.pack_string(type(k).__name__) self.file.write(OPCODE_KIND) self.memoize(k) yield self.pack_object(k) @packs(Node.Node) def pack_node(self, n): self.pack_string(type(n).__name__) self.pack_string(type(n).__module__) self.file.write(OPCODE_NODE) self.memoize(n) yield self.pack_object(n, dijkstra=None) @packs(Monster.Level) def pack_level(self, l): self.file.write(OPCODE_LEVEL) self.memoize(l) yield self.pack_object(l) @packs(Map.Grid) def pack_grid(self, g): self.file.write(OPCODE_GRID) self.memoize(g) yield self.pack_dict(g.grid) self.file.write(OPCODE_MKGRID) @packs(Global) def pack_global(self, g): self.file.write(OPCODE_G) self.memoize(g) yield self.pack_object(g) @packs(weakref.ref) def pack_weakref(self, w): obj = w() if obj is None: self.pack_none(None) else: yield self.pack(obj) self.file.write(OPCODE_WEAKREF) def pack_stateful(self, obj): yield self.pack(obj.__getstate__()) self.file.write(OPCODE_SETSTATE) @packs(random.Random) def pack_rng(self, r): self.file.write(OPCODE_RNG) self.memoize(r) yield self.pack_stateful(r) @packs(types.ObjectType) @packs(types.InstanceType) def invalid_type(self, obj): raise ValueError('no serializer found: %r %r' % (obj, type(obj))) unserializers = {} def unpacks(opcode): def deco(func): unserializers[opcode] = func return func return deco class Unserializer(object): def __init__(self, file): self.memo = {} self.string_memo = {} self.file = file self.stack = [] def load(self): self.unpack() return self.stack.pop() def unpack(self): while True: opcode = self.file.read(2) if len(opcode) < 2: raise EOFError('unexpected end of file') elif opcode == OPCODE_END: break func = unserializers.get(opcode) if func is None: raise ValueError('no unserializer for opcode %s' % opcode) func(self) @unpacks(OPCODE_VERSION) def check_version(self): version = ord(self.file.read(1)) if VERSION < version < COMPAT_VERSION: raise ValueError( 'version mismatch: mine is %d, compat %d, other %d' % ( VERSION, COMPAT_VERSION, version)) @unpacks(OPCODE_LIST) def unpack_list(self): self.stack.append([]) @unpacks(OPCODE_TUPLE) def unpack_tuple(self): length = self.stack.pop() if length == 0: self.stack.append(()) else: l = self.stack[-length:] self.stack[-length:] = [tuple(l)] @unpacks(OPCODE_DICT) def unpack_dict(self): self.stack.append({}) @unpacks(OPCODE_SET) def unpack_set(self): self.stack.append(set()) @unpacks(OPCODE_TRUE) def unpack_true(self): self.stack.append(True) @unpacks(OPCODE_FALSE) def unpack_false(self): self.stack.append(False) @unpacks(OPCODE_NONE) def unpack_none(self): self.stack.append(None) @unpacks(OPCODE_INT) def unpack_int(self): i = self.file.read(4) self.stack.append(*struct.unpack('!i', i)) @unpacks(OPCODE_BYTE) def unpack_byte(self): self.stack.append(ord(self.file.read(1))) @unpacks(OPCODE_LONG) def unpack_long(self): l = self.file.read(8) self.stack.append(*struct.unpack('!q', l)) @unpacks(OPCODE_FLOAT) def unpack_float(self): f = self.file.read(8) self.stack.append(*struct.unpack('!d', f)) @unpacks(OPCODE_STRING) def unpack_string(self): length, = struct.unpack('!H', self.file.read(2)) self.stack.append(self.file.read(length)) @unpacks(OPCODE_UNICODE) def unpack_unicode(self): self.stack[-1] = self.stack[-1].decode('utf8') @unpacks(OPCODE_MONSTER) def unpack_monster(self): self.stack.append(Monster.Monster()) @unpacks(OPCODE_ITEM) def unpack_item(self): self.stack.append(Item.Item()) @unpacks(OPCODE_SPECIES) def unpack_species(self): name = self.stack.pop() self.stack.append(getattr(Species, name)()) @unpacks(OPCODE_KIND) def unpack_kind(self): name = self.stack.pop() self.stack.append(getattr(Items, name)()) @unpacks(OPCODE_NODE) def unpack_node(self): n = Node.Node() module, name = self.stack.pop(), self.stack.pop() module = __import__(module, fromlist=[True]) n.__class__ = getattr(module, name) self.stack.append(n) @unpacks(OPCODE_G) def unpack_global(self): self.stack.append(g) @unpacks(OPCODE_GRID) def unpack_grid(self): self.stack.append(Map.Grid()) @unpacks(OPCODE_MKGRID) def setup_grid(self): dd = collections.defaultdict(lambda: None) dd.update(self.stack.pop()) self.stack[-1].grid = dd @unpacks(OPCODE_LEVEL) def unpack_level(self): self.stack.append(object.__new__(Monster.Level)) @unpacks(OPCODE_WEAKREF) def unpack_weakref(self): self.stack[-1] = weakref.ref(self.stack[-1]) @unpacks(OPCODE_RNG) def unpack_rng(self): self.stack.append(random.Random()) @unpacks(OPCODE_SETSTATE) def unpack_setstate(self): state = self.stack.pop() self.stack[-1].__setstate__(state) @unpacks(OPCODE_SETATTR) def unpack_setattr(self): v, k = self.stack.pop(), self.stack.pop() setattr(self.stack[-1], k, v) @unpacks(OPCODE_SETITEM) def unpack_setitem(self): v, k = self.stack.pop(), self.stack.pop() self.stack[-1][k] = v @unpacks(OPCODE_APPEND) def unpack_append(self): i = self.stack.pop() self.stack[-1].append(i) @unpacks(OPCODE_EXTEND) def unpack_extend(self): i = self.stack.pop() self.stack[-1].extend(i) @unpacks(OPCODE_UPDATE) def unpack_update(self): i = self.stack.pop() self.stack[-1].update(i) @unpacks(OPCODE_MEMOIZE) def memoize(self): index, = struct.unpack('!H', self.file.read(2)) self.memo[index] = self.stack[-1] @unpacks(OPCODE_MEMOIZED) def unmemoize(self): index, = struct.unpack('!H', self.file.read(2)) self.stack.append(self.memo[index]) @unpacks(OPCODE_SMEM) def memoize_string(self): index, = struct.unpack('!H', self.file.read(2)) self.string_memo[index] = self.stack[-1] @unpacks(OPCODE_SMEMMED) def unmemoize(self): index, = struct.unpack('!H', self.file.read(2)) self.stack.append(self.string_memo[index])