scp.py 8.0 KB


  1. # greaseweazle/image/scp.py
  2. #
  3. # Written & released by Keir Fraser <keir.xen@gmail.com>
  4. #
  5. # This is free and unencumbered software released into the public domain.
  6. # See the file COPYING for more details, or visit <http://unlicense.org>.
  7. import struct, functools
  8. from greaseweazle import error
  9. from greaseweazle.flux import Flux
  10. from .image import Image
  11. class SCPOpts:
  12. """legacy_ss: Set to True to generate (incorrect) legacy single-sided
  13. SCP image.
  14. """
  15. def __init__(self):
  16. self.legacy_ss = False
  17. class SCP(Image):
  18. # 40MHz
  19. sample_freq = 40000000
  20. def __init__(self):
  21. self.opts = SCPOpts()
  22. self.nr_revs = None
  23. self.to_track = dict()
  24. def side_count(self):
  25. s = [0,0] # non-empty tracks on each side
  26. for tnr in self.to_track:
  27. s[tnr&1] += 1
  28. return s
  29. @classmethod
  30. def from_file(cls, name):
  31. with open(name, "rb") as f:
  32. dat = f.read()
  33. header = struct.unpack("<3s9BI", dat[0:16])
  34. (sig, _, _, nr_revs, _, _, flags, _, single_sided, _, _) = header
  35. error.check(sig == b"SCP", "SCP: Bad signature")
  36. index_cued = flags & 1 or nr_revs == 1
  37. if not index_cued:
  38. nr_revs -= 1
  39. # Some tools generate a short TLUT. We handle this by truncating the
  40. # TLUT at the first Track Data Header.
  41. trk_offs = struct.unpack("<168I", dat[16:0x2b0])
  42. for i in range(168):
  43. try:
  44. off = trk_offs[i]
  45. except IndexError:
  46. break
  47. if off == 0 or off >= 0x2b0:
  48. continue
  49. off = off//4 - 4
  50. error.check(off >= 0, "SCP: Bad Track Table")
  51. trk_offs = trk_offs[:off]
  52. scp = cls()
  53. scp.nr_revs = nr_revs
  54. for trknr in range(len(trk_offs)):
  55. trk_off = trk_offs[trknr]
  56. if trk_off == 0:
  57. continue
  58. # Parse the SCP track header and extract the flux data.
  59. thdr = dat[trk_off:trk_off+4+12*nr_revs]
  60. sig, tnr = struct.unpack("<3sB", thdr[:4])
  61. error.check(sig == b"TRK", "SCP: Missing track signature")
  62. error.check(tnr == trknr, "SCP: Wrong track number in header")
  63. _off = 12 if index_cued else 24 # skip first partial rev
  64. s_off, = struct.unpack("<I", thdr[_off:_off+4])
  65. _, e_nr, e_off = struct.unpack("<3I", thdr[-12:])
  66. e_off += e_nr*2
  67. if s_off == e_off:
  68. # FluxEngine creates dummy TDHs for empty tracks.
  69. # Bail on them here.
  70. continue
  71. tdat = dat[trk_off+s_off:trk_off+e_off]
  72. scp.to_track[trknr] = (thdr[4:], tdat)
  73. # Some tools produce (or used to produce) single-sided images using
  74. # consecutive entries in the TLUT. This needs fixing up.
  75. s = scp.side_count()
  76. if single_sided and s[0] and s[1]:
  77. new_dict = dict()
  78. for tnr in scp.to_track:
  79. new_dict[tnr*2+single_sided-1] = scp.to_track[tnr]
  80. scp.to_track = new_dict
  81. print('SCP: Imported legacy single-sided image')
  82. return scp
  83. def get_track(self, cyl, side):
  84. tracknr = cyl * 2 + side
  85. if not tracknr in self.to_track:
  86. return None
  87. tdh, dat = self.to_track[tracknr]
  88. index_list = []
  89. while tdh:
  90. ticks, _, _ = struct.unpack("<3I", tdh[:12])
  91. index_list.append(ticks)
  92. tdh = tdh[12:]
  93. # Decode the SCP flux data into a simple list of flux times.
  94. flux_list = []
  95. val = 0
  96. for i in range(0, len(dat), 2):
  97. x = dat[i]*256 + dat[i+1]
  98. if x == 0:
  99. val += 65536
  100. continue
  101. flux_list.append(val + x)
  102. val = 0
  103. return Flux(index_list, flux_list, SCP.sample_freq)
  104. def emit_track(self, cyl, side, track):
  105. """Converts @track into a Supercard Pro Track and appends it to
  106. the current image-in-progress.
  107. """
  108. flux = track.flux()
  109. nr_revs = len(flux.index_list)
  110. if not self.nr_revs:
  111. self.nr_revs = nr_revs
  112. else:
  113. assert self.nr_revs == nr_revs
  114. factor = SCP.sample_freq / flux.sample_freq
  115. tdh, dat = bytearray(), bytearray()
  116. len_at_index = rev = 0
  117. to_index = flux.index_list[0]
  118. rem = 0.0
  119. for x in flux.list:
  120. # Does the next flux interval cross the index mark?
  121. while to_index < x:
  122. # Append to the TDH for the previous full revolution
  123. tdh += struct.pack("<III",
  124. round(flux.index_list[rev]*factor),
  125. (len(dat) - len_at_index) // 2,
  126. 4 + nr_revs*12 + len_at_index)
  127. # Set up for the next revolution
  128. len_at_index = len(dat)
  129. rev += 1
  130. if rev >= nr_revs:
  131. # We're done: We simply discard any surplus flux samples
  132. self.to_track[cyl*2+side] = (tdh, dat)
  133. return
  134. to_index += flux.index_list[rev]
  135. # Process the current flux sample into SCP "bitcell" format
  136. to_index -= x
  137. y = x * factor + rem
  138. val = round(y)
  139. if (val & 65535) == 0:
  140. val += 1
  141. rem = y - val
  142. while val >= 65536:
  143. dat.append(0)
  144. dat.append(0)
  145. val -= 65536
  146. dat.append(val>>8)
  147. dat.append(val&255)
  148. # Header for last track(s) in case we ran out of flux timings.
  149. while rev < nr_revs:
  150. tdh += struct.pack("<III",
  151. round(flux.index_list[rev]*factor),
  152. (len(dat) - len_at_index) // 2,
  153. 4 + nr_revs*12 + len_at_index)
  154. len_at_index = len(dat)
  155. rev += 1
  156. self.to_track[cyl*2+side] = (tdh, dat)
  157. def get_image(self):
  158. # Work out the single-sided byte code
  159. s = self.side_count()
  160. if s[0] and s[1]:
  161. single_sided = 0
  162. elif s[0]:
  163. single_sided = 1
  164. else:
  165. single_sided = 2
  166. to_track = self.to_track
  167. if single_sided and self.opts.legacy_ss:
  168. print('SCP: Generated legacy single-sided image')
  169. to_track = dict()
  170. for tnr in self.to_track:
  171. to_track[tnr//2] = self.to_track[tnr]
  172. ntracks = max(to_track, default=0) + 1
  173. # Generate the TLUT and concatenate all the tracks together.
  174. trk_offs = bytearray()
  175. trk_dat = bytearray()
  176. for tnr in range(ntracks):
  177. if tnr in to_track:
  178. tdh, dat = to_track[tnr]
  179. trk_offs += struct.pack("<I", 0x2b0 + len(trk_dat))
  180. trk_dat += struct.pack("<3sB", b"TRK", tnr) + tdh + dat
  181. else:
  182. trk_offs += struct.pack("<I", 0)
  183. error.check(len(trk_offs) <= 0x2a0, "SCP: Too many tracks")
  184. trk_offs += bytes(0x2a0 - len(trk_offs))
  185. # Calculate checksum over all data (except 16-byte image header).
  186. csum = 0
  187. for x in trk_offs:
  188. csum += x
  189. for x in trk_dat:
  190. csum += x
  191. # Generate the image header.
  192. header = struct.pack("<3s9BI",
  193. b"SCP", # Signature
  194. 0, # Version
  195. 0x80, # DiskType = Other
  196. self.nr_revs, 0, ntracks-1,
  197. 0x03, # Flags = Index, 96TPI
  198. 0, # 16-bit cell width
  199. single_sided,
  200. 0, # 25ns capture
  201. csum & 0xffffffff)
  202. # Concatenate it all together and send it back.
  203. return header + trk_offs + trk_dat
  204. # Local variables:
  205. # python-indent: 4
  206. # End: