scp.py 8.5 KB


  1. # greaseweazle/image/scp.py
  2. #
  3. # Written & released by Keir Fraser <keir.xen@gmail.com>
  4. #
  5. # This is free and unencumbered software released into the public domain.
  6. # See the file COPYING for more details, or visit <http://unlicense.org>.
  7. import struct, functools
  8. from greaseweazle import error
  9. from greaseweazle.flux import Flux
  10. class SCPOpts:
  11. """legacy_ss: Set to True to generate (incorrect) legacy single-sided
  12. SCP image.
  13. """
  14. def __init__(self):
  15. self.legacy_ss = False
  16. class SCP:
  17. # 40MHz
  18. sample_freq = 40000000
  19. def __init__(self, start_cyl, nr_sides):
  20. self.opts = SCPOpts()
  21. self.nr_sides = nr_sides
  22. self.nr_revs = None
  23. self.track_list = [(None,None)] * (start_cyl*2)
  24. @classmethod
  25. def to_file(cls, start_cyl, nr_sides):
  26. scp = cls(start_cyl, nr_sides)
  27. return scp
  28. @classmethod
  29. def from_file(cls, dat):
  30. header = struct.unpack("<3s9BI", dat[0:16])
  31. (sig, _, _, nr_revs, _, _, flags, _, single_sided, _, _) = header
  32. error.check(sig == b"SCP", "SCP: Bad signature")
  33. index_cued = flags & 1 or nr_revs == 1
  34. if not index_cued:
  35. nr_revs -= 1
  36. # Some tools generate a short TLUT. We handle this by truncating the
  37. # TLUT at the first Track Data Header.
  38. trk_offs = struct.unpack("<168I", dat[16:0x2b0])
  39. for i in range(168):
  40. try:
  41. off = trk_offs[i]
  42. except IndexError:
  43. break
  44. if off == 0 or off >= 0x2b0:
  45. continue
  46. off = off//4 - 4
  47. error.check(off >= 0, "SCP: Bad Track Table")
  48. trk_offs = trk_offs[:off]
  49. scp = cls(0, 2)
  50. scp.nr_revs = nr_revs
  51. for trknr in range(len(trk_offs)):
  52. trk_off = trk_offs[trknr]
  53. if trk_off == 0:
  54. scp.track_list.append((None, None))
  55. continue
  56. # Parse the SCP track header and extract the flux data.
  57. thdr = dat[trk_off:trk_off+4+12*nr_revs]
  58. sig, tnr = struct.unpack("<3sB", thdr[:4])
  59. error.check(sig == b"TRK", "SCP: Missing track signature")
  60. error.check(tnr == trknr, "SCP: Wrong track number in header")
  61. _off = 12 if index_cued else 24 # skip first partial rev
  62. s_off, = struct.unpack("<I", thdr[_off:_off+4])
  63. _, e_nr, e_off = struct.unpack("<3I", thdr[-12:])
  64. e_off += e_nr*2
  65. if s_off == e_off:
  66. # FluxEngine creates dummy TDHs for empty tracks.
  67. # Bail on them here.
  68. scp.track_list.append((None, None))
  69. continue
  70. tdat = dat[trk_off+s_off:trk_off+e_off]
  71. scp.track_list.append((thdr[4:], tdat))
  72. # s[side] is True iff there are non-empty tracks on @side
  73. s = []
  74. for i in range(2):
  75. s.append(functools.reduce(lambda x, y: x or (y[1] is not None),
  76. scp.track_list[i::2], False))
  77. # Some tools produce (or used to produce) single-sided images using
  78. # consecutive entries in the TLUT. This needs fixing up.
  79. if single_sided and functools.reduce(lambda x, y: x and y, s):
  80. new_list = []
  81. for t in scp.track_list[:84]:
  82. if single_sided != 1: # Side 1
  83. new_list.append((None, None))
  84. new_list.append(t)
  85. if single_sided == 1: # Side 0
  86. new_list.append((None, None))
  87. scp.track_list = new_list
  88. print('SCP: Imported legacy single-sided image')
  89. return scp
  90. def get_track(self, cyl, side, writeout=False):
  91. off = cyl*2 + side
  92. if off >= len(self.track_list):
  93. return None
  94. tdh, dat = self.track_list[off]
  95. if dat is None:
  96. return None
  97. # Writeout requires only a single revolution
  98. if writeout:
  99. tdh = tdh[:12]
  100. _, nr, _ = struct.unpack("<3I", tdh)
  101. dat = dat[:nr*2]
  102. index_list = []
  103. while tdh:
  104. ticks, _, _ = struct.unpack("<3I", tdh[:12])
  105. index_list.append(ticks)
  106. tdh = tdh[12:]
  107. # Decode the SCP flux data into a simple list of flux times.
  108. flux_list = []
  109. val = 0
  110. for i in range(0, len(dat), 2):
  111. x = dat[i]*256 + dat[i+1]
  112. if x == 0:
  113. val += 65536
  114. continue
  115. flux_list.append(val + x)
  116. val = 0
  117. return Flux(index_list, flux_list, SCP.sample_freq)
  118. def append_track(self, flux):
  119. """Converts a Flux object into a Supercard Pro Track and appends it to
  120. the current image-in-progress.
  121. """
  122. def _append(self, tdh, dat):
  123. self.track_list.append((tdh, dat))
  124. if self.nr_sides == 1:
  125. self.track_list.append((None, None))
  126. nr_revs = len(flux.index_list)
  127. if not self.nr_revs:
  128. self.nr_revs = nr_revs
  129. else:
  130. assert self.nr_revs == nr_revs
  131. factor = SCP.sample_freq / flux.sample_freq
  132. tdh, dat = bytearray(), bytearray()
  133. len_at_index = rev = 0
  134. to_index = flux.index_list[0]
  135. rem = 0.0
  136. for x in flux.list:
  137. # Does the next flux interval cross the index mark?
  138. while to_index < x:
  139. # Append to the TDH for the previous full revolution
  140. tdh += struct.pack("<III",
  141. int(round(flux.index_list[rev]*factor)),
  142. (len(dat) - len_at_index) // 2,
  143. 4 + nr_revs*12 + len_at_index)
  144. # Set up for the next revolution
  145. len_at_index = len(dat)
  146. rev += 1
  147. if rev >= nr_revs:
  148. # We're done: We simply discard any surplus flux samples
  149. _append(self, tdh, dat)
  150. return
  151. to_index += flux.index_list[rev]
  152. # Process the current flux sample into SCP "bitcell" format
  153. to_index -= x
  154. y = x * factor + rem
  155. val = int(round(y))
  156. if (val & 65535) == 0:
  157. val += 1
  158. rem = y - val
  159. while val >= 65536:
  160. dat.append(0)
  161. dat.append(0)
  162. val -= 65536
  163. dat.append(val>>8)
  164. dat.append(val&255)
  165. # Header for last track(s) in case we ran out of flux timings.
  166. while rev < nr_revs:
  167. tdh += struct.pack("<III",
  168. int(round(flux.index_list[rev]*factor)),
  169. (len(dat) - len_at_index) // 2,
  170. 4 + nr_revs*12 + len_at_index)
  171. len_at_index = len(dat)
  172. rev += 1
  173. _append(self, tdh, dat)
  174. def get_image(self):
  175. single_sided = 1 if self.nr_sides == 1 else 0
  176. track_list = self.track_list
  177. if single_sided and self.opts.legacy_ss:
  178. print('SCP: Generated legacy single-sided image')
  179. track_list = track_list[::2]
  180. # Generate the TLUT and concatenate all the tracks together.
  181. trk_offs = bytearray()
  182. trk_dat = bytearray()
  183. for trknr in range(len(track_list)):
  184. tdh, dat = track_list[trknr]
  185. if dat is None:
  186. trk_offs += struct.pack("<I", 0)
  187. else:
  188. trk_offs += struct.pack("<I", 0x2b0 + len(trk_dat))
  189. trk_dat += struct.pack("<3sB", b"TRK", trknr) + tdh + dat
  190. trk_offs += bytes(0x2a0 - len(trk_offs))
  191. # Calculate checksum over all data (except 16-byte image header).
  192. csum = 0
  193. for x in trk_offs:
  194. csum += x
  195. for x in trk_dat:
  196. csum += x
  197. # Generate the image header.
  198. header = struct.pack("<3s9BI",
  199. b"SCP", # Signature
  200. 0, # Version
  201. 0x80, # DiskType = Other
  202. self.nr_revs, 0, len(track_list) - 1,
  203. 0x03, # Flags = Index, 96TPI
  204. 0, # 16-bit cell width
  205. single_sided,
  206. 0, # 25ns capture
  207. csum & 0xffffffff)
  208. # Concatenate it all together and send it back.
  209. return header + trk_offs + trk_dat
  210. # Local variables:
  211. # python-indent: 4
  212. # End: