#!/usr/bin/env python3
# OpenPOWER Automated Test Project
#
# Contributors Listed Below - COPYRIGHT 2017
# [+] International Business Machines Corp.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
#
'''
IPMI Torture
------------
Use several threads in `op-test` to poke IPMI concurrently in a number
of "safe" ways, and see when the BMC explodes.
'''
import unittest
import time
import threading
import OpTestConfiguration
from common.OpTestSystem import OpSystemState
from common.OpTestConstants import OpTestConstants as BMC_CONST
[docs]class OobIpmiThread(threading.Thread):
def __init__(self, threadID, name, cmd, execution_time):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.cmd = cmd
self.execution_time = execution_time
conf = OpTestConfiguration.conf
self.cv_IPMI = conf.ipmi()
[docs] def run(self):
print(("Starting " + self.name))
self.oob_ipmi_thread(self.name, self.cmd, self.execution_time)
print(("Exiting " + self.name))
def oob_ipmi_thread(self, threadName, cmd, t):
execution_time = time.time() + 60*t
print(("Starting %s for oob-ipmi %s" % (threadName, cmd)))
while True:
try:
self.cv_IPMI.ipmitool.run(cmd, logcmd=False)
except:
pass
if time.time() > execution_time:
break
time.sleep(2)
[docs]class InbandIpmiThread(threading.Thread):
def __init__(self, threadID, name, ipmi_method, cmd, execution_time):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.ipmi_method = ipmi_method
self.cmd = cmd
self.execution_time = execution_time
conf = OpTestConfiguration.conf
self.cv_HOST = conf.host()
self.cv_SYSTEM = conf.system()
[docs] def run(self):
print(("Starting " + self.name))
self.inband_ipmi_thread(self.name, self.cmd, self.execution_time)
print(("Exiting " + self.name))
def inband_ipmi_thread(self, threadName, cmd, t):
execution_time = time.time() + 60*t
self.c = self.cv_HOST.get_ssh_connection()
print(("Starting %s for inband-ipmi %s" % (threadName, cmd)))
while True:
try:
self.c.run_command(self.ipmi_method + cmd)
except:
pass
if time.time() > execution_time:
break
time.sleep(2)
[docs]class SolConsoleThread(threading.Thread):
def __init__(self, threadID, name, test, execution_time):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.test = test
self.execution_time = execution_time
conf = OpTestConfiguration.conf
self.cv_SYSTEM = conf.system()
[docs] def run(self):
print(("Starting " + self.name))
self.sol_console_thread(self.name, self.execution_time)
print(("Exiting " + self.name))
def sol_console_thread(self, threadName, t):
self.c = self.cv_SYSTEM.console
# Enable kernel logging(printk) to console
self.c.run_command("echo 10 > /proc/sys/kernel/printk")
execution_time = time.time() + 60*self.execution_time
i = 0
while True:
print(("Iteration %s, SOL open/close" % i))
try:
self.c.get_console()
# Execute any host command(for console IO) if system is in runtime
if "runtime" in self.test:
try:
self.c.run_command("ipmitool power status")
# Enable console traffic by printing the processes/tasks to the console
self.c.run_command("echo t > /proc/sysrq-trigger")
except:
pass
self.c.close()
except:
pass
time.sleep(3)
i += 1
if time.time() > execution_time:
break
[docs]class IpmiInterfaceTorture(unittest.TestCase):
[docs] def setUp(self):
conf = OpTestConfiguration.conf
self.cv_HOST = conf.host()
self.cv_IPMI = conf.ipmi()
self.cv_SYSTEM = conf.system()
self.torture_time = 2400
self.bmc_type = conf.args.bmc_type
def runTest(self):
self.setup_test()
self.thread_list = []
# OOB IPMI Torture
torture_time = self.torture_time
cmd_list = ["sdr list", "fru print",
"sel list", "sensor list", "power status"]
for j in range(1, 3):
for idx, cmd in enumerate(cmd_list):
num = j*(idx + 1)
thread = OobIpmiThread(num, "Thread-%s" %
num, cmd, torture_time)
thread.start()
self.thread_list.append(thread)
if "skiroot" in self.test:
return
if self.test == "standby":
return
# In-band IPMI Torture (Open Interface)
cmd_list = ["sdr list", "fru print",
"sel list", "sensor list", "power status"]
for j in range(1, 3):
for idx, cmd in enumerate(cmd_list):
num = j*(idx + 1)
thread = InbandIpmiThread(
num, "Thread-%s" % num, BMC_CONST.IPMITOOL_OPEN, cmd, torture_time)
thread.start()
self.thread_list.append(thread)
if "FSP" in self.bmc_type:
return
# In-band IPMI Torture (USB Interface)
cmd_list = ["sdr list", "fru print",
"sel list", "sensor list", "power status"]
for idx, cmd in enumerate(cmd_list):
thread = InbandIpmiThread(
idx, "Thread-%s" % idx, BMC_CONST.IPMITOOL_USB, cmd, torture_time)
thread.start()
self.thread_list.append(thread)
[docs] def tearDown(self):
# wait for all the threads to finish
for thread in self.thread_list:
thread.join()
[docs]class ConsoleIpmiTorture(unittest.TestCase):
[docs] def setUp(self):
conf = OpTestConfiguration.conf
self.cv_HOST = conf.host()
self.cv_IPMI = conf.ipmi()
self.cv_SYSTEM = conf.system()
self.torture_time = 2400
def ipmi_interface_torture(self):
# OOB IPMI Torture
torture_time = self.torture_time
self.thread_list = []
cmd_list = ["sdr list", "fru print",
"sel list", "sensor list", "power status"]
for j in range(1, 3):
for idx, cmd in enumerate(cmd_list):
num = j*(idx + 1)
thread = OobIpmiThread(num, "Thread-%s" %
num, cmd, torture_time)
thread.start()
self.thread_list.append(thread)
if self.test == "standby":
return
if "skiroot" in self.test:
return
return # Don't enable below inband ipmi torture, console and ssh sessions make the o/p clutter
# In-band IPMI Torture
cmd_list = ["sdr list", "fru print",
"sel list", "sensor list", "power status"]
for j in range(1, 3):
for idx, cmd in enumerate(cmd_list):
num = j*(idx + 1)
thread = InbandIpmiThread(
num, "Thread-%s" % num, BMC_CONST.IPMITOOL_OPEN, cmd, torture_time)
thread.start()
self.thread_list.append(thread)
def console_torture(self):
thread = SolConsoleThread(
1, "SOL-Thread", self.test, self.torture_time)
thread.start()
self.thread_list.append(thread)
def runTest(self):
self.setup_test()
self.ipmi_interface_torture()
self.console_torture()
[docs] def tearDown(self):
# Wait for all the thread to finish
for thread in self.thread_list:
thread.join()
[docs]class SkirootConsoleTorture(ConsoleIpmiTorture):
def setup_test(self):
self.test = "skiroot_runtime"
self.cv_SYSTEM.goto_state(OpSystemState.PETITBOOT_SHELL)
self.c = self.cv_SYSTEM.console
[docs]class SkirootIpmiTorture(IpmiInterfaceTorture):
def setup_test(self):
self.test = "skiroot_runtime"
self.cv_SYSTEM.goto_state(OpSystemState.PETITBOOT_SHELL)
self.c = self.cv_SYSTEM.console
[docs]class RuntimeConsoleTorture(ConsoleIpmiTorture):
def setup_test(self):
self.test = "runtime"
self.cv_SYSTEM.goto_state(OpSystemState.OS)
self.c = self.cv_SYSTEM.console
[docs]class StandbyConsoleTorture(ConsoleIpmiTorture):
def setup_test(self):
self.test = "standby"
self.cv_SYSTEM.goto_state(OpSystemState.OFF)
self.c = self.cv_SYSTEM.console
[docs]class RuntimeIpmiInterfaceTorture(IpmiInterfaceTorture):
def setup_test(self):
self.test = "runtime"
self.cv_SYSTEM.goto_state(OpSystemState.OS)
self.c = self.cv_SYSTEM.console
[docs]class StandbyIpmiInterfaceTorture(IpmiInterfaceTorture):
def setup_test(self):
self.test = "standby"
self.cv_SYSTEM.goto_state(OpSystemState.OFF)
self.c = self.cv_SYSTEM.console