util.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. # greaseweazle/tools/util.py
  2. #
  3. # Greaseweazle control script: Utility functions.
  4. #
  5. # Written & released by Keir Fraser <keir.xen@gmail.com>
  6. #
  7. # This is free and unencumbered software released into the public domain.
  8. # See the file COPYING for more details, or visit <http://unlicense.org>.
  9. import argparse, os, sys, serial, struct, time, re, platform
  10. import importlib
  11. import serial.tools.list_ports
  12. from greaseweazle import version
  13. from greaseweazle import error
  14. from greaseweazle import usb as USB
  15. class CmdlineHelpFormatter(argparse.ArgumentDefaultsHelpFormatter):
  16. def _get_help_string(self, action):
  17. help = action.help
  18. if '%no_default' in help:
  19. return help.replace('%no_default', '')
  20. if ('%(default)' in help
  21. or action.default is None
  22. or action.default is False
  23. or action.default is argparse.SUPPRESS):
  24. return help
  25. return help + ' (default: %(default)s)'
  26. class ArgumentParser(argparse.ArgumentParser):
  27. def __init__(self, formatter_class=CmdlineHelpFormatter, *args, **kwargs):
  28. return super().__init__(formatter_class=formatter_class,
  29. *args, **kwargs)
  30. def drive_letter(letter):
  31. types = {
  32. 'A': (USB.BusType.IBMPC, 0),
  33. 'B': (USB.BusType.IBMPC, 1),
  34. '0': (USB.BusType.Shugart, 0),
  35. '1': (USB.BusType.Shugart, 1),
  36. '2': (USB.BusType.Shugart, 2)
  37. }
  38. if not letter.upper() in types:
  39. raise argparse.ArgumentTypeError("invalid drive letter: '%s'" % letter)
  40. return types[letter.upper()]
  41. def range_str(l):
  42. if len(l) == 0:
  43. return '<none>'
  44. p, str = None, ''
  45. for i in l:
  46. if p is not None and i == p[1]+1:
  47. p = p[0], i
  48. continue
  49. if p is not None:
  50. str += ('%d,' % p[0]) if p[0] == p[1] else ('%d-%d,' % p)
  51. p = (i,i)
  52. if p is not None:
  53. str += ('%d' % p[0]) if p[0] == p[1] else ('%d-%d' % p)
  54. return str
  55. class TrackSet:
  56. class TrackIter:
  57. """Iterate over a TrackSet in physical <cyl,head> order."""
  58. def __init__(self, ts):
  59. l = []
  60. for c in ts.cyls:
  61. for h in ts.heads:
  62. pc = c*ts.step + ts.h_off[h]
  63. l.append((pc, h, c))
  64. l.sort()
  65. self.l = iter(l)
  66. def __next__(self):
  67. self.physical_cyl, self.head, self.cyl = next(self.l)
  68. return self
  69. def __init__(self, trackspec):
  70. self.cyls = list()
  71. self.heads = list()
  72. self.h_off = [0]*2
  73. self.step = 1
  74. self.trackspec = ''
  75. self.update_from_trackspec(trackspec)
  76. def update_from_trackspec(self, trackspec):
  77. """Update a TrackSet based on a trackspec."""
  78. self.trackspec += trackspec
  79. for x in trackspec.split(':'):
  80. k,v = x.split('=')
  81. if k == 'c':
  82. cyls = [False]*100
  83. for crange in v.split(','):
  84. m = re.match('(\d\d?)(-(\d\d?))?$', crange)
  85. if m is None: raise ValueError()
  86. if m.group(3) is None:
  87. s,e = int(m.group(1)), int(m.group(1))
  88. else:
  89. s,e = int(m.group(1)), int(m.group(3))
  90. for c in range(s, e+1):
  91. cyls[c] = True
  92. self.cyls = []
  93. for c in range(len(cyls)):
  94. if cyls[c]: self.cyls.append(c)
  95. elif k == 'h':
  96. heads = [False]*2
  97. for hrange in v.split(','):
  98. m = re.match('([01])(-([01]))?$', hrange)
  99. if m is None: raise ValueError()
  100. if m.group(3) is None:
  101. s,e = int(m.group(1)), int(m.group(1))
  102. else:
  103. s,e = int(m.group(1)), int(m.group(3))
  104. for h in range(s, e+1):
  105. heads[h] = True
  106. self.heads = []
  107. for h in range(len(heads)):
  108. if heads[h]: self.heads.append(h)
  109. elif re.match('h[01].off$', k):
  110. h = int(re.match('h([01]).off$', k).group(1))
  111. m = re.match('([+-][\d])$', v)
  112. if m is None: raise ValueError()
  113. self.h_off[h] = int(m.group(1))
  114. elif k == 'step':
  115. self.step = int(v)
  116. if self.step <= 0: raise ValueError()
  117. else:
  118. raise ValueError()
  119. def __str__(self):
  120. s = 'c=%s' % range_str(self.cyls)
  121. s += ':h=%s' % range_str(self.heads)
  122. for i in range(len(self.h_off)):
  123. x = self.h_off[i]
  124. if x != 0:
  125. s += ':h%d.off=%s%d' % (i, '+' if x >= 0 else '', x)
  126. if self.step != 1: s += ':step=%d' % self.step
  127. return s
  128. def __iter__(self):
  129. return self.TrackIter(self)
  130. def split_opts(seq):
  131. """Splits a name from its list of options."""
  132. parts = seq.split('::')
  133. name, opts = parts[0], dict()
  134. for x in map(lambda x: x.split(':'), parts[1:]):
  135. for y in x:
  136. try:
  137. opt, val = y.split('=')
  138. except ValueError:
  139. opt, val = y, True
  140. if opt:
  141. opts[opt] = val
  142. return name, opts
  143. def get_image_class(name):
  144. image_types = { '.adf': 'ADF',
  145. '.scp': 'SCP',
  146. '.hfe': 'HFE',
  147. '.ima': 'IMG',
  148. '.img': 'IMG',
  149. '.st' : 'IMG',
  150. '.ipf': 'IPF',
  151. '.dsk': 'EDSK',
  152. '.raw': 'KryoFlux' }
  153. if os.path.isdir(name):
  154. typename = 'KryoFlux'
  155. else:
  156. _, ext = os.path.splitext(name)
  157. error.check(ext.lower() in image_types,
  158. """\
  159. %s: Unrecognised file suffix '%s'
  160. Known suffixes: %s"""
  161. % (name, ext, ', '.join(image_types)))
  162. typename = image_types[ext.lower()]
  163. mod = importlib.import_module('greaseweazle.image.' + typename.lower())
  164. return mod.__dict__[typename]
  165. def with_drive_selected(fn, usb, args, *_args, **_kwargs):
  166. usb.set_bus_type(args.drive[0])
  167. try:
  168. usb.drive_select(args.drive[1])
  169. usb.drive_motor(args.drive[1], _kwargs.pop('motor', True))
  170. fn(usb, args, *_args, **_kwargs)
  171. except KeyboardInterrupt:
  172. print()
  173. usb.reset()
  174. raise
  175. finally:
  176. usb.drive_motor(args.drive[1], False)
  177. usb.drive_deselect()
  178. def valid_ser_id(ser_id):
  179. return ser_id and ser_id.upper().startswith("GW")
  180. def score_port(x, old_port=None):
  181. score = 0
  182. if x.manufacturer == "Keir Fraser" and x.product == "Greaseweazle":
  183. score = 20
  184. elif x.vid == 0x1209 and x.pid == 0x4d69:
  185. # Our very own properly-assigned PID. Guaranteed to be us.
  186. score = 20
  187. elif x.vid == 0x1209 and x.pid == 0x0001:
  188. # Our old shared Test PID. It's not guaranteed to be us.
  189. score = 10
  190. if score > 0 and valid_ser_id(x.serial_number):
  191. # A valid serial id is a good sign unless this is a reopen, and
  192. # the serials don't match!
  193. if not old_port or not valid_ser_id(old_port.serial_number):
  194. score = 20
  195. elif x.serial_number == old_port.serial_number:
  196. score = 30
  197. else:
  198. score = 0
  199. if old_port and old_port.location:
  200. # If this is a reopen, location field must match. A match is not
  201. # sufficient in itself however, as Windows may supply the same
  202. # location for multiple USB ports (this may be an interaction with
  203. # BitDefender). Hence we do not increase the port's score here.
  204. if not x.location or x.location != old_port.location:
  205. score = 0
  206. return score
  207. def find_port(old_port=None):
  208. best_score, best_port = 0, None
  209. for x in serial.tools.list_ports.comports():
  210. score = score_port(x, old_port)
  211. if score > best_score:
  212. best_score, best_port = score, x
  213. if best_port:
  214. return best_port.device
  215. raise serial.SerialException('Cannot find the Greaseweazle device')
  216. def port_info(devname):
  217. for x in serial.tools.list_ports.comports():
  218. if x.device == devname:
  219. return x
  220. return None
  221. def usb_reopen(usb, is_update):
  222. mode = { False: 1, True: 0 }
  223. try:
  224. usb.switch_fw_mode(mode[is_update])
  225. except (serial.SerialException, struct.error):
  226. # Mac and Linux raise SerialException ("... returned no data")
  227. # Win10 pyserial returns a short read which fails struct.unpack
  228. pass
  229. usb.ser.close()
  230. for i in range(10):
  231. time.sleep(0.5)
  232. try:
  233. devicename = find_port(usb.port_info)
  234. new_ser = serial.Serial(devicename)
  235. except serial.SerialException:
  236. # Device not found
  237. pass
  238. else:
  239. new_usb = USB.Unit(new_ser)
  240. new_usb.port_info = port_info(devicename)
  241. new_usb.jumperless_update = usb.jumperless_update
  242. return new_usb
  243. raise serial.SerialException('Could not reopen port after mode switch')
  244. def print_update_instructions(usb):
  245. print("To perform an Update:")
  246. if not usb.jumperless_update:
  247. print(" - Disconnect from USB")
  248. print(" - Install the Update Jumper at pins %s"
  249. % ("RXI-TXO" if usb.hw_model != 1 else "DCLK-GND"))
  250. print(" - Reconnect to USB")
  251. print(" - Run \"gw update\" to install firmware v%u.%u" %
  252. (version.major, version.minor))
  253. def usb_open(devicename, is_update=False, mode_check=True):
  254. if devicename is None:
  255. devicename = find_port()
  256. usb = USB.Unit(serial.Serial(devicename))
  257. usb.port_info = port_info(devicename)
  258. is_win7 = (platform.system() == 'Windows' and platform.release() == '7')
  259. usb.jumperless_update = ((usb.hw_model, usb.hw_submodel) != (1, 0)
  260. and not is_win7)
  261. if not mode_check:
  262. return usb
  263. if usb.update_mode and not is_update:
  264. if usb.jumperless_update and not usb.update_jumpered:
  265. usb = usb_reopen(usb, is_update)
  266. if not usb.update_mode:
  267. return usb
  268. print("ERROR: Greaseweazle is in Firmware Update Mode")
  269. print(" - The only available action is \"gw update\"")
  270. if usb.update_jumpered:
  271. print(" - For normal operation disconnect from USB and remove "
  272. "the Update Jumper at pins %s"
  273. % ("RXI-TXO" if usb.hw_model != 1 else "DCLK-GND"))
  274. else:
  275. print(" - Main firmware is erased: You *must* perform an update!")
  276. sys.exit(1)
  277. if is_update and not usb.update_mode:
  278. if usb.jumperless_update:
  279. usb = usb_reopen(usb, is_update)
  280. error.check(usb.update_mode, """\
  281. Greaseweazle F7 did not change to Firmware Update Mode as requested.
  282. If the problem persists, install the Update Jumper at pins RXI-TXO.""")
  283. return usb
  284. print("ERROR: Greaseweazle is not in Firmware Update Mode")
  285. print_update_instructions(usb)
  286. sys.exit(1)
  287. if not usb.update_mode and usb.update_needed:
  288. print("ERROR: Greaseweazle firmware v%u.%u is unsupported"
  289. % (usb.major, usb.minor))
  290. print_update_instructions(usb)
  291. sys.exit(1)
  292. return usb
  293. # Local variables:
  294. # python-indent: 4
  295. # End: