トップPython > C#とPythonのプロセス間通信

C#とPythonのプロセス間通信

1.はじめに

自作地図システムは C# で開発しており、一方、ここで使用しているタイル地図作成は Pythonプログラムである。 これまでは、Zoomと座標値 {zoom}_{x}_{y} のリストをファイルに書きこみ、Pythonプログラムを起動していた。 これが一番シンプルであろう。タイル地図を予め一括更新する場合には、この方法で支障はない。

しかし、個人利用の地図システムでは、毎回、時間をかけて、全タイルを更新しても、実際に参照するタイルは 微々たるものに過ぎない。

このため、実際に、表示した時、タイル地図を更新する方法(以下、JIT更新と呼ぶ)に重点を移してきた。 JIT更新の場合、数タイル〜十数タイル単位の更新となるため、その都度、Pythonプログラムを起動する場合、 起動のオーバヘッドやレスポンスタイムが問題になる。

このため、Pythonプログラムはタイル更新サーバーとして常時動作させておき、 C#からPythonに更新リクエスト{zoom}_{x}_{y}を通信で送る方式に変更した。 タイル作成が完了したときその {zoom}_{x}_{y} を C# プログラムに返送するため、双方向通信である。

しかし、この場合、1対1通信であるから、プロセス間通信を使うまでもなく、 C#でPythonプログラムを起動した際、標準入出力をリダイレクトしておき、リクエストを標準入力に送り、 レスポンスを標準出力から受け取るようにした方がシンプルであろう。

何らかの理由でプロセス間通信に戻る可能性もあり、また、別の所でプロセス間通信を使う可能性もある ことから、プロセス間通信方式の要点をここに記録しておく。

2.Pythonにおけるプロセス間通信

比較的小さいので Pythonプログラム全体を下に示す。通信部分を赤字とした。 最初の約20行が、リクエストを受けるサーバープログラムである(ポート2345)。 末尾近くの 10行がレスポンスを送るクライアントプログラムである(ポート2346)。

#!/usr/bin/env python
# -*- coding: shift_jis -

from math import pi,exp,atan

try:
    import mapnik2 as mapnik
except:
    import mapnik
mapnik.register_fonts('c:/windows/fonts/')

import sys, os
from Queue import Queue
import threading
from socket import socket, AF_INET, SOCK_STREAM

PORT = 2345
HOST = 'localhost'

RAD_TO_DEG = 180/pi
NUM_THREADS = 4

mapfile  = "c:/gis/mapnik/my_osm.xml"
tile_dir = "d:/mapniktiles"
queue = Queue(256)
maxzoom = 19

class GoogleProjection:
    def __init__(self,levels):
        self.Bc = []
        self.Cc = []
        self.zc = []
        self.Ac = []
        c = 256
        for d in range(0,levels):
            e = c/2;
            self.Bc.append(c/360.0)
            self.Cc.append(c/(2 * pi))
            self.zc.append((e,e))
            self.Ac.append(c)
            c *= 2
                
    def fromPixelToLL(self,px,zoom):
         e = self.zc[zoom]
         f = (px[0] - e[0])/self.Bc[zoom]
         g = (px[1] - e[1])/-self.Cc[zoom]
         h = RAD_TO_DEG * ( 2 * atan(exp(g)) - 0.5 * pi)
         return (f,h)


class RenderThread:
    def __init__(self, i):
        self.q = queue
        self.tile_dir = tile_dir
        self.m = mapnik.Map(256, 256)

        # Load style XML
        mapnik.load_map(self.m, mapfile, True)

        self.prj = mapnik.Projection(self.m.srs)
        self.tileproj = GoogleProjection(maxzoom+1)


    def render_tile(self, z, x, y, z_x_y):

        # Calculate pixel positions of bottom-left & top-right
        p0 = (x * 256, (y + 1) * 256)
        p1 = ((x + 1) * 256, y * 256)

        # Convert to LatLong (EPSG:4326)
        l0 = self.tileproj.fromPixelToLL(p0, z);
        l1 = self.tileproj.fromPixelToLL(p1, z);

        # Convert to map projection (e.g. mercator co-ords EPSG:900913)
        c0 = self.prj.forward(mapnik.Coord(l0[0],l0[1]))
        c1 = self.prj.forward(mapnik.Coord(l1[0],l1[1]))

        # Bounding box for the tile
        bbox = mapnik.Box2d(c0.x, c0.y,  c1.x, c1.y)

        self.m.resize(256, 256)
        self.m.zoom_to_box(bbox)
        if(self.m.buffer_size < 128):
            self.m.buffer_size = 128

        # Render image with default Agg renderer
        im = mapnik.Image(256, 256)
        mapnik.render(self.m, im)

        tile_uri = self.tile_dir + "/" + z_x_y + ".png"
        im.save(tile_uri, "png256")
        send_response(z_x_y)
        print "\r" + z_x_y,


    def loop(self):
        while True:
            #Fetch a tile from the queue and render it
            r = self.q.get()
            if (r == None):
                self.q.task_done()
                break
            else:
                (z, x, y, z_x_y) = r

            self.render_tile(z, x, y, z_x_y)
            self.q.task_done()


def render_tiles():
    renderers = {}
    for i in range(NUM_THREADS):
        renderer = RenderThread(i)
        render_thread = threading.Thread(target=renderer.loop)
        render_thread.start()
        renderers[i] = render_thread

    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind((HOST, PORT))                     # アドレスとポートを指定. 
    sock.listen(5)                              # 同時に接続できる個数
    print "Ready NUM_THREADS = " + str(NUM_THREADS)
    while True:
        conn, addr = sock.accept()
        request = conn.recv(2048)
        try:    
            items = request.split('_')          # {zoom}_{x}_{y}
            z = int(items[0])
            x = int(items[1])
            y = int(items[2])
            if (z == -9): break                 # quit
            r = (z, x, y, request)
            queue.put(r)
        except:
            print "Error: " + request

    conn.close()

    # Signal render threads to exit by sending empty request to queue
    for i in range(NUM_THREADS):
        queue.put(None)

    # wait for pending rendering jobs to complete
    queue.join()
    for i in range(NUM_THREADS):
        renderers[i].join()


def send_response(response):
    while True:
        try:
            sock = socket(AF_INET, SOCK_STREAM)
            sock.connect((HOST, PORT+1))
            sock.send(response.encode())
            sock.close()
            break
        except:
            print "retry send_response: " + response


if __name__ == "__main__":
    NUM_THREADS = int(sys.argv[1])
    render_tiles()

3.C#におけるプロセス間通信

クライアントプログラムとサーバープログラムを下に示す。 C#からPythonへの通信ポートは 2345、その逆の通信ポートは 2346 を用いている。

サーバーにリクエストがたまってしまうとまずいため、 リクエストカウント cntSend とレスポンスカウンタ cntDone を設けて、 256タイル以上滞留しないようにしている。

サーバープログラム ResponseReceiver はレスポンスを キュー queueOSM にいれるだけであり、 別のスレッドがこのキューから {zoom}_{x}_{y} を取り出して、タイル地図の zipアーカイブを更新している。 タイル画像(pngファイル)自体は、予め決められたディレクトリ上に置かれており、 Pythonプログラムが書き込んだものを C# プログラムが読み出し、用済み後、削除している。 プロセス間通信が排他制御も兼ねている。

ある程度エラー処理を入れているが、完ぺきではない。 パソコンの負荷(CPUまたはディスク)が異常に高くなったとき、通信エラーが発生する。

    int cntSend = 0;
    int cntDone = 0;

    public void Send(string z_x_y) {    // タイル更新タスクにリクエストを送る
        while (z_x_y[0] != '-' && cntSend > cntDone + 256) {
            Thread.Sleep(3000);
        }
        byte[] sendBytes = Encoding.UTF8.GetBytes(z_x_y);
        string error = null; 
        int nRetry = 3;
        while (nRetry-- > 0) {
            try {
                TcpClient tcp = new TcpClient("127.0.0.1", 2345);
                NetworkStream ns = tcp.GetStream();
                ns.Write(sendBytes, 0, sendBytes.Length);  // データを送信する
                ns.Close();
                tcp.Close();
                cntSend++;
                break;
            } catch (Exception ex) {
                error = ex.ToString();
                Thread.Sleep(1000);
            }
        }
        if (nRetry == 0) {
            Console.Error.WriteLine("Error: Send " + z_x_y + "\r\n" + error);
        }
    }

    // OSMタイル作成完了通知を受け取る
    void ResponseReceiver() {
        IPAddress localAddr = IPAddress.Parse("127.0.0.1"); 
        TcpListener server = new TcpListener(localAddr, 2346); 
        server.Start(); 
        while (true) {
            try {
                TcpClient client = server.AcceptTcpClient(); 
                NetworkStream stream = client.GetStream(); 
                Byte[] bytes = new Byte[256]; 
                int i; 
                while ((i = stream.Read(bytes, 0, bytes.Length)) != 0) { 
                    string z_x_y = Encoding.UTF8.GetString(bytes, 0, i);
                    string path =  DirMapnik + "/" + z_x_y + ".png";
                    lock (queueOSM) { queueOSM.Enqueue(path); }
                }
                client.Close(); 
            } catch (Exception ex) {
                Console.Error.WriteLine(ex.ToString());
            }
        }
    }

4.パイプによる一方向通信

C#プログラムから Pythonプログラムへリクエストを送る一方向通信だけならば、 パイプを使うのが一番簡単である。 C#プログラムはリクエストを標準出力に送るだけでよい。Pythonプログラムは標準入力から受け取るだけでよい。 エラー処理も特に必要としない。

一括更新の場合、ファイル毎に完了通知をする必要はなく、ひと塊終わったときにそれを知らせる ファイルを作成するだけでよい。C#プログラムは例えば、1秒間隔で、マークファイルが作られるのを監視すればよい。

JIT更新の場合には、ファイル単位の完了通知が望ましいので、タイマースレッドによる監視では、 レスポンスあるいはオーバヘッドの問題が生じるので、この方法は望ましくない。

5.リダイレクトによる双方向通信

Pythonプログラムは、標準入力からリクエストを受け取り、標準出力にレスポンスを送るだけでよい。

C#プログラムは Pythonプログラムの標準入出力をリダイレクトして、リクエストを Pythonプログラムの標準入力に送り、 標準出力からレスポンスを受け取る。

Pythonプログラムを下に示す。通信に置き換わった部分を赤字で示した。 リクエストの受け取りとレスポンスの送出は極めて極めてシンプルで全体では30数行短くなっている。

#!/usr/bin/env python
# -*- coding: shift_jis -

from math import pi,exp,atan

try:
    import mapnik2 as mapnik
except:
    import mapnik
mapnik.register_fonts('c:/windows/fonts/')

import sys, os
from Queue import Queue
import threading

RAD_TO_DEG = 180/pi
NUM_THREADS = 4

mapfile  = "c:/gis/mapnik/my_osm.xml"
tile_dir = "d:/mapniktiles"
queue = Queue(256)
maxzoom = 19

class GoogleProjection:
    def __init__(self,levels):
        self.Bc = []
        self.Cc = []
        self.zc = []
        self.Ac = []
        c = 256
        for d in range(0,levels):
            e = c/2;
            self.Bc.append(c/360.0)
            self.Cc.append(c/(2 * pi))
            self.zc.append((e,e))
            self.Ac.append(c)
            c *= 2
                
    def fromPixelToLL(self,px,zoom):
         e = self.zc[zoom]
         f = (px[0] - e[0])/self.Bc[zoom]
         g = (px[1] - e[1])/-self.Cc[zoom]
         h = RAD_TO_DEG * ( 2 * atan(exp(g)) - 0.5 * pi)
         return (f,h)


class RenderThread:
    def __init__(self, i):
        self.q = queue
        self.tile_dir = tile_dir
        self.m = mapnik.Map(256, 256)

        # Load style XML
        mapnik.load_map(self.m, mapfile, True)

        self.prj = mapnik.Projection(self.m.srs)
        self.tileproj = GoogleProjection(maxzoom+1)


    def render_tile(self, z, x, y, z_x_y):

        # Calculate pixel positions of bottom-left & top-right
        p0 = (x * 256, (y + 1) * 256)
        p1 = ((x + 1) * 256, y * 256)

        # Convert to LatLong (EPSG:4326)
        l0 = self.tileproj.fromPixelToLL(p0, z);
        l1 = self.tileproj.fromPixelToLL(p1, z);

        # Convert to map projection (e.g. mercator co-ords EPSG:900913)
        c0 = self.prj.forward(mapnik.Coord(l0[0],l0[1]))
        c1 = self.prj.forward(mapnik.Coord(l1[0],l1[1]))

        # Bounding box for the tile
        bbox = mapnik.Box2d(c0.x, c0.y,  c1.x, c1.y)

        self.m.resize(256, 256)
        self.m.zoom_to_box(bbox)
        if(self.m.buffer_size < 128):
            self.m.buffer_size = 128

        # Render image with default Agg renderer
        im = mapnik.Image(256, 256)
        mapnik.render(self.m, im)

        tile_uri = self.tile_dir + "/" + z_x_y + ".png"
        im.save(tile_uri, "png256")
        print z_x_y              # 完了通知


    def loop(self):
        while True:
            #Fetch a tile from the queue and render it
            r = self.q.get()
            if (r == None):
                self.q.task_done()
                break
            else:
                (z, x, y, z_x_y) = r

            self.render_tile(z, x, y, z_x_y)
            self.q.task_done()


def render_tiles():
    renderers = {}
    for i in range(NUM_THREADS):
        renderer = RenderThread(i)
        render_thread = threading.Thread(target=renderer.loop)
        render_thread.start()
        renderers[i] = render_thread

    while True:
        request = raw_input()
        if request == '': break                 # quit
        try:    
            items = request.split('_')          # {zoom}_{x}_{y}
            z = int(items[0])
            x = int(items[1])
            y = int(items[2])
            r = (z, x, y, request)
            queue.put(r)
        except:
            print >> sys.stderr, "Error: " + request 

    # Signal render threads to exit by sending empty request to queue
    for i in range(NUM_THREADS):
        queue.put(None)

    # wait for pending rendering jobs to complete
    queue.join()
    for i in range(NUM_THREADS):
        renderers[i].join()


if __name__ == "__main__":
    #NUM_THREADS = int(sys.argv[1])
    render_tiles()

一方、C#側のプログラムは、予想以上に行数がかかった。 リダイレクトを指定してプロセスを起動するのに、あれこれ設定がいる。

リクエストの送出自体は procPython.StandardInput.WriteLine(z_x_y); であるから簡単である。

レスポンスの受け取りはハンドラーを使う。こちらも極めて簡単である。

全体として、プロセス間通信よりも簡単なので、当面、このプログラムを使用してみる。

    int cntSend = 0;
    int cntDone = 0;

    Process procPython = null;

    public void StartGenerator(int numThreads) {
        if (procPython != null) return;
        Thread t = new Thread(new ParameterizedThreadStart(Generator));
        t.Start(numThreads);                       // スレッドを開始する
        t.IsBackground = true;
    }

    public void Generator(Object obj) {
        int numThreads = (int)obj;
        string cmdpara = @"c:\gis\mapnik\gentile312.py " + numThreads;

        procPython = new Process();
        procPython.StartInfo.FileName = @"c:\Python27\python.exe";  // 実行するファイル 
        procPython.StartInfo.Arguments = cmdpara;           // コマンドパラメータ(引数)
        procPython.StartInfo.CreateNoWindow = true;         // コンソール・ウィンドウを開かない
        procPython.StartInfo.UseShellExecute = false;       // シェル機能を使用しない
        procPython.StartInfo.RedirectStandardInput = true;  // 子プロセスの標準入力をリダイレクトする
        procPython.StartInfo.RedirectStandardOutput = true;     // 子プロセスの標準出力をリダイレクトする
        procPython.StartInfo.RedirectStandardError = true;  // 子プロセスの標準エラーをリダイレクトする

        procPython.OutputDataReceived += ResponseReceiver;
        procPython.ErrorDataReceived  += PrintErrorData;

        procPython.Start();

        procPython.BeginOutputReadLine();
        procPython.BeginErrorReadLine();
        procPython.WaitForExit();
        procPython = null;
    }

    public void StopGenerator() {
        procPython.StandardInput.WriteLine("");
        procPython.StandardInput.Close();
    }  // Pythonプロセスの標準入力を閉じて書き込みを終了する

    public void Send(string z_x_y) {    // Mapnikサーバーにリクエストを送る
        while (z_x_y[0] != '-' && cntSend > cntDone + 256) {
            Thread.Sleep(3000);
        }
        procPython.StandardInput.WriteLine(z_x_y);
        cntSend++;
    }

    // OSMタイル作成完了通知を受け取る
    void ResponseReceiver(object sender, DataReceivedEventArgs e) {
        if (!string.IsNullOrEmpty(e.Data)) {
            lock (queueOSM) { queueOSM.Enqueue(e.Data); }
        }
    }

    void PrintErrorData(object sender, DataReceivedEventArgs e) {
        // 子プロセスの標準エラーから受信した内容を自プロセスの標準エラーに書き込む
        Process p = (Process)sender;
        if (!string.IsNullOrEmpty(e.Data)) {
            Console.Error.WriteLine("[{0};stderr] {1}", p.ProcessName, e.Data);
        }
    }

python.exe はコマンドラインプログラムであるから、画面には何も表示されない。 プロセス間通信を用いた時には、実行するファイルを "C:/Windows/System32/cmd.exe" 、 パラメータを @"/c python c:\gis\mapnik\gentile312.py " + numThreads とした。 この場合はコマンドプロンプト画面が表示される。

リダイレクトする場合、この方法が使えるかどうかは未確認である。 もし、使えるならば、標準エラー出力はリダイレクトせず、コマンドプロンプト画面に表示させる方がよい。 ただし、リダイレクトが分かりづらいため、やはり、python.exe を直接起動する方がよい。

pythonが動いているかどうかはOSのタスクマネージャで確認するのが一番簡単で確実である。 動作中はエクスプローラで生成タイルが置かれるディレクトリを見ればわかる。 また、JIT更新中は逐次、地図が更新されるので、特に、更新状況を表示する必要はないと思う。 あえて必要ならば、ステータスバーかモニター画面に cntSend、cntDone の値が刻々変化するのを表示すればよい。

なお、何もしないと、起動した python.exe は地図システムをクローズしても、動いている。 このため、地図システムの終了処理で空行を送る。これが、タイル生成プログラムの停止コマンドである。 タイル生成プログラムはマルチスレッドで動いており、各スレッドに nullリクエストが送られ、 全てのスレッドが停止したとき、python.exe は終了する。これを受けて c# の Generatorスレッドも終了する。 すなわち、停止コマンド(空行)を送っても、一旦送り込んだリクエストはキャンセルされず、 この処理が全て完了するまで、地図システムも動き続ける。

もちろん、プログラムを改変すれば、未処理のリクエストをキャンセルすることも可能であるが、 地図システムでは、どこまで、実行したかを記録しており、 キャンセルを許すと記録方法も複雑になる。

一括更新の場合、数分を目安として更新しているため、実行中に止める場合、数分は待つことになる。

特に、PostgreSQL は動作中(地図更新中)に停止してはならない。再び動かせなくなり、 PostgreSQL のアップデートもアンインストールさえできなくなる悲劇に見舞われる場合もある。

A.リファレンス

[1] 子プロセスの標準入出力