Source code for watson_developer_cloud.websocket.recognize_listener
# coding: utf-8
# Copyright 2018 IBM All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import websocket
import json
import time
try:
import thread
except ImportError:
import _thread as thread
ONE_KB = 1024
TIMEOUT_PREFIX = "No speech detected for"
TEN_MILLISECONDS = 0.01
STATE = "state"
ACTION = "action"
START = "start"
STOP = "stop"
[docs]class RecognizeListener(object):
def __init__(self, audio_source, options, callback, url, headers, http_proxy_host=None, http_proxy_port=None):
self.audio_source = audio_source
self.options = options
self.callback = callback
self.url = url
self.headers = headers
self.http_proxy_host = http_proxy_host
self.http_proxy_port = http_proxy_port
self.isListening = False
# websocket.enableTrace(True)
self.ws_client = websocket.WebSocketApp(
self.url,
header=self.headers,
on_open=self.on_open,
on_data=self.on_data,
on_error=self.on_error,
on_close=self.on_close,
)
self.ws_client.run_forever(http_proxy_host=self.http_proxy_host, http_proxy_port=self.http_proxy_port)
[docs] @classmethod
def build_start_message(cls, options):
options[ACTION] = START
return options
[docs] @classmethod
def build_closing_message(cls):
return json.dumps({ACTION: STOP}).encode('utf8')
[docs] def send(self, data, opcode=websocket.ABNF.OPCODE_TEXT):
"""
Send message to server.
data: message to send. If you set opcode to OPCODE_TEXT,
data must be utf-8 string or unicode.
opcode: operation code of data. default is OPCODE_TEXT.
"""
self.ws_client.send(data, opcode)
[docs] def send_audio(self, ws):
"""
Stream audio to server
:param ws: Websocket client
"""
def run(*args):
"""Background process to stream the data"""
if not self.audio_source.is_buffer:
while True:
chunk = self.audio_source.input.read(ONE_KB)
if not chunk:
break
self.ws_client.send(chunk, websocket.ABNF.OPCODE_BINARY)
time.sleep(TEN_MILLISECONDS)
self.audio_source.input.close()
else:
while True:
try:
if not self.audio_source.input.empty():
chunk = self.audio_source.input.get()
self.ws_client.send(chunk, websocket.ABNF.OPCODE_BINARY)
time.sleep(TEN_MILLISECONDS)
if self.audio_source.input.empty():
if self.audio_source.is_recording:
time.sleep(TEN_MILLISECONDS)
else:
break
except Exception:
if self.audio_source.is_recording:
time.sleep(TEN_MILLISECONDS)
else:
break
time.sleep(TEN_MILLISECONDS)
self.ws_client.send(self.build_closing_message(), websocket.ABNF.OPCODE_TEXT)
thread.start_new_thread(run, ())
[docs] def on_open(self, ws):
"""
Callback executed when a connection is opened to the server.
Handles streaming of audio to the server.
:param ws: Websocket client
"""
self.callback.on_connected()
# Send initialization message
init_data = self.build_start_message(self.options)
self.ws_client.send(json.dumps(init_data).encode('utf8'), websocket.ABNF.OPCODE_TEXT)
[docs] def on_data(self, ws, message, message_type, fin):
"""
Callback executed when message is received from the server.
:param ws: Websocket client
:param message: utf-8 string which we get from the server.
:param message_type: Message type which is either ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY
:param fin: continue flag. If 0, the data continues.
"""
try:
json_object = json.loads(message)
except Exception:
self.on_error(ws, 'Unable to parse received message.')
if 'error' in json_object:
# Only call on_error() if a real error occurred. The STT service sends
# {"error" : "No speech detected for 5s"} for valid timeouts, configured by
# options.inactivity_timeout
error = json_object['error']
if error.startswith(TIMEOUT_PREFIX):
self.callback.on_inactivity_timeout(error)
else:
self.on_error(ws, error)
# if uninitialized, receive the initialization response from the server
elif 'state' in json_object:
if not self.isListening:
self.isListening = True
self.callback.on_listening()
self.send_audio(ws)
else:
# close the connection
self.callback.on_close()
ws.close()
# if in streaming
elif 'results' in json_object or 'speaker_labels' in json_object:
hypothesis = ''
if 'results' in json_object:
hypothesis = json_object['results'][0]['alternatives'][0][
'transcript']
b_final = (json_object['results'][0]['final'] is True)
transcripts = self.extract_transcripts(
json_object['results'][0]['alternatives'])
if b_final:
self.callback.on_transcription(transcripts)
self.callback.on_hypothesis(hypothesis)
self.callback.on_data(json_object)
[docs] def on_error(self, ws, error):
"""
Callback executed when an error is received
:param ws: Websocket client
:param error: Exception object
"""
self.callback.on_error(error)
[docs] def on_close(self, ws):
"""
Callback executed when websocket connection is closed
:param ws: Websocket client
"""
self.callback.on_close()