USB.py 7.8 KB


  1. # greaseweazle/USB.py
  2. #
  3. # Written & released by Keir Fraser <keir.xen@gmail.com>
  4. #
  5. # This is free and unencumbered software released into the public domain.
  6. # See the file COPYING for more details, or visit <http://unlicense.org>.
  7. import struct, collections
  8. ## Control-Path command set
  9. class ControlCmd:
  10. ClearComms = 10000
  11. Normal = 9600
  12. ## Command set
  13. class Cmd:
  14. GetInfo = 0
  15. Seek = 1
  16. Side = 2
  17. SetParams = 3
  18. GetParams = 4
  19. Motor = 5
  20. ReadFlux = 6
  21. WriteFlux = 7
  22. GetFluxStatus = 8
  23. GetIndexTimes = 9
  24. Select = 10
  25. # Bootloader specific:
  26. Update = 1
  27. ## Command responses/acknowledgements
  28. class Ack:
  29. Okay = 0
  30. BadCommand = 1
  31. NoIndex = 2
  32. NoTrk0 = 3
  33. FluxOverflow = 4
  34. FluxUnderflow = 5
  35. Wrprot = 6
  36. Max = 6
  37. ## Cmd.{Get,Set}Params indexes
  38. class Params:
  39. Delays = 0
  40. ## CmdError: Encapsulates a command acknowledgement.
  41. class CmdError(Exception):
  42. str = [ "Okay", "Bad Command", "No Index (No disk?)", "Track 0 not found",
  43. "Flux Overflow", "Flux Underflow", "Disk is Write Protected" ]
  44. def __init__(self, cmd, code):
  45. self.cmd = cmd
  46. self.code = code
  47. def __str__(self):
  48. if self.code <= Ack.Max:
  49. return self.str[self.code]
  50. return "Unknown Error (%u)" % self.code
  51. class Unit:
  52. ## Unit information, instance variables:
  53. ## major, minor: Greaseweazle firmware version number
  54. ## max_index: Maximum index timings for Cmd.ReadFlux
  55. ## max_cmd: Maximum Cmd number accepted by this unit
  56. ## sample_freq: Resolution of all time values passed to/from this unit
  57. ## Unit(ser):
  58. ## Accepts a Pyserial instance for Greaseweazle communications.
  59. def __init__(self, ser):
  60. self.ser = ser
  61. self.reset()
  62. # Initialise the delay properties with current firmware values.
  63. self.send_cmd(struct.pack("4B", Cmd.GetParams, 4, Params.Delays, 10))
  64. (self._select_delay, self._step_delay,
  65. self._seek_settle_delay, self._motor_delay,
  66. self._auto_off_delay) = struct.unpack("<5H", self.ser.read(10))
  67. # Copy firmware info to instance variables (see above for definitions).
  68. self.send_cmd(struct.pack("4B", Cmd.GetInfo, 4, 0, 8))
  69. (self.major, self.minor, self.max_index,
  70. self.max_cmd, self.sample_freq) = struct.unpack("<4BI", self.ser.read(8))
  71. ## reset:
  72. ## Resets communications with Greaseweazle.
  73. def reset(self):
  74. self.ser.reset_output_buffer()
  75. self.ser.baudrate = ControlCmd.ClearComms
  76. self.ser.baudrate = ControlCmd.Normal
  77. self.ser.reset_input_buffer()
  78. ## send_cmd:
  79. ## Send given command byte sequence to Greaseweazle.
  80. ## Raise a CmdError if command fails.
  81. def send_cmd(self, cmd):
  82. self.ser.write(cmd)
  83. (c,r) = struct.unpack("2B", self.ser.read(2))
  84. assert c == cmd[0]
  85. if r != 0:
  86. raise CmdError(c,r)
  87. ## seek:
  88. ## Seek currently-selected drive's heads to the specified track (cyl, side).
  89. def seek(self, cyl, side):
  90. self.send_cmd(struct.pack("3B", Cmd.Seek, 3, cyl))
  91. self.send_cmd(struct.pack("3B", Cmd.Side, 3, side))
  92. ## drive_select:
  93. ## Select/deselect the drive.
  94. def drive_select(self, state):
  95. self.send_cmd(struct.pack("3B", Cmd.Select, 3, int(state)))
  96. ## drive_motor:
  97. ## Turn the selected drive's motor on/off.
  98. def drive_motor(self, state):
  99. self.send_cmd(struct.pack("3B", Cmd.Motor, 3, int(state)))
  100. ## get_index_times:
  101. ## Get index timing values for the last .read_track() command.
  102. def get_index_times(self, nr):
  103. self.send_cmd(struct.pack("4B", Cmd.GetIndexTimes, 4, 0, nr))
  104. x = struct.unpack("<%dI" % nr, self.ser.read(4*nr))
  105. return x
  106. ## update_firmware:
  107. ## Update Greaseweazle to the given new firmware.
  108. def update_firmware(self, dat):
  109. self.send_cmd(struct.pack("<2BI", Cmd.Update, 6, len(dat)))
  110. self.ser.write(dat)
  111. (ack,) = struct.unpack("B", self.ser.read(1))
  112. return ack
  113. ## decode_flux:
  114. ## Decode the Greaseweazle data stream into a list of flux samples.
  115. def decode_flux(self, dat):
  116. flux = []
  117. while dat:
  118. i = dat.popleft()
  119. if i < 250:
  120. flux.append(i)
  121. elif i == 255:
  122. val = (dat.popleft() & 254) >> 1
  123. val += (dat.popleft() & 254) << 6
  124. val += (dat.popleft() & 254) << 13
  125. val += (dat.popleft() & 254) << 20
  126. flux.append(val)
  127. else:
  128. val = (i - 249) * 250
  129. val += dat.popleft() - 1
  130. flux.append(val)
  131. assert flux[-1] == 0
  132. return flux[:-1]
  133. ## encode_flux:
  134. ## Convert the given flux timings into an encoded data stream.
  135. def encode_flux(self, flux):
  136. dat = bytearray()
  137. for val in flux:
  138. if val == 0:
  139. pass
  140. elif val < 250:
  141. dat.append(val)
  142. else:
  143. high = val // 250
  144. if high <= 5:
  145. dat.append(249+high)
  146. dat.append(1 + val%250)
  147. else:
  148. dat.append(255)
  149. dat.append(1 | (val<<1) & 255)
  150. dat.append(1 | (val>>6) & 255)
  151. dat.append(1 | (val>>13) & 255)
  152. dat.append(1 | (val>>20) & 255)
  153. dat.append(0) # End of Stream
  154. return dat
  155. ## read_track:
  156. ## Read flux timings as encoded data stream for the current track.
  157. def read_track(self, nr_idx):
  158. dat = collections.deque()
  159. self.send_cmd(struct.pack("3B", Cmd.ReadFlux, 3, nr_idx))
  160. while True:
  161. dat += self.ser.read(1)
  162. dat += self.ser.read(self.ser.in_waiting)
  163. if dat[-1] == 0:
  164. break
  165. try:
  166. self.send_cmd(struct.pack("2B", Cmd.GetFluxStatus, 2))
  167. except CmdError as error:
  168. del dat
  169. return (error.code, None)
  170. return (Ack.Okay, dat)
  171. ## write_track:
  172. ## Write the given data stream to the current track via Greaseweazle.
  173. def write_track(self, dat):
  174. self.send_cmd(struct.pack("<2BIB", Cmd.WriteFlux, 7, 0, 1))
  175. self.ser.write(dat)
  176. self.ser.read(1) # Sync with Greaseweazle
  177. try:
  178. self.send_cmd(struct.pack("2B", Cmd.GetFluxStatus, 2))
  179. except CmdError as error:
  180. return error.code
  181. return Ack.Okay
  182. ##
  183. ## Delay-property public getters and setters:
  184. ## select_delay: Delay (usec) after asserting drive select
  185. ## step_delay: Delay (usec) after issuing a head-step command
  186. ## seek_settle_delay: Delay (msec) after completing a head-seek operation
  187. ## motor_delay: Delay (msec) after turning on drive spindle motor
  188. ## auto_off_delay: Timeout (msec) since last command upon which all
  189. ## drives are deselected and spindle motors turned off
  190. ##
  191. def _set_delays(self):
  192. self.send_cmd(struct.pack("<3B5H", Cmd.SetParams,
  193. 3+5*2, Params.Delays,
  194. self._select_delay, self._step_delay,
  195. self._seek_settle_delay,
  196. self._motor_delay, self._auto_off_delay))
  197. @property
  198. def select_delay(self):
  199. return self._select_delay
  200. @select_delay.setter
  201. def select_delay(self, select_delay):
  202. self._select_delay = select_delay
  203. self._set_delays()
  204. @property
  205. def step_delay(self):
  206. return self._step_delay
  207. @step_delay.setter
  208. def step_delay(self, step_delay):
  209. self._step_delay = step_delay
  210. self._set_delays()
  211. @property
  212. def seek_settle_delay(self):
  213. return self._seek_settle_delay
  214. @seek_settle_delay.setter
  215. def seek_settle_delay(self, seek_settle_delay):
  216. self._seek_settle_delay = seek_settle_delay
  217. self._set_delays()
  218. @property
  219. def motor_delay(self):
  220. return self._motor_delay
  221. @motor_delay.setter
  222. def motor_delay(self, motor_delay):
  223. self._motor_delay = motor_delay
  224. self._set_delays()
  225. @property
  226. def auto_off_delay(self):
  227. return self._auto_off_delay
  228. @auto_off_delay.setter
  229. def auto_off_delay(self, auto_off_delay):
  230. self._auto_off_delay = auto_off_delay
  231. self._set_delays()