Loading quadtank_calibrate/DESCRIPTION 0 → 100644 +1 −0 Original line number Diff line number Diff line Semi-automatic calibration for quad-tank (/dev/ttyS0) quadtank_calibrate/calibrator 0 → 100755 +466 −0 Original line number Diff line number Diff line #!/usr/bin/python3 import serial import sys import time import numpy import traceback import threading import os class Opcom: """ Layout: Scroll line 1 ... Scroll line N-2 Pressures Action Status """ def __init__(self, f=sys.stdout): self.f = os.fdopen(f.fileno(), 'bw') self.f.write(b'\n') self._pressures = b'' self._action = b'' self._status = b'' def up_left(self, n): self.f.write(b'\033[%dA\r' % n) def down_left(self, n): self.f.write(b'\033[%dB\r' % n) def progress(self, msg): # Row 0..N-2 self.up_left(1) self.f.write(b' ' * len(self._pressures) + b'\r') self.f.write(msg.encode('ascii') + b'\n') self.f.write(b' ' * len(self._action) + b'\r') self.f.write(self._pressures + b'\n') self.f.write(self._action + b' ' * (40 - len(self._action))) self.f.write(self._status + b'\r') self.f.flush() pass def pressures(self, msg): # Row N-1 self.up_left(1) self.f.write(msg.encode('ascii') + b' ' * (len(self._pressures) - len(msg))) self.down_left(1) self._pressures = msg.encode('ascii') self.f.flush() pass def action(self, msg): # Row N, 0..39 self.f.write(b'\r') self.f.write(msg.encode('ascii') + b' ' * (len(self._action) - len(msg)) + b'\r') self._action = msg.encode('ascii') self.f.flush() pass def status(self, msg): # Row N, 40.. self.f.write(b'\r') self.f.write(b' ' * (40 + len(self._status)) + b'\r') self.f.write(self._action + b' ' * (40 - len(self._action))) self.f.write(msg.encode('ascii') + b'\r') self._status = msg.encode('ascii') self.f.flush() pass KIND = { 0: None, 1: "DigitalIn", 2: "DigitalOut", 3: "AnalogIn", 4: "AnalogOut", 5: "Counter" } CMD = { 0: "Bits", 1: "Min", 2: "Max", } class SerialIO: def __init__(self, port): self.config = None self.channel = {} self.tty = serial.Serial(port, 115200) self.cond = threading.Condition() t = threading.Thread(target=self.read) t.setDaemon(True) t.start() self.pollchannel(31) self.cond.acquire() self.cond.wait(2) self.cond.release() def pollchannel(self, index): self.cond.acquire() self.channel[index] = None self.tty.write(bytes([0x60 | index])) self.cond.release() def getchannel(self, index): self.cond.acquire() while self.channel[index] == None: self.cond.wait() result = self.channel[index] self.cond.release() return result def readchannel(self, index): self.pollchannel(index) return self.getchannel(index) def setchannel(self, index, value, bound=0x3ff): value = int(max(0, min(value, bound))) self.cond.acquire() if value >= (1<<30): self.tty.write(bytes([0x80 | ((value >> 30) & 0x03)])) if value >= (1<<23): self.tty.write(bytes([0x80 | ((value >> 23) & 0x7f)])) if value >= (1<<16): self.tty.write(bytes([0x80 | ((value >> 16) & 0x7f)])) if value >= (1<<9): self.tty.write(bytes([0x80 | ((value >> 9) & 0x7f)])) self.tty.write(bytes([0x80 | ((value >> 2) & 0x7f)])) self.tty.write(bytes([((value << 5) & 0x60) | (index & 0x1f)])) self.cond.release() def read(self): value = 0 n = 0 config = {} while True: ch = self.tty.read(1) value = value << 7 | ord(ch) & 0x7f n += 1 if ord(ch) & 0x80 == 0: # Last byte, so lets handle it if n == 1: # Digital I/O or poll pass else: channel = value & 0x1f value = value >> 5 if channel != 31: self.cond.acquire() self.channel[channel] = value self.cond.notifyAll() self.cond.release() else: if self.handleconfig(value, config): self.cond.acquire() self.config = config self.cond.notifyAll() self.cond.release() value = 0 n = 0 def handleconfig(self, value, config): channel = value & 0x1f kind = (value >> 5) & 0x07 cmd = (value >> 8) & 0x03 value = value >> 10 if KIND[kind] == None: return True else: try: config[KIND[kind]] except: config[KIND[kind]] = {} try: config[KIND[kind]][channel] except: config[KIND[kind]][channel] = {} config[KIND[kind]][channel][CMD[cmd]] = value class QuadTank: NOISE = 10 A_PUMP = 0 B_PUMP = 1 A1_LEVEL = 0 A2_LEVEL = 1 B1_LEVEL = 2 B2_LEVEL = 3 A_FLOW = 4 B_FLOW = 5 P_LABEL = [ "A1", "A2", "B1", "B2", "AFlow", "BFlow" ] def __init__(self, port, opcom): self.io = SerialIO(port) self.opcom = opcom self.get_pressures() def set_pump_voltage(self, voltage_A=0, voltage_B=0): self.voltage_A = voltage_A self.voltage_B = voltage_B self.io.setchannel(self.A_PUMP, voltage_A) self.io.setchannel(self.B_PUMP, voltage_B) def get_pressures(self): p = numpy.array([self.io.readchannel(self.A1_LEVEL), self.io.readchannel(self.A2_LEVEL), self.io.readchannel(self.B1_LEVEL), self.io.readchannel(self.B2_LEVEL), self.io.readchannel(self.A_FLOW), self.io.readchannel(self.B_FLOW)]) msg = '' for i in range(len(p)): msg += 'P%d/%s=%4d ' % (i, self.P_LABEL[i], p[i]) self.opcom.pressures(msg) return p def attention(self, msg="", voltage_A=1023, voltage_B=1023): self.opcom.action(msg) self.set_pump_voltage(voltage_A, voltage_B) time.sleep(0.3) self.set_pump_voltage(0, 0) time.sleep(2) def drain(self): # Drained when all deltas are below noise for 2 seconds self.set_pump_voltage(0, 0) old = self.get_pressures() steady = 0 while steady < 4: time.sleep(0.5) new = self.get_pressures() diff = numpy.abs(new - old) old = new self.opcom.status("Draining %d/4 (diff=%d)" % (steady, diff.max())) if diff.max() < self.NOISE and new[0:4].max() < 200: steady = steady + 1 self.opcom.action("") else: steady = 0 msg = "" name = [ "AV3", "AV4", "BV3", "BV4" ] for i in range(0,4): if diff[i] < self.NOISE and new[i] > 200: msg += name[i] + "/" if len(msg): self.attention("Open %s" % msg[0:-1]) else: self.opcom.action("") self.opcom.action("") self.opcom.status("") def check_valves(self): self.opcom.progress("Checking valves") # Fill tanks with some water self.set_pump_voltage(1023, 1023) time.sleep(3) restart = True while restart: restart = False self.drain() targets = [ type("", (), dict(voltage_A = 1023, voltage_B = 0, top = self.A1_LEVEL, bottom = self.A2_LEVEL, decoupled = self.B2_LEVEL, msg_open_upper = "Open AV3", msg_open_lower = "Open AV4", msg_close = "Close V5/BV1")), type("", (), dict(voltage_A = 0, voltage_B = 1023, top = self.B1_LEVEL, bottom = self.B2_LEVEL, decoupled = self.A2_LEVEL, msg_open_upper = "Open BV3", msg_open_lower = "Open BV4", msg_close = "Close V5/AV1"))] for t in targets: self.opcom.action("") if restart: break self.set_pump_voltage(t.voltage_A, t.voltage_B) t0 = time.time() p0 = self.get_pressures() drain_decoupled = False drain_bottom = False while True: time.sleep(0.2) p = self.get_pressures() diff = p - p0 if diff[t.decoupled] > self.NOISE: self.opcom.action(t.msg_close) drain_decoupled = True elif drain_decoupled: if diff[t.decoupled] < self.NOISE: restart = True break elif diff[t.top] > 400 and diff[t.bottom] < self.NOISE: self.opcom.action(t.msg_open_upper) elif not drain_bottom and diff[t.bottom] > 200: drain_bottom = True self.set_pump_voltage() self.opcom.action(t.msg_open_lower) elif drain_bottom: if diff[t.bottom] < self.NOISE: break def calibrated_flow(self): self.opcom.progress("Calibrating flow") voltage_A = 100 voltage_B = 100 done = False while not done: done = True self.set_pump_voltage(voltage_A, voltage_B) time.sleep(1) p = self.get_pressures() if p[self.A_FLOW] < 180: voltage_A = voltage_A + 20 done = False if p[self.B_FLOW] < 180: voltage_B = voltage_B + 20 done = False old = self.get_pressures() steady = 0 sum = numpy.array([0, 0]) n = 0 while steady < 4: new = self.get_pressures() sum = sum + new[4:6] n += 1 if n % 20 == 0: diff = numpy.abs(new - old)[0:4].max() old = new self.opcom.status("Calibrating flow %d/4 (diff=%d)" % (steady, diff)) if diff < self.NOISE: steady = steady + 1 else: steady = 0 time.sleep(0.1) self.opcom.status("") return sum / n def max_levels(self): self.opcom.progress("Getting max levels") old = self.get_pressures()[0:4] voltage_A = self.voltage_A voltage_B = self.voltage_B low = True while True: time.sleep(0.5) low = not low if low: self.set_pump_voltage(voltage_A - 40, voltage_B - 40) else: self.set_pump_voltage(voltage_A, voltage_B) new = self.get_pressures()[0:4] diff = (new - old) if diff[self.A2_LEVEL] < 100 and diff[self.B2_LEVEL] < 100: self.opcom.action("Close AV4/BV4") elif diff[self.A2_LEVEL] < 100: self.opcom.action("Close AV4") elif diff[self.B2_LEVEL] < 100: self.opcom.action("Close BV4") else: break self.opcom.action("") self.set_pump_voltage(voltage_A=1023, voltage_B=1023) old = self.get_pressures()[0:4] steady = 0 while steady < 4: new = self.get_pressures()[0:4] diff = numpy.abs(new - old).max() old = new self.opcom.status("Calibrating top %d/4 (diff=%d)" % (steady, diff)) if diff < self.NOISE: steady = steady + 1 else: steady = 0 time.sleep(0.5) def calibrate(self): # Calibration steps: # 1. Check valves # 2. Drain # 3. Save min levels # 4. Calibrate flows # 5. Save mid levels # 6. Find max levels # 7. Save max levels # 8. Save configuration self.check_valves() self.drain() # All valves except AV2/BV2 are in known positions, tanks are empty min_pressure = self.get_pressures() calibrated_flow = self.calibrated_flow() mid_pressure = self.get_pressures() self.max_levels() max_pressure = self.get_pressures() self.opcom.progress("All measurements done") self.opcom.status("") self.opcom.action("") mid_level = [ 0, 0, 0, 0 ] calib = [] for i in range(0,4): # 0V at 0 mm, 10V at 200 mm min = min_pressure[i] mid = mid_pressure[i] max = max_pressure[i] n = max - min step = 197.0 / n low = -min * step high = 197.0 + (1023 - max) * step mid_level[i] = low + mid * step calib.append([low * 10 / 200, high * 10 / 200]) for i in range(0,2): # Flow sensors follows the same sqrt law as level # calibration values are such that 5V corresponds to the # flow that gives 200 mm level at stationarity min = min_pressure[i + 4] max = calibrated_flow[i] n = max - min step = mid_level[i * 2] / n low = -min * step high = mid_level[i * 2] + (1023 - max) * step calib.append([low * 5 / 200, high * 5 / 200]) for i in range(len(calib)): self.opcom.progress('%d %s' % (i, calib[i])) self.opcom.progress("Min=%s" % min_pressure) self.opcom.progress("Mid=%s" % mid_pressure) self.opcom.progress("Max=%s" % max_pressure) self.opcom.progress("imbalance A %f" % (mid_pressure[0]/mid_pressure[1])) self.opcom.progress("imbalance B %f" % (mid_pressure[2]/mid_pressure[3])) self.drain() for i in range(len(calib)): self.write_calibration(i * 2, calib[i][0]) self.write_calibration(i * 2 + 1, calib[i][1]) self.write_calibration(12, 0) def write_calibration(self, index, value): if value < 0: tmp = (int(-value * 1000) << 8) | 0x80 | index else: tmp = (int(value * 1000) << 8) | index self.opcom.progress("%d %8x" % (index, tmp)) self.io.setchannel(31, tmp, 0xffffffff) if __name__ == "__main__": opcom = Opcom(sys.stdout) tank = QuadTank(sys.argv[1], opcom) try: tank.calibrate() except: traceback.print_exc() tank.io.setchannel(tank.A_PUMP, 0) tank.io.setchannel(tank.B_PUMP, 0) quadtank_calibrate/run 0 → 100755 +2 −0 Original line number Diff line number Diff line #!/bin/sh xterm -e './calibrator /dev/ttyS0 ; $SHELL' Loading
quadtank_calibrate/DESCRIPTION 0 → 100644 +1 −0 Original line number Diff line number Diff line Semi-automatic calibration for quad-tank (/dev/ttyS0)
quadtank_calibrate/calibrator 0 → 100755 +466 −0 Original line number Diff line number Diff line #!/usr/bin/python3 import serial import sys import time import numpy import traceback import threading import os class Opcom: """ Layout: Scroll line 1 ... Scroll line N-2 Pressures Action Status """ def __init__(self, f=sys.stdout): self.f = os.fdopen(f.fileno(), 'bw') self.f.write(b'\n') self._pressures = b'' self._action = b'' self._status = b'' def up_left(self, n): self.f.write(b'\033[%dA\r' % n) def down_left(self, n): self.f.write(b'\033[%dB\r' % n) def progress(self, msg): # Row 0..N-2 self.up_left(1) self.f.write(b' ' * len(self._pressures) + b'\r') self.f.write(msg.encode('ascii') + b'\n') self.f.write(b' ' * len(self._action) + b'\r') self.f.write(self._pressures + b'\n') self.f.write(self._action + b' ' * (40 - len(self._action))) self.f.write(self._status + b'\r') self.f.flush() pass def pressures(self, msg): # Row N-1 self.up_left(1) self.f.write(msg.encode('ascii') + b' ' * (len(self._pressures) - len(msg))) self.down_left(1) self._pressures = msg.encode('ascii') self.f.flush() pass def action(self, msg): # Row N, 0..39 self.f.write(b'\r') self.f.write(msg.encode('ascii') + b' ' * (len(self._action) - len(msg)) + b'\r') self._action = msg.encode('ascii') self.f.flush() pass def status(self, msg): # Row N, 40.. self.f.write(b'\r') self.f.write(b' ' * (40 + len(self._status)) + b'\r') self.f.write(self._action + b' ' * (40 - len(self._action))) self.f.write(msg.encode('ascii') + b'\r') self._status = msg.encode('ascii') self.f.flush() pass KIND = { 0: None, 1: "DigitalIn", 2: "DigitalOut", 3: "AnalogIn", 4: "AnalogOut", 5: "Counter" } CMD = { 0: "Bits", 1: "Min", 2: "Max", } class SerialIO: def __init__(self, port): self.config = None self.channel = {} self.tty = serial.Serial(port, 115200) self.cond = threading.Condition() t = threading.Thread(target=self.read) t.setDaemon(True) t.start() self.pollchannel(31) self.cond.acquire() self.cond.wait(2) self.cond.release() def pollchannel(self, index): self.cond.acquire() self.channel[index] = None self.tty.write(bytes([0x60 | index])) self.cond.release() def getchannel(self, index): self.cond.acquire() while self.channel[index] == None: self.cond.wait() result = self.channel[index] self.cond.release() return result def readchannel(self, index): self.pollchannel(index) return self.getchannel(index) def setchannel(self, index, value, bound=0x3ff): value = int(max(0, min(value, bound))) self.cond.acquire() if value >= (1<<30): self.tty.write(bytes([0x80 | ((value >> 30) & 0x03)])) if value >= (1<<23): self.tty.write(bytes([0x80 | ((value >> 23) & 0x7f)])) if value >= (1<<16): self.tty.write(bytes([0x80 | ((value >> 16) & 0x7f)])) if value >= (1<<9): self.tty.write(bytes([0x80 | ((value >> 9) & 0x7f)])) self.tty.write(bytes([0x80 | ((value >> 2) & 0x7f)])) self.tty.write(bytes([((value << 5) & 0x60) | (index & 0x1f)])) self.cond.release() def read(self): value = 0 n = 0 config = {} while True: ch = self.tty.read(1) value = value << 7 | ord(ch) & 0x7f n += 1 if ord(ch) & 0x80 == 0: # Last byte, so lets handle it if n == 1: # Digital I/O or poll pass else: channel = value & 0x1f value = value >> 5 if channel != 31: self.cond.acquire() self.channel[channel] = value self.cond.notifyAll() self.cond.release() else: if self.handleconfig(value, config): self.cond.acquire() self.config = config self.cond.notifyAll() self.cond.release() value = 0 n = 0 def handleconfig(self, value, config): channel = value & 0x1f kind = (value >> 5) & 0x07 cmd = (value >> 8) & 0x03 value = value >> 10 if KIND[kind] == None: return True else: try: config[KIND[kind]] except: config[KIND[kind]] = {} try: config[KIND[kind]][channel] except: config[KIND[kind]][channel] = {} config[KIND[kind]][channel][CMD[cmd]] = value class QuadTank: NOISE = 10 A_PUMP = 0 B_PUMP = 1 A1_LEVEL = 0 A2_LEVEL = 1 B1_LEVEL = 2 B2_LEVEL = 3 A_FLOW = 4 B_FLOW = 5 P_LABEL = [ "A1", "A2", "B1", "B2", "AFlow", "BFlow" ] def __init__(self, port, opcom): self.io = SerialIO(port) self.opcom = opcom self.get_pressures() def set_pump_voltage(self, voltage_A=0, voltage_B=0): self.voltage_A = voltage_A self.voltage_B = voltage_B self.io.setchannel(self.A_PUMP, voltage_A) self.io.setchannel(self.B_PUMP, voltage_B) def get_pressures(self): p = numpy.array([self.io.readchannel(self.A1_LEVEL), self.io.readchannel(self.A2_LEVEL), self.io.readchannel(self.B1_LEVEL), self.io.readchannel(self.B2_LEVEL), self.io.readchannel(self.A_FLOW), self.io.readchannel(self.B_FLOW)]) msg = '' for i in range(len(p)): msg += 'P%d/%s=%4d ' % (i, self.P_LABEL[i], p[i]) self.opcom.pressures(msg) return p def attention(self, msg="", voltage_A=1023, voltage_B=1023): self.opcom.action(msg) self.set_pump_voltage(voltage_A, voltage_B) time.sleep(0.3) self.set_pump_voltage(0, 0) time.sleep(2) def drain(self): # Drained when all deltas are below noise for 2 seconds self.set_pump_voltage(0, 0) old = self.get_pressures() steady = 0 while steady < 4: time.sleep(0.5) new = self.get_pressures() diff = numpy.abs(new - old) old = new self.opcom.status("Draining %d/4 (diff=%d)" % (steady, diff.max())) if diff.max() < self.NOISE and new[0:4].max() < 200: steady = steady + 1 self.opcom.action("") else: steady = 0 msg = "" name = [ "AV3", "AV4", "BV3", "BV4" ] for i in range(0,4): if diff[i] < self.NOISE and new[i] > 200: msg += name[i] + "/" if len(msg): self.attention("Open %s" % msg[0:-1]) else: self.opcom.action("") self.opcom.action("") self.opcom.status("") def check_valves(self): self.opcom.progress("Checking valves") # Fill tanks with some water self.set_pump_voltage(1023, 1023) time.sleep(3) restart = True while restart: restart = False self.drain() targets = [ type("", (), dict(voltage_A = 1023, voltage_B = 0, top = self.A1_LEVEL, bottom = self.A2_LEVEL, decoupled = self.B2_LEVEL, msg_open_upper = "Open AV3", msg_open_lower = "Open AV4", msg_close = "Close V5/BV1")), type("", (), dict(voltage_A = 0, voltage_B = 1023, top = self.B1_LEVEL, bottom = self.B2_LEVEL, decoupled = self.A2_LEVEL, msg_open_upper = "Open BV3", msg_open_lower = "Open BV4", msg_close = "Close V5/AV1"))] for t in targets: self.opcom.action("") if restart: break self.set_pump_voltage(t.voltage_A, t.voltage_B) t0 = time.time() p0 = self.get_pressures() drain_decoupled = False drain_bottom = False while True: time.sleep(0.2) p = self.get_pressures() diff = p - p0 if diff[t.decoupled] > self.NOISE: self.opcom.action(t.msg_close) drain_decoupled = True elif drain_decoupled: if diff[t.decoupled] < self.NOISE: restart = True break elif diff[t.top] > 400 and diff[t.bottom] < self.NOISE: self.opcom.action(t.msg_open_upper) elif not drain_bottom and diff[t.bottom] > 200: drain_bottom = True self.set_pump_voltage() self.opcom.action(t.msg_open_lower) elif drain_bottom: if diff[t.bottom] < self.NOISE: break def calibrated_flow(self): self.opcom.progress("Calibrating flow") voltage_A = 100 voltage_B = 100 done = False while not done: done = True self.set_pump_voltage(voltage_A, voltage_B) time.sleep(1) p = self.get_pressures() if p[self.A_FLOW] < 180: voltage_A = voltage_A + 20 done = False if p[self.B_FLOW] < 180: voltage_B = voltage_B + 20 done = False old = self.get_pressures() steady = 0 sum = numpy.array([0, 0]) n = 0 while steady < 4: new = self.get_pressures() sum = sum + new[4:6] n += 1 if n % 20 == 0: diff = numpy.abs(new - old)[0:4].max() old = new self.opcom.status("Calibrating flow %d/4 (diff=%d)" % (steady, diff)) if diff < self.NOISE: steady = steady + 1 else: steady = 0 time.sleep(0.1) self.opcom.status("") return sum / n def max_levels(self): self.opcom.progress("Getting max levels") old = self.get_pressures()[0:4] voltage_A = self.voltage_A voltage_B = self.voltage_B low = True while True: time.sleep(0.5) low = not low if low: self.set_pump_voltage(voltage_A - 40, voltage_B - 40) else: self.set_pump_voltage(voltage_A, voltage_B) new = self.get_pressures()[0:4] diff = (new - old) if diff[self.A2_LEVEL] < 100 and diff[self.B2_LEVEL] < 100: self.opcom.action("Close AV4/BV4") elif diff[self.A2_LEVEL] < 100: self.opcom.action("Close AV4") elif diff[self.B2_LEVEL] < 100: self.opcom.action("Close BV4") else: break self.opcom.action("") self.set_pump_voltage(voltage_A=1023, voltage_B=1023) old = self.get_pressures()[0:4] steady = 0 while steady < 4: new = self.get_pressures()[0:4] diff = numpy.abs(new - old).max() old = new self.opcom.status("Calibrating top %d/4 (diff=%d)" % (steady, diff)) if diff < self.NOISE: steady = steady + 1 else: steady = 0 time.sleep(0.5) def calibrate(self): # Calibration steps: # 1. Check valves # 2. Drain # 3. Save min levels # 4. Calibrate flows # 5. Save mid levels # 6. Find max levels # 7. Save max levels # 8. Save configuration self.check_valves() self.drain() # All valves except AV2/BV2 are in known positions, tanks are empty min_pressure = self.get_pressures() calibrated_flow = self.calibrated_flow() mid_pressure = self.get_pressures() self.max_levels() max_pressure = self.get_pressures() self.opcom.progress("All measurements done") self.opcom.status("") self.opcom.action("") mid_level = [ 0, 0, 0, 0 ] calib = [] for i in range(0,4): # 0V at 0 mm, 10V at 200 mm min = min_pressure[i] mid = mid_pressure[i] max = max_pressure[i] n = max - min step = 197.0 / n low = -min * step high = 197.0 + (1023 - max) * step mid_level[i] = low + mid * step calib.append([low * 10 / 200, high * 10 / 200]) for i in range(0,2): # Flow sensors follows the same sqrt law as level # calibration values are such that 5V corresponds to the # flow that gives 200 mm level at stationarity min = min_pressure[i + 4] max = calibrated_flow[i] n = max - min step = mid_level[i * 2] / n low = -min * step high = mid_level[i * 2] + (1023 - max) * step calib.append([low * 5 / 200, high * 5 / 200]) for i in range(len(calib)): self.opcom.progress('%d %s' % (i, calib[i])) self.opcom.progress("Min=%s" % min_pressure) self.opcom.progress("Mid=%s" % mid_pressure) self.opcom.progress("Max=%s" % max_pressure) self.opcom.progress("imbalance A %f" % (mid_pressure[0]/mid_pressure[1])) self.opcom.progress("imbalance B %f" % (mid_pressure[2]/mid_pressure[3])) self.drain() for i in range(len(calib)): self.write_calibration(i * 2, calib[i][0]) self.write_calibration(i * 2 + 1, calib[i][1]) self.write_calibration(12, 0) def write_calibration(self, index, value): if value < 0: tmp = (int(-value * 1000) << 8) | 0x80 | index else: tmp = (int(value * 1000) << 8) | index self.opcom.progress("%d %8x" % (index, tmp)) self.io.setchannel(31, tmp, 0xffffffff) if __name__ == "__main__": opcom = Opcom(sys.stdout) tank = QuadTank(sys.argv[1], opcom) try: tank.calibrate() except: traceback.print_exc() tank.io.setchannel(tank.A_PUMP, 0) tank.io.setchannel(tank.B_PUMP, 0)
quadtank_calibrate/run 0 → 100755 +2 −0 Original line number Diff line number Diff line #!/bin/sh xterm -e './calibrator /dev/ttyS0 ; $SHELL'