Sunday 6 March 2022

How to convert video to gif using python opencv

 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import cv2
from sklearn.metrics import mean_squared_error
from math import sqrt
import numpy as np
import traceback as tb
import images_to_gif as ig
from PIL import Image

cap = cv2.VideoCapture('..\\test\\videoplayback.mp4')
# Check if camera opened successfully
if (cap.isOpened()== False): 
  print("Error opening video  file")
farmes_list = list()
while(cap.isOpened()):
	ret, frame = cap.read()
	ret, frame = cap.read()
	if ret == True:
		# Display the resulting frame
		cv2.imshow('Frame', frame)
		farmes_list.append(frame)
	else:
		break
	# Press Q on keyboard to  exit
	if cv2.waitKey(25) & 0xFF == ord('q'):
		break

print(f'length of the frame list is= {len(farmes_list)}')
i = 0
new_frame = list()
for img in farmes_list:
	try:
		frame = img
		# Open image in bwDir - The searched image
		searchedImageBw = np.array(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))
		# Open image to be compared
		inx = i
		if inx != len(farmes_list):
			cmpImage = np.array(cv2.cvtColor(farmes_list[inx+1], cv2.COLOR_BGR2GRAY))
			rms = sqrt(mean_squared_error(searchedImageBw, cmpImage))
			print(f'rms= {rms}')
			if rms>3:
				#farmes_list.remove(frame)
				new_frame.append(frame)


	except Exception as e:
		print(e)
		tb.print_exc()
		pass
	i = i+1
   

print(f'length of the frame list is= {len(new_frame)}')

pil_frame = [ Image.fromarray(img) for img in new_frame]
bytesio_object = ig.frame_gif(pil_frame)
ig.save(bytesio_object, path = "videotogif.gif")
cap.release()
cv2.destroyAllWindows()

Sunday 23 January 2022

Fire off function without waiting for answer (Python)

Here is sample code for thread based method invocation additionally desired threading.stack_size can be added to boost the performance. Also its important to invoke Garbage collector if the number of threaded invocation is greater in number.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import threading
import requests
import gc
#The stack size set by threading.stack_size is the amount of memory to allocate for the call stack in threads.
threading.stack_size(524288)

def alpha_gun(url, json, headers):
    #r=requests.post(url, data=json, headers=headers)
    r=requests.get(url)
    print(r.text)


def trigger(url, json, headers):
    threading.Thread(target=alpha_gun, args=(url, json, headers)).start()


url = "https://raw.githubusercontent.com/jyotiprakash-work/Live_Video_steaming/master/README.md"
payload="{}"
headers = {
  'Content-Type': 'application/json'
}

for i in range(10):
    print(i)
    #for condition 
    if i==5:
        trigger(url=url, json =payload, headers=headers)
        gc.collect()
        print('invoked')
    

Saturday 22 January 2022

Face Recognition With python and face-net model

 Here i have added face recognition code in a flask app. The Face app contains registration, training model and recognition where each having a separate end point.




To access the endpoints please follow cUrls-

For Respiration- 
curl -X POST \
  http://127.0.0.1:5000/upload \
  -H 'cache-control: no-cache' \
  -H 'content-type: multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW' \
  -H 'postman-token: 2bf86477-f928-9ff2-d677-9b42a802e381' \
  -F file=@WIN_20220123_00_56_49_Pro.jpg \
  -F id=jp

For training-
curl -X POST \
  http://127.0.0.1:5000/train \
  -H 'cache-control: no-cache' \
  -H 'postman-token: 374826ca-15b5-7508-5052-f3ec43b1ca07'

For Recognition-
curl -X POST \
  http://127.0.0.1:5000/recognize \
  -H 'cache-control: no-cache' \
  -H 'content-type: multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW' \
  -H 'postman-token: 4183a550-5278-a7fe-c618-f100d164c7f8' \
  -F file=@WIN_20220123_00_57_28_Pro.jpg 

Sunday 26 September 2021

Video Steaming python Websockets


#Server.py


import http.server as http
import asyncio
import websockets
import socketserver
import multiprocessing
import cv2
import sys
from datetime import datetime as dt

# Keep track of our generated processes
PROCESSES = []

def log(message):
    print("[LOG] " + str(dt.now()) + " - " + message)
#Feed function, here processing code feed can be placed
def camera(man):
    log("Starting camera")
    vc = cv2.VideoCapture(0)

    if vc.isOpened():
        r, f = vc.read()
    else:
        r = False

    while r:
        cv2.waitKey(20)
        r, f = vc.read()
        f = cv2.resize(f, (640, 480))
        encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 65]
        man[0] = cv2.imencode('.jpg', f, encode_param)[1]

# HTTP server handler
def server():
    server_address = ('0.0.0.0', 8000)
    if sys.version_info[1] < 7:
        class ThreadingHTTPServer(socketserver.ThreadingMixIn, http.HTTPServer):
            pass
        httpd = ThreadingHTTPServer(server_address, http.SimpleHTTPRequestHandler)
    else:
        httpd = http.ThreadingHTTPServer(server_address, http.SimpleHTTPRequestHandler)
    log("Server started")
    httpd.serve_forever()

#Socket handler
def socket(man):
    # Will handle our websocket connections
    async def handler(websocket, path):
        log("Socket opened")
        try:
            while True:
                await asyncio.sleep(0.033) # 30 fps
                await websocket.send(man[0].tobytes())
        except websockets.exceptions.ConnectionClosed:
            log("Socket closed")

    log("Starting socket handler")
    # Create the awaitable object
    start_server = websockets.serve(ws_handler=handler, host='0.0.0.0', port=8585)
    # Start the server, add it to the event loop
    asyncio.get_event_loop().run_until_complete(start_server)
    # Registered our websocket connection handler, thus run event loop forever
    asyncio.get_event_loop().run_forever()


def main():
    manager = multiprocessing.Manager()
    lst = manager.list()
    lst.append(None)
    # Host the page, invoking server
    http_server = multiprocessing.Process(target=server)
    # Set up our websocket handler
    socket_handler = multiprocessing.Process(target=socket, args=(lst,))
    # Set up our camera-feed
    camera_handler = multiprocessing.Process(target=camera, args=(lst,))
    # Add 'em to our list
    PROCESSES.append(camera_handler)
    PROCESSES.append(http_server)
    PROCESSES.append(socket_handler)
    for p in PROCESSES:
        p.start()
    # Wait forever
    while True:
        pass

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        for p in PROCESSES:
            p.terminate()

#scripts.js

openSocket = () => {
    let uri = "ws://" + window.location.hostname + ":8585";
    socket = new WebSocket(uri);
    let msg = document.getElementById("msg");
    socket.addEventListener('open', (e) => {
        document.getElementById("status").innerHTML = "Opened";
    });
    socket.addEventListener('message', (e) => {
        let ctx = msg.getContext("2d");
        let image = new Image();
        image.src = URL.createObjectURL(e.data);
        image.addEventListener("load", (e) => {
            ctx.drawImage(image, 0, 0, msg.width, msg.height);
        });
    });
}
#index.html
<!DOCTYPE html>
<html>
    <head>
        <title>Video Steam</title>
        <link rel="stylesheet" type="text/css" href="style.css">
        <script type="text/javascript" src="script.js" ></script>
    </head>
    <body onload="openSocket()">
        <div id="status">
            Connection failed. May socket is broken.
        </div>
        <div style="text-align: center">
            <canvas id="msg" width="460" height="420" style="display:inline-block" />
        </div>
        
    </body>
</html>
Here is git link for full project.

Saturday 20 June 2020

One way Live Video Steaming(Audio and Video) with Python Flask


Here in this code Both video and audio steaming is implemented. 
App.py holds the flask code for steaming and cemara.py contains
video capture method. download full code here

#APP.py

from flask import Flask, Response,render_template
import pyaudio
from camera import VideoCamera
import cv2

app = Flask(__name__)


FORMAT = pyaudio.paInt16
CHANNELS = 2
RATE = 44100
CHUNK = 1024
RECORD_SECONDS = 5


audio1 = pyaudio.PyAudio()



def genHeader(sampleRate, bitsPerSample, channels):
    datasize = 2000*10**6
    o = bytes("RIFF",'ascii')                                               # (4byte) Marks file as RIFF
    o += (datasize + 36).to_bytes(4,'little')                               # (4byte) File size in bytes excluding this and RIFF marker
    o += bytes("WAVE",'ascii')                                              # (4byte) File type
    o += bytes("fmt ",'ascii')                                              # (4byte) Format Chunk Marker
    o += (16).to_bytes(4,'little')                                          # (4byte) Length of above format data
    o += (1).to_bytes(2,'little')                                           # (2byte) Format type (1 - PCM)
    o += (channels).to_bytes(2,'little')                                    # (2byte)
    o += (sampleRate).to_bytes(4,'little')                                  # (4byte)
    o += (sampleRate * channels * bitsPerSample // 8).to_bytes(4,'little')  # (4byte)
    o += (channels * bitsPerSample // 8).to_bytes(2,'little')               # (2byte)
    o += (bitsPerSample).to_bytes(2,'little')                               # (2byte)
    o += bytes("data",'ascii')                                              # (4byte) Data Chunk Marker
    o += (datasize).to_bytes(4,'little')                                    # (4byte) Data size in bytes
    return o

@app.route('/audio')
def audio():
    # start Recording
    def sound():

        CHUNK = 1024
        sampleRate = 44100
        bitsPerSample = 16
        channels = 2
        wav_header = genHeader(sampleRate, bitsPerSample, channels)

        stream = audio1.open(format=FORMAT, channels=CHANNELS,
                        rate=RATE, input=True,input_device_index=1,
                        frames_per_buffer=CHUNK)
        print("recording...")
        #frames = []
        first_run = True
        while True:
           if first_run:
               data = wav_header + stream.read(CHUNK)
               first_run = False
           else:
               data = stream.read(CHUNK)
           yield(data)

    return Response(sound())

def gen(camera):
    while True:
        try:
            frame = camera.get_frame()
            yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
        except:
            frame = cv2.imread('loading.jpg')
            ret, jpeg = cv2.imencode('.jpg', frame)
            frame = jpeg.tobytes()
            yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')

@app.route('/video_feed')
def video_feed():
    return Response(gen(VideoCamera()),
                    mimetype='multipart/x-mixed-replace; boundary=frame')

@app.route('/')
def index():
    """Video streaming home page."""
    return render_template('index.html')


if __name__ == "__main__":
    app.run(host='0.0.0.0', debug=True, threaded=True,port=5000)
camera.py
import cv2
import imutils
import numpy as np
import time

ds_factor=0.6

#net = cv2.dnn.readNetFromCaffe('deploy.prototxt.txt', 'res10_300x300_ssd_iter_140000.caffemodel')

class VideoCamera(object):
    def __init__(self):
        self.video = cv2.VideoCapture(0)
    
    def __del__(self):
        self.video.release()
    
    def get_frame(self):
        success, image = self.video.read()
        #time.sleep(2.0)
        frame = image
        frame = imutils.resize(frame, width=400)
        (h, w) = frame.shape[:2]
        ret, jpeg = cv2.imencode('.jpg', frame)
        return jpeg.tobytes()

index.html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Document</title>
    <style type="text/css">
    img {
  	display: block;
  	margin-left: auto;
  	margin-right: auto;
	width: 60%;
	height:60%;
	}
	div{
	display: block;
  	margin-left: auto;
  	margin-right: auto;
	width: 60%;
	height:60%; 
	}
    </style>
</head>
<body>
      <h1 align="center">Video Streaming Demonistration</h1>
    <img id="bg" class="center" src="{{ url_for('video_feed') }}">
    <div class="center">
    <audio style="width: 60%;" controls>
        <source src="{{ url_for('audio') }}" type="audio/x-wav;codec=pcm">
        Your browser does not support the audio element.
    </audio>
    </div>
</body>
</html>

Output


Friday 29 May 2020

Remove Duplicates From a Python List




list_data = [1,1,2,2,2,3,4,5]

list_data = list(dict.fromkeys(list_data))
print(list_data)

list_data = ['a','a','b','c']
list_data = list(dict.fromkeys(list_data))
print(list_data)


Out Put

Saturday 6 July 2019

Video to Image Converter using OpenCV Python

import cv2
import os

def video_to_frames(video, path_output_dir):
    # extract frames from a video and save to directory as 'x.png' where 
    # x is the frame index
    vidcap = cv2.VideoCapture(video)
    count = 0
    while vidcap.isOpened():
        success, image = vidcap.read()
        if success:
            cv2.imwrite(os.path.join(path_output_dir, '%d.png') % count, image)
            count += 1
        else:
            break
    cv2.destroyAllWindows()
    vidcap.release()

video_to_frames('VID_20190706_222120.mp4', './out')

OutPut Images