Loading catkin_ws/src/gbs/scripts/lc_client.py +58 −59 Original line number Diff line number Diff line Loading @@ -24,7 +24,7 @@ class ResponseMismatch(Exception): self.expected = expected self.received = received def __str__(self): return "Unexpected response: expected %s, got %s" % (self.expected, self.received) return "Unexpected response: expected %s, got %s (response type, sequence number)" % (self.expected, self.received) class test_client(object): Loading Loading @@ -73,42 +73,43 @@ class test_client(object): def close(self): self.shutdown() def add_response(self, type, val): def add_response(self, type, seq, val): self.response.acquire() self.responses.append((type,val)) self.responses.append((type, seq, val)) self.response.notifyAll() self.response.release() def wait_response(self, type): def wait_response(self, type, seq): self.response.acquire() while len(self.responses) < 1: self.response.wait() (t,res) = self.responses.popleft() (t, s, res) = self.responses.popleft() self.response.release() if t == type: if t == type and s == seq: return res else: raise ResponseMismatch(type,t) raise ResponseMismatch((type,seq), (t,s)) def handle_COMMAND_RECEIVED(self, val): print "got COMMAND_RECEIVED: %s" % val self.add_response(COMMAND_RECEIVED, val) def handle_COMMAND_RECEIVED(self, seq): print "got COMMAND_RECEIVED: %s" % seq self.add_response(COMMAND_RECEIVED, seq, 0) def handle_RESP_NAK(self, val): print "got RESP_NAK: %s" % val self.add_response(RESP_NAK, val) def handle_RESP_NAK(self, seq): print "got RESP_NAK: %s" % seq self.add_response(RESP_NAK, seq, 0) def handle_RESP_ACK(self, val): print "got RESP_ACK: %s" % val self.add_response(RESP_ACK, val) def handle_RESP_ACK(self, seq): print "got RESP_ACK: %s" % seq self.add_response(RESP_ACK, seq, 0) def handle_RESP_READ_PTP(self, val): print "got RESP_READ_PTP: %s" % val self.add_response(RESP_READ_PTP, val) def handle_RESP_READ_PTP(self, resp): print "got RESP_READ_PTP: %s" % resp self.add_response(RESP_READ_PTP, resp.seq, resp.val ) def handle_RESP_MOVE_MEASURE(self, val): print "got RESP_MOVE_MEASURE: %s" % val self.add_response(RESP_MOVE_MEASURE, val) def handle_RESP_MOVE_MEASURE(self, resp): print "got RESP_MOVE_MEASURE: %s" % resp val = dict((k, resp[k]) for k in ('x1', 'x2', 'y1')) self.add_response(RESP_MOVE_MEASURE, resp.seq, val) def handle_shutdown(self, val): print "got shutdown: %s" % val Loading @@ -123,116 +124,114 @@ class test_client(object): def CMD_MOVE_LIN(self, coordinate, angle, speed, acc): self.send_command(dict(seq=self.cmd_seq, seq = self.send_command(dict(seq=self.cmd_seq, cmd=dict(pos=dict( pos=coordinate, rot=angle), S_Ac=dict(speed=speed, acc=acc))), commands.CMD_MOVE_LIN.signature) res = self.wait_response(COMMAND_RECEIVED) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) return GBS_OK def CMD_MOVE_PTP(self, j, s, a): self.send_command(dict(seq=self.cmd_seq, seq = self.send_command(dict(seq=self.cmd_seq, cmd=dict(j=j, S_Ac=dict(speed=s, acc=a))), commands.CMD_MOVE_PTP.signature) res = self.wait_response(COMMAND_RECEIVED) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) return GBS_OK def CMD_NO_OPERATION(self): self.send_command(self.cmd_seq, commands.CMD_NO_OPERATION.signature) return GBS_OK return WHAT def CMD_READ_LIB(self): self.send_command(self.cmd_seq, commands.CMD_READ_LIB.signature) return WHAT def CMD_READ_PTP(self): self.send_command(self.cmd_seq, commands.CMD_READ_PTP.signature) res = self.wait_response(COMMAND_RECEIVED) seq = self.send_command(self.cmd_seq, commands.CMD_READ_PTP.signature) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_READ_PTP) res = self.wait_response(RESP_READ_PTP, seq) print("... RESP_READ_PTP: %s"%res) return res.val return res def CMD_LCK_TL(self): self.send_command(self.cmd_seq, commands.CMD_LCK_TL.signature) res = self.wait_response(COMMAND_RECEIVED) seq = self.send_command(self.cmd_seq, commands.CMD_LCK_TL.signature) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_ACK, seq) print("... LCK_TL: RESP_ACK: %s"%res) return GBS_OK def CMD_UNLCK_TL(self): self.send_command(self.cmd_seq, commands.CMD_UNLCK_TL.signature) res = self.wait_response(COMMAND_RECEIVED) seq = self.send_command(self.cmd_seq, commands.CMD_UNLCK_TL.signature) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) return GBS_OK def CMD_NAIL_LIN(self, id, bstop, start, finish, speed, acc): self.send_command(dict(seq=self.cmd_seq, seq = self.send_command(dict(seq=self.cmd_seq, id=id, bstop=bstop, S_Ac=dict(speed=speed, acc=acc), start=start, finish=finish), commands.CMD_NAIL_LIN.signature) res = self.wait_response(COMMAND_RECEIVED) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) return GBS_OK def CMD_MOVE_MEASURE(self, id, coordinate, angle, speed, acc): self.send_command(dict(seq=self.cmd_seq, seq = self.send_command(dict(seq=self.cmd_seq, id=id, pos=dict(pos=coordinate, rot=angle), S_Ac=dict(speed=speed, acc=acc)), commands.CMD_MOVE_MEASURE.signature) res = self.wait_response(COMMAND_RECEIVED) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_MOVE_MEASURE) res = self.wait_response(RESP_MOVE_MEASURE, seq) print("... RESP_MOVE_MEASURE: %s"%res) ans = dict((k, res[k]) for k in ('x1', 'x2', 'y1')) return ans return res def CMD_MOVE_ARC(self, posI, posF, speed, acc): self.send_command(dict(seq=self.cmd_seq, seq =self.send_command(dict(seq=self.cmd_seq, cmd=dict( posI=posI, posF=posF, S_Ac=dict(speed=speed, acc=acc))), commands.CMD_MOVE_ARC.signature) res = self.wait_response(COMMAND_RECEIVED) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) return GBS_OK #TODO def CMD_MOVE(self, moves): self.send_command(dict(seq=self.cmd_seq, seq = self.send_command(dict(seq=self.cmd_seq, cmd=moves), commands.CMD_MOVE.signature) return GBS_OK Loading Loading
catkin_ws/src/gbs/scripts/lc_client.py +58 −59 Original line number Diff line number Diff line Loading @@ -24,7 +24,7 @@ class ResponseMismatch(Exception): self.expected = expected self.received = received def __str__(self): return "Unexpected response: expected %s, got %s" % (self.expected, self.received) return "Unexpected response: expected %s, got %s (response type, sequence number)" % (self.expected, self.received) class test_client(object): Loading Loading @@ -73,42 +73,43 @@ class test_client(object): def close(self): self.shutdown() def add_response(self, type, val): def add_response(self, type, seq, val): self.response.acquire() self.responses.append((type,val)) self.responses.append((type, seq, val)) self.response.notifyAll() self.response.release() def wait_response(self, type): def wait_response(self, type, seq): self.response.acquire() while len(self.responses) < 1: self.response.wait() (t,res) = self.responses.popleft() (t, s, res) = self.responses.popleft() self.response.release() if t == type: if t == type and s == seq: return res else: raise ResponseMismatch(type,t) raise ResponseMismatch((type,seq), (t,s)) def handle_COMMAND_RECEIVED(self, val): print "got COMMAND_RECEIVED: %s" % val self.add_response(COMMAND_RECEIVED, val) def handle_COMMAND_RECEIVED(self, seq): print "got COMMAND_RECEIVED: %s" % seq self.add_response(COMMAND_RECEIVED, seq, 0) def handle_RESP_NAK(self, val): print "got RESP_NAK: %s" % val self.add_response(RESP_NAK, val) def handle_RESP_NAK(self, seq): print "got RESP_NAK: %s" % seq self.add_response(RESP_NAK, seq, 0) def handle_RESP_ACK(self, val): print "got RESP_ACK: %s" % val self.add_response(RESP_ACK, val) def handle_RESP_ACK(self, seq): print "got RESP_ACK: %s" % seq self.add_response(RESP_ACK, seq, 0) def handle_RESP_READ_PTP(self, val): print "got RESP_READ_PTP: %s" % val self.add_response(RESP_READ_PTP, val) def handle_RESP_READ_PTP(self, resp): print "got RESP_READ_PTP: %s" % resp self.add_response(RESP_READ_PTP, resp.seq, resp.val ) def handle_RESP_MOVE_MEASURE(self, val): print "got RESP_MOVE_MEASURE: %s" % val self.add_response(RESP_MOVE_MEASURE, val) def handle_RESP_MOVE_MEASURE(self, resp): print "got RESP_MOVE_MEASURE: %s" % resp val = dict((k, resp[k]) for k in ('x1', 'x2', 'y1')) self.add_response(RESP_MOVE_MEASURE, resp.seq, val) def handle_shutdown(self, val): print "got shutdown: %s" % val Loading @@ -123,116 +124,114 @@ class test_client(object): def CMD_MOVE_LIN(self, coordinate, angle, speed, acc): self.send_command(dict(seq=self.cmd_seq, seq = self.send_command(dict(seq=self.cmd_seq, cmd=dict(pos=dict( pos=coordinate, rot=angle), S_Ac=dict(speed=speed, acc=acc))), commands.CMD_MOVE_LIN.signature) res = self.wait_response(COMMAND_RECEIVED) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) return GBS_OK def CMD_MOVE_PTP(self, j, s, a): self.send_command(dict(seq=self.cmd_seq, seq = self.send_command(dict(seq=self.cmd_seq, cmd=dict(j=j, S_Ac=dict(speed=s, acc=a))), commands.CMD_MOVE_PTP.signature) res = self.wait_response(COMMAND_RECEIVED) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) return GBS_OK def CMD_NO_OPERATION(self): self.send_command(self.cmd_seq, commands.CMD_NO_OPERATION.signature) return GBS_OK return WHAT def CMD_READ_LIB(self): self.send_command(self.cmd_seq, commands.CMD_READ_LIB.signature) return WHAT def CMD_READ_PTP(self): self.send_command(self.cmd_seq, commands.CMD_READ_PTP.signature) res = self.wait_response(COMMAND_RECEIVED) seq = self.send_command(self.cmd_seq, commands.CMD_READ_PTP.signature) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_READ_PTP) res = self.wait_response(RESP_READ_PTP, seq) print("... RESP_READ_PTP: %s"%res) return res.val return res def CMD_LCK_TL(self): self.send_command(self.cmd_seq, commands.CMD_LCK_TL.signature) res = self.wait_response(COMMAND_RECEIVED) seq = self.send_command(self.cmd_seq, commands.CMD_LCK_TL.signature) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_ACK, seq) print("... LCK_TL: RESP_ACK: %s"%res) return GBS_OK def CMD_UNLCK_TL(self): self.send_command(self.cmd_seq, commands.CMD_UNLCK_TL.signature) res = self.wait_response(COMMAND_RECEIVED) seq = self.send_command(self.cmd_seq, commands.CMD_UNLCK_TL.signature) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) return GBS_OK def CMD_NAIL_LIN(self, id, bstop, start, finish, speed, acc): self.send_command(dict(seq=self.cmd_seq, seq = self.send_command(dict(seq=self.cmd_seq, id=id, bstop=bstop, S_Ac=dict(speed=speed, acc=acc), start=start, finish=finish), commands.CMD_NAIL_LIN.signature) res = self.wait_response(COMMAND_RECEIVED) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) return GBS_OK def CMD_MOVE_MEASURE(self, id, coordinate, angle, speed, acc): self.send_command(dict(seq=self.cmd_seq, seq = self.send_command(dict(seq=self.cmd_seq, id=id, pos=dict(pos=coordinate, rot=angle), S_Ac=dict(speed=speed, acc=acc)), commands.CMD_MOVE_MEASURE.signature) res = self.wait_response(COMMAND_RECEIVED) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_MOVE_MEASURE) res = self.wait_response(RESP_MOVE_MEASURE, seq) print("... RESP_MOVE_MEASURE: %s"%res) ans = dict((k, res[k]) for k in ('x1', 'x2', 'y1')) return ans return res def CMD_MOVE_ARC(self, posI, posF, speed, acc): self.send_command(dict(seq=self.cmd_seq, seq =self.send_command(dict(seq=self.cmd_seq, cmd=dict( posI=posI, posF=posF, S_Ac=dict(speed=speed, acc=acc))), commands.CMD_MOVE_ARC.signature) res = self.wait_response(COMMAND_RECEIVED) res = self.wait_response(COMMAND_RECEIVED, seq) print("... COMMAND_RECEIVED: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) res = self.wait_response(RESP_ACK) res = self.wait_response(RESP_ACK, seq) print("... RESP_ACK: %s"%res) return GBS_OK #TODO def CMD_MOVE(self, moves): self.send_command(dict(seq=self.cmd_seq, seq = self.send_command(dict(seq=self.cmd_seq, cmd=moves), commands.CMD_MOVE.signature) return GBS_OK Loading