done is better than perfect

自分が学んだことや、作成したプログラムの記事を書きます。

Watch2chをPythonに移植してみた

Watch2chPythonで書きなおしてみました。
Watch2chは、PHPで作られた2chの監視を目的としたプログラムです。 指定したスレの勢いをチェックし、閾値を超えた場合にスレッドの内容を出力します。
中々に面白いソフトです。mecabなどの形態素解析のソフトと組み合わせたりすると、より高度な2ch監視ソフトが作れそうですね。 Watch2chはPHPで書かれており、ソフト本体のソースコードは約300行とコンパクトになっています。コメントも随所に付けられており、かなり丁寧な作りでした。 まずは機能の追加などはせず、そのまま移植してみました。一部エラーチェックなどは適当ですが、今後追加していきたいと思います。

watch2ch.py

# -*- coding: utf-8 -*-
import sys
import json
import re
import requests
import time
import string

class Watch2ch:
    MENU_LIST_URL = "http://menu.2ch.net/bbsmenu.html"
    DEFAULT_OUTPUT_NUM = 5
    _config = {}
    _board_url = ''

    def run(self, config_file):
        self._config = self._readConfigFile(config_file)
        self._board_url = self._getBoardURL(self._config['BoardName'])
        thread_list = self._getThreadList(self._board_url + self._config['BoardName'])
        thread_info = self._searchThreadList(thread_list, self._config['Keyword'])
        forces = self._getForces(thread_info)

        if (forces >= self._config['BorderForces']):
            try:
                output_num = int(self._config['OutputNum'])
            except KeyError:
                output_num = self.DEFAULT_OUTPUT_NUM

            url = self._board_url + self._config['BoardName'] + '/dat/' + thread_info['dat']
            res = self._getCurrentRes(url, output_num)

            if res:
                print(u"{0} ({1}) [{2}]\n\n".format(thread_info['name'], thread_info['res'], forces))
                print(u"{0}".format(res))

    def _readConfigFile(self, config_file):
        try:
            config_text = open(config_file, 'r').read()
        except IOError:
            print("Cannot open : %s", config_file)
            sys.exit()

        config_dict = json.JSONDecoder().decode(config_text)

        require_keys = ['BoardName', 'Keyword', 'BorderForces']

        for require_key in require_keys:
            if not require_key in config_dict:
                print("{key} is required on config file".format(key = require_key))
                sys.exit()

        return config_dict

    def _getBoardURL(self, board_name):
        url_pattern = re.compile(u"^\<A HREF=(.+\.2ch\.net\/){0}\/\>.*$".format(board_name), re.I | re.U)
        menu_list = self._getURL(self.MENU_LIST_URL).split("\n")
        for line in menu_list:
            if url_pattern.search(line):
                return url_pattern.search(line).group(1)

        sys.exit()

    def _getThreadList(self, board_url):
        try:
            thread_list = self._getURL(board_url + "/subject.txt")
            if thread_list.count("\n") < 5:
                raise(-200)
        except Exception as e:
            error_code = e.args[0]
            window_location_pattern = '.*window.location.href="(.+\/)(.+\/".*'
            if (error_code == 302 or error_code == 404 or error_code == -200):
                try:
                    moved_url = self._getURL(board_url + "/index.html")
                    if window_location_pattern.search(moved_url):
                        self.board_url = window_location_pattern.search(moved_url).group(1)
                        return self._getThreadList(self.board_url + window_location_pattern.search(moved_url).group(2))
                except Exception as sub_e:
                    sys.exit()
            else:
                sys.exit()

        return thread_list

    def _searchThreadList(self, thread_list, keyword):
        ret_list = []
        thread_list_list = thread_list.split('\n')
        thread_pattern = re.compile(u"^(.*)<>(.*) \*1 - int(created_time)) / 86400)
        forces = round(forces, 2)

        if thread_info['res'] < 10:
            forces = max(forces, 99.99)
        if thread_info['res'] < 5:
            forces = max(forces, 9.99)
        return forces

    def _getCurrentRes(self, url, num):
        dat_contents = self._getURL(url)
        dat_list = dat_contents.splitlines()

        ret = ""
        start = max(len(dat_list) - num, 0)
        end = len(dat_list)
        for i, line in enumerate(dat_list):
            if i < start:
                continue
            tmp_res = line
            tmp_list = tmp_res.split("<>")
            tmp_list[3] = tmp_list[3].replace("<br>", "\n")
            tmp_list[3] = tmp_list[3].replace("&gt;", ">")
            tmp_list[3] = tmp_list[3].replace("&lt;", "<")
            tmp_list[3] = tmp_list[3].replace("&amp;", "&")
            tmp_list[3] = tmp_list[3].replace("'", "&#39;")
            tmp_list[3] = tmp_list[3].replace('"', "&quot;")
            ret += u"{0} {1}\n{2}\n\n".format(tmp_list[0], tmp_list[2], tmp_list[3])

        return ret

    def _getURL(self, url, last_access = 0):
        r = requests.get(url)
        if r.status_code == 200:
            r.encoding = 'shift-jis'
            return r.text
        else:
            raise(r.status_code)

def main():
    w2ch = Watch2ch()
    w2ch.run(sys.argv[1])

if __name__ == '__main__':
    main()

sample.conf

configファイルは、本家のものからPython用に少し変えてあります。
{
    "BoardName":"gameswf",
    "Keyword":".*ブラウザ.*",
    "BorderForces":"10",
    "OutputNum":"5"
}
BSDライセンスとします。 使い方は
python watch2ch.py sample.conf
です。Mac OSΧ 10.8.2 + Python 2.7.3で動作確認をしています。後でgithubのほうにも挙げておきます。 動作にはrequestsが必要です。Pythonの標準モジュールであるurllibやurllib2はあまり使いやすくありませんが、requestsは非常に高機能で、なおかつシンプルに書けるのでオススメです。pipなどでインストール出来ます。 移植作業をして思ったのは、Webに特化しているだけあってHTML関連の関数がやたら豊富だということです。HTMLのタグ処理などがあまり上手くできていないので、今後そこを改善しておきたいと思います。

*1:[0-9]+)\)$", re.U) # for thread in thread_list_list: for thread in thread_list.splitlines(): if re.search(keyword, thread, re.U): if thread_pattern.search(thread): ret_dict = { 'dat': thread_pattern.search(thread).group(1), 'name': thread_pattern.search(thread).group(2), 'res': thread_pattern.search(thread).group(3) } break return ret_dict def _getForces(self, thread_info): created_time = thread_info['dat'].split(".")[0] forces = int(thread_info['res']) / ((int(time.time(