Source code for testcases.OpTestPCI

#!/usr/bin/env python3
# IBM_PROLOG_BEGIN_TAG
# This is an automatically generated prolog.
#
# $Source: op-test-framework/testcases/OpTestPCI.py $
#
# OpenPOWER Automated Test Project
#
# Contributors Listed Below - COPYRIGHT 2015,2017
# [+] International Business Machines Corp.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
#
# IBM_PROLOG_END_TAG

'''
OpTestPCI: PCI checks
-------------------------------

Perform various PCI validations and checks

--run-suite BasicPCI (includes skiroot_suite and host_suite)
--run-suite pci-regression

Sample naming conventions below, see each test method for
the applicable options per method.

--run testcases.OpTestPCI.PCISkiroot.pcie_link_errors
      ^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^ ^^^^^^^^^^^^^^^^
          module name      subclass    test method

--run testcases.OpTestPCI.PCIHost.pcie_link_errors
      ^^^^^^^^^^^^^^^^^^^ ^^^^^^^ ^^^^^^^^^^^^^^^^
          module name     subclass   test method

'''

import unittest
import logging
import pexpect
import time
import re
import difflib
from distutils.version import LooseVersion

import OpTestConfiguration
import OpTestLogger
from common.OpTestSystem import OpSystemState
from common.Exceptions import CommandFailed, UnexpectedCase

log = OpTestLogger.optest_logger_glob.get_logger(__name__)
skiroot_done = 0
host_done = 0
skiroot_lspci = None
host_lspci = None
reset_console = 0


[docs]class OpClassPCI(unittest.TestCase): ''' Main Parent class We cannot guarantee a soft boot entry, so need to force to PS or OS '''
[docs] @classmethod def setUpClass(cls, desired=None, power_cycle=0): ''' Main setUpClass, this is shared across all subclasses. This is called once when the subclass is instantiated. ''' if desired is None: cls.desired = OpSystemState.PETITBOOT_SHELL else: cls.desired = desired cls.power_cycle = power_cycle cls.conf = OpTestConfiguration.conf cls.cv_SYSTEM = cls.conf.system() cls.cv_HOST = cls.conf.host() cls.my_connect = None if cls.power_cycle == 1: cls.cv_SYSTEM.goto_state(OpSystemState.OFF) cls.power_cycle = 0 try: if cls.desired == OpSystemState.OS: # set bootdev for reboot cases cls.cv_SYSTEM.sys_set_bootdev_no_override() cls.cv_SYSTEM.goto_state(OpSystemState.OS) cls.c = cls.cv_SYSTEM.host().get_ssh_connection() else: cls.cv_SYSTEM.sys_set_bootdev_setup() cls.cv_SYSTEM.goto_state(OpSystemState.PETITBOOT_SHELL) cls.c = cls.cv_SYSTEM.console cls.pty = cls.cv_SYSTEM.console.get_console() except Exception as e: log.debug("Unable to find cls.desired, probably a test code problem") cls.cv_SYSTEM.goto_state(OpSystemState.OS)
[docs] @classmethod def tearDownClass(cls): ''' Main tearDownClass, this is shared across all subclasses. This is called once when the subclass is taken down. ''' global skiroot_done global host_done global skiroot_lspci global host_lspci global reset_console if reset_console == 1: cls.refresh_console()
[docs] @classmethod def set_console(cls): ''' This method allows setting the shared class console to the real console when needed, i.e. driver_bind tests which unbind the ethernet drivers. ''' cls.c = cls.cv_SYSTEM.console
[docs] @classmethod def refresh_console(cls): ''' This method is used to set the shared class console back to the proper object (this gets set to the real console when we unbind the ethernet) in the driver_bind test as an example. ''' # this done after a reboot global reset_console if cls.cv_SYSTEM.get_state() == OpSystemState.PETITBOOT_SHELL: cls.c = cls.cv_SYSTEM.console else: cls.c = cls.cv_SYSTEM.host().get_ssh_connection() reset_console = 0
[docs] def setUp(self): ''' All variables common to a subclass need to be defined here since this method gets called before each subclass test ''' pass
[docs] def tearDown(self): ''' This is done at the end of each subclass test. ''' global reset_console if reset_console == 1: self.refresh_console()
[docs] def get_lspci(self): ''' Usually used internally, can be run for query of system Case A --run testcases.OpTestPCI.PCISkiroot.get_lspci Case B --run testcases.OpTestPCI.PCISkirootSoftboot.get_lspci Case C --run testcases.OpTestPCI.PCISkirootHardboot.get_lspci Case D --run testcases.OpTestPCI.PCIHost.get_lspci Case E --run testcases.OpTestPCI.PCIHostSoftboot.get_lspci Case F --run testcases.OpTestPCI.PCIHostHardboot.get_lspci ''' lspci_data = self.c.run_command("lspci -mm -n") return lspci_data
[docs] def check_commands(self): ''' Checks for general capability to run commands Case A --run testcases.OpTestPCI.PCISkiroot.check_commands Case B --run testcases.OpTestPCI.PCISkirootSoftboot.check_commands Case C --run testcases.OpTestPCI.PCISkirootHardboot.check_commands Case D --run testcases.OpTestPCI.PCIHost.check_commands Case E --run testcases.OpTestPCI.PCIHostSoftboot.check_commands Case F --run testcases.OpTestPCI.PCIHostHardboot.check_commands ''' list_pci_devices_commands = ["lspci -mm -n", "lspci -m", "lspci -t", "lspci -n", "lspci -nn", "cat /proc/bus/pci/devices", "ls --color=never /sys/bus/pci/devices/ -l", "lspci -vvxxx", ] for cmd in list_pci_devices_commands: self.c.run_command(cmd, timeout=300) list_usb_devices_commands = ["lsusb", "lsusb -t", "lsusb -v", ] for cmd in list_usb_devices_commands: self.c.run_command(cmd) # Test that we do not EEH on reading all config space self.c.run_command( "hexdump -C /sys/bus/pci/devices/*/config", timeout=600)
[docs] def get_lspci_file(self): ''' Usually used internally, can be run for query of system Case A --run testcases.OpTestPCI.PCISkiroot.get_lspci_file Case B --run testcases.OpTestPCI.PCISkirootSoftboot.get_lspci_file Case C --run testcases.OpTestPCI.PCISkirootHardboot.get_lspci_file Case D --run testcases.OpTestPCI.PCIHost.get_lspci_file Case E --run testcases.OpTestPCI.PCIHostSoftboot.get_lspci_file Case F --run testcases.OpTestPCI.PCIHostHardboot.get_lspci_file ''' if self.conf.lspci_file(): with open(self.conf.lspci_file(), 'r') as f: file_content = f.read().splitlines() log.debug("file_content={}".format(file_content)) return file_content
def _diff_my_devices(self, listA=None, listA_name=None, listB=None, listB_name=None): ''' Performs unified diff of two lists ''' unified_output = difflib.unified_diff( [_f for _f in listA if _f], [_f for _f in listB if _f], fromfile=listA_name, tofile=listB_name, lineterm="") unified_list = list(unified_output) log.debug("unified_list={}".format(unified_list)) return unified_list
[docs] def compare_boot_devices(self): ''' This is best leveraged in the suite pci-regression, where both the skiroot/host softboot and the skiroot/host hardboot get done in the same wave, so that the global variables carry over to compare. If both skiroot and host lspci completed, will compare lspci results. If you want to compare against an input file, use compare_live_devices. Case A --run testcases.OpTestPCI.PCISkiroot.compare_boot_devices Case B --run testcases.OpTestPCI.PCISkirootSoftboot.compare_boot_devices Case C --run testcases.OpTestPCI.PCISkirootHardboot.compare_boot_devices Case D --run testcases.OpTestPCI.PCIHost.compare_boot_devices Case E --run testcases.OpTestPCI.PCIHostSoftboot.compare_boot_devices Case F --run testcases.OpTestPCI.PCIHostHardboot.compare_boot_devices ''' global skiroot_done global host_done global skiroot_lspci global host_lspci lspci_output = self.get_lspci() if self.cv_SYSTEM.get_state() == OpSystemState.PETITBOOT_SHELL: skiroot_lspci = lspci_output skiroot_done = 1 else: host_lspci = lspci_output host_done = 1 if host_done and skiroot_done: compare_results = self._diff_my_devices(listA=skiroot_lspci, listA_name="skiroot_lspci", listB=host_lspci, listB_name="host_lspci") if len(compare_results): self.assertEqual(len(compare_results), 0, "skiroot_lspci and host_lspci devices differ:\n{}" .format(self.conf.lspci_file(), ('\n'.join(i for i in compare_results)))) # refresh so next pair can be matched up, i.e. soft or hard skiroot_done = 0 host_done = 0 skiroot_lspci = None host_lspci = None
[docs] def compare_live_devices(self): ''' Compares the live system lspci against an input file, host-lspci provided either in conf file or via command line. "ssh user@host lspci -mm -n > host-lspci.txt" --host-lspci host-lspci.txt on command line or host_lspci=host-lspci.txt in conf file Case A --run testcases.OpTestPCI.PCISkiroot.compare_live_devices Case B --run testcases.OpTestPCI.PCISkirootSoftboot.compare_live_devices Case C --run testcases.OpTestPCI.PCISkirootHardboot.compare_live_devices Case D --run testcases.OpTestPCI.PCIHost.compare_live_devices Case E --run testcases.OpTestPCI.PCIHostSoftboot.compare_live_devices Case F --run testcases.OpTestPCI.PCIHostHardboot.compare_live_devices ''' active_lspci = self.get_lspci() file_lspci = self.get_lspci_file() if file_lspci: compare_results = self._diff_my_devices(listA=file_lspci, listA_name=self.conf.lspci_file(), listB=active_lspci, listB_name="Live System") log.debug("compare_results={}".format(compare_results)) if len(compare_results): self.assertEqual(len(compare_results), 0, "Stored ({}) and Active PCI devices differ:\n{}" .format(self.conf.lspci_file(), ('\n'.join(i for i in compare_results))))
def _get_list_of_pci_devices(self): cmd = "ls --color=never /sys/bus/pci/devices/ | awk {'print $1'}" res = self.c.run_command(cmd) return res def _get_driver(self, pe): cmd = "lspci -ks {}".format(pe) output = self.c.run_command(cmd, timeout=120) if output: for line in output: if 'Kernel driver in use:' in line: return (line.rsplit(":")[1]).strip(" ") return None def _get_list_of_slots(self): cmd = "ls --color=never /sys/bus/pci/slots/ -1" res = self.c.run_command(cmd) return res def _get_root_pe_address(self): cmd = "df -h /boot | awk 'END {print $1}'" res = self.c.run_command(cmd) boot_disk = ''.join(res).split("/dev/")[1] boot_disk = boot_disk.replace("\r\n", "") awk_string = "awk '{print $(NF-2)}'" pre_cmd = "ls --color=never -l /dev/disk/by-path/ | grep {} | ".format( boot_disk) cmd = pre_cmd + awk_string res = self.c.run_command(cmd) root_pe = res[0].split("-")[1] return root_pe def _gather_errors(self): # Gather all errors from kernel and opal logs try: self.c.run_command("dmesg -r|grep '<[4321]>'") except CommandFailed: pass try: self.c.run_command("grep ',[0-4]\]' /sys/firmware/opal/msglog") except CommandFailed: pass
[docs] def driver_bind(self): ''' Unbind and then bind the devices Case A --run testcases.OpTestPCI.PCISkiroot.driver_bind Case B --run testcases.OpTestPCI.PCISkirootSoftboot.driver_bind Case C --run testcases.OpTestPCI.PCISkirootHardboot.driver_bind Case D --run testcases.OpTestPCI.PCIHost.driver_bind Case E --run testcases.OpTestPCI.PCIHostSoftboot.driver_bind Case F --run testcases.OpTestPCI.PCIHostHardboot.driver_bind Special note on unbinding shared bmc ethernet ports, caution. ''' # since we will be unbinding ethernet drivers, override the console global reset_console reset_console = 1 self.set_console() if self.cv_SYSTEM.get_state() == OpSystemState.PETITBOOT_SHELL: root_pe = "xxxx" else: root_pe = self._get_root_pe_address() self.c.run_command("dmesg -D") list = self._get_list_of_pci_devices() failure_list = {} for slot in list: rc = 0 driver = self._get_driver(slot) if root_pe in slot: continue if driver is None: continue index = "{}_{}".format(driver, slot) cmd = "echo -n {} > /sys/bus/pci/drivers/{}/unbind".format( slot, driver) log.debug("unbind driver={} slot={} cmd={}".format( driver, slot, cmd)) try: self.c.run_command(cmd) except CommandFailed as cf: msg = "Driver unbind operation failed for driver {}, slot {}".format( slot, driver) failure_list[index] = msg time.sleep(5) cmd = 'ls --color=never /sys/bus/pci/drivers/{}'.format(driver) self.c.run_command(cmd) path = "/sys/bus/pci/drivers/{}/{}".format(driver, slot) try: self.c.run_command("test -d {}".format(path)) rc = 1 except CommandFailed as cf: pass cmd = "echo -n {} > /sys/bus/pci/drivers/{}/bind".format( slot, driver) log.debug("bind driver={} slot={} cmd={}".format(driver, slot, cmd)) try: self.c.run_command(cmd) except CommandFailed as cf: msg = "Driver bind operation failed for driver {}, slot {}".format( slot, driver) failure_list[index] = msg time.sleep(5) cmd = 'ls --color=never /sys/bus/pci/drivers/{}'.format(driver) self.c.run_command(cmd) try: self.c.run_command("test -d {}".format(path)) except CommandFailed as cf: rc = 2 self._gather_errors() if rc == 1: msg = "{} not unbound for driver {}".format(slot, driver) failure_list[index] = msg if rc == 2: msg = "{} not bound back for driver {}".format(slot, driver) failure_list[index] = msg self.assertEqual(failure_list, {}, "Driver bind/unbind failures {}".format(failure_list))
[docs] def hot_plug_host(self): ''' NEEDS TESTING Case A --run testcases.OpTestPCI.PCIHost.hot_plug_host Case B --run testcases.OpTestPCI.PCIHostSoftboot.hot_plug_host Case C --run testcases.OpTestPCI.PCIHostHardboot.hot_plug_host ''' # Currently this feature enabled only for fsp systems if "FSP" not in self.conf.args.bmc_type: log.debug( "Skipping test, currently only OPAL FSP Platform supported for hot_plug_host") self.skipTest( "Skipping test, currently only OPAL FSP Platform supported for hot_plug_host") res = self.c.run_command("uname -r")[-1].split("-")[0] if LooseVersion(res) < LooseVersion("4.10.0"): log.debug( "Skipping test, Kernel does not support hotplug {}".format(res)) self.skipTest( "Skipping test, Kernel does not support hotplug={}".format(res)) self.cv_HOST.host_load_module("pnv_php") device_list = self._get_list_of_pci_devices() root_pe = self._get_root_pe_address() slot_list = self._get_list_of_slots() self.c.run_command("dmesg -D") pair = {} # Pair of device vs slot location code for device in device_list: cmd = "lspci -k -s {} -vmm".format(device) res = self.c.run_command(cmd) for line in res: # if "PhySlot:\t" in line: obj = re.match('PhySlot:\t(.*)', line) if obj: pair[device] = obj.group(1) failure_list = {} for device, phy_slot in list(pair.items()): if root_pe in device: continue index = "{}_{}".format(device, phy_slot) path = "/sys/bus/pci/slots/{}/power".format(phy_slot) try: self.c.run_command("test -f {}".format(path)) except CommandFailed as cf: log.debug("Slot {} does not support hotplug".format(phy_slot)) continue # slot does not support hotplug try: self.c.run_command("echo 0 > {}".format(path)) except CommandFailed as cf: msg = "PCI device/slot power off operation failed" failure_list[index] = msg time.sleep(5) cmd = "lspci -k -s {}".format(device) res = self.c.run_command(cmd) if device in "\n".join(res): msg = "PCI device failed to remove after power off operation" failure_list[index] = msg try: self.c.run_command("echo 1 > {}".format(path)) except CommandFailed as cf: msg = "PCI device/slot power on operation failed" failure_list[index] = msg res = self.c.run_command(cmd) if device not in "\n".join(res): msg = "PCI device failed to attach back after power on operation" failure_list[index] = msg self._gather_errors() self.assertEqual(failure_list, {}, "PCI Hotplug failures {}".format(failure_list))
[docs]class PCISkirootSoftboot(OpClassPCI, unittest.TestCase): ''' Class allows to run parent classes with unique setup '''
[docs] @classmethod def setUpClass(cls): super(PCISkirootSoftboot, cls).setUpClass() cls.pty.sendline("reboot") cls.cv_SYSTEM.set_state(OpSystemState.IPLing) # clear the states since we rebooted outside the state machine cls.cv_SYSTEM.util.clear_state(cls.cv_SYSTEM) cls.cv_SYSTEM.goto_state(OpSystemState.PETITBOOT_SHELL)
[docs] @classmethod def tearDownClass(cls): super(PCISkirootSoftboot, cls).tearDownClass()
[docs] def setUp(self): # this left as placeholder for per test setUp super(PCISkirootSoftboot, self).setUp()
[docs]class PCISkirootHardboot(OpClassPCI, unittest.TestCase): ''' Class allows to run parent classes with unique setup '''
[docs] @classmethod def setUpClass(cls): super(PCISkirootHardboot, cls).setUpClass(power_cycle=1)
[docs] @classmethod def tearDownClass(cls): super(PCISkirootHardboot, cls).tearDownClass()
[docs] def setUp(self): # this left as placeholder for per test setUp super(PCISkirootHardboot, self).setUp()
[docs]class PCISkiroot(OpClassPCI, unittest.TestCase): ''' Class allows to run parent classes with unique setup '''
[docs] def setUp(self): # this left as placeholder for per test setUp super(PCISkiroot, self).setUp()
[docs]class PCIHostSoftboot(OpClassPCI, unittest.TestCase): ''' Class allows to run parent classes with unique setup '''
[docs] @classmethod def setUpClass(cls): super(PCIHostSoftboot, cls).setUpClass(desired=OpSystemState.OS) cls.pty.sendline("reboot") cls.cv_SYSTEM.set_state(OpSystemState.BOOTING) # clear the states since we rebooted outside the state machine cls.cv_SYSTEM.util.clear_state(cls.cv_SYSTEM) cls.cv_SYSTEM.goto_state(OpSystemState.OS)
[docs] @classmethod def tearDownClass(cls): super(PCIHostSoftboot, cls).tearDownClass()
[docs] def setUp(self): # this left as placeholder for per test setUp super(PCIHostSoftboot, self).setUp()
[docs]class PCIHostHardboot(OpClassPCI, unittest.TestCase): ''' Class allows to run parent classes with unique setup '''
[docs] @classmethod def setUpClass(cls): super(PCIHostHardboot, cls).setUpClass( desired=OpSystemState.OS, power_cycle=1)
[docs] @classmethod def tearDownClass(cls): super(PCIHostHardboot, cls).tearDownClass()
[docs] def setUp(self): # this left as placeholder for per test setUp super(PCIHostHardboot, self).setUp()
[docs]class PCIHost(OpClassPCI, unittest.TestCase): ''' Class allows to run parent classes with unique setup '''
[docs] @classmethod def setUpClass(cls): super(PCIHost, cls).setUpClass(desired=OpSystemState.OS)
[docs] def setUp(self): # this left as placeholder for per test setUp super(PCIHost, self).setUp()
[docs]def skiroot_softboot_suite(): ''' Function used to prepare a test suite (see op-test) --run-suite pci-regression --run testcases.OpTestPCI.skiroot_softboot_suite ''' tests = ['pcie_link_errors', 'compare_live_devices', 'pci_link_check', 'compare_boot_devices'] return unittest.TestSuite(list(map(PCISkirootSoftboot, tests)))
[docs]def skiroot_hardboot_suite(): ''' Function used to prepare a test suite (see op-test) --run-suite pci-regression --run testcases.OpTestPCI.skiroot_hardboot_suite ''' tests = ['pcie_link_errors', 'compare_live_devices', 'pci_link_check', 'compare_boot_devices'] return unittest.TestSuite(list(map(PCISkirootHardboot, tests)))
[docs]def skiroot_suite(): ''' Function used to prepare a test suite (see op-test) --run-suite BasicPCI --run testcases.OpTestPCI.skiroot_suite This suite does not care on soft vs hard boot ''' tests = ['pcie_link_errors', 'compare_live_devices'] return unittest.TestSuite(list(map(PCISkiroot, tests)))
[docs]def skiroot_full_suite(): ''' Function used to prepare a test suite (see op-test) --run testcases.OpTestPCI.skiroot_full_suite This suite does not care on soft vs hard boot ''' tests = ['pcie_link_errors', 'compare_live_devices', 'pci_link_check', 'driver_bind'] return unittest.TestSuite(list(map(PCISkiroot, tests)))
[docs]def host_softboot_suite(): ''' Function used to prepare a test suite (see op-test) --run-suite pci-regression --run testcases.OpTestPCI.host_softboot_suite ''' tests = ['pcie_link_errors', 'compare_live_devices', 'pci_link_check', 'compare_boot_devices', 'driver_bind', 'hot_plug_host'] return unittest.TestSuite(list(map(PCIHostSoftboot, tests)))
[docs]def host_hardboot_suite(): ''' Function used to prepare a test suite (see op-test) --run-suite pci-regression --run testcases.OpTestPCI.host_hardboot_suite ''' tests = ['pcie_link_errors', 'compare_live_devices', 'pci_link_check', 'compare_boot_devices', 'driver_bind', 'hot_plug_host'] return unittest.TestSuite(list(map(PCIHostHardboot, tests)))
[docs]def host_suite(): ''' Function used to prepare a test suite (see op-test) --run-suite BasicPCI --run testcases.OpTestPCI.host_suite This suite does not care on soft vs hard boot ''' tests = ['pcie_link_errors', 'compare_live_devices'] return unittest.TestSuite(list(map(PCIHost, tests)))
[docs]def host_full_suite(): ''' Function used to prepare a test suite (see op-test) --run testcases.OpTestPCI.host_full_suite This suite does not care on soft vs hard boot ''' tests = ['pcie_link_errors', 'compare_live_devices', 'pci_link_check', 'driver_bind', 'hot_plug_host'] return unittest.TestSuite(list(map(PCIHost, tests)))