Python Scripts

From Network Security Wiki


Code Snippets

Print without next line

import sys
sys.stdout.write('.')

You may also need to call to ensure stdout is flushed immediately:

sys.stdout.flush()

Regex

  • Extract a particular value(like 10.5, 100.0):
value  = re.findall(r'([0-9]{3}.[0-9]|[0-9]{2}.[0-9]|[0-9].[0-9])(?=\sid)', output)
# Print all matches
print value
# Print 2nd Match
print value[1]

Arrays

  • Using Array
x = [1,2,3,4,5]
for i in range(5):
  print x[i]
  • Append to Array
f = []
for i in range(30):
  f.append(i)
print f

Pause

import time
time.sleep(5)

Print File Extensions

import os.path
extension = os.path.splitext(filename)[1][1:]

EasySNMP

        This snippet needs more testing.
sudo apt-get install libsnmp-dev snmp-mibs-downloader
sudo apt-get install gcc python-dev
sudo pip install easysnmp
from easysnmp import snmp_get
snmp_get('.1.3.6.1.4.1.2021.10.1.3.1', hostname='10.107.88.93', community='public', version=2)

Calculator

#!/usr/bin/python

#Define Function
def calc():
 operation = raw_input('''
 Please type operator:
 + for addition
 - for subtraction
 * for multiplication
 / for division
 ''')

 #try:
 num1 = float(input('First Number: '))
 num2 = float(input('Second Number: '))
 #except:
 #print("Invalid Input")

 if operation == '+':
	add = num1 + num2
	print("Addition is: %d" % int(add))

 elif operation == '-':
	sub = num1 - num2
	print("Subtraction is %d:" %int(sub))

 elif operation == '*':
	mul = num1 * num2
	print("Multiplication is %d:" %int(mul))

 elif operation == '/':
	div = num1 / num2
	print("Division is %.2f:" %float(div))

 else:
	print("Invalid Operator")

 again()

def again():
 calc_again = raw_input('''
 Do you want to calculate again?
 Please type Y for Yes or N for No.
 ''')

 if calc_again.upper() == "Y":
    calc()
 elif calc_again.upper() == "N":
    print('See you later on..  Bye')
 else:
    again()

def welcome():
  print('''
Welcome to My Calculator
''')

#Call Function
welcome()
calc()

Schedule output

sudo pip install schedule
import schedule
import time

def job():
    print("I'm working...")

schedule.every(1).second.do(job)
schedule.every(1).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("21:14").do(job)

while 1:
    schedule.run_pending()
    time.sleep(1)

Fill Routing Table of Remote Server

import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect( '192.168.1.7', username = 'root', password = 'pwd@123' )

for x in range(1,256):
 for y in range(256):
  for z in range(256):
   for a in range(1):
    ssh.exec_command( "route add -net %d.%d.%d.%d netmask 255.255.255.0 gw 192.168.1.1 " %(x,y,z,a))

ssh.close()

Paramiko execute commands

Source: sebastiandahlgren.se

import paramiko
import time
import select
import sys

# NetScaler/Server Parameters
host = '10.107.88.78'
user = 'nsroot'
passwd = 'pwd@123'
cmd = 'show runningconfig'

i = 1

# Try to connect to the host, Retry a few times if it fails.
while True:
    print "Trying to connect to %s (%i/10)" % (host, i)

    try:
       ssh = paramiko.SSHClient()
       ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
       ssh.connect( host, username = user, password = passwd )
       print "Connected to %s" % host
       break
    except paramiko.AuthenticationException:
       print "Authentication failed when connecting to %s" % host
       sys.exit(1)
    except:
       print "Could not SSH to %s, waiting for it to start" % host
       i += 1
       time.sleep(2)
    # If we could not connect within time limit
    if i == 10:
        print "Could not connect to %s. Giving up" % host
        sys.exit(1)

# Send the command (non-blocking)
stdin, stdout, stderr = ssh.exec_command( cmd )

# Wait for the command to terminate
while not stdout.channel.exit_status_ready():
    # Only print data if there is data to read in the channel
    if stdout.channel.recv_ready():
        rl, wl, xl = select.select([stdout.channel], [], [], 0.0)
        if len(rl) > 0:
            # Print data from stdout
            print stdout.channel.recv(1024),


# Disconnect from the host
print "\nCommand done, closing SSH connection"
ssh.close()

Docker add multiple servers remotely

import paramiko
import time

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect( '10.107.88.93', username = 'aman', password = 'pwd@123' )

for i in range(44400,44405):
  ssh.exec_command("docker run --name docker-nginx%d -p %d:8000 -d nginx" %(i,i))
  print("Docker server created..")
  time.sleep(1)

print("Done")
ssh.close()

CPU check of Remote Server

import paramiko
from terminalplot import plot

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect( '10.107.88.93', username = 'aman', password = 'pwd@123' )

for i in range(10):
 stdin, stdout, stderr = ssh.exec_command( 'top -bn2 | grep "Cpu(s)"' )
 output = stdout.read()

 stat = output.split()
 idle = float(stat[24])
 cpu = (100-idle)
 print cpu

ssh.close()

TCPDump

Print captured packets
import subprocess as sub

p= sub.Popen(('sudo', 'tcpdump', '-l'), stdout=sub.PIPE)
for row in iter(p.stdout.readline, b''):
 print row.rstrip()
Print Packet Length
import subprocess as sub
import re

try:
 p = sub.Popen(('sudo', 'tcpdump', '-l'), stdout=sub.PIPE)
 for row in iter(p.stdout.readline, b''):
   y = re.search('length (\d+)', row)
   if y:
     x = int(y.group(1))
     if x !=0:
       print x
except KeyboardInterrupt:
 print("User killed command.")
Terminal Plot
sudo pip install terminalplot
from terminalplot import plot
#import gnuplotlib as gp
import subprocess as sub
import re

x = 0
a = []
b = []

try:
  p = sub.Popen(('sudo', 'tcpdump', '-l'), stdout=sub.PIPE)
  for row in iter(p.stdout.readline, b''):
    y = re.search('length (\d+)', row)
    if y:
      z = int(y.group(1))
      if z != 0:
         x += 1
         print x
         b.append(z)
         a.append(x)
         if x == 10:
          plot(a,b)
          print a
          print b
# Remove below # if you don't want continuous output
#         break
          x = 0
          a = []
          b = []

except KeyboardInterrupt:
 print("User killed command.")
Plot using Stdout
import subprocess as sub
import re
import sys

x = 0

try:
  p = sub.Popen(('sudo', 'tcpdump', '-l'), stdout=sub.PIPE)
  for row in iter(p.stdout.readline, b''):
    y = re.search('length (\d+)', row)
    if y:
      z = int(y.group(1))
      x += 1
      if (z < 190):
         for i in range(z):
            sys.stdout.write('=')
         print (">")

except KeyboardInterrupt:
  print("User killed command.")

Speaking Passwords

eSpeak Password
import os
def check():
 x = int(input("Input your password: "))
 if (x == 1234):
  os.system("espeak 'correct password'")
 else:
  os.system("espeak 'wrong password'")

def again():
  check()
  again()

check()
again()
Using pyttsx
import pyttsx
engine = pyttsx.init()

def check():
 x = int(input("Input your password: "))
 if (x == 1234):
  engine.say('correct password')
  engine.runAndWait()
 else:
  engine.say('wrong password')
  engine.runAndWait()

def again():
  check()
  again()

check()
again()

NetScaler NITRO REST Server Monitor using SSH

  • Monitors CPU Utilization of Backend Server, Disables service if CPU Exceeds 50%
  • Requires: NitroRestClient
import paramiko
import sys
import re
import time
from bin import NitroRestClient

# Netscaler Parameters
nsip = "10.107.88.78"
nsun = "nsroot"
nspwd = "pwd@123"
nssrv = "Ubuntu_Server"

# Server Parameters
srvip = "10.107.88.93"
srvun = "aman"
srvpwd = "Passion@123"

# Loop Logic variables
x = 1
y = 0

# Attempt SSH Connection
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect( srvip, username = srvun, password = srvpwd )

# Attempt instantiation of NitroRestClient
try:
    Client = NitroRestClient.NitroRestClient(nsip, nsun, nspwd)
except:
    print("Couldn't create Nitro Session, check username and password and network connectivity")
    sys.exit(1)

# Check servername for validity
try:
    normalizedservername = Client.servernamecheck(nssrv)
except ValueError as e:
    print("Invalid server name was provided. Please check server name and try again.")
    sys.exit(2)

# Main Program
try:
 while True:
   stdin, stdout, stderr = ssh.exec_command( 'top -bn2 | grep "Cpu(s)"' )
   output = stdout.read()
   value  = re.findall(r'([0-9]{3}.[0-9]|[0-9]{2}.[0-9]|[0-9].[0-9])\sid', output)
   cpu = (100-float(value[1]))
   print cpu

   if cpu < 50:
     if x == 1:
       print("CPU is normal..")
       time.sleep(5)
     else:
       print("Server enabled")
       Client.enablephysicalserver(normalizedservername)
       x = 1
       y = 0
   else:
     if y == 1:
       print("Server remains disabled..")
       time.sleep(5)
     else:
       Client.disablephysicalserver(normalizedservername)
       print("Server disabled")
       time.sleep(1)
       y = 1
       x = 0

except KeyboardInterrupt:
 print("Script Killed by user")
 ssh.close()
 sys.exit(0)

NetScaler NITRO REST Server Monitor using SNMP

  • Monitors CPU Utilization of Backend Server, Disables service if Load Exceeds 0.50
  • Requires: NitroRestClient
  • Requires SNMP configured on Server & SNMP Utils installed on the PC where script is run.
import time
import subprocess
import sys
import re
from bin import NitroRestClient

# Netscaler Parameters & Credentials
nsip = "10.107.88.78"
nsun = "nsroot"
nspwd = "pwd@123"
nssrv = "Ubuntu_Server"

# Loop Logic variables
x = 1
y = 0

# Attempt instantiation of NitroRestClient
try:
    Client = NitroRestClient.NitroRestClient(nsip, nsun, nspwd)
except:
    print("Couldn't create Nitro Session, check username and password and network connectivity")
    sys.exit(1)

# Check servername for validity
try:
    normalizedservername = Client.servernamecheck(nssrv)
except ValueError as e:
    print("Invalid server name was provided. Please check server name and try again.")
    sys.exit(2)

# Main Program
try:
 while True:
   snmpload = subprocess.Popen(["snmpget", "-v", "2c", "-O", "qv", "-c", "public", "10.107.88.93", ".1.3.6.1.4.1.2021.10.1.3.1"], stdout=subprocess.PIPE)
   output, err = snmpload.communicate()
   value  = re.sub(r'^"|"$', '', output)
   load = float(value)
   print(load)

   if load < 0.10:
     if x == 1:
       print("Load is normal..")
       time.sleep(5)
     else:
       print("Server enabled")
       Client.enablephysicalserver(normalizedservername)
       x = 1
       y = 0
   else:
     if y == 1:
       print("Server remains disabled..")
       time.sleep(5)
     else:
       Client.disablephysicalserver(normalizedservername)
       print("Server disabled")
       time.sleep(1)
       y = 1
       x = 0

except KeyboardInterrupt:
 print("Script Killed by user")
 sys.exit(0)

Reboot Netscaler via API Call using Curl

        This script is under construction.
  • Needs Authentication alerts
  • Needs NS Parameters to be set globally
  • Needs to confirm API execution via HTTP Status Codes
import subprocess as sub
import time

# Nitro API calls using Curl
uptimeapi = ('curl', '-s', '-k', '-X', 'GET', '-H', 'Content-Type:application/json', '--basic', '--user', 'nsroot:pwd@123', 'http://10.107.88.78/nitro/v1/stat/system?attrs=starttime')
rebootapi = ('curl', '-s', '-k', '-X', 'POST', '-H', 'Content-Type:application/vnd.com.citrix.netscaler.reboot+json', '--basic', '--user', 'nsroot:pwd@123', '-d', '{"reboot":{"warm":true}}', 'http://10.107.88.78/nitro/v1/config/reboot')


print uptimeapi
print rebootapi

uptime1 = sub.Popen(uptimeapi, stdout=sub.PIPE)
output1, err = uptime1.communicate()
print output1

sub.Popen(rebootapi, stdout=sub.PIPE)
time.sleep(50)

uptime2 = sub.Popen(uptimeapi, stdout=sub.PIPE)
output2, err = uptime2.communicate()
print output2

if (output1 == output2):
  print "Reboot unsuccessful"
else:
  print "Reboot successful"
Under testing version of above with using parameters
import subprocess as sub
import time

# Netscaler Parameters
host = "10.107.88.78"
username = "nsroot"
passwd = "pwd@123"
# Boolean true means Warm reboot, else false
boolean = "true"

# Nitro API calls using Curl
uptimeapi = "curl -s -k -X GET -H 'Content-Type:application/json'  --basic --user %s:%s http://%s/nitro/v1/stat/system?attrs=starttime" %(username, passwd, host)
rebootapi = "curl -s -k -X POST -H 'Content-Type:application/vnd.com.citrix.netscaler.reboot+json' --basic --user %s:%s -d '{'reboot':{'warm':boolean}}' http://%s/nitro/v1/config/reboot" %(username,passwd,boolean,host)

print uptimeapi
print rebootapi

uptime1 = sub.Popen(uptimeapi.split(), stdout=sub.PIPE)
output1, err = uptime1.communicate()
print output1

sub.Popen(rebootapi.split(), stdout=sub.PIPE)
time.sleep(50)

uptime2 = sub.Popen(uptimeapi.split(), stdout=sub.PIPE)
output2, err = uptime2.communicate()
print output2

if (output1 == output2):
  print "Reboot unsuccessful"
else:
  print "Reboot successful"

Reboot Netscaler via API Call using SDK

Download & Install Nitro REST SDK: citrix.co.in

Save config & Reboot

import sys
from nssrc.com.citrix.netscaler.nitro.exception.nitro_exception import nitro_exception
from nssrc.com.citrix.netscaler.nitro.resource.base.base_resource import base_resource
from nssrc.com.citrix.netscaler.nitro.service.nitro_service import nitro_service

ip = "10.107.88.78"
username = "nsroot"
password = "pwd@123"

try:
   client = nitro_service(ip,"http")
   client.set_credential(username,password)
   client.timeout = 20
   client.save_config()
   client.reboot(True)
   client.logout()
   print "Config saved, Rebooting now.."
except nitro_exception as e:
   print("Exception::errorcode="+str(e.errorcode)+",message="+ e.message)

Save Netscaler config, Reboot & Verify

  • Need to remove repeating code lines for connection from each function
import sys
import time
from nssrc.com.citrix.netscaler.nitro.exception.nitro_exception import nitro_exception
from nssrc.com.citrix.netscaler.nitro.resource.base.base_resource import base_resource
from nssrc.com.citrix.netscaler.nitro.service.nitro_service import nitro_service
from nssrc.com.citrix.netscaler.nitro.resource.stat.system.system_stats import system_stats

class ns_edit :
	def __init__(self, ip, username, password):
		self.ip = ip
		self.username = username
		self.password = password
		try:
			self.client = nitro_service(self.ip,"http")
			self.client.set_credential(self.username,self.password)
		except nitro_exception as  e:
			print("Exception::errorcode="+str(e.errorcode)+",message="+ e.message)
	def uptime(self):
		try:
			node = system_stats.get(self.client)
			for i in node:
				print "Last reboot time is %s" %i.starttime
		except nitro_exception as  e:
			print("Exception::errorcode="+str(e.errorcode)+",message="+ e.message)
	def reboot(self):
		try:
			self.client.reboot(True)
			print "Node rebooted"
		except nitro_exception as  e:
			print("Exception::errorcode="+str(e.errorcode)+",message="+ e.message)

if __name__ == '__main__':
	ns = ns_edit("10.107.88.78", "nsroot", "pwd@123")
	ns.uptime()
	ns.reboot()
    time.sleep(60)
	ns.uptime()

Python Dice

#!/usr/bin/env python3
from tkinter import *
from random import randint
import sys

root = Tk()

header = Label(root, text="Dice App v1.0", font=("Times", 35))
header.pack()

photo = PhotoImage(file="dice.gif")

label = Label(image=photo)
label.pack(side=RIGHT)

e = Entry(root, width=10, font=("Helvetica", 30, "bold"), justify=CENTER)
e.pack()

e.delete(0, END)
e.insert(0, "Ready!!")

def dice():
    e.delete(0, END)
    e.insert(0, randint(1,6))

def exit():
    print("Bye Bye!!")
    sys.exit()

a = Button(root, text="Roll Dice", width=20, command=dice)
a.pack()

f = Button(root, text="Exit App", width=20, fg="Red", command=exit)
f.pack()

root.geometry("400x200")
root.mainloop()


Weather Parcer

import socket
import json

s = socket.socket()
addr = socket.getaddrinfo('api.openweathermap.org', 80)

s.connect(addr[0][4])
s.send(b'GET http://api.openweathermap.org/data/2.5/weather?q=Bangalore&appid=84d996853fcc4db149bc40acb09a3ef7_1 HTTP/1.0\r\n\r\n')

html = s.recv(1000)
div = html.split(b'\r\n\r\n')[-1]
data = json.loads(div)

main = data['weather'][0]['main']
print("Main =", main)

desc = data['weather'][0]['description']
print("Description =", desc)

Celcius = data['main']['temp'] - 273.15
rnd = str(round(Celcius,2))
print("Temperature =", rnd, "\xb0C")

pres = data['main']['pressure']
print("Pressure =", pres, "mbar")

hum = data['main']['humidity']
print("Humidity =", hum, "%")

vis = int(data['visibility'])/1000
print("Visibility =", vis, "Km?")

speed = data['wind']['speed']
try:
    dir = data['wind']['deg']
except KeyError:
    dir = 0
rnd2 = str(round(dir,2))
print("Wind Speed =", speed, "Km/h?")
print("Wind Direction =", rnd2, "Degree")

Plot Temperature using Thingspeak

Upload Temperature

import socket
import json
from time import strftime,localtime,sleep

while True:
    # Fetch Temperature from openweathermap.org
    s = socket.socket()
    addr = socket.getaddrinfo('api.openweathermap.org', 80)

    s.connect(addr[0][4])
    s.send(b'GET http://api.openweathermap.org/data/2.5/weather?q=Bangalore&appid=84d996853fcc4db149bc40acb09a3ef7_1 HTTP/1.0\r\n\r\n')

    html = s.recv(1000)
    div = html.split(b'\r\n\r\n')[-1]
    data = json.loads(div.decode('utf-8'))

    cel = data['main']['temp'] - 273.15
    temp = float(round(cel,2))

    # Upload Temperature Data to thingspeak.com
    s2 = socket.socket()
    addr2 = socket.getaddrinfo('api.thingspeak.com', 80)

    s2.connect(addr2[0][4])
    s2.send(b'POST https://api.thingspeak.com/update?api_key=XB15HC17CZH6KYMV_1&field1=%f HTTP/1.0\r\n\r\n' % temp)
    html2 = s2.recv(1000)
    div2 = html2.split(b' ')[1]

    # Check if logged data successfully
    if int(div2) == 200:
        currtime = strftime("%H:%M:%S %d-%b-%Y", localtime())
        print("Uploaded Data @", currtime,":", "Temp =", temp, "\xb0C")
    else:
        print("HTTP Error:", int(div2))

    # Wait for 10 minutes
    sleep(600)

Bulk Upload Parameters

import socket
import json
from time import strftime,localtime,sleep
import requests

# Fetch Temperature from openweathermap.org
def weather():
    s = socket.socket()
    addr = socket.getaddrinfo('api.openweathermap.org', 80)
    s.connect(addr[0][4])
    s.send(b'GET http://api.openweathermap.org/data/2.5/weather?q=Bangalore&units=metric&appid=84d996853fcc4db149bc40acb09a3ef7_1 HTTP/1.0\r\n\r\n')
    html = s.recv(1000)
    s.close()
    div = html.split(b'\r\n\r\n')[-1]
    data = json.loads(div.decode('utf-8'))
    return data

def thing():
    # Call weather function
    data = weather()
    temp = data['main']['temp']
    pres = data['main']['pressure']
    hum = data['main']['humidity']
    vis = int(data['visibility'])/1000
    speed = data['wind']['speed']
    try:
        dir = data['wind']['deg']
    except KeyError:
        dir = 0
    ctime = strftime("%Y-%m-%d %H:%M:%S +0530", localtime())
    print("Data Fetched @",ctime,":","Temp(C)=",temp,"\xb0C","Press(hPa)=",pres,"Hum(%)=",hum,"Vis(Km)=",vis,"Speed(m/s)=",speed,"Dir(deg)=",dir)
    # Upload Temperature Data to thingspeak.com
    payload = {"write_api_key":"G97NZS1XWXCS798C_1","updates":[{"created_at":ctime,"field1":temp,"field2":pres,"field3":hum,"field4":vis,"field5":speed,"field6":dir}]}
    url = 'https://api.thingspeak.com/channels/396314/bulk_update.json'
    headers = {'content-type': 'application/json'}
    response = requests.post(url, data=json.dumps(payload), headers=headers)
    return response

while True:
    response = thing()

    # Check if logged data successfully
    if response.status_code == 202:
        print("Data successfully uploaded :", response.status_code)
    else:
        print("HTTP Error Code:", response.status_code)

    # Wait for 10 minutes
    sleep(600)

Slack Webhook POST

#!/usr/bin/python
import json
import requests

requests.packages.urllib3.disable_warnings()

# Create the webhook at https://my.slack.com/services/new/incoming-webhook/
webhook_url = 'https://hooks.slack.com/services/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'
slack_data = {'text': "Sev1 Incident: Pool is Down :skull:"}

response = requests.post(
    webhook_url, data=json.dumps(slack_data),
    headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
    raise ValueError(
        'Request to slack returned an error %s, the response is:\n%s'
        % (response.status_code, response.text)
)



References





{{#widget:DISQUS |id=networkm |uniqid=Python Scripts |url=https://aman.awiki.org/wiki/Python_Scripts }}