Python Scripts
Code Snippets
Print without next line
import sys sys.stdout.write('.')
You may also need to call to ensure stdout is flushed immediately:
sys.stdout.flush()
Regex
- Extract a particular value(like 10.5, 100.0):
value = re.findall(r'([0-9]{3}.[0-9]|[0-9]{2}.[0-9]|[0-9].[0-9])(?=\sid)', output) # Print all matches print value # Print 2nd Match print value[1]
Arrays
- Using Array
x = [1,2,3,4,5] for i in range(5): print x[i]
- Append to Array
f = [] for i in range(30): f.append(i) print f
Pause
import time time.sleep(5)
Print File Extensions
import os.path extension = os.path.splitext(filename)[1][1:]
EasySNMP
This snippet needs more testing. |
sudo apt-get install libsnmp-dev snmp-mibs-downloader sudo apt-get install gcc python-dev sudo pip install easysnmp
from easysnmp import snmp_get snmp_get('.1.3.6.1.4.1.2021.10.1.3.1', hostname='10.107.88.93', community='public', version=2)
Calculator
#!/usr/bin/python #Define Function def calc(): operation = raw_input(''' Please type operator: + for addition - for subtraction * for multiplication / for division ''') #try: num1 = float(input('First Number: ')) num2 = float(input('Second Number: ')) #except: #print("Invalid Input") if operation == '+': add = num1 + num2 print("Addition is: %d" % int(add)) elif operation == '-': sub = num1 - num2 print("Subtraction is %d:" %int(sub)) elif operation == '*': mul = num1 * num2 print("Multiplication is %d:" %int(mul)) elif operation == '/': div = num1 / num2 print("Division is %.2f:" %float(div)) else: print("Invalid Operator") again() def again(): calc_again = raw_input(''' Do you want to calculate again? Please type Y for Yes or N for No. ''') if calc_again.upper() == "Y": calc() elif calc_again.upper() == "N": print('See you later on.. Bye') else: again() def welcome(): print(''' Welcome to My Calculator ''') #Call Function welcome() calc()
Schedule output
sudo pip install schedule
import schedule import time def job(): print("I'm working...") schedule.every(1).second.do(job) schedule.every(1).minutes.do(job) schedule.every().hour.do(job) schedule.every().day.at("21:14").do(job) while 1: schedule.run_pending() time.sleep(1)
Fill Routing Table of Remote Server
import paramiko ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect( '192.168.1.7', username = 'root', password = 'pwd@123' ) for x in range(1,256): for y in range(256): for z in range(256): for a in range(1): ssh.exec_command( "route add -net %d.%d.%d.%d netmask 255.255.255.0 gw 192.168.1.1 " %(x,y,z,a)) ssh.close()
Paramiko execute commands
Source: sebastiandahlgren.se
import paramiko import time import select import sys # NetScaler/Server Parameters host = '10.107.88.78' user = 'nsroot' passwd = 'pwd@123' cmd = 'show runningconfig' i = 1 # Try to connect to the host, Retry a few times if it fails. while True: print "Trying to connect to %s (%i/10)" % (host, i) try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect( host, username = user, password = passwd ) print "Connected to %s" % host break except paramiko.AuthenticationException: print "Authentication failed when connecting to %s" % host sys.exit(1) except: print "Could not SSH to %s, waiting for it to start" % host i += 1 time.sleep(2) # If we could not connect within time limit if i == 10: print "Could not connect to %s. Giving up" % host sys.exit(1) # Send the command (non-blocking) stdin, stdout, stderr = ssh.exec_command( cmd ) # Wait for the command to terminate while not stdout.channel.exit_status_ready(): # Only print data if there is data to read in the channel if stdout.channel.recv_ready(): rl, wl, xl = select.select([stdout.channel], [], [], 0.0) if len(rl) > 0: # Print data from stdout print stdout.channel.recv(1024), # Disconnect from the host print "\nCommand done, closing SSH connection" ssh.close()
Docker add multiple servers remotely
import paramiko import time ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect( '10.107.88.93', username = 'aman', password = 'pwd@123' ) for i in range(44400,44405): ssh.exec_command("docker run --name docker-nginx%d -p %d:8000 -d nginx" %(i,i)) print("Docker server created..") time.sleep(1) print("Done") ssh.close()
CPU check of Remote Server
import paramiko from terminalplot import plot ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect( '10.107.88.93', username = 'aman', password = 'pwd@123' ) for i in range(10): stdin, stdout, stderr = ssh.exec_command( 'top -bn2 | grep "Cpu(s)"' ) output = stdout.read() stat = output.split() idle = float(stat[24]) cpu = (100-idle) print cpu ssh.close()
TCPDump
- Print captured packets
import subprocess as sub p= sub.Popen(('sudo', 'tcpdump', '-l'), stdout=sub.PIPE) for row in iter(p.stdout.readline, b''): print row.rstrip()
- Print Packet Length
import subprocess as sub import re try: p = sub.Popen(('sudo', 'tcpdump', '-l'), stdout=sub.PIPE) for row in iter(p.stdout.readline, b''): y = re.search('length (\d+)', row) if y: x = int(y.group(1)) if x !=0: print x except KeyboardInterrupt: print("User killed command.")
- Terminal Plot
sudo pip install terminalplot
from terminalplot import plot #import gnuplotlib as gp import subprocess as sub import re x = 0 a = [] b = [] try: p = sub.Popen(('sudo', 'tcpdump', '-l'), stdout=sub.PIPE) for row in iter(p.stdout.readline, b''): y = re.search('length (\d+)', row) if y: z = int(y.group(1)) if z != 0: x += 1 print x b.append(z) a.append(x) if x == 10: plot(a,b) print a print b # Remove below # if you don't want continuous output # break x = 0 a = [] b = [] except KeyboardInterrupt: print("User killed command.")
- Plot using Stdout
import subprocess as sub import re import sys x = 0 try: p = sub.Popen(('sudo', 'tcpdump', '-l'), stdout=sub.PIPE) for row in iter(p.stdout.readline, b''): y = re.search('length (\d+)', row) if y: z = int(y.group(1)) x += 1 if (z < 190): for i in range(z): sys.stdout.write('=') print (">") except KeyboardInterrupt: print("User killed command.")
Speaking Passwords
- eSpeak Password
import os def check(): x = int(input("Input your password: ")) if (x == 1234): os.system("espeak 'correct password'") else: os.system("espeak 'wrong password'") def again(): check() again() check() again()
- Using pyttsx
import pyttsx engine = pyttsx.init() def check(): x = int(input("Input your password: ")) if (x == 1234): engine.say('correct password') engine.runAndWait() else: engine.say('wrong password') engine.runAndWait() def again(): check() again() check() again()
NetScaler NITRO REST Server Monitor using SSH
- Monitors CPU Utilization of Backend Server, Disables service if CPU Exceeds 50%
- Requires: NitroRestClient
import paramiko import sys import re import time from bin import NitroRestClient # Netscaler Parameters nsip = "10.107.88.78" nsun = "nsroot" nspwd = "pwd@123" nssrv = "Ubuntu_Server" # Server Parameters srvip = "10.107.88.93" srvun = "aman" srvpwd = "Passion@123" # Loop Logic variables x = 1 y = 0 # Attempt SSH Connection ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect( srvip, username = srvun, password = srvpwd ) # Attempt instantiation of NitroRestClient try: Client = NitroRestClient.NitroRestClient(nsip, nsun, nspwd) except: print("Couldn't create Nitro Session, check username and password and network connectivity") sys.exit(1) # Check servername for validity try: normalizedservername = Client.servernamecheck(nssrv) except ValueError as e: print("Invalid server name was provided. Please check server name and try again.") sys.exit(2) # Main Program try: while True: stdin, stdout, stderr = ssh.exec_command( 'top -bn2 | grep "Cpu(s)"' ) output = stdout.read() value = re.findall(r'([0-9]{3}.[0-9]|[0-9]{2}.[0-9]|[0-9].[0-9])\sid', output) cpu = (100-float(value[1])) print cpu if cpu < 50: if x == 1: print("CPU is normal..") time.sleep(5) else: print("Server enabled") Client.enablephysicalserver(normalizedservername) x = 1 y = 0 else: if y == 1: print("Server remains disabled..") time.sleep(5) else: Client.disablephysicalserver(normalizedservername) print("Server disabled") time.sleep(1) y = 1 x = 0 except KeyboardInterrupt: print("Script Killed by user") ssh.close() sys.exit(0)
NetScaler NITRO REST Server Monitor using SNMP
- Monitors CPU Utilization of Backend Server, Disables service if Load Exceeds 0.50
- Requires: NitroRestClient
- Requires SNMP configured on Server & SNMP Utils installed on the PC where script is run.
import time import subprocess import sys import re from bin import NitroRestClient # Netscaler Parameters & Credentials nsip = "10.107.88.78" nsun = "nsroot" nspwd = "pwd@123" nssrv = "Ubuntu_Server" # Loop Logic variables x = 1 y = 0 # Attempt instantiation of NitroRestClient try: Client = NitroRestClient.NitroRestClient(nsip, nsun, nspwd) except: print("Couldn't create Nitro Session, check username and password and network connectivity") sys.exit(1) # Check servername for validity try: normalizedservername = Client.servernamecheck(nssrv) except ValueError as e: print("Invalid server name was provided. Please check server name and try again.") sys.exit(2) # Main Program try: while True: snmpload = subprocess.Popen(["snmpget", "-v", "2c", "-O", "qv", "-c", "public", "10.107.88.93", ".1.3.6.1.4.1.2021.10.1.3.1"], stdout=subprocess.PIPE) output, err = snmpload.communicate() value = re.sub(r'^"|"$', '', output) load = float(value) print(load) if load < 0.10: if x == 1: print("Load is normal..") time.sleep(5) else: print("Server enabled") Client.enablephysicalserver(normalizedservername) x = 1 y = 0 else: if y == 1: print("Server remains disabled..") time.sleep(5) else: Client.disablephysicalserver(normalizedservername) print("Server disabled") time.sleep(1) y = 1 x = 0 except KeyboardInterrupt: print("Script Killed by user") sys.exit(0)
Reboot Netscaler via API Call using Curl
This script is under construction. |
- Needs Authentication alerts
- Needs NS Parameters to be set globally
- Needs to confirm API execution via HTTP Status Codes
import subprocess as sub import time # Nitro API calls using Curl uptimeapi = ('curl', '-s', '-k', '-X', 'GET', '-H', 'Content-Type:application/json', '--basic', '--user', 'nsroot:pwd@123', 'http://10.107.88.78/nitro/v1/stat/system?attrs=starttime') rebootapi = ('curl', '-s', '-k', '-X', 'POST', '-H', 'Content-Type:application/vnd.com.citrix.netscaler.reboot+json', '--basic', '--user', 'nsroot:pwd@123', '-d', '{"reboot":{"warm":true}}', 'http://10.107.88.78/nitro/v1/config/reboot') print uptimeapi print rebootapi uptime1 = sub.Popen(uptimeapi, stdout=sub.PIPE) output1, err = uptime1.communicate() print output1 sub.Popen(rebootapi, stdout=sub.PIPE) time.sleep(50) uptime2 = sub.Popen(uptimeapi, stdout=sub.PIPE) output2, err = uptime2.communicate() print output2 if (output1 == output2): print "Reboot unsuccessful" else: print "Reboot successful"
- Under testing version of above with using parameters
import subprocess as sub import time # Netscaler Parameters host = "10.107.88.78" username = "nsroot" passwd = "pwd@123" # Boolean true means Warm reboot, else false boolean = "true" # Nitro API calls using Curl uptimeapi = "curl -s -k -X GET -H 'Content-Type:application/json' --basic --user %s:%s http://%s/nitro/v1/stat/system?attrs=starttime" %(username, passwd, host) rebootapi = "curl -s -k -X POST -H 'Content-Type:application/vnd.com.citrix.netscaler.reboot+json' --basic --user %s:%s -d '{'reboot':{'warm':boolean}}' http://%s/nitro/v1/config/reboot" %(username,passwd,boolean,host) print uptimeapi print rebootapi uptime1 = sub.Popen(uptimeapi.split(), stdout=sub.PIPE) output1, err = uptime1.communicate() print output1 sub.Popen(rebootapi.split(), stdout=sub.PIPE) time.sleep(50) uptime2 = sub.Popen(uptimeapi.split(), stdout=sub.PIPE) output2, err = uptime2.communicate() print output2 if (output1 == output2): print "Reboot unsuccessful" else: print "Reboot successful"
Reboot Netscaler via API Call using SDK
Download & Install Nitro REST SDK: citrix.co.in
Save config & Reboot
import sys from nssrc.com.citrix.netscaler.nitro.exception.nitro_exception import nitro_exception from nssrc.com.citrix.netscaler.nitro.resource.base.base_resource import base_resource from nssrc.com.citrix.netscaler.nitro.service.nitro_service import nitro_service ip = "10.107.88.78" username = "nsroot" password = "pwd@123" try: client = nitro_service(ip,"http") client.set_credential(username,password) client.timeout = 20 client.save_config() client.reboot(True) client.logout() print "Config saved, Rebooting now.." except nitro_exception as e: print("Exception::errorcode="+str(e.errorcode)+",message="+ e.message)
Save Netscaler config, Reboot & Verify
- Need to remove repeating code lines for connection from each function
import sys import time from nssrc.com.citrix.netscaler.nitro.exception.nitro_exception import nitro_exception from nssrc.com.citrix.netscaler.nitro.resource.base.base_resource import base_resource from nssrc.com.citrix.netscaler.nitro.service.nitro_service import nitro_service from nssrc.com.citrix.netscaler.nitro.resource.stat.system.system_stats import system_stats ip = "10.107.88.78" username = "nsroot" password = "pwd@123" def uptime(): client = nitro_service(ip,"http") client.set_credential(username,password) client.timeout = 20 node = system_stats.get(client) for i in node: print "Last reboot time is %s" %i.starttime def save_cfg(): client = nitro_service(ip,"http") client.set_credential(username,password) client.timeout = 20 client.save_config() print ("Config Saved") def reboot(): client = nitro_service(ip,"http") client.set_credential(username,password) client.timeout = 20 client.reboot(True) print ("Reboot Initiated") try: uptime() save_cfg() reboot() time.sleep(60) uptime() except nitro_exception as e: print("Exception::errorcode="+str(e.errorcode)+",message="+ e.message)
Extensions Doctor
Source: askubuntu.com
Need to prevent overwrite of existing files using stackoverflow.com. |
#!/usr/bin/env python3 # Need to prevent Overwrite of existing files # jpg = jpeg ignore # write list of files before and after operation # write modified files to list # duplicate remover # : replace _ import os import imghdr import shutil import sys directory = sys.argv[1] for root, dirs, files in os.walk(directory): for name in files: file = root+"/"+name # find double extensions fn1, ext1 = os.path.splitext(file) fn2, ext2 = os.path.splitext(fn1) if not ext1: ftype = imghdr.what(file) newname = file + "." + ftype shutil.move(file, newname) print (file, "has no ext, Appending:", ftype) else: if not ext2: continue if ext2 == ext1: shutil.move(file, fn1.replace(ext2,ext1)) print (file, "has 2 Same ext, Removing:", ext2) elif ext2 != ext1: shutil.move(file, fn1.replace(ext2,ext1)) print (file, "has 2 Diff ext, Removing:", ext2) else: print ("Something Wrong") print ("Unique extensions done") for root, dirs, files in os.walk(directory): for name in files: file = root+"/"+name # find the correct extension ftype = imghdr.what(file) ext = os.path.splitext(file)[1][1:] # find files with the (incorrect) extension to rename if ext: if ftype != ext: if ftype != None: # rename the file shutil.move(file, file.replace(ext,ftype)) print (file, ext, ("=>"), ftype) # in case it can't be determined, mention it in the output else: print("could not determine: "+file) else: continue else: print(file, "No Extension detected") print ("Correcting extension done")
- References
{{#widget:DISQUS
|id=networkm
|uniqid=Python Scripts
|url=https://aman.awiki.org/wiki/Python_Scripts
}}