Source code for taurus.core.tango.tangodevice

#!/usr/bin/env python

#############################################################################
##
## This file is part of Taurus
## 
## http://taurus-scada.org
##
## Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
## 
## Taurus is free software: you can redistribute it and/or modify
## it under the terms of the GNU Lesser General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
## 
## Taurus is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU Lesser General Public License for more details.
## 
## You should have received a copy of the GNU Lesser General Public License
## along with Taurus.  If not, see <http://www.gnu.org/licenses/>.
##
#############################################################################

"""This module contains all taurus tango database"""

__all__ = ["TangoDevice"]

__docformat__ = "restructuredtext"

import time
import PyTango

from taurus import Factory
from taurus.core.taurusdevice import TaurusDevice
from taurus.core.taurusbasetypes import TaurusSWDevState, TaurusLockInfo, LockStatus

DFT_TANGO_DEVICE_DESCRIPTION = "A TANGO device"

class _TangoInfo(object):

    def __init__(self):
        self.dev_class = self.dev_type = 'TangoDevice'
        self.doc_url = 'http://www.esrf.fr/computing/cs/tango/tango_doc/ds_doc/'
        self.server_host = 'Unknown'
        self.server_id = 'Unknown'
        self.server_version = 1
                
[docs]class TangoDevice(TaurusDevice): def __init__(self, name, **kw): """Object initialization.""" self.call__init__(TaurusDevice, name, **kw) #-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- # TaurusModel necessary overwrite #-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- # helper class property that stores a reference to the corresponding factory _factory = None @classmethod
[docs] def factory(cls): if cls._factory is None: cls._factory = Factory(scheme='tango') return cls._factory
#-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- # TaurusDevice necessary overwrite #-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- def _createHWObject(self): try: return PyTango.DeviceProxy(self.getFullName()) except PyTango.DevFailed, e: self.warning('Could not create HW object: %s' % (e[0].desc)) self.traceback()
[docs] def isValidDev(self): '''see: :meth:`TaurusDevice.isValid`''' return self._deviceObj is not None
[docs] def lock(self, force=False): li = self.getLockInfo() if force: if self.getLockInfo().status == TaurusLockInfo.Locked: self.unlock(force=True) return self.getHWObj().lock()
[docs] def unlock(self, force=False): return self.getHWObj().unlock(force)
[docs] def getLockInfo(self, cache=False): lock_info = self._lock_info if cache and lock_info.status != LockStatus.Unknown: return lock_info try: dev = self.getHWObj() li = PyTango.LockerInfo() locked = dev.get_locker(li) msg = "%s " % self.getSimpleName() if locked: lock_info.id = pid = li.li lock_info.language = li.ll lock_info.host = host = li.locker_host lock_info.klass = li.locker_class if dev.is_locked_by_me(): status = LockStatus.LockedMaster msg += "is locked by you!" else: status = LockStatus.Locked msg += "is locked by PID %s on %s" % (pid, host) else: lock_info.id = None lock_info.language = None lock_info.host = host = None lock_info.klass = None status = LockStatus.Unlocked msg += "is not locked" lock_info.status = status lock_info.status_msg = msg except: self._lock_info = lock_info = TaurusLockInfo() return lock_info
#-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- # Protected implementation #-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- def _server_state(self): state = None try: self.dev.ping() state = TaurusSWDevState.Running except: try: if self.dev.import_info().exported: state = TaurusSWDevState.Crash else: state = TaurusSWDevState.Shutdown except: state = TaurusSWDevState.Shutdown return state
[docs] def decode(self, event_value): if isinstance(event_value, PyTango.DeviceAttribute): new_sw_state = TaurusSWDevState.Running elif isinstance(event_value, PyTango.DevFailed): new_sw_state = self._handleExceptionEvent(event_value) elif isinstance(event_value, int): # TaurusSWDevState new_sw_state = event_value else: self.info("Unexpected value to decode: %s" % str(event_value)) new_sw_state = TaurusSWDevState.Crash value = PyTango.DeviceAttribute() value.value = new_sw_state return value
def _handleExceptionEvent(self, event_value): """Handles the tango error event and returns the proper SW state.""" new_sw_state = TaurusSWDevState.Uninitialized reason = event_value[0].reason # API_EventTimeout happens when: # 1 - the server where the device is running shuts down/crashes # 2 - the notifd shuts down/crashes if reason == 'API_EventTimeout': if not self._deviceSwState in self.SHUTDOWNS: serv_state = self._server_state() # if the device is running it means that it must have been # the event system that failed if serv_state == TaurusSWDevState.Running: new_sw_state = TaurusSWDevState.EventSystemShutdown else: new_sw_state = serv_state else: # Keep the old state new_sw_state = self._deviceSwState # API_BadConfigurationProperty happens when: # 1 - at client startup the server where the device is is not # running. elif reason == 'API_BadConfigurationProperty': assert(self._deviceSwState != TaurusSWDevState.Running) new_sw_state = TaurusSWDevState.Shutdown # API_EventChannelNotExported happens when: # 1 - at client startup the server is running but the notifd # is not elif reason == 'API_EventChannelNotExported': new_sw_state = TaurusSWDevState.EventSystemShutdown return new_sw_state def _getDefaultDescription(self): return DFT_TANGO_DEVICE_DESCRIPTION def __pollResult(self, attrs, ts, result, error=False): if error: for attr in attrs.values(): attr.poll(single=False, value=None, error=result, time=ts) return for da in result: if da.has_failed: v, err = None, PyTango.DevFailed(*da.get_err_stack()) else: v, err = da, None attr = attrs[da.name] attr.poll(single=False, value=v, error=err, time=ts) def __pollAsynch(self, attrs): ts = time.time() try: req_id = self.read_attributes_asynch(attrs.keys()) except PyTango.DevFailed as e: return False, e, ts return True, req_id, ts def __pollReply(self, attrs, req_id, timeout=None): ok, req_id, ts = req_id if not ok: self.__pollResult(attrs, ts, req_id, error=True) return if timeout is None: timeout = 0 timeout = int(timeout*1000) result = self.read_attributes_reply(req_id, timeout) self.__pollResult(attrs, ts, result)
[docs] def poll(self, attrs, asynch=False, req_id=None): '''optimized by reading of multiple attributes in one go''' if req_id is not None: return self.__pollReply(attrs, req_id) if asynch: return self.__pollAsynch(attrs) error = False ts = time.time() try: result = self.read_attributes(attrs.keys()) except PyTango.DevFailed as e: error = True result = e self.__pollResult(attrs, ts, result, error=error)
def _repr_html_(self): try: info = self.getHWObj().info() except: info = _TangoInfo() txt = """\ <table> <tr><td>Short name</td><td>{simple_name}</td></tr> <tr><td>Standard name</td><td>{normal_name}</td></tr> <tr><td>Full name</td><td>{full_name}</td></tr> <tr><td>Device class</td><td>{dev_class}</td></tr> <tr><td>Server</td><td>{server_id}</td></tr> <tr><td>Documentation</td><td><a target="_blank" href="{doc_url}">{doc_url}</a></td></tr> </table> """.format(simple_name=self.getSimpleName(), normal_name=self.getNormalName(), full_name=self.getFullName(), dev_class=info.dev_class, server_id=info.server_id, doc_url=info.doc_url) return txt