Source code for Tribler.Core.Modules.restapi.events_endpoint

from twisted.web import server, resource

from Tribler.Core.Modules.restapi.util import convert_db_channel_to_json, convert_search_torrent_to_json, \
    fix_unicode_dict
from Tribler.Core.simpledefs import (NTFY_CHANNELCAST, SIGNAL_CHANNEL, SIGNAL_ON_SEARCH_RESULTS, SIGNAL_TORRENT,
                                     NTFY_UPGRADER, NTFY_STARTED, NTFY_WATCH_FOLDER_CORRUPT_TORRENT, NTFY_INSERT,
                                     NTFY_NEW_VERSION, NTFY_FINISHED, NTFY_TRIBLER, NTFY_UPGRADER_TICK, NTFY_CHANNEL,
                                     NTFY_DISCOVERED, NTFY_TORRENT, NTFY_ERROR, NTFY_DELETE, NTFY_MARKET_ON_ASK,
                                     NTFY_UPDATE, NTFY_MARKET_ON_BID, NTFY_MARKET_ON_TRANSACTION_COMPLETE,
                                     NTFY_MARKET_ON_ASK_TIMEOUT, NTFY_MARKET_ON_BID_TIMEOUT,
                                     NTFY_MARKET_ON_PAYMENT_RECEIVED, NTFY_MARKET_ON_PAYMENT_SENT,
                                     SIGNAL_RESOURCE_CHECK, SIGNAL_LOW_SPACE, NTFY_CREDIT_MINING)
import Tribler.Core.Utilities.json_util as json
from Tribler.Core.version import version_id


[docs]class EventsEndpoint(resource.Resource): """ Important events in Tribler are returned over the events endpoint. This connection is held open. Each event is pushed over this endpoint in the form of a JSON dictionary. Each JSON dictionary contains a type field that indicates the type of the event. Individual events are separated by a newline character (\n). Currently, the following events are implemented: - events_start: An indication that the event socket is opened and that the server is ready to push events. This includes information about whether Tribler has started already or not and the version of Tribler used. - search_result_channel: This event dictionary contains a search result with a channel that has been found. - search_result_torrent: This event dictionary contains a search result with a torrent that has been found. - upgrader_started: An indication that the Tribler upgrader has started. - upgrader_finished: An indication that the Tribler upgrader has finished. - upgrader_tick: An indication that the state of the upgrader has changed. The dictionary contains a human-readable string with the new state. - watch_folder_corrupt_torrent: This event is emitted when a corrupt .torrent file in the watch folder is found. The dictionary contains the name of the corrupt torrent file. - new_version_available: This event is emitted when a new version of Tribler is available. - tribler_started: An indicator that Tribler has completed the startup procedure and is ready to use. - channel_discovered: An indicator that Tribler has discovered a new channel. The event contains the name, description and dispersy community id of the discovered channel. - torrent_discovered: An indicator that Tribler has discovered a new torrent. The event contains the infohash, name, list of trackers, list of files with name and size, and the dispersy community id of the discovered torrent. - torrent_removed_from_channel: An indicator that a torrent has been removed from a channel. The event contains the infohash and the dispersy id of the channel which contained the removed torrent. - torrent_finished: A specific torrent has finished downloading. The event includes the infohash and name of the torrent that has finished downloading. - torrent_error: An error has occurred during the download process of a specific torrent. The event includes the infohash and a readable string of the error message. - tribler_exception: An exception has occurred in Tribler. The event includes a readable string of the error. - market_ask: Tribler learned about a new ask in the market. The event includes information about the ask. - market_bid: Tribler learned about a new bid in the market. The event includes information about the bid. - market_ask_timeout: An ask has expired. The event includes information about the ask. - market_bid_timeout: An bid has expired. The event includes information about the bid. - market_transaction_complete: A transaction has been completed in the market. The event contains the transaction that was completed. - market_payment_received: We received a payment in the market. The events contains the payment information. - market_payment_sent: We sent a payment in the market. The events contains the payment information. - market_iom_input_required: The Internet-of-Money modules requires user input (like a password or challenge response). """ def __init__(self, session): resource.Resource.__init__(self) self.session = session self.events_requests = [] self.infohashes_sent = set() self.channel_cids_sent = set() self.session.add_observer(self.on_search_results_channels, SIGNAL_CHANNEL, [SIGNAL_ON_SEARCH_RESULTS]) self.session.add_observer(self.on_search_results_torrents, SIGNAL_TORRENT, [SIGNAL_ON_SEARCH_RESULTS]) self.session.add_observer(self.on_upgrader_started, NTFY_UPGRADER, [NTFY_STARTED]) self.session.add_observer(self.on_upgrader_finished, NTFY_UPGRADER, [NTFY_FINISHED]) self.session.add_observer(self.on_upgrader_tick, NTFY_UPGRADER_TICK, [NTFY_STARTED]) self.session.add_observer(self.on_watch_folder_corrupt_torrent, NTFY_WATCH_FOLDER_CORRUPT_TORRENT, [NTFY_INSERT]) self.session.add_observer(self.on_new_version_available, NTFY_NEW_VERSION, [NTFY_INSERT]) self.session.add_observer(self.on_tribler_started, NTFY_TRIBLER, [NTFY_STARTED]) self.session.add_observer(self.on_channel_discovered, NTFY_CHANNEL, [NTFY_DISCOVERED]) self.session.add_observer(self.on_torrent_discovered, NTFY_TORRENT, [NTFY_DISCOVERED]) self.session.add_observer(self.on_torrent_removed_from_channel, NTFY_TORRENT, [NTFY_DELETE]) self.session.add_observer(self.on_torrent_finished, NTFY_TORRENT, [NTFY_FINISHED]) self.session.add_observer(self.on_torrent_error, NTFY_TORRENT, [NTFY_ERROR]) self.session.add_observer(self.on_market_ask, NTFY_MARKET_ON_ASK, [NTFY_UPDATE]) self.session.add_observer(self.on_market_bid, NTFY_MARKET_ON_BID, [NTFY_UPDATE]) self.session.add_observer(self.on_market_ask_timeout, NTFY_MARKET_ON_ASK_TIMEOUT, [NTFY_UPDATE]) self.session.add_observer(self.on_market_bid_timeout, NTFY_MARKET_ON_BID_TIMEOUT, [NTFY_UPDATE]) self.session.add_observer(self.on_market_transaction_complete, NTFY_MARKET_ON_TRANSACTION_COMPLETE, [NTFY_UPDATE]) self.session.add_observer(self.on_market_payment_received, NTFY_MARKET_ON_PAYMENT_RECEIVED, [NTFY_UPDATE]) self.session.add_observer(self.on_market_payment_sent, NTFY_MARKET_ON_PAYMENT_SENT, [NTFY_UPDATE]) self.session.add_observer(self.on_resource_event, SIGNAL_RESOURCE_CHECK, [SIGNAL_LOW_SPACE]) self.session.add_observer(self.on_credit_minig_error, NTFY_CREDIT_MINING, [NTFY_ERROR]) def write_data(self, message): """ Write data over the event socket if it's open. """ try: message_str = json.dumps(message) except UnicodeDecodeError: # The message contains invalid characters; fix them message_str = json.dumps(fix_unicode_dict(message)) if len(self.events_requests) == 0: return else: [request.write(message_str + '\n') for request in self.events_requests] def start_new_query(self): self.infohashes_sent = set() self.channel_cids_sent = set() def on_search_results_channels(self, subject, changetype, objectID, results): """ Returns the channel search results over the events endpoint. """ query = ' '.join(results['keywords']) for channel in results['result_list']: channel_json = convert_db_channel_to_json(channel, include_rel_score=True) if self.session.config.get_family_filter_enabled() and \ self.session.lm.category.xxx_filter.isXXX(channel_json['name']): continue if channel_json['dispersy_cid'] not in self.channel_cids_sent: self.write_data({"type": "search_result_channel", "event": {"query": query, "result": channel_json}}) self.channel_cids_sent.add(channel_json['dispersy_cid']) def on_search_results_torrents(self, subject, changetype, objectID, results): """ Returns the torrent search results over the events endpoint. """ query = ' '.join(results['keywords']) for torrent in results['result_list']: torrent_json = convert_search_torrent_to_json(torrent) torrent_name = torrent_json['name'] torrent_json['relevance_score'] = torrent_json['relevance_score'] if 'relevance_score' in torrent_json \ else self.session.lm.torrent_db.relevance_score_remote_torrent(torrent_name) if self.session.config.get_family_filter_enabled() and torrent_json['category'] == 'xxx': continue if 'infohash' in torrent_json and torrent_json['infohash'] not in self.infohashes_sent: self.write_data({"type": "search_result_torrent", "event": {"query": query, "result": torrent_json}}) self.infohashes_sent.add(torrent_json['infohash']) def on_upgrader_started(self, subject, changetype, objectID, *args): self.write_data({"type": "upgrader_started"}) def on_upgrader_finished(self, subject, changetype, objectID, *args): self.write_data({"type": "upgrader_finished"}) def on_upgrader_tick(self, subject, changetype, objectID, *args): self.write_data({"type": "upgrader_tick", "event": {"text": args[0]}}) def on_watch_folder_corrupt_torrent(self, subject, changetype, objectID, *args): self.write_data({"type": "watch_folder_corrupt_torrent", "event": {"name": args[0]}}) def on_new_version_available(self, subject, changetype, objectID, *args): self.write_data({"type": "new_version_available", "event": {"version": args[0]}}) def on_tribler_started(self, subject, changetype, objectID, *args): self.write_data({"type": "tribler_started"}) def on_channel_discovered(self, subject, changetype, objectID, *args): self.write_data({"type": "channel_discovered", "event": args[0]}) def on_torrent_discovered(self, subject, changetype, objectID, *args): self.write_data({"type": "torrent_discovered", "event": args[0]}) def on_torrent_removed_from_channel(self, subject, changetype, objectID, *args): self.write_data({"type": "torrent_removed_from_channel", "event": args[0]}) def on_torrent_finished(self, subject, changetype, objectID, *args): self.write_data({"type": "torrent_finished", "event": {"infohash": objectID.encode('hex'), "name": args[0]}}) def on_torrent_error(self, subject, changetype, objectID, *args): self.write_data({"type": "torrent_error", "event": {"infohash": objectID.encode('hex'), "error": args[0]}}) def on_tribler_exception(self, exception_text): self.write_data({"type": "tribler_exception", "event": {"text": exception_text}}) def on_market_ask(self, subject, changetype, objectID, *args): self.write_data({"type": "market_ask", "event": args[0]}) def on_market_bid(self, subject, changetype, objectID, *args): self.write_data({"type": "market_bid", "event": args[0]}) def on_market_ask_timeout(self, subject, changetype, objectID, *args): self.write_data({"type": "market_ask_timeout", "event": args[0]}) def on_market_bid_timeout(self, subject, changetype, objectID, *args): self.write_data({"type": "market_bid_timeout", "event": args[0]}) def on_market_transaction_complete(self, subject, changetype, objectID, *args): self.write_data({"type": "market_transaction_complete", "event": args[0]}) def on_market_payment_received(self, subject, changetype, objectID, *args): self.write_data({"type": "market_payment_received", "event": args[0]}) def on_market_payment_sent(self, subject, changetype, objectID, *args): self.write_data({"type": "market_payment_sent", "event": args[0]}) def on_resource_event(self, subject, changetype, objectID, *args): self.write_data({"type": changetype, "event": args[0]}) def on_credit_minig_error(self, subject, changetype, ojbectID, *args): self.write_data({"type": "credit_mining_error", "event": args[0]})
[docs] def render_GET(self, request): """ .. http:get:: /events A GET request to this endpoint will open the event connection. **Example request**: .. sourcecode:: none curl -X GET http://localhost:8085/events """ def on_request_finished(_): self.events_requests.remove(request) self.events_requests.append(request) request.notifyFinish().addCallbacks(on_request_finished, on_request_finished) request.write(json.dumps({"type": "events_start", "event": { "tribler_started": self.session.lm.initComplete, "version": version_id}}) + '\n') return server.NOT_DONE_YET