Python Scripts: Difference between revisions

 
(9 intermediate revisions by the same user not shown)
Line 678:
</syntaxhighlight>
 
= ExtensionsPython correctorDice =
Source: [https://askubuntu.com/questions/631900/correct-file-extensions askubuntu.com]
 
<syntaxhighlight lang="python">
#!/usr/bin/env python3
from tkinter import *
from random import randint
import sys
 
root = Tk()
# Append _(1) if existing file
# Duplicate remover
# Webp Conv failed > do not rm file
# HAUceV0.jpg.jpg => not renamed when image not supported
# use dwebp 1.webp -o - | convert - 1.jpg
# Do not run command recursively
 
header = Label(root, text="Dice App v1.0", font=("Times", 35))
import os, re, sys, glob
header.pack()
import imghdr, shutil
import subprocess, tarfile
from pathlib import Path
 
photo = PhotoImage(file="dice.gif")
try:
 
directory = sys.argv[1]
label = Label(image=photo)
except:
label.pack(side=RIGHT)
print("\nException occured: Full Directory path with images not defined.\n")
 
e = Entry(root, width=10, font=("Helvetica", 30, "bold"), justify=CENTER)
e.pack()
 
e.delete(0, END)
e.insert(0, "Ready!!")
 
def dice():
e.delete(0, END)
e.insert(0, randint(1,6))
 
def exit():
print("Bye Bye!!")
sys.exit()
 
a = Button(root, text="Roll Dice", width=20, command=dice)
# Code for Archiving existing files & save list of files
a.pack()
# Delete Archive if no of files is same
 
f = Button(root, text="Exit App", width=20, fg="Red", command=exit)
print("---=========== Started backup of files =============---")
f.pack()
 
root.geometry("400x200")
tar = tarfile.open("backup.tar.gz", "w:gz")
root.mainloop()
wr = open("file_list.txt", "w")
</syntaxhighlight>
wr.write("List of files:\n\n")
x = 0
 
for name in glob.glob(os.path.join(directory, '*')):
wr.write(name + '\n')
tar.add(name)
x += 1
wr.close()
tar.close()
 
= Weather Parcer =
print ("Initial No of Files: ", x)
*Need to register a account & get API Key from http://openweathermap.org
 
<syntaxhighlight lang="py>
import socket
import json
 
s = socket.socket()
# Code to add Ext if not present & remove double Ext
addr = socket.getaddrinfo('api.openweathermap.org', 80)
 
s.connect(addr[0][4])
print("\n---=========== Started Unique extensions =============---\n")
s.send(b'GET http://api.openweathermap.org/data/2.5/weather?q=Bangalore&appid=84d996853fcc4db149bc40acb09a3ef7_1 HTTP/1.0\r\n\r\n')
 
html = s.recv(1000)
for root, dirs, files in os.walk(directory):
div = html.split(b'\r\n\r\n')[-1]
for name in files:
data = json.loads(div)
file = root +"/"+ name
 
main = data['weather'][0]['main']
# find double extensions
print("Main =", main)
fn1, ext1 = os.path.splitext(file)
fn2, ext2 = os.path.splitext(fn1)
ftype = imghdr.what(file)
 
desc = data['weather'][0]['description']
if ftype == None:
print("Description =", desc)
print(file, "Unsupported file")
else:
newname = file +"."+ ftype
if not ext1:
filechk2 = Path(newname)
if filechk2.is_file():
print (filechk2, "File already EXISTS, not overwriting")
else:
shutil.move(file, newname)
print (file, "has no ext, Appending:", ftype)
else:
if not ext2:
# print("1 Extension only")
continue
elif ext2 == ext1:
filechk = file.replace(ext1,ftype)
filechk2 = Path(filechk)
if filechk2.is_file():
print (filechk2, "File already EXISTS, not overwritting")
else:
shutil.move(file, fn1.replace(ext2,ext1))
print (file, "has 2 Same ext, Removing:", ext2)
elif ext2 != ext1:
filechk = file.replace(ext1,ftype)
filechk2 = Path(filechk)
if filechk2.is_file():
print (filechk2, "File already EXISTS, not overwritting")
else:
shutil.move(file, fn1.replace(ext2,ext1))
print (file, "has 2 Diff ext, Removing:", ext2)
else:
print ("Something wrong with file or logic")
 
Celcius = data['main']['temp'] - 273.15
rnd = str(round(Celcius,2))
print("Temperature =", rnd, "\xb0C")
 
pres = data['main']['pressure']
# Code to Correct File Extensions
print("Pressure =", pres, "mbar")
 
hum = data['main']['humidity']
print("\n---=============== Started correcting extension ================---\n")
print("Humidity =", hum, "%")
 
vis = int(data['visibility'])/1000
for root, dirs, files in os.walk(directory):
print("Visibility =", vis, "Km?")
for name in files:
file = root+"/"+name
 
speed = data['wind']['speed']
# find the correct extension
try:
ftype = imghdr.what(file)
extdir = os.path.splitext(file)data[1'wind'][1:'deg']
except KeyError:
dir = 0
rnd2 = str(round(dir,2))
print("Wind Speed =", speed, "Km/h?")
print("Wind Direction =", rnd2, "Degree")
</syntaxhighlight>
 
= Plot Temperature using Thingspeak =
# find files with the (incorrect) extension to rename
*Need to register a account & get API Key from http://openweathermap.org
if ext:
*Need to register a account & get API Key from http://thingspeak.com
if ftype != ext:
if ftype != None:
if (ftype == "jpeg") & (ext == "jpg"):
continue
# print(file, "File type is JPG/JPEG, ignoring")
else:
filechk = file.replace(ext,ftype)
filechk2 = Path(filechk)
if filechk2.is_file():
print (filechk2, "File already EXISTS, not overwritting")
else:
# rename the file
shutil.move(file, file.replace(ext,ftype))
print (file, ext, ("=>"), ftype)
# in case it can't be determined, mention it in the output
else:
if ext == "png":
filechk = file.replace(ext,"jpg")
filechk2 = Path(filechk)
if filechk2.is_file():
print (filechk2, "File already EXISTS, not overwritting")
else:
shutil.move(file, file.replace(ext,"jpg"))
print (file, "File type not determined for PNG =>", file.replace(ext,"jpg"))
else:
print(file, "Could not determine file type")
else:
continue
# print(file, "Correct Extension")
else:
print(file, "No Extension detected")
 
== Upload Temperature ==
 
<syntaxhighlight lang="py>
# Code to convert WEBP to PNG
import socket
import json
from time import strftime,localtime,sleep
 
while True:
print("\n---================== Started WEBP to PNG conversion ===============---\n")
# Fetch Temperature from openweathermap.org
s = socket.socket()
addr = socket.getaddrinfo('api.openweathermap.org', 80)
 
s.connect(addr[0][4])
for root, dirs, files in os.walk(directory):
s.send(b'GET http://api.openweathermap.org/data/2.5/weather?q=Bangalore&appid=84d996853fcc4db149bc40acb09a3ef7_1 HTTP/1.0\r\n\r\n')
for name in files:
file = root + "/" + name
fn, ext = os.path.splitext(file)
if ext == ".webp":
fnpng = fn + ".png"
fpath = Path(fnpng)
if fpath.is_file():
print (fnpng, "File already EXISTS, not overwritting")
else:
conv = subprocess.Popen(["dwebp", file, "-o", fnpng], stdout=subprocess.PIPE)
output, err = conv.communicate()
rmfile = subprocess.Popen(["rm", file], stdout=subprocess.PIPE)
output2, err2 = rmfile.communicate()
else:
continue
# print (file, "File is not Webp, Skipping")
 
html = s.recv(1000)
div = html.split(b'\r\n\r\n')[-1]
data = json.loads(div.decode('utf-8'))
 
cel = data['main']['temp'] - 273.15
# Code to replace : with _ in file names
temp = float(round(cel,2))
 
# Upload Temperature Data to thingspeak.com
print("\n---=============== Started Colon replace space =============---\n")
s2 = socket.socket()
addr2 = socket.getaddrinfo('api.thingspeak.com', 80)
 
s2.connect(addr2[0][4])
for root, dirs, files in os.walk(directory):
s2.send(b'POST https://api.thingspeak.com/update?api_key=XB15HC17CZH6KYMV_1&field1=%f HTTP/1.0\r\n\r\n' % temp)
for name in files:
filehtml2 = root + "/" + names2.recv(1000)
div2 = html2.split(b' ')[1]
if re.search(r':', file):
filechk = file.replace(":","_")
filechk2 = Path(filechk)
if filechk2.is_file():
print (filechk2, "File already EXISTS, not overwriting")
else:
# rename the file
shutil.move(file, file.replace(":","_"))
print (file, ("Colon => "), file.replace(":","_"))
else:
continue
# print(name, " Colon Not Found in name")
 
# Check if logged data successfully
print("\n---===================== All Done =======================---\n")
if int(div2) == 200:
currtime = strftime("%H:%M:%S %d-%b-%Y", localtime())
print("Uploaded Data @", currtime,":", "Temp =", temp, "\xb0C")
else:
print("HTTP Error:", int(div2))
 
# Wait for 10 minutes
sleep(600)
</syntaxhighlight>
 
== Bulk Upload Parameters ==
# Code to verify if any file missing, deleted, overwritten
y = 0
 
<syntaxhighlight lang="python">
for name in glob.glob(os.path.join(directory, '*')):
import socket
y += 1
import json
from time import strftime,localtime,sleep
import requests
 
# Fetch Temperature from openweathermap.org
print ("Initial No of Files: ", x)
def weather():
print ("Final No of Files: ", y)
s = socket.socket()
addr = socket.getaddrinfo('api.openweathermap.org', 80)
s.connect(addr[0][4])
s.send(b'GET http://api.openweathermap.org/data/2.5/weather?q=Bangalore&units=metric&appid=84d996853fcc4db149bc40acb09a3ef7_1 HTTP/1.0\r\n\r\n')
html = s.recv(1000)
s.close()
div = html.split(b'\r\n\r\n')[-1]
data = json.loads(div.decode('utf-8'))
return data
 
def thing():
if x == y:
# Call weather function
print("Operation completed successfully")
data = weather()
else:
temp = data['main']['temp']
print("Operation failed, some files missing")
pres = data['main']['pressure']
hum = data['main']['humidity']
vis = int(data['visibility'])/1000
speed = data['wind']['speed']
try:
dir = data['wind']['deg']
except KeyError:
dir = 0
ctime = strftime("%Y-%m-%d %H:%M:%S +0530", localtime())
print("Data Fetched @",ctime,":","Temp(C)=",temp,"\xb0C","Press(hPa)=",pres,"Hum(%)=",hum,"Vis(Km)=",vis,"Speed(m/s)=",speed,"Dir(deg)=",dir)
# Upload Temperature Data to thingspeak.com
payload = {"write_api_key":"G97NZS1XWXCS798C_1","updates":[{"created_at":ctime,"field1":temp,"field2":pres,"field3":hum,"field4":vis,"field5":speed,"field6":dir}]}
url = 'https://api.thingspeak.com/channels/396314/bulk_update.json'
headers = {'content-type': 'application/json'}
response = requests.post(url, data=json.dumps(payload), headers=headers)
return response
 
while True:
response = thing()
 
# Check if logged data successfully
if response.status_code == 202:
print("Data successfully uploaded :", response.status_code)
else:
print("HTTP Error Code:", response.status_code)
 
# Wait for 10 minutes
sleep(600)
</syntaxhighlight>
 
== Slack Webhook POST ==
<syntaxhighlight lang="python">
#!/usr/bin/python
import json
import requests
 
requests.packages.urllib3.disable_warnings()
 
# Create the webhook at https://my.slack.com/services/new/incoming-webhook/
webhook_url = 'https://hooks.slack.com/services/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'
slack_data = {'text': "Sev1 Incident: Pool is Down :skull:"}
 
response = requests.post(
webhook_url, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text)
)
</syntaxhighlight>
 
 
<br />