scp.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. # greaseweazle/image/scp.py
  2. #
  3. # Written & released by Keir Fraser <keir.xen@gmail.com>
  4. #
  5. # This is free and unencumbered software released into the public domain.
  6. # See the file COPYING for more details, or visit <http://unlicense.org>.
  7. import struct, functools
  8. from greaseweazle import error
  9. from greaseweazle.flux import Flux
  10. from .image import Image
  11. # Names for disktype byte in SCP file header
  12. DiskType = {
  13. 'amiga': 0x04,
  14. 'c64': 0x00,
  15. 'atari800-sd': 0x10,
  16. 'atari800-dd': 0x11,
  17. 'atari800-ed': 0x12,
  18. 'atarist-ss': 0x14,
  19. 'atarist-ds': 0x15,
  20. 'appleII': 0x20,
  21. 'appleIIpro': 0x21,
  22. 'apple-400k': 0x24,
  23. 'apple-800k': 0x25,
  24. 'apple-1m44': 0x26,
  25. 'ibmpc-320k': 0x30,
  26. 'ibmpc-720k': 0x31,
  27. 'ibmpc-1m2': 0x32,
  28. 'ibmpc-1m44': 0x33,
  29. 'trs80_sssd': 0x40,
  30. 'trs80_ssdd': 0x41,
  31. 'trs80_dssd': 0x42,
  32. 'trs80_dsdd': 0x43,
  33. 'ti-99/4a': 0x50,
  34. 'roland-d20': 0x60,
  35. 'amstrad-cpc': 0x70,
  36. 'other-320k': 0x80,
  37. 'other-1m2': 0x81,
  38. 'other-720k': 0x84,
  39. 'other-1m44': 0x85,
  40. 'tape-gcr1': 0xe0,
  41. 'tape-gcr2': 0xe1,
  42. 'tape-mfm': 0xe2,
  43. 'hdd-mfm': 0xf0,
  44. 'hdd-rll': 0xf1
  45. }
  46. class SCPOpts:
  47. """legacy_ss: Set to True to generate (incorrect) legacy single-sided
  48. SCP image.
  49. """
  50. def __init__(self):
  51. self.legacy_ss = False
  52. self._disktype = 0x80 # Other
  53. @property
  54. def disktype(self):
  55. return self._disktype
  56. @disktype.setter
  57. def disktype(self, disktype):
  58. try:
  59. self._disktype = DiskType[disktype.lower()]
  60. except KeyError:
  61. try:
  62. self._disktype = int(disktype, 0)
  63. except ValueError:
  64. raise error.Fatal("Bad SCP disktype: '%s'" % disktype)
  65. class SCPTrack:
  66. def __init__(self, tdh, dat, splice=None):
  67. self.tdh = tdh
  68. self.dat = dat
  69. self.splice = splice
  70. class SCP(Image):
  71. # 40MHz
  72. sample_freq = 40000000
  73. def __init__(self):
  74. self.opts = SCPOpts()
  75. self.nr_revs = None
  76. self.to_track = dict()
  77. self.index_cued = True
  78. def side_count(self):
  79. s = [0,0] # non-empty tracks on each side
  80. for tnr in self.to_track:
  81. s[tnr&1] += 1
  82. return s
  83. @classmethod
  84. def from_file(cls, name):
  85. splices = None
  86. with open(name, "rb") as f:
  87. dat = f.read()
  88. header = struct.unpack("<3s9BI", dat[0:16])
  89. sig, _, disk_type, nr_revs, _, _, flags, _, single_sided, _, _ = header
  90. error.check(sig == b"SCP", "SCP: Bad signature")
  91. index_cued = flags & 1 or nr_revs == 1
  92. # Some tools generate a short TLUT. We handle this by truncating the
  93. # TLUT at the first Track Data Header.
  94. trk_offs = struct.unpack("<168I", dat[16:0x2b0])
  95. for i in range(168):
  96. try:
  97. off = trk_offs[i]
  98. except IndexError:
  99. break
  100. if off == 0 or off >= 0x2b0:
  101. continue
  102. off = off//4 - 4
  103. error.check(off >= 0, "SCP: Bad Track Table")
  104. trk_offs = trk_offs[:off]
  105. # Parse the extension block introduced by github:markusC64/g64conv.
  106. # b'EXTS', length, <length bytes: Extension Area>
  107. # Extension Area contains consecutive chunks of the form:
  108. # ID, length, <length bytes: ID-specific data>
  109. ext_sig, ext_len = struct.unpack('<4sI', dat[0x2b0:0x2b8])
  110. min_tdh = min(filter(lambda x: x != 0, trk_offs), default=0)
  111. if ext_sig == b'EXTS' and 0x2b8 + ext_len <= min_tdh:
  112. pos, end = 0x2b8, 0x2b8 + ext_len
  113. while end - pos >= 8:
  114. chk_sig, chk_len = struct.unpack('<4sI', dat[pos:pos+8])
  115. pos += 8
  116. # WRSP: WRite SPlice information block.
  117. # Data is comprised of >= 169 32-bit values:
  118. # 0: Flags (currently unused; must be zero)
  119. # N: Write splice/overlap position for track N, in SCP ticks
  120. # (zero if the track is unused)
  121. if chk_sig == b'WRSP' and chk_len >= 169*4:
  122. # Write-splice positions for writing out SCP tracks
  123. # correctly to disk.
  124. splices = struct.unpack('<168I', dat[pos+4:pos+169*4])
  125. pos += chk_len
  126. scp = cls()
  127. scp.nr_revs = nr_revs
  128. if not index_cued:
  129. scp.nr_revs -= 1
  130. for trknr in range(len(trk_offs)):
  131. trk_off = trk_offs[trknr]
  132. if trk_off == 0:
  133. continue
  134. # Parse the SCP track header and extract the flux data.
  135. thdr = dat[trk_off:trk_off+4+12*nr_revs]
  136. sig, tnr = struct.unpack("<3sB", thdr[:4])
  137. error.check(sig == b"TRK", "SCP: Missing track signature")
  138. error.check(tnr == trknr, "SCP: Wrong track number in header")
  139. thdr = thdr[4:] # Remove TRK header
  140. if not index_cued: # Remove first partial revolution
  141. thdr = thdr[12:]
  142. s_off, = struct.unpack("<I", thdr[8:12])
  143. _, e_nr, e_off = struct.unpack("<3I", thdr[-12:])
  144. e_off += e_nr*2
  145. if s_off == e_off:
  146. # FluxEngine creates dummy TDHs for empty tracks.
  147. # Bail on them here.
  148. continue
  149. tdat = dat[trk_off+s_off:trk_off+e_off]
  150. track = SCPTrack(thdr, tdat)
  151. if splices is not None:
  152. track.splice = splices[trknr]
  153. scp.to_track[trknr] = track
  154. s = scp.side_count()
  155. # C64 images with halftracks are genberated by Supercard Pro using
  156. # consecutive track numbers. That needs fixup here for our layout.
  157. # We re-use the legacy-single-sided fixup below.
  158. if (single_sided == 0 and disk_type == 0
  159. and s[1] and s[0]==s[1]+1 and s[0] < 42):
  160. single_sided = 1
  161. print('SCP: Importing C64 image with halftracks')
  162. # Some tools produce (or used to produce) single-sided images using
  163. # consecutive entries in the TLUT. This needs fixing up.
  164. if single_sided and s[0] and s[1]:
  165. new_dict = dict()
  166. for tnr in scp.to_track:
  167. new_dict[tnr*2+single_sided-1] = scp.to_track[tnr]
  168. scp.to_track = new_dict
  169. print('SCP: Imported legacy single-sided image')
  170. return scp
  171. def get_track(self, cyl, side):
  172. tracknr = cyl * 2 + side
  173. if not tracknr in self.to_track:
  174. return None
  175. track = self.to_track[tracknr]
  176. tdh, dat = track.tdh, track.dat
  177. index_list = []
  178. while tdh:
  179. ticks, _, _ = struct.unpack("<3I", tdh[:12])
  180. index_list.append(ticks)
  181. tdh = tdh[12:]
  182. # Decode the SCP flux data into a simple list of flux times.
  183. flux_list = []
  184. val = 0
  185. for i in range(0, len(dat), 2):
  186. x = dat[i]*256 + dat[i+1]
  187. if x == 0:
  188. val += 65536
  189. continue
  190. flux_list.append(val + x)
  191. val = 0
  192. flux = Flux(index_list, flux_list, SCP.sample_freq)
  193. flux.splice = track.splice if track.splice is not None else 0
  194. return flux
  195. def emit_track(self, cyl, side, track):
  196. """Converts @track into a Supercard Pro Track and appends it to
  197. the current image-in-progress.
  198. """
  199. flux = track.flux()
  200. # External tools and emulators generally seem to work best (or only)
  201. # with index-cued SCP image files. So let's make sure we give them
  202. # what they want.
  203. flux.cue_at_index()
  204. if not flux.index_cued:
  205. self.index_cued = False
  206. nr_revs = len(flux.index_list)
  207. if not self.nr_revs:
  208. self.nr_revs = nr_revs
  209. else:
  210. assert self.nr_revs == nr_revs
  211. factor = SCP.sample_freq / flux.sample_freq
  212. tdh, dat = bytearray(), bytearray()
  213. len_at_index = rev = 0
  214. to_index = flux.index_list[0]
  215. rem = 0.0
  216. for x in flux.list:
  217. # Does the next flux interval cross the index mark?
  218. while to_index < x:
  219. # Append to the TDH for the previous full revolution
  220. tdh += struct.pack("<III",
  221. round(flux.index_list[rev]*factor),
  222. (len(dat) - len_at_index) // 2,
  223. 4 + nr_revs*12 + len_at_index)
  224. # Set up for the next revolution
  225. len_at_index = len(dat)
  226. rev += 1
  227. if rev >= nr_revs:
  228. # We're done: We simply discard any surplus flux samples
  229. self.to_track[cyl*2+side] = SCPTrack(tdh, dat)
  230. return
  231. to_index += flux.index_list[rev]
  232. # Process the current flux sample into SCP "bitcell" format
  233. to_index -= x
  234. y = x * factor + rem
  235. val = round(y)
  236. if (val & 65535) == 0:
  237. val += 1
  238. rem = y - val
  239. while val >= 65536:
  240. dat.append(0)
  241. dat.append(0)
  242. val -= 65536
  243. dat.append(val>>8)
  244. dat.append(val&255)
  245. # Header for last track(s) in case we ran out of flux timings.
  246. while rev < nr_revs:
  247. tdh += struct.pack("<III",
  248. round(flux.index_list[rev]*factor),
  249. (len(dat) - len_at_index) // 2,
  250. 4 + nr_revs*12 + len_at_index)
  251. len_at_index = len(dat)
  252. rev += 1
  253. self.to_track[cyl*2+side] = SCPTrack(tdh, dat)
  254. def get_image(self):
  255. # Work out the single-sided byte code
  256. s = self.side_count()
  257. if s[0] and s[1]:
  258. single_sided = 0
  259. elif s[0]:
  260. single_sided = 1
  261. else:
  262. single_sided = 2
  263. to_track = self.to_track
  264. if single_sided and self.opts.legacy_ss:
  265. print('SCP: Generated legacy single-sided image')
  266. to_track = dict()
  267. for tnr in self.to_track:
  268. to_track[tnr//2] = self.to_track[tnr]
  269. ntracks = max(to_track, default=0) + 1
  270. # Generate the TLUT and concatenate all the tracks together.
  271. trk_offs = bytearray()
  272. trk_dat = bytearray()
  273. for tnr in range(ntracks):
  274. if tnr in to_track:
  275. track = to_track[tnr]
  276. trk_offs += struct.pack("<I", 0x2b0 + len(trk_dat))
  277. trk_dat += struct.pack("<3sB", b"TRK", tnr)
  278. trk_dat += track.tdh + track.dat
  279. else:
  280. trk_offs += struct.pack("<I", 0)
  281. error.check(len(trk_offs) <= 0x2a0, "SCP: Too many tracks")
  282. trk_offs += bytes(0x2a0 - len(trk_offs))
  283. # Calculate checksum over all data (except 16-byte image header).
  284. csum = 0
  285. for x in trk_offs:
  286. csum += x
  287. for x in trk_dat:
  288. csum += x
  289. # Generate the image header.
  290. flags = 2 # 96TPI
  291. if self.index_cued:
  292. flags |= 1 # Index-Cued
  293. header = struct.pack("<3s9BI",
  294. b"SCP", # Signature
  295. 0, # Version
  296. self.opts.disktype,
  297. self.nr_revs, 0, ntracks-1,
  298. flags,
  299. 0, # 16-bit cell width
  300. single_sided,
  301. 0, # 25ns capture
  302. csum & 0xffffffff)
  303. # Concatenate it all together and send it back.
  304. return header + trk_offs + trk_dat
  305. # Local variables:
  306. # python-indent: 4
  307. # End: