Procházet zdrojové kódy

python: New modules for encapsulating Flux and Bitcells

Keir Fraser před 5 roky
rodič
revize
5fe094e653

+ 1 - 1
scripts/greaseweazle/USB.py

@@ -6,7 +6,7 @@
 # See the file COPYING for more details, or visit <http://unlicense.org>.
 
 import struct, collections
-from . import version
+from greaseweazle import version
 
 ## Control-Path command set
 class ControlCmd:

+ 93 - 0
scripts/greaseweazle/bitcell.py

@@ -0,0 +1,93 @@
+# greaseweazle/bitcell.py
+#
+# Written & released by Keir Fraser <keir.xen@gmail.com>
+#
+# This is free and unencumbered software released into the public domain.
+# See the file COPYING for more details, or visit <http://unlicense.org>.
+
+from bitarray import bitarray
+import binascii
+
+class Bitcell:
+
+    def __init__(self):
+        self.clock = 2 / 1000000
+        self.clock_max_adj = 0.10
+        self.pll_period_adj = 0.05
+        self.pll_phase_adj = 0.60
+
+    def __str__(self):
+        s = ""
+        rev = 0
+        for b, t in self.revolution_list:
+            s += "Revolution %u: " % rev
+            s += str(binascii.hexlify(b.tobytes())) + "\n"
+            rev += 1
+        return s[:-1]
+
+    def read_flux(self, flux):
+
+        index_list, freq = flux.index_list, flux.sample_freq
+
+        clock = self.clock
+        clock_min = self.clock * (1 - self.clock_max_adj)
+        clock_max = self.clock * (1 + self.clock_max_adj)
+        ticks = 0.0
+
+        # Per-revolution list of bitcells and bitcell times.
+        self.revolution_list = []
+        
+        # Initialise bitcell lists for the first revolution.
+        bits, times = bitarray(), []
+        to_index = index_list[0] / freq
+        index_list = index_list[1:]
+
+        for x in flux.list:
+
+            # Gather enough ticks to generate at least one bitcell.
+            ticks += x / freq
+            if ticks < clock/2:
+                continue
+
+            # Clock out zero or more 0s, followed by a 1.
+            zeros = 0
+            while True:
+
+                # Check if we cross the index mark.
+                to_index -= clock
+                if to_index < 0:
+                    self.revolution_list.append((bits,times))
+                    if not index_list:
+                        return
+                    bits, times = bitarray(), []
+                    to_index = index_list[0] / freq
+                    index_list = index_list[1:]
+
+                ticks -= clock
+                times.append(clock)
+                if ticks >= clock/2:
+                    zeros += 1
+                    bits.append(False)
+                else:
+                    bits.append(True)
+                    break
+
+            # PLL: Adjust clock frequency according to phase mismatch.
+            if zeros <= 3:
+                # In sync: adjust clock by a fraction of the phase mismatch.
+                clock += ticks * self.pll_period_adj
+            else:
+                # Out of sync: adjust clock towards centre.
+                clock += (self.clock - clock) * self.pll_period_adj
+            # Clamp the clock's adjustment range.
+            clock = min(max(clock, clock_min), clock_max)
+            # PLL: Adjust clock phase according to mismatch.
+            new_ticks = ticks * (1 - self.pll_phase_adj)
+            times[-1] += ticks - new_ticks
+            ticks = new_ticks
+            
+        self.revolution_list.append((bits, times))
+
+# Local variables:
+# python-indent: 4
+# End:

+ 26 - 0
scripts/greaseweazle/flux.py

@@ -0,0 +1,26 @@
+# greaseweazle/flux.py
+#
+# Written & released by Keir Fraser <keir.xen@gmail.com>
+#
+# This is free and unencumbered software released into the public domain.
+# See the file COPYING for more details, or visit <http://unlicense.org>.
+
+class Flux:
+
+    def __init__(self, index_list, flux_list, sample_freq):
+        self.index_list = index_list
+        self.list = flux_list
+        self.sample_freq = sample_freq
+
+    def __str__(self):
+        s = "Sample Frequency: %f MHz\n" % (self.sample_freq/1000000)
+        s += "Total Flux: %u\n" % len(self.list)
+        rev = 0
+        for t in self.index_list:
+            s += "Revolution %u: %.2fms\n" % (rev, t*1000/self.sample_freq)
+            rev += 1
+        return s[:-1]
+
+# Local variables:
+# python-indent: 4
+# End:

+ 17 - 14
scripts/gw.py

@@ -11,7 +11,10 @@ import crcmod.predefined
 import sys, struct, argparse, serial
 from timeit import default_timer as timer
 
-from greaseweazle import version, USB
+from greaseweazle import version
+from greaseweazle import USB
+from greaseweazle.bitcell import Bitcell
+from greaseweazle.flux import Flux
 
 # 40MHz
 scp_freq = 40000000
@@ -19,18 +22,18 @@ scp_freq = 40000000
 # flux_to_scp:
 # Converts Greaseweazle flux samples into a Supercard Pro Track.
 # Returns the Track Data Header (TDH) and the SCP "bitcell" array.
-def flux_to_scp(usb, flux, index_times, track, nr_revs):
+def flux_to_scp(flux, track, nr_revs):
 
-    factor = scp_freq / usb.sample_freq
+    factor = scp_freq / flux.sample_freq
 
     tdh = struct.pack("<3sB", b"TRK", track)
     dat = bytearray()
 
     len_at_index = rev = 0
-    to_index = index_times[0]
+    to_index = flux.index_list[0]
     rem = 0.0
 
-    for x in flux:
+    for x in flux.list:
 
         # Are we processing initial samples before the first revolution?
         if rev == 0:
@@ -40,13 +43,13 @@ def flux_to_scp(usb, flux, index_times, track, nr_revs):
                 continue
             # Now starting the first full revolution
             rev = 1
-            to_index += index_times[rev]
+            to_index += flux.index_list[rev]
 
         # Does the next flux interval cross the index mark?
         while to_index < x:
             # Append to the Track Data Header for the previous full revolution
             tdh += struct.pack("<III",
-                               int(round(index_times[rev]*factor)),
+                               int(round(flux.index_list[rev]*factor)),
                                (len(dat) - len_at_index) // 2,
                                4 + nr_revs*12 + len_at_index)
             # Set up for the next revolution
@@ -55,7 +58,7 @@ def flux_to_scp(usb, flux, index_times, track, nr_revs):
             if rev > nr_revs:
                 # We're done: We simply discard any surplus flux samples
                 return tdh, dat
-            to_index += index_times[rev]
+            to_index += flux.index_list[rev]
 
         # Process the current flux sample into SCP "bitcell" format
         to_index -= x
@@ -74,7 +77,7 @@ def flux_to_scp(usb, flux, index_times, track, nr_revs):
     # Header for last track(s) in case we ran out of flux timings.
     while rev <= nr_revs:
         tdh += struct.pack("<III",
-                           int(round(index_times[rev]*factor)),
+                           int(round(flux.index_list[rev]*factor)),
                            (len(dat) - len_at_index) // 2,
                            4 + nr_revs*12 + len_at_index)
         len_at_index = len(dat)
@@ -101,15 +104,15 @@ def read_to_scp(usb, args):
         trk_offs.append(len(trk_dat))
         usb.seek(cyl, side)
         for retry in range(1, 5):
-            ack, index_times, enc_flux = usb.read_track(args.revs+1)
+            ack, index_list, enc_flux = usb.read_track(args.revs+1)
             if ack == USB.Ack.Okay:
                 break
             elif ack == USB.Ack.FluxOverflow and retry < 5:
                 print("Retry #%u..." % (retry))
             else:
                 raise CmdError(ack)
-        flux = usb.decode_flux(enc_flux)
-        tdh, dat = flux_to_scp(usb, flux, index_times, track, args.revs)
+        flux = Flux(index_list, usb.decode_flux(enc_flux), usb.sample_freq)
+        tdh, dat = flux_to_scp(flux, track, args.revs)
         trk_dat += tdh
         trk_dat += dat
     print()
@@ -151,12 +154,12 @@ def write_from_scp(usb, args):
         # @drive_ticks is the time in Gresaeweazle ticks between index pulses.
         # We will adjust the flux intervals per track to allow for this.
         for retry in range(1, 5):
-            ack, index_times, _ = usb.read_track(3)
+            ack, index_list, _ = usb.read_track(3)
             if ack == USB.Ack.Okay:
                 break
             elif ack != USB.Ack.FluxOverflow or retry >= 5:
                 raise CmdError(ack)
-        drive_ticks = (index_times[1] + index_times[2]) / 2
+        drive_ticks = (index_list[1] + index_list[2]) / 2
     else:
         # Simple ratio between the Greaseweazle and SCP sample frequencies.
         factor = usb.sample_freq / scp_freq