The P2P Framework Implementation (Python version)
This page walks through a Python implementation of the P2P framework library itself. I will assume that you are familiar with Python. I will also assume you are familiar with the general concepts of socket programming, though you may not necessarily have used the Python networking or threading libraries. The complete source code may be downloaded here: btpeer.py.
^ TOPInitializing a peer
Let us first examine how a peer node is initialized. As discussed
above, the overall operation of a node is managed by the Peer
class. The constructor of the class stores the canonical identifier
(name) of the peer, the port on which it listens for connections,
and the maximum size of the list of known peers that the node will
maintain (this can be set to 0 to allow an unlimited number of peers
in the list). The constructor also initializes several fields whose
use is described in the following subsections
- shutdown
, handlers
,
and router
. The following definition illustrates the
Python code for accomplishing these tasks:
def __init__( self, maxpeers, serverport, myid=None, serverhost = None ): self.debug = 0 self.maxpeers = int(maxpeers) self.serverport = int(serverport) # If not supplied, the host name/IP address will be determined # by attempting to connect to an Internet host like Google. if serverhost: self.serverhost = serverhost else: self.__initserverhost() # If not supplied, the peer id will be composed of the host address # and port number if myid: self.myid = myid else: self.myid = '%s:%d' % (self.serverhost, self.serverport) # list (dictionary/hash table) of known peers self.peers = {} # used to stop the main loop self.shutdown = False self.handlers = {} self.router = None # end constructor |
Every peer node performs operations common to traditional network client and server applications. First, let us walk through the server-related operations, as described in the previous section.
^ TOPThe main loop
The main loop of a peer begins by setting up a socket that listens for incoming connections from other peers. To do this, we must (1) create the socket object, (2) set its options, (3) bind it to a port on which to listen for connections, and (4) actually begin listening for connections. Here is a Python method that accomplishes this, returning the initialized socket:
def makeserversocket( self, port, backlog=5 ): s = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) s.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) s.bind( ( '', port ) ) s.listen( backlog ) return s
The first statement creates a socket that will communicate using the
IPv4 (AF_INET
) protocol with TCP
(SOCK_STREAM
). By setting the SO_REUSEADDR
option, the port number of the socket will be immediately reusable
after the socket is closed (otherwise the operating system may prevent
the port from being reused after the server exits, until a certain
amount of time has passed). Then the socket is bound to the specified
port and is set up to receive connections. The backlog
parameter indicates how many incoming connections should be queued
up. A value of 5 is conventionally used, although with a multithreaded
server (as we are building) the parameter's value is not very
significant.
Having created a server socket, the main loop of a peer loops continously, accepting connections. When an incoming connection is accepted, the server will have a new socket object used to send and receive data on the connection. The main loop then calls a separate method to handle communication with this connection in a new thread. A simple for of the main loop would thus look like this:
s = self.makeserversocket( self.serverport ) while 1: clientsock, clientaddr = s.accept() t = threading.Thread( target = self.__handlepeer, args = [ clientsock ] ) t.start()
In reality, we also need to handle any errors that may occur in the
process of accepting a connection, and we need to provide a mechanism
so that the loop may somehow be nicely terminated (for example, when
the user indicates that the program should exit). To do this, we set
up the server socket to time out every 2 seconds (an arbitrary choice)
and make the loop termination condition dependent on a boolean
(instance) variable, shutdown
. Also, we set up an
exception handler to allow the main loop to be stopped by the user
pressing the "Ctrl"+"Break" (or "Ctrl"+"c") keys. Here, then, is the
complete mainloop
method.
def mainloop( self ): s = self.makeserversocket( self.serverport ) s.settimeout(2) self.__debug( 'Server started: %s (%s:%d)' % ( self.myid, self.serverhost, self.serverport ) ) while not self.shutdown: try: self.__debug( 'Listening for connections...' ) clientsock, clientaddr = s.accept() clientsock.settimeout(None) t = threading.Thread( target = self.__handlepeer, args = [ clientsock ] ) t.start() except KeyboardInterrupt: self.shutdown = True continue except: if self.debug: traceback.print_exc() continue # end while loop self.__debug( 'Main loop exiting' ) s.close() # end mainloop method |
The debug method will output various messages to an appropriate
location - for example, the screen or a log
file. The myid
field is the identifier of the peer, and
the serverhost
field stores the peer's IP address (or
host name). These values are initialized by the constructor for the
peer object.
Handling a peer connection
The handlepeer
method takes a newly formed peer
connection, reads in a request from it, and dispatches the request
to an appropriate handler (function or method) for processing. The
particular handlers and types of requests will be specified by the
programmer using this framework to implement a particular
protocol. The handlepeer
method simply looks for the
appropriate handler for a message, if there is one registered with
the peer object, and calls it.
handlepeer
begins by encapsulating the socket
connection in a PeerConnection object, to allow easy
sending/receiving and encoding/decoding of P2P messages in the
system.
host, port = clientsock.getpeername() peerconn = BTPeerConnection( None, host, port, clientsock, debug=False )
Then, handlepeer
attempts to receive some data from
the connection and determine what to do with it:
msgtype, msgdata = peerconn.recvdata() if msgtype: msgtype = msgtype.upper() if msgtype not in self.handlers: self.__debug( 'Not handled: %s: %s' % (msgtype, msgdata) ) else: self.__debug( 'Handling peer msg: %s: %s' % (msgtype, msgdata) ) self.handlers[ msgtype ]( peerconn, msgdata )
The handlers
field is a dictionary (hash table),
mapping message types (4-character strings) to function pointers. If
the message type has a corresponding entry in the dictionary, the
function pointer is extracted and invoked, passing it the
PeerConnection object and the actual data of the message. Upon
completion of Before returning, handlepeer
closes the
connection. Here, then, is the complete definition of the method:
def __handlepeer( self, clientsock ): self.__debug( 'Connected ' + str(clientsock.getpeername()) ) host, port = clientsock.getpeername() peerconn = BTPeerConnection( None, host, port, clientsock, debug=False ) try: msgtype, msgdata = peerconn.recvdata() if msgtype: msgtype = msgtype.upper() if msgtype not in self.handlers: self.__debug( 'Not handled: %s: %s' % (msgtype, msgdata) ) else: self.__debug( 'Handling peer msg: %s: %s' % (msgtype, msgdata) ) self.handlers[ msgtype ]( peerconn, msgdata ) except KeyboardInterrupt: raise except: if self.debug: traceback.print_exc() self.__debug( 'Disconnecting ' + str(clientsock.getpeername()) ) peerconn.close() # end handlepeer method |
Routing and sending messages
Using the addrouter
method, the programmer may
register a routing function (or method) with the Peer class to help
decide how messages should be forwarded, given a destination peer
id. The routing function should expect as a paremeter the name of a
peer (which may not necessarily be present in the list of known
peers of the node), and decide which of the known peer the message
should be routed to next in order to (hopefully) reach the desired
peer. The routing function should return a tuple of three values:
(next-peer-id, host, port) where the host and port are the IP
address of the peer identified by next-peer-id. If the message
cannot be routed, the next-peer-id should be None
.
The sendtopeer
method takes a message type and data,
along with a destination peer id, and uses the routing function to
decide where to send the message next. If no routing function has
been registered by the programmer, or if the routing function fails
for some reason, the method fails. If the routing function
successfully returns the next host/port combination to which the
message should be sent, sendtopeer
calls
the connectandsend
method to actually make the
connection to the peer, package up, and send the data. If the
programmer desires to receive a response from the next peer before
the communication socket is closed, it will be returned by these
methods.
def sendtopeer( self, peerid, msgtype, msgdata, waitreply=True ): if self.router: nextpid, host, port = self.router( peerid ) if not self.router or not nextpid: self.__debug( 'Unable to route %s to %s' % (msgtype, peerid) ) return None return self.connectandsend( host, port, msgtype, msgdata, pid=nextpid, waitreply=waitreply ) # end sendtopeer method |
The connectandsend
method connects and sends a message
to a peer at the specified IP address and port. The host's reply, if
desired by the caller, will be returned as a list of pairs, where
each pair contains (1) the type and (2) the actual data of the
message.
def connectandsend( self, host, port, msgtype, msgdata, pid=None, waitreply=True ): msgreply = [] # list of replies try: peerconn = BTPeerConnection( pid, host, port, debug=self.debug ) peerconn.senddata( msgtype, msgdata ) self.__debug( 'Sent %s: %s' % (pid, msgtype) ) if waitreply: onereply = peerconn.recvdata() while (onereply != (None,None)): msgreply.append( onereply ) self.__debug( 'Got reply %s: %s' % ( pid, str(msgreply) ) ) onereply = peerconn.recvdata() peerconn.close() except KeyboardInterrupt: raise except: if self.debug: traceback.print_exc() return msgreply # end connectsend method |
Additional methods
The Peer class provides methods supporting other fundamental functionalities of a peer node. Briefly, these include:
startstabilizer(stabilizer, delay)
: Runs the provided 'stabilizer' function in a separate thread, activating it repeatedly after every delay seconds, until theshutdown
flag of the Peer object is set.addhandler(msgtype, handler)
: Registers a handler function for the given message type with the Peer object. Only one handler function may be provided per message type. Message types do not have to be defined in advance of calling this method.addrouter(router)
: Registers a routing function with this peer. Read the section on routing above for details.addpeer(peerid, host, port)
: Adds a peer name and IP address/port mapping to the known list of peers.getpeer(peerid)
: Returns the (host,port) pair for the given peer name.removepeer(peerid)
: Removes the entry corresponding to the supplied peerid from the list of known pairs.addpeerat(loc, peerid, host, port)
/getpeerat(loc)
/removepeerat(loc)
: Analogous to the prior three methods, except that they access direct (numeric) positions within the list of peers (as opposed to using the peerid as a hash key). These functions should not be used concurrently with the prior three.getpeerids()
: Return a list of all known peer ids.numberofpeers()
:maxpeersreached()
:checklivepeers()
: Attempt to connect and send a 'PING' message to all currently known peers to ensure that they are still alive. For any connections that fail, the corresponding entry is removed from the list. This method can be used as a simple 'stabilizer' function for a P2P protocol.
The PeerConnection class
The PeerConnection class encapsulates a socket connection to a peer node. An object of this class may be initialized with an already-connected socket, or with an IP address/port combination that will be used to open a new socket connection.