`
huangyuanmu
  • 浏览: 286782 次
  • 性别: Icon_minigender_1
  • 来自: 龙城
社区版块
存档分类
最新评论
收藏列表
标题 标签 来源
pedometer0330
#-*- coding: utf-8 -*-

'''
Created on 2012-3-26

@author: Jason Done
'''
import time, re, AppError
from bcd_tool import *
from device_hid_api import find_device
from hid.bcd_tool import bcd_decode
from hid.AppError import AppError

def find_pedometer(arg):
    '''
    Find the pedometer device by the given argument
    '''
    a_arg = arg.split(",")
    if len(a_arg) != 2:
        raise AppError('Illegal parameter.')
    else:
        vid = int(a_arg[0], 16)
        pid = int(a_arg[1], 16)
        dev = find_device(vid, pid)
        if dev:
            return pedometer_facade(dev)
        else:
            return None

class pedometer_facade:
    '''
    A facade to manipulate the pedometer device.
    '''
    def __init__(self, dev):
        self.dev = dev



########## Check Link functions begin.
    def is_linked(self):
        '''
        Check whether the pedometer is linked.
        '''
        self.__send_command([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
        data = self.dev.read_data(8)
        
        if len(data) != 0:
            if data[0] == 0x06:
                return True
            elif data[0] == 0x02 and data[1] == 0x00 and data[2] == 0x00:
                # Notify the reader that if device put on 
                # then response message back automatically when not linked
                self.__send_command([0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
                return False
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                raise AppError("IR suspend.")
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
                raise AppError("Reader timeout.")
            elif data[0] == 0xEE:
                raise AppError("Command error.")
            else:
                return False
        else:
            raise AppError("Cann't get the link status.")
                
                    

    def wait_link(self):
        '''
        Wait for link.
        Read data first, if failed then send wait link command and read data again, 
        if failed throw exception, else analysis data returned and return True or False.
        '''
        data = self.dev.read_data(8)
        if len(data) == 0:
            self.__send_command([0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
            data = self.dev.read_data(8)
            if len(data) == 0: raise AppError("Cann't wait for link.")
            else:
                if data[0] == 0x06:
                    return True
                elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                    raise AppError("IR suspend.")
                elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
                    raise AppError("Reader timeout.")
                elif data[0] == 0xEE:
                    raise AppError("Command error.")
                else:
                    return False
        else:
            if data[0] == 6:
                return True
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                raise AppError("IR suspend.")
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
                raise AppError("Reader timeout.")
            elif data[0] == 0xEE:
                raise AppError("Command error.")
            else:
                return False
########### Check Link functions end.
    
 
 
 
 
    
########### Read functions begin.
    def get_user_id(self):
        '''
        Get user's id from the device(pedometer)
        '''
        self.__send_command([0x05, 0x0C, 0xF4, 0x03, 0x00, 0x00, 0x00, 0x00])
        data = self.dev.read_data(8)
        if len(data) == 0:
            raise AppError("Cann't get the user id.")
        else:
            if data[0] == 0x05:
                return self.__get6str(data[3], data[4], data[5])
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                raise AppError("IR suspend.")
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
                raise AppError("Reader timeout.")
            elif data[0] == 0xEE:
                raise AppError("Command error.")
            else: return None
    
    def get_device_id(self):
        '''
        Get the device id
        '''
        self.__send_command([0x05, 0x0C, 0xFA, 0x03, 0x00, 0x00, 0x00, 0x00])
        data = self.dev.read_data(8)
        if len(data) == 0:
            raise AppError("Cann't get the device id.")
        else:
            if data[0] == 0x05:
                return self.__get6str(data[3], data[4], data[5])
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                raise AppError("IR suspend.")
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
                raise AppError("Reader timeout.")
            elif data[0] == 0xEE:
                raise AppError("Command error.")
            else: return None
            
    def get_spm(self):
        '''
        Get the device SPM, value in 1-9.
        SPM=1 for 80 step/min, SPM=2 for 90 step/min, ….. and SPM=9 for 160 step/min 
        '''
        self.__send_command([0x05, 0x0C, 0xF2, 0x01, 0x00, 0x00, 0x00, 0x00])
        data = self.dev.read_data(8)
        if len(data) == 0:
            raise AppError("Cann't get the device spm.")
        else:
            if data[0] == 0x05:
                mask = data[3]>>4
                mask = mask<<4
                return data[3] - mask
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                raise AppError("IR suspend.")
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
                raise AppError("Reader timeout.")
            elif data[0] == 0xEE:
                raise AppError("Command error.")
            else: return None
                
    def get_clock(self):
        '''
        Get the device clock time,
        return a time string as the format hh:mm:ss
        '''
        self.__send_command([0x05, 0x0C, 0x3A, 0x03, 0x00, 0x00, 0x00, 0x00])
        data = self.dev.read_data(8)
        if len(data) == 0:
            raise AppError("Cann't get the device spm.")
        else:
            if data[0] == 0x05:
                hour = bcd_decode(data[3]) - 80 + 12 if bcd_decode(data[3]) > 12 else bcd_decode(data[3])
                hour = '0' + str(hour) if hour < 10 else str(hour)
                minutes = bcd_decode(data[4])
                minutes = '0' + str(minutes) if minutes < 10 else str(minutes)
                second = bcd_decode(data[5])
                second = '0' + str(second) if second < 10 else str(second)
                return hour + ':' + minutes + ':' + second
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                raise AppError("IR suspend.")
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
                raise AppError("Reader timeout.")
            elif data[0] == 0xEE:
                raise AppError("Command error.")
            else: return None
        
    def get_alldata(self):
        '''
        Get all data from the device.
        '''
        self.__send_command([0x05, 0x0D, 0x00, 0x10, 0x00, 0x30, 0x00, 0x00])
        data = []
        while True:
            buffer = self.dev.read_data(8)
            data = data + buffer
            if len(buffer) == 0 or buffer[0] == 0xFF:
                break
        if len(data) == 0:
            raise AppError("Cann't get all data.")
        elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                raise AppError("IR suspend.")
        elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
            raise AppError("Reader timeout.")
        elif data[0] == 0xEE:
            raise AppError("Command error.")
        else:
            l = self.__deal_data(data)
            alldata = {}
            alldata['step'] = int(self.__bcd2str(l[0]) + self.__bcd2str(l[1]) + self.__bcd2str(l[2]))
            alldata['mvpa'] = 
            alldata['act_time'] = int(self.__bcd2str(l[4]) + self.__bcd2str(l[5]) + self.__bcd2str(l[6])) + l[7] * 7.8125 / 1000
            alldata['spm80'] =
            alldata['spm90'] =
            alldata['spm100'] =
            alldata['spm110'] =
            alldata['spm120'] =
            alldata['spm130'] =
            alldata['spm140'] =
            alldata['spm150'] =
            alldata['spm160'] =
            return alldata
        
    
    def get_alldata_7days(self):
        '''
        Get all data from the device(7 days).
        '''
        '''
        Get all data from the device.
        '''
        self.__send_command([0x05, 0x0D, 0x00, 0x40, 0x01, 0xB0, 0x00, 0x00])
        data = []
        while True:
            buffer = self.dev.read_data(8)
            data = data + buffer
            if len(buffer) == 0 or buffer[0] == 0xFF:
                break
        if len(data) == 0:
            raise AppError("Cann't get all data.")
        elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                raise AppError("IR suspend.")
        elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
            raise AppError("Reader timeout.")
        elif data[0] == 0xEE:
            raise AppError("Command error.")
        else:
            return data
########### Read functions end.






########### Write functions begin.     
    def set_user_id(self, uid):
        '''
        Set the user id, if success, return True
        '''
        if not isinstance(uid, basestring):
            raise AppError("id must be a string type.")
        if len(uid) != 6:
            raise AppError("id must be a string type.")
        if not re.match(r'[+-]?\d+$', uid):
            raise AppError("id must be a number string.")
        self.__send_command([0x05, 0x0B, 0xF4, bcd_encode(int(uid[0:2])), 
                             bcd_encode(int(uid[2:4])), bcd_encode(int(uid[4:6])), 0x00, 0x00])
        data = self.dev.read_data(8)
        if len(data) == 0:
            raise AppError("Cann't set the user id.")
        else:
            if data[0] == 0x05:
                return True
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                raise AppError("IR suspend.")
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
                raise AppError("Reader timeout.")
            elif data[0] == 0xEE:
                raise AppError("Command error.")
            else: return False
        
    def set_device_id(self, did):
        '''
        Set the device id, if success, return True.
        '''
        if not isinstance(did, basestring):
            raise AppError("id must be a string type.")
        if len(did) != 6:
            raise AppError("id must be 6 bits.")
        if not re.match(r'[+-]?\d+$', did):
            raise AppError("id must be a number string.")
        self.__send_command([0x05, 0x0B, 0xFA, bcd_encode(int(did[0:2])), 
                             bcd_encode(int(did[2:4])), bcd_encode(int(did[4:6])), 0x00, 0x00])
        data = self.dev.read_data(8)
        if len(data) == 0:
            raise AppError("Cann't set the device id.")
        else:
            if data[0] == 0x05:
                return True
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                raise AppError("IR suspend.")
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
                raise AppError("Reader timeout.")
            elif data[0] == 0xEE:
                raise AppError("Command error.")
            else: return False
        
    def set_spm(self, spm):
        '''
        Set spm, if success, return True.
        '''
        if spm < 1 or spm > 9:
            raise AppError("Illegal parameter, it must between 1 and 9.")
        self.__send_command([0x05, 0x09, 0xF2, spm, 0x00, 0x00, 0x00, 0x00])
        data = self.dev.read_data(8)
        if len(data) == 0:
            raise AppError("Cann't set the device SPM.")
        else:
            if data[0] == 0x05:
                return True
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                raise AppError("IR suspend.")
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
                raise AppError("Reader timeout.")
            elif data[0] == 0xEE:
                raise AppError("Command error.")
            else: return False
        
    def set_clock(self, hh, mm, ss):
        '''
        Set device clock time.
        '''
        if 23 < hh < 0:
            raise AppError("Illegal parameter, it must between 0 and 23.")
        if 59 < mm < 0:
            raise AppError("Illegal parameter, it must between 0 and 59.")
        if 59 < ss < 0:
            raise AppError("Illegal parameter, it must between 0 and 59.")
        hh = hh - 12 + 80 if hh > 12 else hh
        self.__send_command([0x05, 0x0B, 0x3A, bcd_encode(hh), 
                             bcd_encode(mm), bcd_encode(ss), 0x00, 0x00])
        data = self.dev.read_data(8)
        if len(data) == 0:
            raise AppError("Cann't set the device SPM.")
        else:
            if data[0] == 0x05:
                return True
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                raise AppError("IR suspend.")
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
                raise AppError("Reader timeout.")
            elif data[0] == 0xEE:
                raise AppError("Command error.")
            else: return False
        
########### Write functions end.






########### Link end function begin.
    def link_end(self, param):
        '''
        End the link to device, release the handle.
        '''
        if param != 1 and param != 14 and param != 15:
            raise AppError("Illegal parameter, it must be 1,14 or 15")
        self.__send_command([0x02, 0x0F, param, 0x00, 0x00, 0x00, 0x00, 0x00])
        data = self.dev.read_data(8)
        if len(data) == 0:
            raise AppError("The link cann't be ended.")
        else:
            if data[0] == 0x02:
                return True
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0x0D:
                raise AppError("IR suspend.")
            elif data[0] == 0x02 and data[1] == 0x0F and data[2] == 0xD0:
                raise AppError("Reader timeout.")
            elif data[0] == 0xEE:
                raise AppError("Command error.")
            else: return False
########### Link end function end.






########### Private functions begin.
    def __send_command(self, data):
        '''
        Send command to device.
        '''
        ret = self.dev.write_data(data)
        if ret < 0:
            raise IOError("Error writing device: " + self.dev.devh.error())
    
    def __get6str(self, data1, data2, data3):
        '''
        Format number to 6 size string.
        '''
        data1 = bcd_decode(data1)
        data2 = bcd_decode(data2)
        data3 = bcd_decode(data3)
        s_data1 = '0' + str(data1) if data1 < 10 else str(data1)
        s_data2 = '0' + str(data2) if data2 < 10 else str(data2)
        s_data3 = '0' + str(data3) if data3 < 10 else str(data3)
        return s_data1 + s_data2 + s_data3
    
    def __deal_data(self, data):
        '''
        Deal the data read from the EEPROM
        '''
        l = []
        if len(data) != 136:
            raise AppError("Illegal parameter, the data length must be 136")
        else:
            for i in range(16):
                l.append(data[i*8 + 3])
                l.append(data[i*8 + 4])
                l.append(data[i*8 + 5])
            return l
        
    def __bcd2str(self, data):
        return '0' + str(data) if data < 10 else str(data)
                
########### Private functions end.






########## Test
if __name__ == "__main__":
    p = find_pedometer('0x1164,0x451C')
    if p:
        if p.is_linked():
            try:
                print "userId: %s" % (p.get_user_id())
                # p.set_device_id('000002')
                print "deviceId: %s" % (p.get_device_id())
                # p.set_spm(5)
                print "SPM: %s" % (p.get_spm())
                print "clock: %s" % (p.get_clock())
                print "all data: %s" % ([bcd_decode(e) for e in p.get_alldata()])
            finally:
                p.link_end(1)
        else:
            while True:
                if p.wait_link():
                    try:
                        print "userId: %s" % (p.get_user_id())
                        # p.set_device_id('000000')
                        print "deviceId: %s" % (p.get_device_id())
                        # p.set_spm(4)
                        print "SPM: %s" % (p.get_spm())
                        print "clock: %s" % (p.get_clock())
                        print "all data: %s" % ([bcd_decode(e) for e in p.get_alldata()])
                    finally:
                        try:
                            p.link_end(1)
                        finally:
                            break
                else:
                    time.sleep(0.2)
    else:
        raise AppError("Can not find the pedometer.")
Global site tag (gtag.js) - Google Analytics