| input | process1 | process2 or reverse | output |
google AI studio https://makersuite.google.com/app/apikey https://medium.com/international-school-of-ai-data-science/transforming-text-and-image-processing-with-gemini-ai-25d1dc88c27f https://github.com/FareedKhan-dev/Gemini-AI-chatbot/blob/master/app.py | API keys:key_llm AIzaSyD308Gx1Y_vkvsz9_MpeS_SBRB_5yCNrG0 | https://csvkit.readthedocs.io/en/1.0.7/scripts/csvsql.html https://learnsql.com/blog/import-csv-mysql-database/ | import google.generativeai as genai model = genai.GenerativeModel('gemini-pro') my_api_key_gemini = "AIzaSyD308Gx1Y_vkvsz9_MpeS_SBRB_5yCNrG0" genai.configure(api_key=my_api_key_gemini) |
| database | ===pipebear.net===cpanel hostinger ### remote dbConn = pymysql.Connect( host='srv473.hstgr.io', port=3306, user='u284407381_bear', passwd='1234Sander', db='u284407381_db01', charset='utf8' ) | ===tpri.site===cpanel hostinger ### remote conn_mysql = pymysql.Connect( host='sql456.main-hosting.eu',#sql132.main-hosting.eu', port=3306, user='u929718468_bear', passwd='1234Sander', db='u929718468_db01',#u284407381_db01', charset='utf8' ) ### local //set the database connection $server = "mysql.hostinger.com"; $database = "u929718468_db01"; $user = "u929718468_bear"; $password = "1234Sander";
?> | ===pibear.net===my.fastcomet.com ### remote dbConn = pymysql.Connect( host='sg7.fcomet.com', port=3306, user='pibearne_user', passwd='001234@fcomet', db='pibearne_dbfc', charset='utf8' ) ### localconn_mysql = pymysql.Connect( host='localhost', port=3306, user='pibearne_user', passwd='001234@fcomet', db='pibearne_dbfc', charset='utf8' ) |
| image file | ###cv2 with Chinese file name #IMAGE_SIZE = [224, 224] #Default image size for VGG16 #img_size = 224 stream = open("mat_norm01中文.jpg", "rb") bytes = bytearray(stream.read()) numpyarray = np.asarray(bytes, dtype=np.uint8) img_bgr = cv2.imdecode(numpyarray, cv2.IMREAD_UNCHANGED) img_arr = img_bgr #img_arr = cv2.cvtColor(img_bgr, cv2.COLOR_GRAY2RGB) #img_bgr = cv2.imread(os.path.join(path, ui_img)) #讀取所有圖片檔案 # 將 BGR 圖片轉為 RGB 圖片 #img_arr = img_bgr[:,:,::-1]#[:,::-1]# print(img_arr.shape) print(len(img_arr.shape)) print(img_arr) plt.imshow(img_arr) plt.show() #resized_arr = cv2.resize(img_arr, (img_size, img_size)) # Reshaping images to preferred size | ### CV2 only img_arr = cv2.imread("mat_norm01中文.jpg" ) print(img_arr.shape) print(len(img_arr.shape)) print(img_arr) plt.imshow(img_arr) plt.show() AttributeError Traceback (most recent call last)
in
2 img_arr = cv2.imread("mat_norm01中文.jpg" )
3
----> 4 print(img_arr.shape)
5 print(len(img_arr.shape))
6 print(img_arr)
AttributeError: 'NoneType' object has no attribute 'shape' ### scikit-image way from skimage import io img_arr = io.imread("mat_norm01中文.jpg" , as_gray=False) print(img_arr.shape) print(len(img_arr.shape)) print(img_arr) plt.imshow(img_arr) plt.show() | img_arr |
| list | ### write list to file, line by line with open(dir1 + 'voc_classes.txt', 'w') as f: for line in lstClass: f.write(f"{line}\n") ### read and edit example strMessage = 'waiting' strProgress = '0' a_file = open("progress.txt", "r") lines = a_file.readlines() lines[0] = strMessage + "\n" lines[1] = strProgress + "\n" a_file = open("progress.txt", "w") a_file.writelines(lines) a_file.close() | with open(cfg.YOLO.CLASSES, 'r') as f: classes = f.readlines() num_classes = len(classes) labels = [] with open('./label_cnn/'+'label_carditube2.txt') as file: labels = [line.rstrip() for line in file] def read_class_names(class_file_name): '''loads class name from a file''' names = {} with open(class_file_name, 'r') as data: for ID, name in enumerate(data): names[ID] = name.strip('\n') return names | file |
| predict | ###tfk case from tensorflow.keras.models import load_model model=load_model('./model_cnn/'+"carditube2vgg19.h5") from skimage import io from skimage.transform import resize n_size = 224#128#256#64; n_channel = 3 labels = ['bad', 'good'] lstImg = [] lstName = [] dir1 = './carditube2_test/' for imgFile in os.listdir(dir1): if 'jpg' in imgFile or 'JPG' in imgFile or 'png' in imgFile: img = io.imread(dir1 + imgFile , as_gray=False) img = resize(img, (n_size, n_size, n_channel)) lstImg.append(img) lstGot = imgFile.split('_') lstName.append(lstGot[0]) x_ = np.array(lstImg) y_pred=model.predict(x_,batch_size=1) ###[[9.9376833e-01, 6.2317080e-03], [9.9998426e-01, 1.5723252e-05], … y_pred = np.argmax(y_pred, axis=1) ###[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] y_test = [labels.index(x) for x in lstName] ###[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] | ### tf10 case lstImg = [] #img = io.imread(upload_path, as_gray=False) #check if png, then convert to jpg and save if '.png' in strFileName: img_arr = io.imread(upload_path, as_gray=False) img_arr = color.rgba2rgb(img_arr) else: img_arr = io.imread(upload_path, as_gray=False) img = resize(img_arr, (n_size, n_size, 3))#, n_channel)) lstImg.append(img) x_test = np.array(lstImg) x_test = normalize(x_test) lstClass = [] lstMax = [] pred, pred_max = sess.run([tf.argmax(y_pred, 1), y_pred_softmax], feed_dict={x_in: x_test}) lstClass.extend(list(pred)) lstMax.extend(list(pred_max))
strRes = dicClass2[str(lstClass[0])]+":"+str(lstMax[0]) #dRes = 0 #for i in range(n_label): #dRes += float(dicClass2[str(i)][-3:])*float(lstMax[0][i]) #dRes = round(dRes, 3) #strRes = dicClass2[str(lstClass[0])][:4] + ":" + str(dRes) + " ppb" ''' dRes = 0 for i in range(n_label): dRes += float(dicClass2[str(i)])*float(lstMax[0][i]) dRes = round(dRes, 3) strRes = str(dRes) ''' | result |
| youtube | import cv2 import pafy#pafy first, pip install youtube-dl==2020.12.2 url = "https://www.youtube.com/watch?v=i4ZSMDWNXTg" video = pafy.new(url) best = video.getbest(preftype="mp4") capture = cv2.VideoCapture() capture.open(best.url) success, frame = capture.read() c = 1 frameRate = 30 # Frame number interception interval (one frame is intercepted every 10 frames) | while success: #cv2.imshow('frame data', frame) if(c % frameRate == 1): #print("Start to capture video:" + str(c) + "frame") cv2.imwrite("./img_youtube4/img0815_" + str(c) +'.jpg', frame) c += 1 if cv2.waitKey(1) & 0xFF == ord('q'): break success, frame = capture.read() cv2.destroyAllWindows() capture.release() | image |
| | | | |
webcamera ip camera | from flask import Flask, render_template, Response import cv2 app = Flask(__name__) camera = cv2.VideoCapture('rtsp://Davidtsao:Ayqmgkds1@192.168.68.114:554/stream2') # for local webcam use cv2.VideoCapture(0) def gen_frames(): # generate frame by frame from camera while True: # Capture frame-by-frame success, frame = camera.read() # read the camera frame if not success: break else: ret, buffer = cv2.imencode('.jpg', frame) frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') # concat frame one by one @app.route('/video_feed') def video_feed(): #Video streaming route. Put this in the src attribute of an img tag return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/') def index(): """Video streaming home page.""" return render_template('web_rtsp.html') if __name__ == '__main__': app.run(debug=True) | ###web_rtsp.html as below
Live Streaming  }})
| web streaming |
webcamera ip camera | # source code from # https://picamera.readthedocs.io/en/release-1.13/recipes2.html#web-streaming
import io #import picamera import logging import socketserver from threading import Condition from http import server import cv2 import threading
PAGE="""\
PiCamera MJPEG Streaming Demo

"""
class StreamingOutput(object): def __init__(self): self.frame = None self.buffer = io.BytesIO() self.condition = Condition()
def write(self, buf): if buf.startswith(b'\xff\xd8'): # New frame, copy the existing buffer's content and notify all # clients it's available self.buffer.truncate() with self.condition: self.frame = self.buffer.getvalue() self.condition.notify_all() self.buffer.seek(0) return self.buffer.write(buf)
| class StreamingHandler(server.BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(301) self.send_header('Location', '/index.html') self.end_headers() elif self.path == '/index.html': content = PAGE.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) elif self.path == '/stream.mjpg': self.send_response(200) self.send_header('Age', 0) self.send_header('Cache-Control', 'no-cache, private') self.send_header('Pragma', 'no-cache') self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME') self.end_headers() try: while True: with output.condition: output.condition.wait() frame = output.frame self.wfile.write(b'--FRAME\r\n') self.send_header('Content-Type', 'image/jpeg') self.send_header('Content-Length', len(frame)) self.end_headers() self.wfile.write(frame) self.wfile.write(b'\r\n') except Exception as e: logging.warning( 'Removed streaming client %s: %s', self.client_address, str(e)) else: self.send_error(404) self.end_headers() class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer): allow_reuse_address = True daemon_threads = True
output = StreamingOutput() address = ('', 8000) server = StreamingServer(address, StreamingHandler) def server_run(): server.serve_forever() threading.Thread(target=server_run).start()
cap = cv2.VideoCapture('rtsp://Davidtsao:Ayqmgkds1@192.168.68.114:554/stream2') cap.set(3, 640) cap.set(4, 480) while True: ret, frame = cap.read() ### modify frame here ### data = cv2.imencode('.jpg', frame)[1].tobytes() output.write(data) | web streaming |
| pi camera | # Web streaming example # Source code from the official PiCamera package # http://picamera.readthedocs.io/en/latest/recipes2.html#web-streaming import io import picamera import logging import socketserver from threading import Condition from http import server import subprocess import socket PAGE="""\
Raspberry Pi - Surveillance Camera

""" def get_ip_address(): ip_address = ''; s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8",80)) ip_address = s.getsockname()[0] s.close() return ip_address
class StreamingOutput(object): def __init__(self): self.frame = None self.buffer = io.BytesIO() self.condition = Condition() def write(self, buf): if buf.startswith(b'\xff\xd8'): # New frame, copy the existing buffer's content and notify all # clients it's available self.buffer.truncate() with self.condition: self.frame = self.buffer.getvalue() self.condition.notify_all() self.buffer.seek(0) return self.buffer.write(buf) | class StreamingHandler(server.BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(301) self.send_header('Location', '/index.html') self.end_headers() elif self.path == '/index.html': content = PAGE.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) elif self.path == '/stream.mjpg': self.send_response(200) self.send_header('Age', 0) self.send_header('Cache-Control', 'no-cache, private') self.send_header('Pragma', 'no-cache') self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME') self.end_headers() try: while True: with output.condition: output.condition.wait() frame = output.frame self.wfile.write(b'--FRAME\r\n') self.send_header('Content-Type', 'image/jpeg') self.send_header('Content-Length', len(frame)) self.end_headers() self.wfile.write(frame) self.wfile.write(b'\r\n') except Exception as e: logging.warning( 'Removed streaming client %s: %s', self.client_address, str(e)) else: self.send_error(404) self.end_headers() class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer): allow_reuse_address = True daemon_threads = True with picamera.PiCamera(resolution='640x480', framerate=24) as camera: output = StreamingOutput() #Uncomment the next line to change your Pi's Camera rotation (in degrees) #camera.rotation = 90 camera.start_recording(output, format='mjpeg') try: #myIP = subprocess.getoutput('hostname -I') #hostname = socket.gethostname() myIP = get_ip_address()#socket.gethostbyname(hostname) address = (myIP, 8000) print('http://'+myIP+':8000') #address = ('192.168.1.13', 8000) server = StreamingServer(address, StreamingHandler) server.serve_forever() finally: camera.stop_recording() | web streaming |
| | | | |
| | | | |
| | | | |
| HTML | # HTML PARSER # pip install beautifulsoup4 # requests
import requests from bs4 import BeautifulSoup
req = requests.get('https://en.wikipedia.org/wiki/Python') parser = BeautifulSoup(req.text, 'html.parser')
# Parse the title title = parser.title.string
# Parse the first paragraph first_paragraph = parser.p.string
# Parse all the links links = parser.find_all('a')
| # Find Element by Tagname title = parser.find('a')
# Find Element by Class elem = parser.find(class_='mw-headline')
# Find Element by ID elem = parser.find(id='firstHeading')
# Find Element by Tag and Attribute elem = parser.find('a', attrs={'class': 'mw-jump-link'})
# Find Element by CSS Selector elem = parser.select_one('a.mw-jump-link')
# Fetch Text elem = parser.find('a').text
# Fetch Whole HTML Text elem = parser.text
# Fetch Attribute elem = parser.find('a').attrs['href'] | content |
tblTagWestPV id/pw=630448/630448 http://10.216.199.1/anyglass/fs_Assets_Main.gdfx http://10.52.60.5:8081/anyglass/tpri_Assets_Main.gdfx 誤差來源: 1.prediction:預測之uvi和cloud 2.location:高雄 or 鳳山 3.model:模型適用性 4.decay:設備老舊 5.level:發電量少;誤差較大 | http://app.pibear.net/tblTagWestPV2CSV | df1 = pd.read_csv('./data_pv.csv', index_col = False) def getDate(row): strDT = str(row['time']) return strDT.split(' ')[0] df1['fldDate'] = df1.apply(lambda row:getDate(row), axis=1) # Use GroupBy() to compute the sum df2 = df1.groupby('fldDate').sum() df2.to_csv('pv_sum.csv') | sum(power) groupby(Date) |
date 1101204 time 020757 | def mergeDateTime(row): nDate = int(row['fldDate'])#1101204 nTime = int(row['fldTime'])#020757 fldTime = '' try: nYear = nDate//10000#110 nMon = (nDate - nYear*10000)//100#12 nDay = nDate - nYear*10000 - nMon*100 nHour = nTime//10000#2 nMin = (nTime - nHour*10000)//100#7 nSec = nTime - nHour*10000 - nMin*100 fldTime = str(nYear+1911) + '/' + str(nMon) + '/' + str(nDay) + ' ' + str(nHour) + ':' + str(nMin) + ':' + str(nSec) except: pass return fldTime | df1 = pd.read_sql(strSql, conn_mysql) df1['time'] = df1.apply(lambda row:mergeDateTime(row), axis=1) df1['time']= pd.to_datetime(df1['time']) | DateTime 2021/12/4 2:7:57 |