Nonblocking update listener
This commit is contained in:
@@ -1,10 +1,34 @@
|
||||
from flask.blueprints import Blueprint
|
||||
from flask.helpers import url_for
|
||||
from tornado.ioloop import IOLoop
|
||||
from tornado.web import RequestHandler, asynchronous
|
||||
from werkzeug.utils import redirect
|
||||
|
||||
api = Blueprint('api', __name__)
|
||||
api_docs = {}
|
||||
api_docs_missing = []
|
||||
api_nonblock = {}
|
||||
|
||||
|
||||
class NonBlockHandler(RequestHandler):
|
||||
stoppers = []
|
||||
|
||||
@asynchronous
|
||||
def get(self, route):
|
||||
start, stop = api_nonblock[route]
|
||||
self.stoppers.append(stop)
|
||||
|
||||
start(self.on_new_messages, last_id = self.get_argument("last_id", None))
|
||||
|
||||
def on_new_messages(self, response):
|
||||
if self.request.connection.stream.closed():
|
||||
return
|
||||
self.finish(response)
|
||||
|
||||
def on_connection_close(self):
|
||||
for stop in self.stoppers:
|
||||
stop(self.on_new_messages)
|
||||
|
||||
|
||||
def addApiView(route, func, static = False, docs = None, **kwargs):
|
||||
api.add_url_rule(route + ('' if static else '/'), endpoint = route.replace('.', '::') if route else 'index', view_func = func, **kwargs)
|
||||
@@ -13,6 +37,14 @@ def addApiView(route, func, static = False, docs = None, **kwargs):
|
||||
else:
|
||||
api_docs_missing.append(route)
|
||||
|
||||
def addNonBlockApiView(route, func_tuple, docs = None, **kwargs):
|
||||
api_nonblock[route] = func_tuple
|
||||
|
||||
if docs:
|
||||
api_docs[route[4:] if route[0:4] == 'api.' else route] = docs
|
||||
else:
|
||||
api_docs_missing.append(route)
|
||||
|
||||
""" Api view """
|
||||
def index():
|
||||
index_url = url_for('web.index')
|
||||
|
||||
@@ -114,7 +114,6 @@ class Core(Plugin):
|
||||
log.debug('Save to shutdown/restart')
|
||||
|
||||
try:
|
||||
Env.get('httpserver').stop()
|
||||
IOLoop.instance().stop()
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from couchpotato import get_session
|
||||
from couchpotato.api import addApiView
|
||||
from couchpotato.api import addApiView, addNonBlockApiView
|
||||
from couchpotato.core.event import addEvent
|
||||
from couchpotato.core.helpers.encoding import toUnicode
|
||||
from couchpotato.core.helpers.request import jsonified, getParam
|
||||
@@ -8,14 +8,19 @@ from couchpotato.core.logger import CPLog
|
||||
from couchpotato.core.notifications.base import Notification
|
||||
from couchpotato.core.settings.model import Notification as Notif
|
||||
from sqlalchemy.sql.expression import or_
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
log = CPLog(__name__)
|
||||
|
||||
|
||||
class CoreNotifier(Notification):
|
||||
|
||||
m_lock = threading.RLock()
|
||||
messages = []
|
||||
listeners = []
|
||||
|
||||
listen_to = [
|
||||
'movie.downloaded', 'movie.snatched',
|
||||
'updater.available', 'updater.updated',
|
||||
@@ -46,8 +51,17 @@ class CoreNotifier(Notification):
|
||||
}"""}
|
||||
})
|
||||
|
||||
addNonBlockApiView('notification.listener', (self.addListener, self.removeListener))
|
||||
addApiView('notification.listener', self.listener)
|
||||
|
||||
|
||||
def test():
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
addEvent('app.load', test)
|
||||
|
||||
|
||||
def markAsRead(self):
|
||||
ids = [x.strip() for x in getParam('ids').split(',')]
|
||||
|
||||
@@ -107,25 +121,79 @@ class CoreNotifier(Notification):
|
||||
ndict = n.to_dict()
|
||||
ndict['type'] = 'notification'
|
||||
ndict['time'] = time.time()
|
||||
self.messages.append(ndict)
|
||||
|
||||
self.frontend(type = listener, data = data)
|
||||
|
||||
#db.close()
|
||||
return True
|
||||
|
||||
def frontend(self, type = 'notification', data = {}):
|
||||
self.messages.append({
|
||||
|
||||
self.m_lock.acquire()
|
||||
message = {
|
||||
'id': str(uuid.uuid4()),
|
||||
'time': time.time(),
|
||||
'type': type,
|
||||
'data': data,
|
||||
})
|
||||
}
|
||||
self.messages.append(message)
|
||||
|
||||
while True and not self.shuttingDown():
|
||||
try:
|
||||
listener, last_id = self.listeners.pop()
|
||||
listener({
|
||||
'success': True,
|
||||
'result': [message],
|
||||
})
|
||||
except:
|
||||
break
|
||||
|
||||
self.m_lock.release()
|
||||
|
||||
self.cleanMessages()
|
||||
|
||||
def addListener(self, callback, last_id = None):
|
||||
|
||||
if last_id:
|
||||
messages = self.getMessages(last_id)
|
||||
if len(messages) > 0:
|
||||
return callback({
|
||||
'success': True,
|
||||
'result': messages,
|
||||
})
|
||||
|
||||
self.listeners.append((callback, last_id))
|
||||
|
||||
def removeListener(self, callback):
|
||||
for list_tuple in self.listeners:
|
||||
try:
|
||||
listener, last_id = list_tuple
|
||||
if listener == callback:
|
||||
self.listeners.remove(list_tuple)
|
||||
except:
|
||||
pass
|
||||
|
||||
def cleanMessages(self):
|
||||
|
||||
for message in self.messages:
|
||||
if message['time'] < (time.time() - 15):
|
||||
self.messages.remove(message)
|
||||
|
||||
def getMessages(self, last_id):
|
||||
self.m_lock.acquire()
|
||||
recent = []
|
||||
index = 0
|
||||
for i in xrange(len(self.messages)):
|
||||
index = len(self.messages) - i - 1
|
||||
if self.messages[index]["id"] == last_id: break
|
||||
recent = self.messages[index + 1:]
|
||||
|
||||
self.m_lock.release()
|
||||
return recent or []
|
||||
|
||||
def listener(self):
|
||||
|
||||
messages = []
|
||||
for message in self.messages:
|
||||
#delete message older then 15s
|
||||
if message['time'] > (time.time() - 15):
|
||||
messages.append(message)
|
||||
|
||||
# Get unread
|
||||
if getParam('init'):
|
||||
@@ -139,9 +207,6 @@ class CoreNotifier(Notification):
|
||||
ndict['type'] = 'notification'
|
||||
messages.append(ndict)
|
||||
|
||||
#db.close()
|
||||
|
||||
self.messages = []
|
||||
return jsonified({
|
||||
'success': True,
|
||||
'result': messages,
|
||||
|
||||
@@ -8,8 +8,7 @@ var NotificationBase = new Class({
|
||||
self.setOptions(options);
|
||||
|
||||
// Listener
|
||||
App.addEvent('load', self.startInterval.bind(self));
|
||||
App.addEvent('unload', self.stopTimer.bind(self));
|
||||
App.addEvent('unload', self.stopPoll.bind(self));
|
||||
App.addEvent('notification', self.notify.bind(self));
|
||||
|
||||
// Add test buttons to settings page
|
||||
@@ -30,7 +29,11 @@ var NotificationBase = new Class({
|
||||
'href': App.createUrl('notifications'),
|
||||
'text': 'Show older notifications'
|
||||
})); */
|
||||
})
|
||||
});
|
||||
|
||||
window.addEvent('load', function(){
|
||||
self.startInterval()
|
||||
});
|
||||
|
||||
},
|
||||
|
||||
@@ -85,35 +88,55 @@ var NotificationBase = new Class({
|
||||
|
||||
startInterval: function(){
|
||||
var self = this;
|
||||
|
||||
if(self.stopped) return;
|
||||
|
||||
self.request = Api.request('notification.listener', {
|
||||
'initialDelay': 100,
|
||||
'delay': 1500,
|
||||
Api.request('notification.listener', {
|
||||
'data': {'init':true},
|
||||
'onSuccess': self.processData.bind(self)
|
||||
})
|
||||
|
||||
self.request.startTimer()
|
||||
}).send()
|
||||
|
||||
},
|
||||
|
||||
startTimer: function(){
|
||||
if(this.request)
|
||||
this.request.startTimer()
|
||||
startPoll: function(){
|
||||
var self = this;
|
||||
|
||||
if(self.stopped || (self.request && self.request.isRunning()))
|
||||
return;
|
||||
|
||||
self.request = Api.request('nonblock/notification.listener', {
|
||||
'onSuccess': self.processData.bind(self),
|
||||
'data': {
|
||||
'last_id': self.last_id
|
||||
},
|
||||
'onFailure': function(){
|
||||
self.startPoll.delay(2000, self)
|
||||
}
|
||||
}).send()
|
||||
|
||||
},
|
||||
|
||||
stopTimer: function(){
|
||||
stopPoll: function(){
|
||||
if(this.request)
|
||||
this.request.stopTimer()
|
||||
this.request.cancel()
|
||||
this.stopped = true;
|
||||
},
|
||||
|
||||
processData: function(json){
|
||||
var self = this;
|
||||
|
||||
self.request.options.data = {}
|
||||
Array.each(json.result, function(result){
|
||||
App.fireEvent(result.type, result)
|
||||
})
|
||||
|
||||
// Process data
|
||||
if(json){
|
||||
Array.each(json.result, function(result){
|
||||
App.fireEvent(result.type, result)
|
||||
})
|
||||
|
||||
self.last_id = json.result.getLast().id
|
||||
}
|
||||
|
||||
// Restart poll
|
||||
self.startPoll()
|
||||
},
|
||||
|
||||
addTestButtons: function(){
|
||||
|
||||
@@ -127,9 +127,6 @@ class LibraryPlugin(Plugin):
|
||||
|
||||
library_dict = library.to_dict(self.default_dict)
|
||||
|
||||
fireEvent('notify.frontend', type = 'library.update.%s' % identifier, data = library_dict)
|
||||
|
||||
#db.close()
|
||||
return library_dict
|
||||
|
||||
def updateReleaseDate(self, identifier):
|
||||
|
||||
@@ -239,6 +239,7 @@ class MoviePlugin(Plugin):
|
||||
db = get_session()
|
||||
|
||||
for id in getParam('id').split(','):
|
||||
fireEvent('notify.frontend', type = 'movie.busy.%s' % id, data = True)
|
||||
movie = db.query(Movie).filter_by(id = id).first()
|
||||
|
||||
# Get current selected title
|
||||
|
||||
@@ -17,14 +17,16 @@ var Movie = new Class({
|
||||
self.parent(self, options);
|
||||
|
||||
App.addEvent('movie.update.'+data.id, self.update.bind(self));
|
||||
App.addEvent('searcher.started.'+data.id, self.searching.bind(self));
|
||||
App.addEvent('searcher.ended.'+data.id, self.searching.bind(self));
|
||||
App.addEvent('movie.busy.'+data.id, function(notification){
|
||||
if(notification.data)
|
||||
self.busy(true)
|
||||
});
|
||||
},
|
||||
|
||||
searching: function(notification){
|
||||
busy: function(set_busy){
|
||||
var self = this;
|
||||
|
||||
if(notification && notification.type.indexOf('ended') > -1){
|
||||
if(!set_busy){
|
||||
if(self.spinner){
|
||||
self.mask.fade('out');
|
||||
setTimeout(function(){
|
||||
@@ -72,8 +74,11 @@ var Movie = new Class({
|
||||
|
||||
self.data = notification.data;
|
||||
self.container.destroy();
|
||||
|
||||
self.profile = Quality.getProfile(self.data.profile_id) || {};
|
||||
self.create();
|
||||
|
||||
self.busy(false);
|
||||
},
|
||||
|
||||
create: function(){
|
||||
|
||||
@@ -23,7 +23,6 @@ class Env(object):
|
||||
_deamonize = False
|
||||
_desktop = None
|
||||
_session = None
|
||||
_httpserver = None
|
||||
|
||||
''' Data paths and directories '''
|
||||
_app_dir = ""
|
||||
|
||||
+11
-9
@@ -1,13 +1,13 @@
|
||||
from argparse import ArgumentParser
|
||||
from couchpotato import web
|
||||
from couchpotato.api import api
|
||||
from couchpotato.api import api, NonBlockHandler
|
||||
from couchpotato.core.event import fireEventAsync, fireEvent
|
||||
from couchpotato.core.helpers.variable import getDataDir, tryInt
|
||||
from logging import handlers
|
||||
from tornado import autoreload
|
||||
from tornado.httpserver import HTTPServer
|
||||
from tornado.ioloop import IOLoop
|
||||
from tornado.web import RequestHandler
|
||||
from tornado.web import RequestHandler, Application, FallbackHandler
|
||||
from tornado.wsgi import WSGIContainer
|
||||
from werkzeug.contrib.cache import FileSystemCache
|
||||
import locale
|
||||
@@ -227,20 +227,22 @@ def runCouchPotato(options, base_path, args, data_dir = None, log_dir = None, En
|
||||
# Go go go!
|
||||
web_container = WSGIContainer(app)
|
||||
web_container._log = _log
|
||||
http_server = HTTPServer(web_container, no_keep_alive = True)
|
||||
Env.set('httpserver', http_server)
|
||||
loop = IOLoop.instance()
|
||||
|
||||
application = Application([
|
||||
(r'%s/api/%s/nonblock/(.*)/' % (url_base, api_key), NonBlockHandler),
|
||||
(r'.*', FallbackHandler, dict(fallback = web_container)),
|
||||
],
|
||||
log_function = lambda x : None,
|
||||
debug = config['use_reloader']
|
||||
)
|
||||
|
||||
try_restart = True
|
||||
restart_tries = 5
|
||||
|
||||
while try_restart:
|
||||
try:
|
||||
http_server.listen(config['port'], config['host'])
|
||||
|
||||
if config['use_reloader']:
|
||||
autoreload.start(loop)
|
||||
|
||||
application.listen(config['port'], config['host'], no_keep_alive = True)
|
||||
loop.start()
|
||||
except Exception, e:
|
||||
try:
|
||||
|
||||
@@ -137,7 +137,6 @@ window.addEvent('domready', function(){
|
||||
var self = this;
|
||||
(e).preventDefault();
|
||||
|
||||
self.movie.searching();
|
||||
Api.request('movie.refresh', {
|
||||
'data': {
|
||||
'id': self.movie.get('id')
|
||||
|
||||
Reference in New Issue
Block a user