123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- # greaseweazle/image/edsk.py
- #
- # Some of the code here is heavily inspired by Simon Owen's SAMdisk:
- # https://simonowen.com/samdisk/
- #
- # Written & released by Keir Fraser <keir.xen@gmail.com>
- #
- # This is free and unencumbered software released into the public domain.
- # See the file COPYING for more details, or visit <http://unlicense.org>.
- import binascii, math, struct
- import itertools as it
- from bitarray import bitarray
- from greaseweazle import error
- from greaseweazle.codec.ibm import mfm
- from greaseweazle.track import MasterTrack, RawTrack
- from .image import Image
- class SR1:
- SUCCESS = 0x00
- RESERVED1 = 0x08
- OVERRUN = 0x10
- CRC_ERROR = 0x20
- RESERVED2 = 0x40
- class SR2:
- SUCCESS = 0x00
- RESERVED = 0x80
- class SectorErrors:
- def __init__(self, sr1, sr2):
- self.id_crc_error = (sr1 & SR1.CRC_ERROR) != 0
- self.data_not_found = (sr2 & SR2.MISSING_ADDRESS_MARK) != 0
- self.data_crc_error = (sr2 & SR2.CRC_ERROR_IN_SECTOR_DATA) != 0
- self.deleted_dam = (sr2 & SR2.SECTOR_WITH_DELETED_DATA) != 0
- if self.data_crc_error:
- # uPD765 sets both id and data flags for data CRC errors
- self.id_crc_error = False
- if (# normal data
- (sr1 == SR1.SUCCESS and sr2 == SR2.SUCCESS) or
- # deleted data
- (sr1 == SR1.SUCCESS and sr2 == SR2.SECTOR_WITH_DELETED_DATA) or
- # end of track
- (sr1 == SR1.END_OF_CYLINDER and sr2 == SR2.SUCCESS) or
- # id crc error
- (sr1 == SR1.CRC_ERROR and sr2 == SR2.SUCCESS) or
- # normal data crc error
- (sr1 == SR1.CRC_ERROR and sr2 == SR2.CRC_ERROR_IN_SECTOR_DATA) or
- # deleted data crc error
- (sr1 == SR1.CRC_ERROR and sr2 == (SR2.CRC_ERROR_IN_SECTOR_DATA |
- # data field missing (some FDCs set AM in ST1)
- and sr2 == SR2.MISSING_ADDRESS_MARK) or
- # data field missing (some FDCs don't)
- (sr1 == SR1.SUCCESS and sr2 == SR2.MISSING_ADDRESS_MARK) or
- # CHRN mismatch
- (sr1 == SR1.CANNOT_FIND_SECTOR_ID and sr2 == SR2.SUCCESS) or
- # CHRN mismatch, including wrong cylinder
- pass
- else:
- print('Unusual status flags (ST1=%02X ST2=%02X)' % (sr1, sr2))
- class EDSKTrack:
- gap_presync = 12
- gap_4a = 80 # Post-Index
- gap_1 = 50 # Post-IAM
- gap_2 = 22 # Post-IDAM
- gapbyte = 0x4e
- def __init__(self):
- self.time_per_rev = 0.2
- self.clock = 2e-6
- self.bits, self.weak = [], []
- def raw_track(self):
- track = MasterTrack(
- bits = self.bits,
- time_per_rev = self.time_per_rev,
- weak = self.weak)
- track.verify = self
- track.verify_revs = 1
- return track
- def _find_sync(self, bits, sync, start):
- for offs in bits.itersearch(sync):
- if offs >= start:
- return offs
- return None
- def verify_track(self, flux):
- flux.cue_at_index()
- raw = RawTrack(clock = self.clock, data = flux)
- bits, _ = raw.get_all_data()
- weak_iter = it.chain(self.weak, [(self.verify_len+1,1)])
- weak = next(weak_iter)
- # Start checking from the IAM sync
- dump_start = self._find_sync(bits, mfm.iam_sync, 0)
- self_start = self._find_sync(self.bits, mfm.iam_sync, 0)
- # Include the IAM pre-sync header
- if dump_start is None:
- return False
- dump_start -= self.gap_presync * 16
- self_start -= self.gap_presync * 16
- while self_start is not None and dump_start is not None:
- # Find the weak areas immediately before and after the current
- # region to be checked.
- s,n = None,None
- while self_start > weak[0]:
- s,n = weak
- weak = next(weak_iter)
- # If there is a weak area preceding us, move the start point to
- # immediately follow the weak area.
- if s is not None:
- delta = self_start - (s + n + 16)
- self_start -= delta
- dump_start -= delta
- # Truncate the region at the next weak area, or the last sector.
- self_end = max(self_start, min(weak[0], self.verify_len+1))
- dump_end = dump_start + self_end - self_start
- # Extract the corresponding areas from the pristine track and
- # from the dump, and check that they match.
- if bits[dump_start:dump_end] != self.bits[self_start:self_end]:
- return False
- # Find the next A1A1A1 sync pattern
- dump_start = self._find_sync(bits, mfm.sync, dump_end)
- self_start = self._find_sync(self.bits, mfm.sync, self_end)
- # Did we verify all regions in the pristine track?
- return self_start is None
- class EDSK(Image):
- read_only = True
- default_format = 'ibm.mfm'
- def __init__(self):
- self.to_track = dict()
- # Currently only finds one weak range.
- @staticmethod
- def find_weak_ranges(dat, size):
- orig = dat[:size]
- s, e = size, 0
- for i in range(1, len(dat)//size):
- diff = [x^y for x, y in zip(orig, dat[size*i:size*(i+1)])]
- weak = [idx for idx, val in enumerate(diff) if val != 0]
- if weak:
- s, e = min(s, weak[0]), max(e, weak[-1])
- return [(s,e-s+1)] if s <= e else []
- @classmethod
- def from_file(cls, name):
- with open(name, "rb") as f:
- dat = f.read()
- edsk = cls()
- sig, creator, ncyls, nsides, track_sz = struct.unpack(
- '<34s14s2BH', dat[:52])
- if sig[:8] == b'MV - CPC':
- extended = False
- elif sig[:16] == b'EXTENDED CPC DSK':
- extended = True
- else:
- raise error.Fatal('Unrecognised CPC DSK file: bad signature')
- if extended:
- track_sizes = list(dat[52:52+ncyls*nsides])
- track_sizes = list(map(lambda x: x*256, track_sizes))
- else:
- track_sizes = [track_sz] * (ncyls * nsides)
- o = 256 # skip disk header and track-size table
- for track_size in track_sizes:
- if track_size == 0:
- continue
- sig, cyl, head, sec_sz, nsecs, gap_3, filler = struct.unpack(
- '<12s4x2B2x4B', dat[o:o+24])
- error.check(sig == b'Track-Info\r\n',
- 'EDSK: Missing track header')
- error.check((cyl, head) not in edsk.to_track,
- 'EDSK: Track specified twice')
- while True:
- track = EDSKTrack()
- t = bytes()
- # Post-index gap
- t += mfm.encode(bytes([track.gapbyte] * track.gap_4a))
- # IAM
- t += mfm.encode(bytes(track.gap_presync))
- t += mfm.iam_sync_bytes
- t += mfm.encode(bytes([mfm.IBM_MFM.IAM]))
- t += mfm.encode(bytes([track.gapbyte] * track.gap_1))
- secs = dat[o+24:o+24+8*nsecs]
- data_pos = o + 256 # skip track header and sector-info table
- while secs:
- c, h, r, n, stat1, stat2, data_size = struct.unpack(
- '<6BH', secs[:8])
- secs = secs[8:]
- native_size = mfm.sec_sz(n)
- weak = []
- errs = SectorErrors(stat1, stat2)
- num_copies = 0 if errs.data_not_found else 1
- if not extended:
- data_size = mfm.sec_sz(sec_sz)
- sec_data = dat[data_pos:data_pos+data_size]
- data_pos += data_size
- if (extended
- and data_size > native_size
- and errs.data_crc_error
- and (data_size % native_size == 0
- or data_size == 49152)):
- num_copies = (3 if data_size == 49152
- else data_size // native_size)
- data_size //= num_copies
- weak = cls().find_weak_ranges(sec_data, data_size)
- sec_data = sec_data[:data_size]
- if data_size < native_size:
- # Pad short data
- sec_data += bytes(native_size - data_size)
- # IDAM
- t += mfm.encode(bytes(track.gap_presync))
- t += mfm.sync_bytes
- am = bytes([0xa1, 0xa1, 0xa1, mfm.IBM_MFM.IDAM,
- c, h, r, n])
- crc = mfm.crc16.new(am).crcValue
- if errs.id_crc_error:
- crc ^= 0x5555
- am += struct.pack('>H', crc)
- t += mfm.encode(am[3:])
- t += mfm.encode(bytes([track.gapbyte] * track.gap_2))
- # DAM
- if errs.id_crc_error or errs.data_not_found:
- continue
- t += mfm.encode(bytes(track.gap_presync))
- t += mfm.sync_bytes
- track.weak += [((s+len(t)//2+4)*16, n*16) for s,n in weak]
- dmark = (mfm.IBM_MFM.DDAM if errs.deleted_dam
- else mfm.IBM_MFM.DAM)
- am = bytes([0xa1, 0xa1, 0xa1, dmark]) + sec_data
- if data_size > native_size:
- # Long data includes CRC and GAP
- if (sec_data[-13] != 0
- and all([v==0 for v in sec_data[-12:]])):
- # Includes next pre-sync: Clip it.
- am = am[:-12]
- t += mfm.encode(am[3:])
- continue
- crc = mfm.crc16.new(am).crcValue
- if errs.data_crc_error:
- crc ^= 0x5555
- am += struct.pack('>H', crc)
- t += mfm.encode(am[3:])
- t += mfm.encode(bytes([track.gapbyte] * gap_3))
- # Some EDSK images have bogus GAP3 values. If the track is too
- # long to comfortably fit in 300rpm at double density, shrink
- # GAP3 as far as necessary.
- tracklen = int((track.time_per_rev / track.clock) / 16)
- overhang = int(len(t)//2 - tracklen*0.99)
- if overhang <= 0:
- break
- new_gap_3 = gap_3 - math.ceil(overhang / nsecs)
- error.check(new_gap_3 >= 0,
- 'EDSK: Track %d.%d is too long '
- '(%d bits @ GAP3=%d; %d bits @ GAP3=0)'
- % (cyl, head, len(t)*8, gap_3,
- (len(t)//2-gap_3*nsecs)*16))
- #print('EDSK: GAP3 reduced (%d -> %d)' % (gap_3, new_gap_3))
- gap_3 = new_gap_3
- # Pre-index gap
- track.verify_len = len(t)*8
- gap = tracklen - len(t)//2
- t += mfm.encode(bytes([track.gapbyte] * gap))
- track.bits = bitarray(endian='big')
- track.bits.frombytes(mfm.mfm_encode(t))
- edsk.to_track[cyl,head] = track
- o += track_size
- return edsk
- def get_track(self, cyl, side):
- if (cyl,side) not in self.to_track:
- return None
- return self.to_track[cyl,side].raw_track()
- # Local variables:
- # python-indent: 4
- # End: