TA的每日心情 | 慵懒 2014-11-28 09:29 |
---|
签到天数: 3 天 连续签到: 1 天 [LV.2]偶尔看看I
|
对话界面如下:
一、获取机器人
首先去图灵机器人官网注册一个账号并新建一个其它类机器人。新建完成后在“我的机器人》机器人详情》接入”页面即可看到每一个机器人的API KEY,如下图所示:
二、测试机器人
上面我们已经有了机器人并拿到了key现在新建一个程序测试下- import requests
- import json
- from datetime import datetime
- key = '***' #替换成你的key
- while True:
- data = str(input('请输入:'))
- data = {"key":key,"info":data}
- postdata = json.dumps(data)
- r = requests.post('http://www.tuling123.com/openapi/api',data=postdata)
- rend_data = r.text
- updata = json.loads(rend_data)
- rend_data = ''
- #根距接收的数据类型选择 say 方式
- if updata['code'] == 100000:
- print( updata['text'])
- if updata['code'] == 200000:
- print( updata['text'])
- print( updata['url'])
- if updata['code'] == 308000:
- temp = (updata['list'])
- for i in range(0,len(temp)):
- client_data = dict(temp[i])
- print(client_data['icon'])
- print(client_data['name'])
- print(client_data['info'])
- print(client_data['detailurl'])
- if updata['code'] == 302000:
- temp = (updata['list'])
- for i in range(0,len(temp)):
- client_data = dict(temp[i])
- print(client_data['article'])
- print(client_data['icon'])
- print(client_data['source'])
- print(client_data['detailurl'])
复制代码 测试效果如下:
三、接入贝壳
1、新建一个文件并保存为config.py 用来保存配置。代码如下:- #!/uer/bin/python3
- # -*- coding:utf-8 -*-
- #===================
- # bitiot.net 配置
- #===================
- class bigiot:
- host = 'www.bigiot.net'
- port = 8181
- deviceid = '***' #设备ID
- apikey = '***' #key
- iputid1 = '***' #数据接口,备用
- checkin = {
- "M":"checkin",
- "ID":deviceid,
- "K":apikey
- }
- #===========================
- # 图灵机器人配置
- # http://www.tuling123.com/
- #===========================
- class tulin:
- key = '***' #机器人key
- host = 'http://www.tuling123.com/openapi/api'
复制代码 2、新建一个文件并保存为function.py 用来存放函数。代码如下:- #!/uer/bin/python3
- # -*- coding:utf-8 -*-
- import requests
- import socket
- import time
- import json
- from config import bigiot as big
- from config import tulin as t
- from datetime import datetime
- address =(big.host, big.port)
- checkin = json.dumps(big.checkin)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- #=============================
- # bigiot
- # 2016/12/21
- #=============================
- class bigiot:
- def big_line(self): #连接贝壳平台
- while True:
- try:
- s.connect(address)
- print(address)
- time.sleep(2)
- except:
- print('连接失败!尝试重新连接...')
- time.sleep(2)
- else:
- s.sendall(checkin.encode('utf-8'))
- s.sendall(b'\n')
- print(checkin)
- break
- def keep_online(self,t): #在线状态检察
- if time.time() - t > 40: #每40S'{"M":"status"}'一次
- s.sendall(b'{"M":"status"}\n')
- print('[{0}]-->check status'.format(datetime.now()))
- return time.time()
- else:
- return t
- def say(self,id,coutent): #发送指令(say方式)
- __coutent = {"M":"say","ID":id, "C":coutent}
- __coutent = json.dumps(__coutent)
- s.sendall(__coutent.encode('utf-8'))
- s.sendall(b'\n')
- def process(self,b_data): #解析平台数据
- global msg
- msg = b_data
- print('[{0}]-->{1}'.format(datetime.now(),msg))
- if msg['M'] == 'connected':
- s.sendall(checkin.encode('utf-8'))
- s.sendall(b'\n')
- if msg['M'] == 'say':
- bigiot.chitchat(msg['C']) #调用机器人函数
- if msg['M'] == 'login':
- bigiot.say(msg['ID'], '你好!我是小冰,请问有什么可以帮你!')
- msg.clear()
- def chitchat(self,data): #机器人
- __data = {"key":t.key,"info":data}
- __postdata = json.dumps(__data)
- r = requests.post(t.host,data=__postdata)
- __rend_data = r.text
- __updata = json.loads(__rend_data)
- __rend_data = ''
- print('[{0}]-->{1}'.format(datetime.now(),__updata['code']))
- #根距接收的数据类型选择 say 方式
- if __updata['code'] == 100000:
- bigiot.say(msg['ID'], __updata['text'])
- if __updata['code'] == 200000:
- bigiot.say(msg['ID'], __updata['text'])
- bigiot.say(msg['ID'], __updata['url'])
- if __updata['code'] == 308000:
- __temp = (__updata['list'])
- for i in range(0,len(__temp)):
- __client_data = dict(__temp[i])
- bigiot.say(msg['ID'], __client_data['detailurl'])
- bigiot.say(msg['ID'], __client_data['info'])
- bigiot.say(msg['ID'], __client_data['name'])
- bigiot.say(msg['ID'], __client_data['icon'])
- if __updata['code'] == 302000:
- __temp = (__updata['list'])
- for i in range(0,len(__temp)):
- __client_data = dict(__temp[i])
- bigiot.say(msg['ID'], __client_data['detailurl'])
- bigiot.say(msg['ID'], __client_data['source'])
- bigiot.say(msg['ID'], __client_data['icon'])
- bigiot.say(msg['ID'], __client_data['article'])
- __updata = ''
- bigiot = bigiot()
复制代码 3、新建一个文件并保存为tcp_client.py 代码如下:- #!/uer/bin/python3
- # -*- coding:utf-8 -*-
- import function
- import time
- fs = function.s
- bigiot = function.bigiot
- bigiot.big_line() #调用big_line()连接bigiot.net
- fs.settimeout(0)
- t1 = t2 =time.time()
- while True:
- try:
- temp = fs.recv(1024)
- t1 = time.time()
- except:
- time.sleep(2)
- t1 = bigiot.keep_online(t1)
- pass
- else:
- try:
- msg = eval(str(temp,encoding = 'utf-8'))
- t2 = time.time()
- except:
- '''
- 如果90s内没有收到平台返回的数据,
- 则认定为掉线,并重新调用big_line()
- 函数
- '''
- if time.time() - t2 > 90:
- bigiot.big_line()
- else:
- bigiot.process(msg)
- msg.clear()
复制代码 把上面三个方件放在一个方件夹内,然后启动tcp_client.py
现在你的机器人也上线了^-^
四、一些问题
1、这套代码在PC的运行正常,树莓派上也没问题;但在树莓派上运行时机器人回答的比较慢,我的是大概5秒才收到返回信息而PC是秒回。至于是什么原因造成不太清楚还望大神指点。
2、在function.py中的process()函数中加入了对{"M":"login"}的返回,如果你现在有两个设备在相互say将有可能陷入无限循环,当然你也可以去掉对{"M":"login"}的处理。
3、由于本人是刚学python所以代码中对错误的处理都比较‘暴力’,当遇到错误时只会跳过并不会输出错误信息;如你要调试代码请用python IDLE 调试或自行更改错误处理方式。这种错误处 理方式极不推荐可其它的我还没学会-_-!
4、不要问我为什么把非要用类,上面说过了,我现在刚学python的类,拿来练手的。
|
|