123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
- # greaseweazle/image/scp.py
- #
- # Written & released by Keir Fraser <keir.xen@gmail.com>
- #
- # This is free and unencumbered software released into the public domain.
- # See the file COPYING for more details, or visit <http://unlicense.org>.
- import struct, functools
- from greaseweazle import error
- from greaseweazle.flux import Flux
- from .image import Image
- # Names for disktype byte in SCP file header
- DiskType = {
- 'amiga': 0x04,
- 'c64': 0x00
- }
- class SCPOpts:
- """legacy_ss: Set to True to generate (incorrect) legacy single-sided
- SCP image.
- """
- def __init__(self):
- self.legacy_ss = False
- self._disktype = 0x80 # Other
- @property
- def disktype(self):
- return self._disktype
- @disktype.setter
- def disktype(self, disktype):
- try:
- self._disktype = DiskType[disktype.lower()]
- except KeyError:
- try:
- self._disktype = int(disktype, 0)
- except ValueError:
- raise error.Fatal("Bad SCP disktype: '%s'" % disktype)
- class SCPTrack:
- def __init__(self, tdh, dat, splice=None):
- self.tdh = tdh
- self.dat = dat
- self.splice = splice
- class SCP(Image):
- # 40MHz
- sample_freq = 40000000
- def __init__(self):
- self.opts = SCPOpts()
- self.nr_revs = None
- self.to_track = dict()
- self.index_cued = True
-
- def side_count(self):
- s = [0,0] # non-empty tracks on each side
- for tnr in self.to_track:
- s[tnr&1] += 1
- return s
- @classmethod
- def from_file(cls, name):
- splices = None
- with open(name, "rb") as f:
- dat = f.read()
- header = struct.unpack("<3s9BI", dat[0:16])
- (sig, _, _, nr_revs, _, _, flags, _, single_sided, _, _) = header
- error.check(sig == b"SCP", "SCP: Bad signature")
- index_cued = flags & 1 or nr_revs == 1
- # Some tools generate a short TLUT. We handle this by truncating the
- # TLUT at the first Track Data Header.
- trk_offs = struct.unpack("<168I", dat[16:0x2b0])
- for i in range(168):
- try:
- off = trk_offs[i]
- except IndexError:
- break
- if off == 0 or off >= 0x2b0:
- continue
- off = off//4 - 4
- error.check(off >= 0, "SCP: Bad Track Table")
- trk_offs = trk_offs[:off]
- # Parse the extension block introduced by github:markusC64/g64conv.
- # b'EXTS', length, <length bytes: Extension Area>
- # Extension Area contains consecutive chunks of the form:
- # ID, length, <length bytes: ID-specific data>
- ext_sig, ext_len = struct.unpack('<4sI', dat[0x2b0:0x2b8])
- min_tdh = min(filter(lambda x: x != 0, trk_offs), default=0)
- if ext_sig == b'EXTS' and 0x2b8 + ext_len <= min_tdh:
- pos, end = 0x2b8, 0x2b8 + ext_len
- while end - pos >= 8:
- chk_sig, chk_len = struct.unpack('<4sI', dat[pos:pos+8])
- pos += 8
- # WRSP: WRite SPlice information block.
- # Data is comprised of >= 169 32-bit values:
- # 0: Flags (currently unused; must be zero)
- # N: Write splice/overlap position for track N, in SCP ticks
- # (zero if the track is unused)
- if chk_sig == b'WRSP' and chk_len >= 169*4:
- # Write-splice positions for writing out SCP tracks
- # correctly to disk.
- splices = struct.unpack('<168I', dat[pos+4:pos+169*4])
- pos += chk_len
- scp = cls()
- scp.nr_revs = nr_revs
- if not index_cued:
- scp.nr_revs -= 1
- for trknr in range(len(trk_offs)):
-
- trk_off = trk_offs[trknr]
- if trk_off == 0:
- continue
- # Parse the SCP track header and extract the flux data.
- thdr = dat[trk_off:trk_off+4+12*nr_revs]
- sig, tnr = struct.unpack("<3sB", thdr[:4])
- error.check(sig == b"TRK", "SCP: Missing track signature")
- error.check(tnr == trknr, "SCP: Wrong track number in header")
- thdr = thdr[4:] # Remove TRK header
- if not index_cued: # Remove first partial revolution
- thdr = thdr[12:]
- s_off, = struct.unpack("<I", thdr[8:12])
- _, e_nr, e_off = struct.unpack("<3I", thdr[-12:])
- e_off += e_nr*2
- if s_off == e_off:
- # FluxEngine creates dummy TDHs for empty tracks.
- # Bail on them here.
- continue
- tdat = dat[trk_off+s_off:trk_off+e_off]
- track = SCPTrack(thdr, tdat)
- if splices is not None:
- track.splice = splices[trknr]
- scp.to_track[trknr] = track
- # Some tools produce (or used to produce) single-sided images using
- # consecutive entries in the TLUT. This needs fixing up.
- s = scp.side_count()
- if single_sided and s[0] and s[1]:
- new_dict = dict()
- for tnr in scp.to_track:
- new_dict[tnr*2+single_sided-1] = scp.to_track[tnr]
- scp.to_track = new_dict
- print('SCP: Imported legacy single-sided image')
-
- return scp
- def get_track(self, cyl, side):
- tracknr = cyl * 2 + side
- if not tracknr in self.to_track:
- return None
- track = self.to_track[tracknr]
- tdh, dat = track.tdh, track.dat
- index_list = []
- while tdh:
- ticks, _, _ = struct.unpack("<3I", tdh[:12])
- index_list.append(ticks)
- tdh = tdh[12:]
-
- # Decode the SCP flux data into a simple list of flux times.
- flux_list = []
- val = 0
- for i in range(0, len(dat), 2):
- x = dat[i]*256 + dat[i+1]
- if x == 0:
- val += 65536
- continue
- flux_list.append(val + x)
- val = 0
- flux = Flux(index_list, flux_list, SCP.sample_freq)
- flux.splice = track.splice if track.splice is not None else 0
- return flux
- def emit_track(self, cyl, side, track):
- """Converts @track into a Supercard Pro Track and appends it to
- the current image-in-progress.
- """
- flux = track.flux()
- # External tools and emulators generally seem to work best (or only)
- # with index-cued SCP image files. So let's make sure we give them
- # what they want.
- flux.cue_at_index()
- if not flux.index_cued:
- self.index_cued = False
- nr_revs = len(flux.index_list)
- if not self.nr_revs:
- self.nr_revs = nr_revs
- else:
- assert self.nr_revs == nr_revs
-
- factor = SCP.sample_freq / flux.sample_freq
- tdh, dat = bytearray(), bytearray()
- len_at_index = rev = 0
- to_index = flux.index_list[0]
- rem = 0.0
- for x in flux.list:
- # Does the next flux interval cross the index mark?
- while to_index < x:
- # Append to the TDH for the previous full revolution
- tdh += struct.pack("<III",
- round(flux.index_list[rev]*factor),
- (len(dat) - len_at_index) // 2,
- 4 + nr_revs*12 + len_at_index)
- # Set up for the next revolution
- len_at_index = len(dat)
- rev += 1
- if rev >= nr_revs:
- # We're done: We simply discard any surplus flux samples
- self.to_track[cyl*2+side] = SCPTrack(tdh, dat)
- return
- to_index += flux.index_list[rev]
- # Process the current flux sample into SCP "bitcell" format
- to_index -= x
- y = x * factor + rem
- val = round(y)
- if (val & 65535) == 0:
- val += 1
- rem = y - val
- while val >= 65536:
- dat.append(0)
- dat.append(0)
- val -= 65536
- dat.append(val>>8)
- dat.append(val&255)
- # Header for last track(s) in case we ran out of flux timings.
- while rev < nr_revs:
- tdh += struct.pack("<III",
- round(flux.index_list[rev]*factor),
- (len(dat) - len_at_index) // 2,
- 4 + nr_revs*12 + len_at_index)
- len_at_index = len(dat)
- rev += 1
- self.to_track[cyl*2+side] = SCPTrack(tdh, dat)
- def get_image(self):
- # Work out the single-sided byte code
- s = self.side_count()
- if s[0] and s[1]:
- single_sided = 0
- elif s[0]:
- single_sided = 1
- else:
- single_sided = 2
- to_track = self.to_track
- if single_sided and self.opts.legacy_ss:
- print('SCP: Generated legacy single-sided image')
- to_track = dict()
- for tnr in self.to_track:
- to_track[tnr//2] = self.to_track[tnr]
- ntracks = max(to_track, default=0) + 1
- # Generate the TLUT and concatenate all the tracks together.
- trk_offs = bytearray()
- trk_dat = bytearray()
- for tnr in range(ntracks):
- if tnr in to_track:
- track = to_track[tnr]
- trk_offs += struct.pack("<I", 0x2b0 + len(trk_dat))
- trk_dat += struct.pack("<3sB", b"TRK", tnr)
- trk_dat += track.tdh + track.dat
- else:
- trk_offs += struct.pack("<I", 0)
- error.check(len(trk_offs) <= 0x2a0, "SCP: Too many tracks")
- trk_offs += bytes(0x2a0 - len(trk_offs))
- # Calculate checksum over all data (except 16-byte image header).
- csum = 0
- for x in trk_offs:
- csum += x
- for x in trk_dat:
- csum += x
- # Generate the image header.
- flags = 2 # 96TPI
- if self.index_cued:
- flags |= 1 # Index-Cued
- header = struct.pack("<3s9BI",
- b"SCP", # Signature
- 0, # Version
- self.opts.disktype,
- self.nr_revs, 0, ntracks-1,
- flags,
- 0, # 16-bit cell width
- single_sided,
- 0, # 25ns capture
- csum & 0xffffffff)
- # Concatenate it all together and send it back.
- return header + trk_offs + trk_dat
- # Local variables:
- # python-indent: 4
- # End:
|