# greaseweazle/codec/ibm/fm.py # # Written & released by Keir Fraser # # This is free and unencumbered software released into the public domain. # See the file COPYING for more details, or visit . import binascii import copy, heapq, struct, functools import itertools as it from bitarray import bitarray import crcmod.predefined from greaseweazle.codec.ibm import mfm from greaseweazle.track import MasterTrack, RawTrack default_revs = 2 def sync(dat, clk=0xc7): x = 0 for i in range(8): x <<= 1 x |= (clk >> (7-i)) & 1 x <<= 1 x |= (dat >> (7-i)) & 1 return bytes(struct.pack('>H', x)) sync_prefix = bitarray(endian='big') sync_prefix.frombytes(b'\xaa\xaa' + sync(0xf8)) sync_prefix = sync_prefix[:16+10] iam_sync_bytes = sync(0xfc, 0xd7) iam_sync = bitarray(endian='big') iam_sync.frombytes(b'\xaa\xaa' + iam_sync_bytes) crc16 = crcmod.predefined.Crc('crc-ccitt-false') sec_sz = mfm.sec_sz IDAM = mfm.IDAM DAM = mfm.DAM Sector = mfm.Sector IAM = mfm.IAM class IBM_FM: IAM = 0xfc IDAM = 0xfe DAM = 0xfb DDAM = 0xf8 gap_presync = 6 gapbyte = 0xff def __init__(self, cyl, head): self.cyl, self.head = cyl, head self.sectors = [] self.iams = [] def summary_string(self): nsec, nbad = len(self.sectors), self.nr_missing() s = "IBM FM (%d/%d sectors)" % (nsec - nbad, nsec) if nbad != 0: s += " - %d sectors missing" % nbad return s def has_sec(self, sec_id): return self.sectors[sec_id].crc == 0 def nr_missing(self): return len(list(filter(lambda x: x.crc != 0, self.sectors))) def flux(self, *args, **kwargs): return self.raw_track().flux(*args, **kwargs) def decode_raw(self, track): track.cue_at_index() raw = RawTrack(clock = self.clock, data = track) bits, _ = raw.get_all_data() areas = [] idam = None ## 1. Calculate offsets within dump for offs in bits.itersearch(iam_sync): offs += 16 areas.append(IAM(offs, offs+1*16)) self.has_iam = True for offs in bits.itersearch(sync_prefix): offs += 16 mark = decode(bits[offs:offs+1*16].tobytes())[0] clock = decode(bits[offs-1:offs+1*16-1].tobytes())[0] if clock != 0xc7: continue if mark == IBM_FM.IDAM: s, e = offs, offs+7*16 b = decode(bits[s:e].tobytes()) c,h,r,n = struct.unpack(">x4B2x", b) crc = crc16.new(b).crcValue if idam is not None: areas.append(idam) idam = IDAM(s, e, crc, c=c, h=h, r=r, n=n) elif mark == IBM_FM.DAM or mark == IBM_FM.DDAM: if idam is None or idam.end - offs > 1000: areas.append(DAM(offs, offs+4*16, 0xffff, mark=mark)) else: sz = 128 << idam.n s, e = offs, offs+(1+sz+2)*16 b = decode(bits[s:e].tobytes()) crc = crc16.new(b).crcValue dam = DAM(s, e, crc, mark=mark, data=b[1:-2]) areas.append(Sector(idam, dam)) idam = None else: pass #print("Unknown mark %02x" % mark) if idam is not None: areas.append(idam) # Convert to offsets within track areas.sort(key=lambda x:x.start) index = iter(raw.revolutions) p, n = 0, next(index) for a in areas: if a.start >= n: p = n try: n = next(index) except StopIteration: n = float('inf') a.delta(p) areas.sort(key=lambda x:x.start) # Add to the deduped lists for a in areas: match = False if isinstance(a, IAM): list = self.iams elif isinstance(a, Sector): list = self.sectors else: continue for s in list: if abs(s.start - a.start) < 1000: match = True break if match and isinstance(a, Sector) and s.crc != 0 and a.crc == 0: self.sectors = [x for x in self.sectors if x != a] match = False if not match: list.append(a) def raw_track(self): areas = heapq.merge(self.iams, self.sectors, key=lambda x:x.start) t = bytes() for a in areas: start = a.start//16 - self.gap_presync gap = max(start - len(t)//2, 0) t += encode(bytes([self.gapbyte] * gap)) t += encode(bytes(self.gap_presync)) if isinstance(a, IAM): t += iam_sync_bytes elif isinstance(a, Sector): idam = bytes([self.IDAM, a.idam.c, a.idam.h, a.idam.r, a.idam.n]) idam += struct.pack('>H', crc16.new(idam).crcValue) t += sync(idam[0]) + encode(idam[1:]) start = a.dam.start//16 - self.gap_presync gap = max(start - len(t)//2, 0) t += encode(bytes([self.gapbyte] * gap)) t += encode(bytes(self.gap_presync)) dam = bytes([a.dam.mark]) + a.dam.data dam += struct.pack('>H', crc16.new(dam).crcValue) t += sync(dam[0]) + encode(dam[1:]) # Add the pre-index gap. tlen = int((self.time_per_rev / self.clock) // 16) gap = max(tlen - len(t)//2, 0) t += encode(bytes([self.gapbyte] * gap)) track = MasterTrack( bits = t, time_per_rev = self.time_per_rev) track.verify = self track.verify_revs = default_revs return track class IBM_FM_Formatted(IBM_FM): gap_4a = 40 # Post-Index gap_1 = 26 # Post-IAM gap_2 = 11 # Post-IDAM def __init__(self, cyl, head): super().__init__(cyl, head) self.raw_iams, self.raw_sectors = [], [] def decode_raw(self, track): iams, sectors = self.iams, self.sectors self.iams, self.sectors = self.raw_iams, self.raw_sectors super().decode_raw(track) self.iams, self.sectors = iams, sectors for r in self.raw_sectors: if r.idam.crc != 0: continue for s in self.sectors: if (s.idam.c == r.idam.c and s.idam.h == r.idam.h and s.idam.r == r.idam.r and s.idam.n == r.idam.n): s.idam.crc = 0 if r.dam.crc == 0 and s.dam.crc != 0: s.dam.crc = s.crc = 0 s.dam.data = r.dam.data def set_img_track(self, tdat): pos = 0 self.sectors.sort(key = lambda x: x.idam.r) totsize = functools.reduce(lambda x, y: x + (128<> (7-i)) & 1 encode_list.append(y) def encode(dat): out = bytearray() for x in dat: out += struct.pack('>H', encode_list[x]) return bytes(out) decode = mfm.decode # Local variables: # python-indent: 4 # End: