トップPython > プロセス間通信

プロセス間通信

1.プロセス間通信

現在はC#で記述した地図システムから外部プログラムとして Pythonプログラムを動かしている。 これはプログラム起動オーバヘッドを伴うため、Pythonプログラムをサーバーとして動かしておき、 プロセス間通信でリクエストを送りたい。 まず、手始めに、文献[1]のプログラムの動作を確認した。

次に、サーバープログラムはこのままで、文献[2]を参考にしてクライアントプログラムを C# で記述する。

プログラムを下に示す。 地図システムでは、サーバー側がリクエストをシングルスレッドで実行する場合、 特に、サーバーから返すデータはなく、完了を通知するだけでよい。'\n' を返すだけでもよいので、 クライアント側は一行のリクエストを送り、'\n'が戻るのをまつだけでよいので簡単である。

しかし、現在はタイル作成はマルチスレッドで実行している。この場合はどうすればいいだろうか。 リクエストは一行ではなく、複数行とする。サーバーはこの複数のリクエストをマルチスレッドで処理し、 全て終えたら、終了通知の '\n' を送るのが、現在のプログラムに近いので作りやすい。

より、高度な方法として、非同期通信があるが、それについては次のステップで検討したい。

【サーバー】
# -*- coding: shift_jis -*-
import os
import sys
import threading
from socket import socket, AF_INET, SOCK_STREAM

# AF_INET =  Adress Family : Internet Sockets IPv4 を指す
# SOCK_STREAM = 通信プロトコル TCP を使用。 SOCK_DGRAM だと UDP

port = 12345
host = 'localhost'

stdoutmutex = threading.Lock()

def server():
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind((host, port)) # アドレスとポートを指定. 
    sock.listen(5) # 同時に接続できる個数
    while True:
         conn, addr = sock.accept()
         data = conn.recv(1024) # bufsize
         with stdoutmutex:
             print data

         # 処理

         conn.send("done\n".encode())

if __name__=='__main__':
     sthread = threading.Thread(target=server)
     sthread.daemon = False
     sthread.start()

【クライアント】
using System;
using System.IO;
using System.Net.Sockets;

public class Client {
    public static void Main() {
        string host = "localhost";
        int port = 12345;

        TcpClient tcp = new TcpClient(host, port);
        NetworkStream ns = tcp.GetStream();

        // サーバーにデータを送信する
        System.Text.Encoding enc = System.Text.Encoding.UTF8;
        byte[] sendBytes = enc.GetBytes("hello\n");  // 文字列をByte型配列に変換
        ns.Write(sendBytes, 0, sendBytes.Length);    // データを送信する

        // サーバーから送られたデータを受信する
        MemoryStream ms = new MemoryStream();
        byte[] resBytes = new byte[256];
        do {
            int resSize = ns.Read(resBytes, 0, resBytes.Length);
            if (resSize == 0) {
                Console.WriteLine("サーバーが切断しました。");
                break;
            }
            ms.Write(resBytes, 0, resSize);
        } while (ns.DataAvailable || resBytes[resSize - 1] != '\n');
        string resMsg = enc.GetString(ms.GetBuffer(), 0, (int)ms.Length);
        ms.Close();
        Console.Write(resMsg);

        // 閉じる
        ns.Close();
        tcp.Close();
    }
}

地図システムでの使い方に合わせてプログラムを修正した結果を下に示す。

サーバの受信バッファのサイズは 1024か 2048バイトが良いだろう。4096バイトでもいいかも知れない。 いずれにせよ、末尾に \n を重ねて、これをチェックする方がよい。

下のプログラムでは1秒間隔でしか実行できなかった。何らかの設定値があるのだろう。

やはり、非同期通信を使う必要があるのかも知れない。

同時に二つのクライアントを動かすと、サーバーは1秒間に2リクエストを受け取った。 1秒間隔はクライアント側に起因するようだ。

リンガーオプションで変更できるかと思ったが、少し、試みたところではうまく行かなかった。

実際の応用では、先着順処理がベストではなく、画面がスクロールしたり、Zoomの切り替えが起こるので、 たくさんの待ち行列があった場合、現在の Zoom で、 現在画面に表示されているか、近傍にあるタイルの更新を優先した方がよい。

Python側にキューを作る場合、先着順で処理が終わるわけではないので、Python側のサーバーはリクエストを 受け付けるだけで、何もレスポンスを返さない。C#側にもサーバーを立て、タイル作成が終わったら、 座標またはファイル名を送るようにするのがいいだろう。

しかし、Pythonは遅いので、こちらの処理はなるべく簡単にして、キューはなるべく作らない方がよい。 全体として、 どういう造りがいいかは時間をかけることにして、まずは動かせたい。 このため、とりあえず、Python側にキューを置き、先着順処理とする。

【サーバー】
import os
import sys
import threading
from socket import socket, AF_INET, SOCK_STREAM

port = 12345
host = 'localhost'

def server():
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind((host, port)) # アドレスとポートを指定. 
    sock.listen(1)          # 同時に接続できる個数
    while True:
         conn, addr = sock.accept()
         data = conn.recv(1024) # bufsize
         print data

         conn.send("\n".encode())

if __name__=='__main__':
     sthread = threading.Thread(target=server)
     sthread.daemon = False
     sthread.start()

【クライアント】
using System;
using System.IO;
using System.Net.Sockets;

public class Client {
    public static void Main() {
        System.Text.Encoding enc = System.Text.Encoding.UTF8;
        string host = "localhost";
        int port = 12345;

        for (int n = 0; n < 10; n++) {
            TcpClient tcp = new TcpClient(host, port);
            NetworkStream ns = tcp.GetStream();

            // サーバーにデータを送信する
            string sendStr = n + ": 123\n456\n789\n\n";
            byte[] sendBytes = enc.GetBytes(sendStr);  // 文字列をByte型配列に変換
            ns.Write(sendBytes, 0, sendBytes.Length);    // データを送信する

            // サーバーから送られたデータを受信する
            byte[] resBytes = new byte[256];
            int resSize = ns.Read(resBytes, 0, resBytes.Length);
            if (resSize > 0) {
                if (resBytes[0] == 0x0A) Console.WriteLine("終了コードを受け取りました。");
                else  Console.WriteLine("Error");
            } else {
                Console.WriteLine("サーバーが切断しました。");
            }

            // 閉じる
            ns.Close();
            tcp.Close();
        }
    }
}

2.タイル画像作成プログラム

改造したタイル画像作成プログラムを下に示す。地図システムからのリクエストはソケット通信により受け取る。

リクエストは {zoom}_{x}_{y} で表す。"stop" を受け取るとタイル画像作成プログラムは終了する。

地図システム側でもサーバースレッドを起動し、タイル画像の作成が完了する毎に返却される {zoom}_{x}_{y} を 受信する。タイル画像作成プログラムが終了するときには、直前に "done" を返却するために、これにより、 タイル画像の作成完了を知ることができる。

16_58001_25480
16_58001_25481
stop
#!/usr/bin/env python
# -*- coding: shift_jis -

try:
    import mapnik2 as mapnik
except:
    import mapnik
import sys, os
import threading
from math import pi,exp,atan
from Queue import Queue
from socket import socket, AF_INET, SOCK_STREAM

PORT = 12345
HOST = 'localhost'
RAD_TO_DEG = 180/pi
NUM_THREADS = 4
MAX_ZOOM = 19

request_queue = Queue(32)

class GoogleProjection:
    def __init__(self):
        self.Bc = []
        self.Cc = []
        self.zc = []
        self.Ac = []
        c = 256
        for d in range(0,MAX_ZOOM+1):
            e = c/2;
            self.Bc.append(c/360.0)
            self.Cc.append(c/(2 * pi))
            self.zc.append((e,e))
            self.Ac.append(c)
            c *= 2
                
    def fromPixelToLL(self,px,zoom):
         e = self.zc[zoom]
         f = (px[0] - e[0])/self.Bc[zoom]
         g = (px[1] - e[1])/-self.Cc[zoom]
         h = RAD_TO_DEG * ( 2 * atan(exp(g)) - 0.5 * pi)
         return (f,h)

# タイル作成スレッド
class RenderThread:
    def __init__(self, mapfile, q, tile_dir):
        self.q = q
        self.tile_dir = tile_dir
        self.m = mapnik.Map(256, 256)

        # Load style XML
        mapnik.load_map(self.m, mapfile, True)

        self.prj = mapnik.Projection(self.m.srs)
        self.tileproj = GoogleProjection()

    def render_tile(self, z_x_y):
        # Parse request
        items = z_x_y.split('_')
        if (len(items) == 3):
            z = int(items[0])
            x = int(items[1])
            y = int(items[2])
        else:
            print "Error: " + z_x_y
            return

        # Calculate pixel positions of bottom-left & top-right
        p0 = (x * 256, (y + 1) * 256)
        p1 = ((x + 1) * 256, y * 256)

        # Convert to LatLong (EPSG:4326)
        l0 = self.tileproj.fromPixelToLL(p0, z);
        l1 = self.tileproj.fromPixelToLL(p1, z);

        # Convert to map projection (e.g. mercator co-ords EPSG:900913)
        c0 = self.prj.forward(mapnik.Coord(l0[0],l0[1]))
        c1 = self.prj.forward(mapnik.Coord(l1[0],l1[1]))

        # Bounding box for the tile
        bbox = mapnik.Box2d(c0.x,c0.y, c1.x,c1.y)

        self.m.resize(256, 256)
        self.m.zoom_to_box(bbox)
        if(self.m.buffer_size < 128):
            self.m.buffer_size = 128

        # Render image with default Agg renderer
        im = mapnik.Image(256, 256)
        mapnik.render(self.m, im)
        tile_uri = self.tile_dir + "/" + z_x_y + ".png"
        im.save(tile_uri, "png256")
        send_response(z_x_y)

    def loop(self):
        while True:
            # Fetch a tile from the request_queue and render it
            req = self.q.get()
            if (req == "stop"):
                self.q.task_done()
                break
            self.render_tile(req)
            self.q.task_done()


def render_tiles(mapfile, tile_dir):
    print "start renderers"
    renderers = {}
    for i in range(NUM_THREADS):
        renderer = RenderThread(mapfile, request_queue, tile_dir)
        render_thread = threading.Thread(target=renderer.loop)
        render_thread.start()
        renderers[i] = render_thread

    # wait for pending rendering jobs to complete
    request_queue.join()
    for i in range(NUM_THREADS):
        renderers[i].join()
    send_response("done")

def server():
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind((HOST, PORT)) # アドレスとポートを指定. 
    sock.listen(5)          # 同時に接続できる個数
    print "wait requests"
    listening = True
    while listening:
        conn, addr = sock.accept()
        lines = conn.recv(2048).split('\n')
        for n in range(len(lines)):
            if (lines[n] == "stop"):
                listening = False
                break
            request_queue.put(lines[n])
    conn.close()
    for i in range(NUM_THREADS):
        request_queue.put("stop")

def send_response(response):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.connect((HOST, PORT+1))
    sock.send(response.encode())
    sock.close()

if __name__ == "__main__":
    mapfile = "c:/gis/mapnik/my_osm.xml"
    tile_dir = "d:/tiles"
    sthread = threading.Thread(target=server)
    sthread.daemon = False
    sthread.start()
    render_tiles(mapfile, tile_dir)

下の C# プログラムはテストプログラムである。 これにより、タイル画像ファイルが作成され、レスポンスが返されることを確認した。

この考え方を tileserver.cs に組込み、格段に応答性が改善されることを確認した。 細部はこれから修正する。

更新リクエストはキューに溜めるのではなく、即座に Pythonプログラムに送るようにした。

using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;

public class Client {
    static string host = "127.0.0.1";
    static int port = 12345;
    static bool fRunServer = true;

    static void ServerThread() {
        IPAddress localAddr = IPAddress.Parse(host); 
        TcpListener server = new TcpListener(localAddr, port+1); 
        server.Start(); 

        while (fRunServer) {
            TcpClient client = server.AcceptTcpClient(); 
            NetworkStream stream = client.GetStream(); 
            Byte[] bytes = new Byte[256]; 
            int i; 

            while ((i = stream.Read(bytes, 0, bytes.Length)) != 0) { 
                 String data = System.Text.Encoding.UTF8.GetString(bytes, 0, i); 
                 Console.WriteLine(String.Format("受信: {0}", data)); 
            } 

            client.Close(); 
        }

        server.Stop();
    }

    public static void Main() {
        Thread threadServer = new Thread(new ThreadStart(ServerThread));
        threadServer.Start();   // スレッドを開始する

        System.Text.Encoding enc = System.Text.Encoding.UTF8;
        TcpClient tcp = new TcpClient(host, port);
        NetworkStream ns = tcp.GetStream();

        // サーバーにデータを送信する
        string sendStr = "16_58001_25480\n16_58001_25481\nstop\n";
        byte[] sendBytes = enc.GetBytes(sendStr);  // 文字列をByte型配列に変換
        ns.Write(sendBytes, 0, sendBytes.Length);  // データを送信する

        // 閉じる
        ns.Close();
        tcp.Close();
    }
}

Pythonの server関数は次のようにリファインした。1接続では1コマンドしたことによる。

def server():
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind((HOST, PORT)) # アドレスとポートを指定. 
    sock.listen(5)          # 同時に接続できる個数
    print "wait requests"
    while True:
        conn, addr = sock.accept()
        request = conn.recv(2048)
        if (request == "stop"): break
        request_queue.put(request)
    conn.close()
    for i in range(NUM_THREADS):
        request_queue.put("stop")

A.リファレンス

[1] python - socket を使ったプロセス間通信
[2] TCPクライアント・サーバープログラムを作成する