scp.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. # greaseweazle/image/scp.py
  2. #
  3. # Written & released by Keir Fraser <keir.xen@gmail.com>
  4. #
  5. # This is free and unencumbered software released into the public domain.
  6. # See the file COPYING for more details, or visit <http://unlicense.org>.
  7. import struct, functools
  8. from greaseweazle import error
  9. from greaseweazle.flux import Flux
  10. from .image import Image
  11. class SCPOpts:
  12. """legacy_ss: Set to True to generate (incorrect) legacy single-sided
  13. SCP image.
  14. """
  15. def __init__(self):
  16. self.legacy_ss = False
  17. class SCPTrack:
  18. def __init__(self, tdh, dat, splice=None):
  19. self.tdh = tdh
  20. self.dat = dat
  21. self.splice = splice
  22. class SCP(Image):
  23. # 40MHz
  24. sample_freq = 40000000
  25. def __init__(self):
  26. self.opts = SCPOpts()
  27. self.nr_revs = None
  28. self.to_track = dict()
  29. self.index_cued = True
  30. def side_count(self):
  31. s = [0,0] # non-empty tracks on each side
  32. for tnr in self.to_track:
  33. s[tnr&1] += 1
  34. return s
  35. @classmethod
  36. def from_file(cls, name):
  37. splices = None
  38. with open(name, "rb") as f:
  39. dat = f.read()
  40. header = struct.unpack("<3s9BI", dat[0:16])
  41. (sig, _, _, nr_revs, _, _, flags, _, single_sided, _, _) = header
  42. error.check(sig == b"SCP", "SCP: Bad signature")
  43. index_cued = flags & 1 or nr_revs == 1
  44. # Some tools generate a short TLUT. We handle this by truncating the
  45. # TLUT at the first Track Data Header.
  46. trk_offs = struct.unpack("<168I", dat[16:0x2b0])
  47. for i in range(168):
  48. try:
  49. off = trk_offs[i]
  50. except IndexError:
  51. break
  52. if off == 0 or off >= 0x2b0:
  53. continue
  54. off = off//4 - 4
  55. error.check(off >= 0, "SCP: Bad Track Table")
  56. trk_offs = trk_offs[:off]
  57. # Parse the extension block introduced by github:markusC64/g64conv.
  58. # b'EXTS', length, <length byte Extension Area>
  59. # Extension Area contains consecutive chunks of the form:
  60. # ID, length, <length bytes of ID-specific dat>
  61. ext_sig, ext_len = struct.unpack('<4sI', dat[0x2b0:0x2b8])
  62. min_tdh = min(filter(lambda x: x != 0, trk_offs), default=0)
  63. if ext_sig == b'EXTS' and 0x2b8 + ext_len <= min_tdh:
  64. pos, end = 0x2b8, 0x2b8 + ext_len
  65. while end - pos >= 8:
  66. chk_sig, chk_len = struct.unpack('<4sI', dat[pos:pos+8])
  67. pos += 8
  68. if chk_sig == b'WRSP' and chk_len >= 169*4:
  69. # Write-splice positions for writing out SCP tracks
  70. # correctly to disk.
  71. splices = struct.unpack('<168I', dat[pos+4:pos+169*4])
  72. pos += chk_len
  73. scp = cls()
  74. scp.nr_revs = nr_revs
  75. if not index_cued:
  76. scp.nr_revs -= 1
  77. for trknr in range(len(trk_offs)):
  78. trk_off = trk_offs[trknr]
  79. if trk_off == 0:
  80. continue
  81. # Parse the SCP track header and extract the flux data.
  82. thdr = dat[trk_off:trk_off+4+12*nr_revs]
  83. sig, tnr = struct.unpack("<3sB", thdr[:4])
  84. error.check(sig == b"TRK", "SCP: Missing track signature")
  85. error.check(tnr == trknr, "SCP: Wrong track number in header")
  86. thdr = thdr[4:] # Remove TRK header
  87. if not index_cued: # Remove first partial revolution
  88. thdr = thdr[12:]
  89. s_off, = struct.unpack("<I", thdr[8:12])
  90. _, e_nr, e_off = struct.unpack("<3I", thdr[-12:])
  91. e_off += e_nr*2
  92. if s_off == e_off:
  93. # FluxEngine creates dummy TDHs for empty tracks.
  94. # Bail on them here.
  95. continue
  96. tdat = dat[trk_off+s_off:trk_off+e_off]
  97. track = SCPTrack(thdr, tdat)
  98. if splices is not None:
  99. track.splice = splices[trknr]
  100. scp.to_track[trknr] = track
  101. # Some tools produce (or used to produce) single-sided images using
  102. # consecutive entries in the TLUT. This needs fixing up.
  103. s = scp.side_count()
  104. if single_sided and s[0] and s[1]:
  105. new_dict = dict()
  106. for tnr in scp.to_track:
  107. new_dict[tnr*2+single_sided-1] = scp.to_track[tnr]
  108. scp.to_track = new_dict
  109. print('SCP: Imported legacy single-sided image')
  110. return scp
  111. def get_track(self, cyl, side):
  112. tracknr = cyl * 2 + side
  113. if not tracknr in self.to_track:
  114. return None
  115. track = self.to_track[tracknr]
  116. tdh, dat = track.tdh, track.dat
  117. index_list = []
  118. while tdh:
  119. ticks, _, _ = struct.unpack("<3I", tdh[:12])
  120. index_list.append(ticks)
  121. tdh = tdh[12:]
  122. # Decode the SCP flux data into a simple list of flux times.
  123. flux_list = []
  124. val = 0
  125. for i in range(0, len(dat), 2):
  126. x = dat[i]*256 + dat[i+1]
  127. if x == 0:
  128. val += 65536
  129. continue
  130. flux_list.append(val + x)
  131. val = 0
  132. flux = Flux(index_list, flux_list, SCP.sample_freq)
  133. flux.splice = track.splice if track.splice is not None else 0
  134. return flux
  135. def emit_track(self, cyl, side, track):
  136. """Converts @track into a Supercard Pro Track and appends it to
  137. the current image-in-progress.
  138. """
  139. flux = track.flux()
  140. # External tools and emulators generally seem to work best (or only)
  141. # with index-cued SCP image files. So let's make sure we give them
  142. # what they want.
  143. flux.cue_at_index()
  144. if not flux.index_cued:
  145. self.index_cued = False
  146. nr_revs = len(flux.index_list)
  147. if not self.nr_revs:
  148. self.nr_revs = nr_revs
  149. else:
  150. assert self.nr_revs == nr_revs
  151. factor = SCP.sample_freq / flux.sample_freq
  152. tdh, dat = bytearray(), bytearray()
  153. len_at_index = rev = 0
  154. to_index = flux.index_list[0]
  155. rem = 0.0
  156. for x in flux.list:
  157. # Does the next flux interval cross the index mark?
  158. while to_index < x:
  159. # Append to the TDH for the previous full revolution
  160. tdh += struct.pack("<III",
  161. round(flux.index_list[rev]*factor),
  162. (len(dat) - len_at_index) // 2,
  163. 4 + nr_revs*12 + len_at_index)
  164. # Set up for the next revolution
  165. len_at_index = len(dat)
  166. rev += 1
  167. if rev >= nr_revs:
  168. # We're done: We simply discard any surplus flux samples
  169. self.to_track[cyl*2+side] = SCPTrack(tdh, dat)
  170. return
  171. to_index += flux.index_list[rev]
  172. # Process the current flux sample into SCP "bitcell" format
  173. to_index -= x
  174. y = x * factor + rem
  175. val = round(y)
  176. if (val & 65535) == 0:
  177. val += 1
  178. rem = y - val
  179. while val >= 65536:
  180. dat.append(0)
  181. dat.append(0)
  182. val -= 65536
  183. dat.append(val>>8)
  184. dat.append(val&255)
  185. # Header for last track(s) in case we ran out of flux timings.
  186. while rev < nr_revs:
  187. tdh += struct.pack("<III",
  188. round(flux.index_list[rev]*factor),
  189. (len(dat) - len_at_index) // 2,
  190. 4 + nr_revs*12 + len_at_index)
  191. len_at_index = len(dat)
  192. rev += 1
  193. self.to_track[cyl*2+side] = SCPTrack(tdh, dat)
  194. def get_image(self):
  195. # Work out the single-sided byte code
  196. s = self.side_count()
  197. if s[0] and s[1]:
  198. single_sided = 0
  199. elif s[0]:
  200. single_sided = 1
  201. else:
  202. single_sided = 2
  203. to_track = self.to_track
  204. if single_sided and self.opts.legacy_ss:
  205. print('SCP: Generated legacy single-sided image')
  206. to_track = dict()
  207. for tnr in self.to_track:
  208. to_track[tnr//2] = self.to_track[tnr]
  209. ntracks = max(to_track, default=0) + 1
  210. # Generate the TLUT and concatenate all the tracks together.
  211. trk_offs = bytearray()
  212. trk_dat = bytearray()
  213. for tnr in range(ntracks):
  214. if tnr in to_track:
  215. track = to_track[tnr]
  216. trk_offs += struct.pack("<I", 0x2b0 + len(trk_dat))
  217. trk_dat += struct.pack("<3sB", b"TRK", tnr)
  218. trk_dat += track.tdh + track.dat
  219. else:
  220. trk_offs += struct.pack("<I", 0)
  221. error.check(len(trk_offs) <= 0x2a0, "SCP: Too many tracks")
  222. trk_offs += bytes(0x2a0 - len(trk_offs))
  223. # Calculate checksum over all data (except 16-byte image header).
  224. csum = 0
  225. for x in trk_offs:
  226. csum += x
  227. for x in trk_dat:
  228. csum += x
  229. # Generate the image header.
  230. flags = 2 # 96TPI
  231. if self.index_cued:
  232. flags |= 1 # Index-Cued
  233. header = struct.pack("<3s9BI",
  234. b"SCP", # Signature
  235. 0, # Version
  236. 0x80, # DiskType = Other
  237. self.nr_revs, 0, ntracks-1,
  238. flags,
  239. 0, # 16-bit cell width
  240. single_sided,
  241. 0, # 25ns capture
  242. csum & 0xffffffff)
  243. # Concatenate it all together and send it back.
  244. return header + trk_offs + trk_dat
  245. # Local variables:
  246. # python-indent: 4
  247. # End: