scp.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. # greaseweazle/image/scp.py
  2. #
  3. # Written & released by Keir Fraser <keir.xen@gmail.com>
  4. #
  5. # This is free and unencumbered software released into the public domain.
  6. # See the file COPYING for more details, or visit <http://unlicense.org>.
  7. import struct, functools
  8. from greaseweazle import error
  9. from greaseweazle.flux import Flux
  10. class SCP:
  11. # 40MHz
  12. sample_freq = 40000000
  13. def __init__(self, start_cyl, nr_sides):
  14. self.nr_sides = nr_sides
  15. self.nr_revs = None
  16. self.track_list = [(None,None)] * (start_cyl*2)
  17. @classmethod
  18. def to_file(cls, start_cyl, nr_sides):
  19. scp = cls(start_cyl, nr_sides)
  20. return scp
  21. @classmethod
  22. def from_file(cls, dat):
  23. header = struct.unpack("<3s9BI", dat[0:16])
  24. (sig, _, _, nr_revs, _, _, flags, _, single_sided, _, _) = header
  25. error.check(sig == b"SCP", "SCP: Bad signature")
  26. # Some tools generate a short TLUT. We handle this by truncating the
  27. # TLUT at the first Track Data Header.
  28. trk_offs = struct.unpack("<168I", dat[16:0x2b0])
  29. for i in range(168):
  30. try:
  31. off = trk_offs[i]
  32. except IndexError:
  33. break
  34. if off == 0 or off >= 0x2b0:
  35. continue
  36. off = off//4 - 4
  37. error.check(off >= 0, "SCP: Bad Track Table")
  38. trk_offs = trk_offs[:off]
  39. scp = cls(0, 2)
  40. scp.nr_revs = nr_revs
  41. for trknr in range(len(trk_offs)):
  42. trk_off = trk_offs[trknr]
  43. if trk_off == 0:
  44. scp.track_list.append((None, None))
  45. continue
  46. # Parse the SCP track header and extract the flux data.
  47. thdr = dat[trk_off:trk_off+4+12*nr_revs]
  48. sig, tnr, _, _, s_off = struct.unpack("<3sB3I", thdr[:16])
  49. error.check(sig == b"TRK", "SCP: Missing track signature")
  50. error.check(tnr == trknr, "SCP: Wrong track number in header")
  51. _, e_nr, e_off = struct.unpack("<3I", thdr[-12:])
  52. tdat = dat[trk_off+s_off:trk_off+e_off+e_nr*2]
  53. scp.track_list.append((thdr, tdat))
  54. # s[side] is True iff there are non-empty tracks on @side
  55. s = []
  56. for i in range(2):
  57. s.append(functools.reduce(lambda x, y: x or (y[1] is not None),
  58. scp.track_list[i::2], False))
  59. # Some tools produce (or used to produce) single-sided images using
  60. # consecutive entries in the TLUT. This needs fixing up.
  61. if single_sided and functools.reduce(lambda x, y: x and y, s):
  62. new_list = []
  63. for t in scp.track_list[:84]:
  64. if single_sided != 1: # Side 1
  65. new_list.append((None, None))
  66. new_list.append(t)
  67. if single_sided == 1: # Side 0
  68. new_list.append((None, None))
  69. scp.track_list = new_list
  70. print('SCP: Imported old single-sided image')
  71. return scp
  72. def get_track(self, cyl, side, writeout=False):
  73. off = cyl*2 + side
  74. if off >= len(self.track_list):
  75. return None
  76. tdh, dat = self.track_list[off]
  77. if dat is None:
  78. return None
  79. tdh = tdh[4:]
  80. # Writeout requires only a single revolution
  81. if writeout:
  82. tdh = tdh[:12]
  83. _, nr, _ = struct.unpack("<3I", tdh)
  84. dat = dat[:nr*2]
  85. index_list = []
  86. while tdh:
  87. ticks, _, _ = struct.unpack("<3I", tdh[:12])
  88. index_list.append(ticks)
  89. tdh = tdh[12:]
  90. # Decode the SCP flux data into a simple list of flux times.
  91. flux_list = []
  92. val = 0
  93. for i in range(0, len(dat), 2):
  94. x = dat[i]*256 + dat[i+1]
  95. if x == 0:
  96. val += 65536
  97. continue
  98. flux_list.append(val + x)
  99. val = 0
  100. return Flux(index_list, flux_list, SCP.sample_freq)
  101. # append_track:
  102. # Converts a Flux object into a Supercard Pro Track and appends it to
  103. # the current image-in-progress.
  104. def append_track(self, flux):
  105. def _append(self, tdh, dat):
  106. self.track_list.append((tdh, dat))
  107. if self.nr_sides == 1:
  108. self.track_list.append((None, None))
  109. nr_revs = len(flux.index_list)
  110. if not self.nr_revs:
  111. self.nr_revs = nr_revs
  112. else:
  113. assert self.nr_revs == nr_revs
  114. factor = SCP.sample_freq / flux.sample_freq
  115. trknr = len(self.track_list)
  116. tdh = struct.pack("<3sB", b"TRK", trknr)
  117. dat = bytearray()
  118. len_at_index = rev = 0
  119. to_index = flux.index_list[0]
  120. rem = 0.0
  121. for x in flux.list:
  122. # Does the next flux interval cross the index mark?
  123. while to_index < x:
  124. # Append to the TDH for the previous full revolution
  125. tdh += struct.pack("<III",
  126. int(round(flux.index_list[rev]*factor)),
  127. (len(dat) - len_at_index) // 2,
  128. 4 + nr_revs*12 + len_at_index)
  129. # Set up for the next revolution
  130. len_at_index = len(dat)
  131. rev += 1
  132. if rev >= nr_revs:
  133. # We're done: We simply discard any surplus flux samples
  134. _append(self, tdh, dat)
  135. return
  136. to_index += flux.index_list[rev]
  137. # Process the current flux sample into SCP "bitcell" format
  138. to_index -= x
  139. y = x * factor + rem
  140. val = int(round(y))
  141. if (val & 65535) == 0:
  142. val += 1
  143. rem = y - val
  144. while val >= 65536:
  145. dat.append(0)
  146. dat.append(0)
  147. val -= 65536
  148. dat.append(val>>8)
  149. dat.append(val&255)
  150. # Header for last track(s) in case we ran out of flux timings.
  151. while rev < nr_revs:
  152. tdh += struct.pack("<III",
  153. int(round(flux.index_list[rev]*factor)),
  154. (len(dat) - len_at_index) // 2,
  155. 4 + nr_revs*12 + len_at_index)
  156. len_at_index = len(dat)
  157. rev += 1
  158. _append(self, tdh, dat)
  159. def get_image(self):
  160. single_sided = 1 if self.nr_sides == 1 else 0
  161. # Generate the TLUT and concatenate all the tracks together.
  162. trk_offs = bytearray()
  163. trk_dat = bytearray()
  164. for tdh, dat in self.track_list:
  165. if dat is None:
  166. trk_offs += struct.pack("<I", 0)
  167. else:
  168. trk_offs += struct.pack("<I", 0x2b0 + len(trk_dat))
  169. trk_dat += tdh + dat
  170. trk_offs += bytes(0x2a0 - len(trk_offs))
  171. # Calculate checksum over all data (except 16-byte image header).
  172. csum = 0
  173. for x in trk_offs:
  174. csum += x
  175. for x in trk_dat:
  176. csum += x
  177. # Generate the image header.
  178. header = struct.pack("<3s9BI",
  179. b"SCP", # Signature
  180. 0, # Version
  181. 0x80, # DiskType = Other
  182. self.nr_revs, 0, len(self.track_list) - 1,
  183. 0x01, # Flags = Index
  184. 0, # 16-bit cell width
  185. single_sided,
  186. 0, # 25ns capture
  187. csum & 0xffffffff)
  188. # Concatenate it all together and send it back.
  189. return header + trk_offs + trk_dat
  190. # Local variables:
  191. # python-indent: 4
  192. # End: