scp.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. # greaseweazle/image/scp.py
  2. #
  3. # Written & released by Keir Fraser <keir.xen@gmail.com>
  4. #
  5. # This is free and unencumbered software released into the public domain.
  6. # See the file COPYING for more details, or visit <http://unlicense.org>.
  7. import struct, functools
  8. from greaseweazle import error
  9. from greaseweazle.flux import Flux
  10. from .image import Image
  11. class SCPOpts:
  12. """legacy_ss: Set to True to generate (incorrect) legacy single-sided
  13. SCP image.
  14. """
  15. def __init__(self):
  16. self.legacy_ss = False
  17. class SCP(Image):
  18. # 40MHz
  19. sample_freq = 40000000
  20. def __init__(self, start_cyl, nr_sides):
  21. self.opts = SCPOpts()
  22. self.nr_sides = nr_sides
  23. self.nr_revs = None
  24. self.track_list = [(None,None)] * (start_cyl*2)
  25. @classmethod
  26. def from_file(cls, name):
  27. with open(name, "rb") as f:
  28. dat = f.read()
  29. header = struct.unpack("<3s9BI", dat[0:16])
  30. (sig, _, _, nr_revs, _, _, flags, _, single_sided, _, _) = header
  31. error.check(sig == b"SCP", "SCP: Bad signature")
  32. index_cued = flags & 1 or nr_revs == 1
  33. if not index_cued:
  34. nr_revs -= 1
  35. # Some tools generate a short TLUT. We handle this by truncating the
  36. # TLUT at the first Track Data Header.
  37. trk_offs = struct.unpack("<168I", dat[16:0x2b0])
  38. for i in range(168):
  39. try:
  40. off = trk_offs[i]
  41. except IndexError:
  42. break
  43. if off == 0 or off >= 0x2b0:
  44. continue
  45. off = off//4 - 4
  46. error.check(off >= 0, "SCP: Bad Track Table")
  47. trk_offs = trk_offs[:off]
  48. scp = cls(0, 2)
  49. scp.nr_revs = nr_revs
  50. for trknr in range(len(trk_offs)):
  51. trk_off = trk_offs[trknr]
  52. if trk_off == 0:
  53. scp.track_list.append((None, None))
  54. continue
  55. # Parse the SCP track header and extract the flux data.
  56. thdr = dat[trk_off:trk_off+4+12*nr_revs]
  57. sig, tnr = struct.unpack("<3sB", thdr[:4])
  58. error.check(sig == b"TRK", "SCP: Missing track signature")
  59. error.check(tnr == trknr, "SCP: Wrong track number in header")
  60. _off = 12 if index_cued else 24 # skip first partial rev
  61. s_off, = struct.unpack("<I", thdr[_off:_off+4])
  62. _, e_nr, e_off = struct.unpack("<3I", thdr[-12:])
  63. e_off += e_nr*2
  64. if s_off == e_off:
  65. # FluxEngine creates dummy TDHs for empty tracks.
  66. # Bail on them here.
  67. scp.track_list.append((None, None))
  68. continue
  69. tdat = dat[trk_off+s_off:trk_off+e_off]
  70. scp.track_list.append((thdr[4:], tdat))
  71. # s[side] is True iff there are non-empty tracks on @side
  72. s = []
  73. for i in range(2):
  74. s.append(functools.reduce(lambda x, y: x or (y[1] is not None),
  75. scp.track_list[i::2], False))
  76. # Some tools produce (or used to produce) single-sided images using
  77. # consecutive entries in the TLUT. This needs fixing up.
  78. if single_sided and functools.reduce(lambda x, y: x and y, s):
  79. new_list = []
  80. for t in scp.track_list[:84]:
  81. if single_sided != 1: # Side 1
  82. new_list.append((None, None))
  83. new_list.append(t)
  84. if single_sided == 1: # Side 0
  85. new_list.append((None, None))
  86. scp.track_list = new_list
  87. print('SCP: Imported legacy single-sided image')
  88. return scp
  89. def get_track(self, cyl, side):
  90. off = cyl*2 + side
  91. if off >= len(self.track_list):
  92. return None
  93. tdh, dat = self.track_list[off]
  94. if dat is None:
  95. return None
  96. index_list = []
  97. while tdh:
  98. ticks, _, _ = struct.unpack("<3I", tdh[:12])
  99. index_list.append(ticks)
  100. tdh = tdh[12:]
  101. # Decode the SCP flux data into a simple list of flux times.
  102. flux_list = []
  103. val = 0
  104. for i in range(0, len(dat), 2):
  105. x = dat[i]*256 + dat[i+1]
  106. if x == 0:
  107. val += 65536
  108. continue
  109. flux_list.append(val + x)
  110. val = 0
  111. return Flux(index_list, flux_list, SCP.sample_freq)
  112. def append_track(self, track):
  113. """Converts @track into a Supercard Pro Track and appends it to
  114. the current image-in-progress.
  115. """
  116. def _append(self, tdh, dat):
  117. self.track_list.append((tdh, dat))
  118. if self.nr_sides == 1:
  119. self.track_list.append((None, None))
  120. flux = track.flux()
  121. nr_revs = len(flux.index_list)
  122. if not self.nr_revs:
  123. self.nr_revs = nr_revs
  124. else:
  125. assert self.nr_revs == nr_revs
  126. factor = SCP.sample_freq / flux.sample_freq
  127. tdh, dat = bytearray(), bytearray()
  128. len_at_index = rev = 0
  129. to_index = flux.index_list[0]
  130. rem = 0.0
  131. for x in flux.list:
  132. # Does the next flux interval cross the index mark?
  133. while to_index < x:
  134. # Append to the TDH for the previous full revolution
  135. tdh += struct.pack("<III",
  136. round(flux.index_list[rev]*factor),
  137. (len(dat) - len_at_index) // 2,
  138. 4 + nr_revs*12 + len_at_index)
  139. # Set up for the next revolution
  140. len_at_index = len(dat)
  141. rev += 1
  142. if rev >= nr_revs:
  143. # We're done: We simply discard any surplus flux samples
  144. _append(self, tdh, dat)
  145. return
  146. to_index += flux.index_list[rev]
  147. # Process the current flux sample into SCP "bitcell" format
  148. to_index -= x
  149. y = x * factor + rem
  150. val = round(y)
  151. if (val & 65535) == 0:
  152. val += 1
  153. rem = y - val
  154. while val >= 65536:
  155. dat.append(0)
  156. dat.append(0)
  157. val -= 65536
  158. dat.append(val>>8)
  159. dat.append(val&255)
  160. # Header for last track(s) in case we ran out of flux timings.
  161. while rev < nr_revs:
  162. tdh += struct.pack("<III",
  163. round(flux.index_list[rev]*factor),
  164. (len(dat) - len_at_index) // 2,
  165. 4 + nr_revs*12 + len_at_index)
  166. len_at_index = len(dat)
  167. rev += 1
  168. _append(self, tdh, dat)
  169. def get_image(self):
  170. single_sided = 1 if self.nr_sides == 1 else 0
  171. track_list = self.track_list
  172. if single_sided and self.opts.legacy_ss:
  173. print('SCP: Generated legacy single-sided image')
  174. track_list = track_list[::2]
  175. # Generate the TLUT and concatenate all the tracks together.
  176. trk_offs = bytearray()
  177. trk_dat = bytearray()
  178. for trknr in range(len(track_list)):
  179. tdh, dat = track_list[trknr]
  180. if dat is None:
  181. trk_offs += struct.pack("<I", 0)
  182. else:
  183. trk_offs += struct.pack("<I", 0x2b0 + len(trk_dat))
  184. trk_dat += struct.pack("<3sB", b"TRK", trknr) + tdh + dat
  185. error.check(len(trk_offs) <= 0x2a0, "SCP: Too many tracks")
  186. trk_offs += bytes(0x2a0 - len(trk_offs))
  187. # Calculate checksum over all data (except 16-byte image header).
  188. csum = 0
  189. for x in trk_offs:
  190. csum += x
  191. for x in trk_dat:
  192. csum += x
  193. # Generate the image header.
  194. header = struct.pack("<3s9BI",
  195. b"SCP", # Signature
  196. 0, # Version
  197. 0x80, # DiskType = Other
  198. self.nr_revs, 0, len(track_list) - 1,
  199. 0x03, # Flags = Index, 96TPI
  200. 0, # 16-bit cell width
  201. single_sided,
  202. 0, # 25ns capture
  203. csum & 0xffffffff)
  204. # Concatenate it all together and send it back.
  205. return header + trk_offs + trk_dat
  206. # Local variables:
  207. # python-indent: 4
  208. # End: